Add watched series/authors feature

Introduce watched lists for series and authors end-to-end.

- Add DB migration to create watched_series and watched_authors tables with indexes and foreign keys.
- Implement API routes: GET/POST for listing/adding and DELETE by id for both /api/user/watched-series and /api/user/watched-authors. Validation, ownership checks, and immediate targeted job triggers are included.
- Add client hooks (useWatchedSeries, useWatchedAuthors) with add/delete helpers and SWR revalidation.
- Add UI components: WatchButton (toggle/confirm) and WatchedListsSection for profile display and removal UX.
- Add processor (check-watched-lists.processor) and service (watched-lists.service) to scrape Audible, deduplicate, check library ownership, and auto-create requests; supports targeted checks for newly watched items.
- Include tests for the watched-lists service.

These changes implement the watched-lists feature to let users watch series/authors and have the system automatically detect and request new releases.
This commit is contained in:
kikootwo
2026-03-03 21:57:38 -05:00
parent 610873af6b
commit cbf02d3e24
23 changed files with 2392 additions and 32 deletions
+50
View File
@@ -27,6 +27,7 @@ export type JobType =
| 'cleanup_seeded_torrents'
| 'monitor_rss_feeds'
| 'sync_goodreads_shelves'
| 'check_watched_lists'
| 'send_notification'
// Ebook-specific job types
| 'search_ebook'
@@ -113,6 +114,16 @@ export interface SyncGoodreadsShelvesPayload extends JobPayload {
maxLookupsPerShelf?: number;
}
export interface CheckWatchedListsPayload extends JobPayload {
scheduledJobId?: string;
/** If set, only process watched items for this user */
userId?: string;
/** If set, only process this specific series */
seriesAsin?: string;
/** If set, only process this specific author */
authorAsin?: string;
}
// Ebook-specific payload interfaces
export interface SearchEbookPayload extends JobPayload {
requestId: string;
@@ -384,6 +395,12 @@ export class JobQueueService {
return await processSyncGoodreadsShelves(payloadWithJobId);
});
this.queue.process('check_watched_lists', 1, async (job: BullJob<CheckWatchedListsPayload>) => {
const { processCheckWatchedLists } = await import('../processors/check-watched-lists.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'check_watched_lists');
return await processCheckWatchedLists(payloadWithJobId);
});
// Send notification processor
this.queue.process('send_notification', 2, async (job: BullJob<SendNotificationPayload>) => {
const { processSendNotification } = await import('../processors/send-notification.processor');
@@ -766,6 +783,39 @@ export class JobQueueService {
);
}
/**
* Add check watched lists job (watched series + watched authors)
*/
async addCheckWatchedListsJob(scheduledJobId?: string): Promise<string> {
return await this.addJob(
'check_watched_lists',
{
scheduledJobId,
} as CheckWatchedListsPayload,
{
priority: 7,
}
);
}
/**
* Add a targeted check for a specific watched series or author for a specific user.
* Used for immediate processing when a user adds a new watch.
*/
async addCheckWatchedItemJob(userId: string, seriesAsin?: string, authorAsin?: string): Promise<string> {
return await this.addJob(
'check_watched_lists',
{
userId,
seriesAsin,
authorAsin,
} as CheckWatchedListsPayload,
{
priority: 8, // Higher than scheduled (7) since user-initiated
}
);
}
// =========================================================================
// EBOOK-SPECIFIC JOB METHODS
// =========================================================================
+18 -1
View File
@@ -10,7 +10,7 @@ import { RMABLogger } from '../utils/logger';
const logger = RMABLogger.create('Scheduler');
export type ScheduledJobType = 'plex_library_scan' | 'plex_recently_added_check' | 'audible_refresh' | 'retry_missing_torrents' | 'retry_failed_imports' | 'cleanup_seeded_torrents' | 'monitor_rss_feeds' | 'sync_goodreads_shelves';
export type ScheduledJobType = 'plex_library_scan' | 'plex_recently_added_check' | 'audible_refresh' | 'retry_missing_torrents' | 'retry_failed_imports' | 'cleanup_seeded_torrents' | 'monitor_rss_feeds' | 'sync_goodreads_shelves' | 'check_watched_lists';
export interface ScheduledJob {
id: string;
@@ -133,6 +133,13 @@ export class SchedulerService {
enabled: true, // Enable by default
payload: {},
},
{
name: 'Check Watched Lists',
type: 'check_watched_lists' as ScheduledJobType,
schedule: '0 0 * * *', // Daily at midnight (every 24 hours)
enabled: true, // Enable by default
payload: {},
},
];
let created = 0;
@@ -353,6 +360,9 @@ export class SchedulerService {
case 'sync_goodreads_shelves':
bullJobId = await this.triggerSyncGoodreadsShelves(job);
break;
case 'check_watched_lists':
bullJobId = await this.triggerCheckWatchedLists(job);
break;
default:
throw new Error(`Unknown job type: ${job.type}`);
}
@@ -627,6 +637,13 @@ export class SchedulerService {
private async triggerSyncGoodreadsShelves(job: any): Promise<string> {
return await this.jobQueue.addSyncGoodreadsShelvesJob(job.id);
}
/**
* Trigger watched lists check (watched series + watched authors)
*/
private async triggerCheckWatchedLists(job: any): Promise<string> {
return await this.jobQueue.addCheckWatchedListsJob(job.id);
}
}
// Singleton instance
+414
View File
@@ -0,0 +1,414 @@
/**
* Component: Watched Lists Service
* Documentation: documentation/features/watched-lists.md
*
* Checks watched series and watched authors for new releases.
* Deduplicates results using the works table, checks against user's library,
* and auto-creates requests via the shared request-creator service.
* Follows the same pattern as goodreads-sync.service.ts.
*/
import { prisma } from '@/lib/db';
import { getAudibleService, AudibleAudiobook } from '@/lib/integrations/audible.service';
import { scrapeSeriesPage } from '@/lib/integrations/audible-series';
import { deduplicateAndCollectGroups } from '@/lib/utils/deduplicate-audiobooks';
import { persistDedupGroups } from '@/lib/services/works.service';
import { createRequestForUser } from '@/lib/services/request-creator.service';
import { findPlexMatch } from '@/lib/utils/audiobook-matcher';
import { getSiblingAsins } from '@/lib/services/works.service';
import { RMABLogger } from '@/lib/utils/logger';
const logger = RMABLogger.create('WatchedLists');
/** Max books to process per series (avoid excessively long runs) */
const MAX_BOOKS_PER_SERIES = 200;
/** Max author book pages to scrape */
const MAX_AUTHOR_PAGES = 4;
/** Delay between scrapes to avoid rate limiting (ms) */
const SCRAPE_DELAY_MS = 2000;
export interface WatchedListsSyncStats {
seriesChecked: number;
authorsChecked: number;
booksFound: number;
requestsCreated: number;
skippedOwned: number;
skippedExisting: number;
errors: number;
}
export interface WatchedListsSyncOptions {
/** Process only this specific user (for targeted sync) */
userId?: string;
/** Process only this specific series (for immediate sync on watch) */
seriesAsin?: string;
/** Process only this specific author (for immediate sync on watch) */
authorAsin?: string;
}
/**
* Process all watched series and authors: scrape for new releases,
* deduplicate, check library ownership, and create requests.
* Called from the check_watched_lists processor.
*/
export async function processWatchedLists(
jobLogger?: ReturnType<typeof RMABLogger.forJob>,
options: WatchedListsSyncOptions = {}
): Promise<WatchedListsSyncStats> {
const log = jobLogger || logger;
const stats: WatchedListsSyncStats = {
seriesChecked: 0,
authorsChecked: 0,
booksFound: 0,
requestsCreated: 0,
skippedOwned: 0,
skippedExisting: 0,
errors: 0,
};
// ---- Watched Series ----
await processAllWatchedSeries(log, stats, options);
// ---- Watched Authors ----
await processAllWatchedAuthors(log, stats, options);
log.info('Watched lists sync complete', {
seriesChecked: stats.seriesChecked,
authorsChecked: stats.authorsChecked,
booksFound: stats.booksFound,
requestsCreated: stats.requestsCreated,
skippedOwned: stats.skippedOwned,
skippedExisting: stats.skippedExisting,
errors: stats.errors,
});
return stats;
}
// ---------------------------------------------------------------------------
// Watched Series
// ---------------------------------------------------------------------------
async function processAllWatchedSeries(
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
stats: WatchedListsSyncStats,
options: WatchedListsSyncOptions
): Promise<void> {
const whereClause: any = {};
if (options.userId) whereClause.userId = options.userId;
if (options.seriesAsin) whereClause.seriesAsin = options.seriesAsin;
const watchedSeries = await prisma.watchedSeries.findMany({
where: whereClause,
include: { user: { select: { id: true, plexUsername: true } } },
});
if (watchedSeries.length === 0) {
log.info('No watched series to process');
return;
}
// Group by seriesAsin to avoid re-scraping the same series for multiple users
const seriesByAsin = new Map<string, typeof watchedSeries>();
for (const ws of watchedSeries) {
const list = seriesByAsin.get(ws.seriesAsin) || [];
list.push(ws);
seriesByAsin.set(ws.seriesAsin, list);
}
log.info(`Processing ${seriesByAsin.size} unique watched series (${watchedSeries.length} total subscriptions)`);
for (const [seriesAsin, subscriptions] of seriesByAsin) {
try {
await processSeriesForUsers(seriesAsin, subscriptions, log, stats);
} catch (error) {
stats.errors++;
log.error(`Failed to process watched series ${seriesAsin}`, {
error: error instanceof Error ? error.message : String(error),
});
}
// Rate limit between series
await delay(SCRAPE_DELAY_MS);
}
}
async function processSeriesForUsers(
seriesAsin: string,
subscriptions: Array<{ id: string; seriesTitle: string; user: { id: string; plexUsername: string } }>,
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
stats: WatchedListsSyncStats
): Promise<void> {
const title = subscriptions[0].seriesTitle;
log.info(`Scraping watched series: "${title}" (${seriesAsin})`);
// Scrape all pages of the series (up to MAX_BOOKS_PER_SERIES)
const allBooks: AudibleAudiobook[] = [];
let page = 1;
let hasMore = true;
while (hasMore && allBooks.length < MAX_BOOKS_PER_SERIES) {
const result = await scrapeSeriesPage(seriesAsin, page);
if (!result || result.books.length === 0) break;
allBooks.push(...result.books);
hasMore = result.hasMore;
page++;
if (hasMore) await delay(1000);
}
if (allBooks.length === 0) {
log.info(`No books found for series "${title}"`);
stats.seriesChecked++;
return;
}
stats.booksFound += allBooks.length;
// Deduplicate
const { books: dedupedBooks, groups } = deduplicateAndCollectGroups(allBooks);
// Persist dedup groups (fire-and-forget)
if (groups.length > 0) {
persistDedupGroups(groups).catch(() => {});
}
// For each user watching this series, create requests for new books
for (const subscription of subscriptions) {
await createRequestsForUser(
subscription.user.id,
subscription.user.plexUsername,
dedupedBooks,
log,
stats
);
// Update lastCheckedAt
await prisma.watchedSeries.update({
where: { id: subscription.id },
data: { lastCheckedAt: new Date() },
}).catch(() => {});
}
stats.seriesChecked++;
}
// ---------------------------------------------------------------------------
// Watched Authors
// ---------------------------------------------------------------------------
async function processAllWatchedAuthors(
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
stats: WatchedListsSyncStats,
options: WatchedListsSyncOptions
): Promise<void> {
const whereClause: any = {};
if (options.userId) whereClause.userId = options.userId;
if (options.authorAsin) whereClause.authorAsin = options.authorAsin;
const watchedAuthors = await prisma.watchedAuthor.findMany({
where: whereClause,
include: { user: { select: { id: true, plexUsername: true } } },
});
if (watchedAuthors.length === 0) {
log.info('No watched authors to process');
return;
}
// Group by authorAsin to avoid re-scraping the same author for multiple users
const authorsByAsin = new Map<string, typeof watchedAuthors>();
for (const wa of watchedAuthors) {
const list = authorsByAsin.get(wa.authorAsin) || [];
list.push(wa);
authorsByAsin.set(wa.authorAsin, list);
}
log.info(`Processing ${authorsByAsin.size} unique watched authors (${watchedAuthors.length} total subscriptions)`);
for (const [authorAsin, subscriptions] of authorsByAsin) {
try {
await processAuthorForUsers(authorAsin, subscriptions, log, stats);
} catch (error) {
stats.errors++;
log.error(`Failed to process watched author ${authorAsin}`, {
error: error instanceof Error ? error.message : String(error),
});
}
// Rate limit between authors
await delay(SCRAPE_DELAY_MS);
}
}
async function processAuthorForUsers(
authorAsin: string,
subscriptions: Array<{ id: string; authorName: string; user: { id: string; plexUsername: string } }>,
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
stats: WatchedListsSyncStats
): Promise<void> {
const authorName = subscriptions[0].authorName;
log.info(`Scraping watched author: "${authorName}" (${authorAsin})`);
const audibleService = getAudibleService();
const allBooks: AudibleAudiobook[] = [];
let page = 1;
let hasMore = true;
while (hasMore && page <= MAX_AUTHOR_PAGES) {
try {
const result = await audibleService.searchByAuthorAsin(authorName, authorAsin, page);
if (result.books.length === 0) break;
allBooks.push(...result.books);
hasMore = result.hasMore;
page++;
if (hasMore) await delay(1000);
} catch (error) {
log.error(`Failed to scrape author page ${page} for "${authorName}"`, {
error: error instanceof Error ? error.message : String(error),
});
break;
}
}
if (allBooks.length === 0) {
log.info(`No books found for author "${authorName}"`);
stats.authorsChecked++;
return;
}
stats.booksFound += allBooks.length;
// Deduplicate
const { books: dedupedBooks, groups } = deduplicateAndCollectGroups(allBooks);
// Persist dedup groups (fire-and-forget)
if (groups.length > 0) {
persistDedupGroups(groups).catch(() => {});
}
// For each user watching this author, create requests for new books
for (const subscription of subscriptions) {
await createRequestsForUser(
subscription.user.id,
subscription.user.plexUsername,
dedupedBooks,
log,
stats
);
// Update lastCheckedAt
await prisma.watchedAuthor.update({
where: { id: subscription.id },
data: { lastCheckedAt: new Date() },
}).catch(() => {});
}
stats.authorsChecked++;
}
// ---------------------------------------------------------------------------
// Shared: Create requests for a user from a list of books
// ---------------------------------------------------------------------------
async function createRequestsForUser(
userId: string,
username: string,
books: AudibleAudiobook[],
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
stats: WatchedListsSyncStats
): Promise<void> {
// Filter to books that have an ASIN
const booksWithAsin = books.filter(b => b.asin);
if (booksWithAsin.length === 0) return;
// Batch check: which ASINs are already in library (direct + sibling expansion)
const ownedAsins = await getOwnedAsins(booksWithAsin.map(b => b.asin));
for (const book of booksWithAsin) {
// Skip if user already owns this (direct or via sibling ASIN)
if (ownedAsins.has(book.asin)) {
stats.skippedOwned++;
continue;
}
try {
const result = await createRequestForUser(userId, {
asin: book.asin,
title: book.title,
author: book.author,
narrator: book.narrator,
description: book.description,
coverArtUrl: book.coverArtUrl,
});
if (result.success) {
stats.requestsCreated++;
log.info(`Auto-requested "${book.title}" by ${book.author} for ${username}`);
} else {
// already_available, being_processed, duplicate — all expected
stats.skippedExisting++;
}
} catch (error) {
log.error(`Failed to create request for "${book.title}" for ${username}`, {
error: error instanceof Error ? error.message : String(error),
});
}
}
}
/**
* Get the set of ASINs that are already in the library (direct match + sibling expansion).
*/
async function getOwnedAsins(asins: string[]): Promise<Set<string>> {
const owned = new Set<string>();
// Direct library lookup
const libraryItems = await prisma.plexLibrary.findMany({
where: { asin: { in: asins } },
select: { asin: true },
});
for (const item of libraryItems) {
if (item.asin) owned.add(item.asin);
}
// Sibling expansion via works table
try {
const siblingMap = await getSiblingAsins(asins);
if (siblingMap.size > 0) {
const allSiblings = new Set<string>();
for (const siblings of siblingMap.values()) {
for (const s of siblings) allSiblings.add(s);
}
if (allSiblings.size > 0) {
const siblingLibrary = await prisma.plexLibrary.findMany({
where: { asin: { in: [...allSiblings] } },
select: { asin: true },
});
for (const item of siblingLibrary) {
if (item.asin) {
// Mark the original ASIN as owned (not the sibling)
for (const [originalAsin, siblings] of siblingMap) {
if (siblings.includes(item.asin)) {
owned.add(originalAsin);
}
}
}
}
}
}
} catch {
// Works table expansion is best-effort
}
return owned;
}
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}