diff --git a/src/app/api/user/goodreads-shelves/route.ts b/src/app/api/user/goodreads-shelves/route.ts index b598349..8626fc0 100644 --- a/src/app/api/user/goodreads-shelves/route.ts +++ b/src/app/api/user/goodreads-shelves/route.ts @@ -16,13 +16,10 @@ const logger = RMABLogger.create('API.GoodreadsShelves'); const GOODREADS_RSS_PATTERN = /goodreads\.com\/review\/list_rss\//; const AddShelfSchema = z.object({ - rssUrl: z - .string() - .url() - .refine((url) => GOODREADS_RSS_PATTERN.test(url), { - message: - 'URL must be a Goodreads shelf RSS URL (goodreads.com/review/list_rss/...)', - }), + rssUrl: z.string().url().refine( + (url) => GOODREADS_RSS_PATTERN.test(url), + { message: 'URL must be a Goodreads shelf RSS URL (goodreads.com/review/list_rss/...)' } + ), }); /** @@ -43,12 +40,7 @@ export async function GET(request: NextRequest) { const shelvesWithMeta = shelves.map((shelf) => { // Normalize coverUrls: old format (string[]) → new format ({coverUrl,asin,title,author}[]) - let books: { - coverUrl: string; - asin: string | null; - title: string; - author: string; - }[] = []; + let books: { coverUrl: string; asin: string | null; title: string; author: string }[] = []; if (shelf.coverUrls) { const parsed = JSON.parse(shelf.coverUrls); if (Array.isArray(parsed)) { @@ -80,13 +72,8 @@ export async function GET(request: NextRequest) { return NextResponse.json({ success: true, shelves: shelvesWithMeta }); } catch (error) { - logger.error('Failed to list shelves', { - error: error instanceof Error ? error.message : String(error), - }); - return NextResponse.json( - { error: 'Failed to list shelves' }, - { status: 500 }, - ); + logger.error('Failed to list shelves', { error: error instanceof Error ? error.message : String(error) }); + return NextResponse.json({ error: 'Failed to list shelves' }, { status: 500 }); } }); } @@ -112,43 +99,30 @@ export async function POST(request: NextRequest) { if (existing) { return NextResponse.json( - { - error: 'DuplicateShelf', - message: 'You have already added this shelf', - }, - { status: 409 }, + { error: 'DuplicateShelf', message: 'You have already added this shelf' }, + { status: 409 } ); } // Validate by fetching the RSS feed let shelfName: string; let bookCount: number; - let initialBooks: { - coverUrl: string; - asin: null; - title: string; - author: string; - }[] = []; + let initialBooks: { coverUrl: string; asin: null; title: string; author: string }[] = []; try { const rssData = await fetchAndValidateRss(rssUrl); shelfName = rssData.shelfName; bookCount = rssData.books.length; initialBooks = rssData.books - .filter((b) => b.coverUrl) + .filter(b => b.coverUrl) .slice(0, 8) - .map((b) => ({ - coverUrl: b.coverUrl!, - asin: null, - title: b.title, - author: b.author, - })); + .map(b => ({ coverUrl: b.coverUrl!, asin: null, title: b.title, author: b.author })); } catch (error) { return NextResponse.json( { error: 'InvalidRSS', message: `Could not fetch or parse the RSS feed: ${error instanceof Error ? error.message : 'Unknown error'}`, }, - { status: 400 }, + { status: 400 } ); } @@ -158,55 +132,43 @@ export async function POST(request: NextRequest) { name: shelfName, rssUrl, bookCount, - coverUrls: - initialBooks.length > 0 ? JSON.stringify(initialBooks) : null, + coverUrls: initialBooks.length > 0 ? JSON.stringify(initialBooks) : null, }, }); + // Trigger immediate sync for this shelf (unlimited lookups, process all books) try { const jobQueue = getJobQueueService(); await jobQueue.addSyncShelvesJob(undefined, shelf.id, 'goodreads', 0); - logger.info( - `Triggered immediate sync for Goodreads shelf "${shelfName}" (${shelf.id})`, - ); + logger.info(`Triggered immediate sync for Goodreads shelf "${shelfName}" (${shelf.id})`); } catch (error) { - logger.error('Failed to trigger immediate shelf sync', { - error: error instanceof Error ? error.message : String(error), - }); + logger.error('Failed to trigger immediate shelf sync', { error: error instanceof Error ? error.message : String(error) }); } - return NextResponse.json( - { - success: true, - shelf: { - id: shelf.id, - name: shelf.name, - rssUrl: shelf.rssUrl, - lastSyncAt: shelf.lastSyncAt, - createdAt: shelf.createdAt, - bookCount: shelf.bookCount, - books: initialBooks, - }, - bookCount, + return NextResponse.json({ + success: true, + shelf: { + id: shelf.id, + name: shelf.name, + rssUrl: shelf.rssUrl, + lastSyncAt: shelf.lastSyncAt, + createdAt: shelf.createdAt, + bookCount: shelf.bookCount, + books: initialBooks, }, - { status: 201 }, - ); + bookCount, + }, { status: 201 }); } catch (error) { - logger.error('Failed to add shelf', { - error: error instanceof Error ? error.message : String(error), - }); + logger.error('Failed to add shelf', { error: error instanceof Error ? error.message : String(error) }); if (error instanceof z.ZodError) { return NextResponse.json( { error: 'ValidationError', details: error.errors }, - { status: 400 }, + { status: 400 } ); } - return NextResponse.json( - { error: 'Failed to add shelf' }, - { status: 500 }, - ); + return NextResponse.json({ error: 'Failed to add shelf' }, { status: 500 }); } }); } diff --git a/src/app/profile/page.tsx b/src/app/profile/page.tsx index 6fcd163..5d69501 100644 --- a/src/app/profile/page.tsx +++ b/src/app/profile/page.tsx @@ -19,11 +19,7 @@ const statConfig = [ { key: 'waiting', label: 'Waiting', color: 'text-amber-500' }, { key: 'completed', label: 'Complete', color: 'text-emerald-500' }, { key: 'failed', label: 'Failed', color: 'text-red-500' }, - { - key: 'cancelled', - label: 'Cancelled', - color: 'text-gray-400 dark:text-gray-500', - }, + { key: 'cancelled', label: 'Cancelled', color: 'text-gray-400 dark:text-gray-500' }, ] as const; type StatKey = (typeof statConfig)[number]['key']; @@ -34,45 +30,25 @@ export default function ProfilePage() { const stats = useMemo(() => { if (!requests.length) { - return { - total: 0, - completed: 0, - active: 0, - waiting: 0, - failed: 0, - cancelled: 0, - }; + return { total: 0, completed: 0, active: 0, waiting: 0, failed: 0, cancelled: 0 }; } return { total: requests.length, - completed: requests.filter((r: any) => - ['available', 'downloaded'].includes(r.status), - ).length, - active: requests.filter((r: any) => - ['pending', 'searching', 'downloading', 'processing'].includes( - r.status, - ), - ).length, - waiting: requests.filter((r: any) => - ['awaiting_search', 'awaiting_import'].includes(r.status), - ).length, + completed: requests.filter((r: any) => ['available', 'downloaded'].includes(r.status)).length, + active: requests.filter((r: any) => ['pending', 'searching', 'downloading', 'processing'].includes(r.status)).length, + waiting: requests.filter((r: any) => ['awaiting_search', 'awaiting_import'].includes(r.status)).length, failed: requests.filter((r: any) => r.status === 'failed').length, cancelled: requests.filter((r: any) => r.status === 'cancelled').length, }; }, [requests]); const activeDownloads = useMemo(() => { - return requests.filter((r: any) => - ['downloading', 'processing'].includes(r.status), - ); + return requests.filter((r: any) => ['downloading', 'processing'].includes(r.status)); }, [requests]); const recentRequests = useMemo(() => { return [...requests] - .sort( - (a: any, b: any) => - new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), - ) + .sort((a: any, b: any) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime()) .slice(0, 5); }, [requests]); @@ -82,18 +58,8 @@ export default function ProfilePage() {
- - + +

@@ -147,7 +113,7 @@ export default function ProfilePage() { 'inline-flex items-center px-3 py-1 rounded-full text-xs font-semibold uppercase tracking-wide', user.role === 'admin' ? 'bg-purple-50 text-purple-600 dark:bg-purple-500/15 dark:text-purple-400' - : 'bg-gray-100 text-gray-500 dark:bg-gray-700/50 dark:text-gray-400', + : 'bg-gray-100 text-gray-500 dark:bg-gray-700/50 dark:text-gray-400' )} > {user.role === 'admin' ? 'Administrator' : 'User'} @@ -162,12 +128,7 @@ export default function ProfilePage() { key={stat.key} className="py-5 sm:py-6 px-3 text-center bg-white dark:bg-gray-800" > -
+
{isLoading ? '\u2013' : stats[stat.key as StatKey]}
@@ -197,11 +158,7 @@ export default function ProfilePage() {
{activeDownloads.map((request: any) => ( - + ))}
@@ -244,11 +201,7 @@ export default function ProfilePage() { ) : recentRequests.length > 0 ? (
{recentRequests.map((request: any) => ( - + ))}
) : ( @@ -260,11 +213,7 @@ export default function ProfilePage() { viewBox="0 0 24 24" strokeWidth={1.5} > - +

No requests yet @@ -276,18 +225,8 @@ export default function ProfilePage() { href="/search" className="inline-flex items-center gap-2 mt-5 px-5 py-2.5 text-sm font-medium text-white bg-blue-600 hover:bg-blue-700 rounded-lg transition-colors" > - - + + Search Audiobooks diff --git a/src/lib/services/job-queue.service.ts b/src/lib/services/job-queue.service.ts index e0f7f97..0050a43 100644 --- a/src/lib/services/job-queue.service.ts +++ b/src/lib/services/job-queue.service.ts @@ -63,8 +63,8 @@ export interface MonitorDownloadPayload extends JobPayload { downloadHistoryId: string; downloadClientId: string; downloadClient: DownloadClientType; - lastProgress?: number; // Previous poll's progress (0-100) for stall detection - stallCount?: number; // Consecutive polls with no progress change (drives backoff) + lastProgress?: number; // Previous poll's progress (0-100) for stall detection + stallCount?: number; // Consecutive polls with no progress change (drives backoff) pathWaitCount?: number; // Consecutive polls waiting for content_path to relocate to save_path } @@ -227,15 +227,13 @@ export class JobQueueService { 'failed', null, error.message, - error.stack, + error.stack ); // Handle permanent failures for specific job types after all retries exhausted if (job.name === 'monitor_download' && job.data) { const payload = job.data as MonitorDownloadPayload; - logger.error( - `MonitorDownload job permanently failed for request ${payload.requestId} after ${job.attemptsMade} attempts`, - ); + logger.error(`MonitorDownload job permanently failed for request ${payload.requestId} after ${job.attemptsMade} attempts`); // Update request status to failed (only happens after all retries exhausted) try { @@ -243,9 +241,7 @@ export class JobQueueService { where: { id: payload.requestId }, data: { status: 'failed', - errorMessage: - error.message || - 'Failed to monitor download after multiple retries', + errorMessage: error.message || 'Failed to monitor download after multiple retries', updatedAt: new Date(), }, }); @@ -261,12 +257,7 @@ export class JobQueueService { }); } } catch (updateError) { - logger.error('Failed to update request/download status', { - error: - updateError instanceof Error - ? updateError.message - : String(updateError), - }); + logger.error('Failed to update request/download status', { error: updateError instanceof Error ? updateError.message : String(updateError) }); } } }); @@ -290,211 +281,106 @@ export class JobQueueService { */ private startProcessors(): void { // Search indexers processor - this.queue.process( - 'search_indexers', - 2, - async (job: BullJob) => { - const { processSearchIndexers } = - await import('../processors/search-indexers.processor'); - return await processSearchIndexers(job.data); - }, - ); + this.queue.process('search_indexers', 2, async (job: BullJob) => { + const { processSearchIndexers } = await import('../processors/search-indexers.processor'); + return await processSearchIndexers(job.data); + }); // Download torrent processor - this.queue.process( - 'download_torrent', - 2, - async (job: BullJob) => { - const { processDownloadTorrent } = - await import('../processors/download-torrent.processor'); - return await processDownloadTorrent(job.data); - }, - ); + this.queue.process('download_torrent', 2, async (job: BullJob) => { + const { processDownloadTorrent } = await import('../processors/download-torrent.processor'); + return await processDownloadTorrent(job.data); + }); // Monitor download processor - this.queue.process( - 'monitor_download', - 2, - async (job: BullJob) => { - const { processMonitorDownload } = - await import('../processors/monitor-download.processor'); - return await processMonitorDownload(job.data); - }, - ); + this.queue.process('monitor_download', 2, async (job: BullJob) => { + const { processMonitorDownload } = await import('../processors/monitor-download.processor'); + return await processMonitorDownload(job.data); + }); // Organize files processor - this.queue.process( - 'organize_files', - 2, - async (job: BullJob) => { - const { processOrganizeFiles } = - await import('../processors/organize-files.processor'); - return await processOrganizeFiles(job.data); - }, - ); + this.queue.process('organize_files', 2, async (job: BullJob) => { + const { processOrganizeFiles } = await import('../processors/organize-files.processor'); + return await processOrganizeFiles(job.data); + }); // Scan Plex processor - this.queue.process( - 'scan_plex', - 1, - async (job: BullJob) => { - const { processScanPlex } = - await import('../processors/scan-plex.processor'); - return await processScanPlex(job.data); - }, - ); + this.queue.process('scan_plex', 1, async (job: BullJob) => { + const { processScanPlex } = await import('../processors/scan-plex.processor'); + return await processScanPlex(job.data); + }); // Scheduled job processors this.queue.process('plex_library_scan', 1, async (job: BullJob) => { // plex_library_scan is just an alias for scan_plex - const { processScanPlex } = - await import('../processors/scan-plex.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'plex_library_scan', - ); + const { processScanPlex } = await import('../processors/scan-plex.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'plex_library_scan'); return await processScanPlex(payloadWithJobId); }); - this.queue.process( - 'plex_recently_added_check', - 1, - async (job: BullJob) => { - const { processPlexRecentlyAddedCheck } = - await import('../processors/plex-recently-added.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'plex_recently_added_check', - ); - return await processPlexRecentlyAddedCheck(payloadWithJobId); - }, - ); + this.queue.process('plex_recently_added_check', 1, async (job: BullJob) => { + const { processPlexRecentlyAddedCheck } = await import('../processors/plex-recently-added.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'plex_recently_added_check'); + return await processPlexRecentlyAddedCheck(payloadWithJobId); + }); - this.queue.process( - 'monitor_rss_feeds', - 1, - async (job: BullJob) => { - const { processMonitorRssFeeds } = - await import('../processors/monitor-rss-feeds.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'monitor_rss_feeds', - ); - return await processMonitorRssFeeds(payloadWithJobId); - }, - ); + this.queue.process('monitor_rss_feeds', 1, async (job: BullJob) => { + const { processMonitorRssFeeds } = await import('../processors/monitor-rss-feeds.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'monitor_rss_feeds'); + return await processMonitorRssFeeds(payloadWithJobId); + }); - this.queue.process( - 'audible_refresh', - 1, - async (job: BullJob) => { - const { processAudibleRefresh } = - await import('../processors/audible-refresh.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'audible_refresh', - ); - return await processAudibleRefresh(payloadWithJobId); - }, - ); + this.queue.process('audible_refresh', 1, async (job: BullJob) => { + const { processAudibleRefresh } = await import('../processors/audible-refresh.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'audible_refresh'); + return await processAudibleRefresh(payloadWithJobId); + }); - this.queue.process( - 'retry_missing_torrents', - 1, - async (job: BullJob) => { - const { processRetryMissingTorrents } = - await import('../processors/retry-missing-torrents.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'retry_missing_torrents', - ); - return await processRetryMissingTorrents(payloadWithJobId); - }, - ); + this.queue.process('retry_missing_torrents', 1, async (job: BullJob) => { + const { processRetryMissingTorrents } = await import('../processors/retry-missing-torrents.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'retry_missing_torrents'); + return await processRetryMissingTorrents(payloadWithJobId); + }); - this.queue.process( - 'retry_failed_imports', - 1, - async (job: BullJob) => { - const { processRetryFailedImports } = - await import('../processors/retry-failed-imports.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'retry_failed_imports', - ); - return await processRetryFailedImports(payloadWithJobId); - }, - ); + this.queue.process('retry_failed_imports', 1, async (job: BullJob) => { + const { processRetryFailedImports } = await import('../processors/retry-failed-imports.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'retry_failed_imports'); + return await processRetryFailedImports(payloadWithJobId); + }); - this.queue.process( - 'cleanup_seeded_torrents', - 1, - async (job: BullJob) => { - const { processCleanupSeededTorrents } = - await import('../processors/cleanup-seeded-torrents.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'cleanup_seeded_torrents', - ); - return await processCleanupSeededTorrents(payloadWithJobId); - }, - ); + this.queue.process('cleanup_seeded_torrents', 1, async (job: BullJob) => { + const { processCleanupSeededTorrents } = await import('../processors/cleanup-seeded-torrents.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'cleanup_seeded_torrents'); + return await processCleanupSeededTorrents(payloadWithJobId); + }); - this.queue.process( - 'sync_reading_shelves', - 1, - async (job: BullJob) => { - const { processSyncShelves } = - await import('../processors/sync-shelves.processor'); - const payloadWithJobId = await this.ensureJobRecord( - job, - 'sync_reading_shelves', - ); - return await processSyncShelves(payloadWithJobId); - }, - ); + this.queue.process('sync_reading_shelves', 1, async (job: BullJob) => { + const { processSyncShelves } = await import('../processors/sync-shelves.processor'); + const payloadWithJobId = await this.ensureJobRecord(job, 'sync_reading_shelves'); + return await processSyncShelves(payloadWithJobId); + }); // Send notification processor - this.queue.process( - 'send_notification', - 2, - async (job: BullJob) => { - const { processSendNotification } = - await import('../processors/send-notification.processor'); - return await processSendNotification(job.data); - }, - ); + this.queue.process('send_notification', 2, async (job: BullJob) => { + const { processSendNotification } = await import('../processors/send-notification.processor'); + return await processSendNotification(job.data); + }); // Ebook-specific processors - this.queue.process( - 'search_ebook', - 2, - async (job: BullJob) => { - const { processSearchEbook } = - await import('../processors/search-ebook.processor'); - return await processSearchEbook(job.data); - }, - ); + this.queue.process('search_ebook', 2, async (job: BullJob) => { + const { processSearchEbook } = await import('../processors/search-ebook.processor'); + return await processSearchEbook(job.data); + }); - this.queue.process( - 'start_direct_download', - 2, - async (job: BullJob) => { - const { processStartDirectDownload } = - await import('../processors/direct-download.processor'); - return await processStartDirectDownload(job.data); - }, - ); + this.queue.process('start_direct_download', 2, async (job: BullJob) => { + const { processStartDirectDownload } = await import('../processors/direct-download.processor'); + return await processStartDirectDownload(job.data); + }); - this.queue.process( - 'monitor_direct_download', - 2, - async (job: BullJob) => { - const { processMonitorDirectDownload } = - await import('../processors/direct-download.processor'); - return await processMonitorDirectDownload(job.data); - }, - ); + this.queue.process('monitor_direct_download', 2, async (job: BullJob) => { + const { processMonitorDirectDownload } = await import('../processors/direct-download.processor'); + return await processMonitorDirectDownload(job.data); + }); } /** @@ -519,17 +405,12 @@ export class JobQueueService { if (existingJob) { // Update lastRun for the scheduled job if this is a timer-triggered job if (payload.scheduledJobId) { - await prisma.scheduledJob - .update({ - where: { id: payload.scheduledJobId }, - data: { lastRun: new Date() }, - }) - .catch((err) => { - logger.error( - `Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, - { error: err instanceof Error ? err.message : String(err) }, - ); - }); + await prisma.scheduledJob.update({ + where: { id: payload.scheduledJobId }, + data: { lastRun: new Date() }, + }).catch(err => { + logger.error(`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, { error: err instanceof Error ? err.message : String(err) }); + }); } return { ...payload, jobId: existingJob.id }; } @@ -549,17 +430,12 @@ export class JobQueueService { // Update lastRun for the scheduled job if this is a timer-triggered job if (payload.scheduledJobId) { - await prisma.scheduledJob - .update({ - where: { id: payload.scheduledJobId }, - data: { lastRun: new Date() }, - }) - .catch((err) => { - logger.error( - `Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, - { error: err instanceof Error ? err.message : String(err) }, - ); - }); + await prisma.scheduledJob.update({ + where: { id: payload.scheduledJobId }, + data: { lastRun: new Date() }, + }).catch(err => { + logger.error(`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, { error: err instanceof Error ? err.message : String(err) }); + }); } return { ...payload, jobId: dbJob.id }; @@ -573,7 +449,7 @@ export class JobQueueService { status: string, result?: any, errorMessage?: string, - stackTrace?: string, + stackTrace?: string ): Promise { try { const updateData: any = { @@ -606,9 +482,7 @@ export class JobQueueService { data: updateData, }); } catch (error) { - logger.error('Failed to update job in database', { - error: error instanceof Error ? error.message : String(error), - }); + logger.error('Failed to update job in database', { error: error instanceof Error ? error.message : String(error) }); } } @@ -618,7 +492,7 @@ export class JobQueueService { private async addJob( type: JobType, payload: JobPayload, - options?: JobOptions, + options?: JobOptions ): Promise { // First create the database job record const dbJob = await prisma.job.create({ @@ -651,10 +525,7 @@ export class JobQueueService { /** * Add search indexers job */ - async addSearchJob( - requestId: string, - audiobook: { id: string; title: string; author: string; asin?: string }, - ): Promise { + async addSearchJob(requestId: string, audiobook: { id: string; title: string; author: string; asin?: string }): Promise { return await this.addJob( 'search_indexers', { @@ -663,7 +534,7 @@ export class JobQueueService { } as SearchIndexersPayload, { priority: 10, // High priority for user-initiated requests - }, + } ); } @@ -673,7 +544,7 @@ export class JobQueueService { async addDownloadJob( requestId: string, audiobook: { id: string; title: string; author: string }, - torrent: TorrentResult, + torrent: TorrentResult ): Promise { return await this.addJob( 'download_torrent', @@ -684,7 +555,7 @@ export class JobQueueService { } as DownloadTorrentPayload, { priority: 9, // High priority - download selected torrent - }, + } ); } @@ -699,7 +570,7 @@ export class JobQueueService { delaySeconds: number = 0, lastProgress?: number, stallCount?: number, - pathWaitCount?: number, + pathWaitCount?: number ): Promise { return await this.addJob( 'monitor_download', @@ -715,7 +586,7 @@ export class JobQueueService { { priority: 5, // Medium priority delay: delaySeconds * 1000, // Convert seconds to milliseconds - }, + } ); } @@ -727,7 +598,7 @@ export class JobQueueService { requestId: string, audiobookId: string, downloadPath: string, - targetPath?: string, + targetPath?: string ): Promise { return await this.addJob( 'organize_files', @@ -739,18 +610,14 @@ export class JobQueueService { } as OrganizeFilesPayload, { priority: 8, - }, + } ); } /** * Add Plex scan job */ - async addPlexScanJob( - libraryId: string, - partial?: boolean, - path?: string, - ): Promise { + async addPlexScanJob(libraryId: string, partial?: boolean, path?: string): Promise { return await this.addJob( 'scan_plex', { @@ -760,7 +627,7 @@ export class JobQueueService { } as ScanPlexPayload, { priority: 7, - }, + } ); } @@ -775,7 +642,7 @@ export class JobQueueService { } as PlexRecentlyAddedPayload, { priority: 8, - }, + } ); } @@ -790,7 +657,7 @@ export class JobQueueService { } as MonitorRssFeedsPayload, { priority: 8, - }, + } ); } @@ -805,7 +672,7 @@ export class JobQueueService { } as AudibleRefreshPayload, { priority: 9, - }, + } ); } @@ -820,7 +687,7 @@ export class JobQueueService { } as RetryMissingTorrentsPayload, { priority: 7, - }, + } ); } @@ -835,7 +702,7 @@ export class JobQueueService { } as RetryFailedImportsPayload, { priority: 7, - }, + } ); } @@ -850,19 +717,14 @@ export class JobQueueService { } as CleanupSeededTorrentsPayload, { priority: 10, - }, + } ); } /** * Add sync reading shelves job */ - async addSyncShelvesJob( - scheduledJobId?: string, - shelfId?: string, - shelfType?: 'goodreads' | 'hardcover', - maxLookupsPerShelf?: number, - ): Promise { + async addSyncShelvesJob(scheduledJobId?: string, shelfId?: string, shelfType?: 'goodreads' | 'hardcover', maxLookupsPerShelf?: number): Promise { return await this.addJob( 'sync_reading_shelves', { @@ -873,7 +735,7 @@ export class JobQueueService { } as SyncShelvesPayload, { priority: 7, - }, + } ); } @@ -887,7 +749,7 @@ export class JobQueueService { async addSearchEbookJob( requestId: string, audiobook: { id: string; title: string; author: string; asin?: string }, - preferredFormat?: string, + preferredFormat?: string ): Promise { return await this.addJob( 'search_ebook', @@ -898,7 +760,7 @@ export class JobQueueService { } as SearchEbookPayload, { priority: 10, // High priority for user-initiated requests - }, + } ); } @@ -910,7 +772,7 @@ export class JobQueueService { downloadHistoryId: string, downloadUrl: string, targetFilename: string, - expectedSize?: number, + expectedSize?: number ): Promise { return await this.addJob( 'start_direct_download', @@ -923,7 +785,7 @@ export class JobQueueService { } as StartDirectDownloadPayload, { priority: 9, // High priority - download selected ebook - }, + } ); } @@ -936,7 +798,7 @@ export class JobQueueService { downloadId: string, targetPath: string, expectedSize?: number, - delaySeconds: number = 0, + delaySeconds: number = 0 ): Promise { return await this.addJob( 'monitor_direct_download', @@ -950,7 +812,7 @@ export class JobQueueService { { priority: 5, // Medium priority delay: delaySeconds * 1000, - }, + } ); } @@ -1099,13 +961,9 @@ export class JobQueueService { author: string, userName: string, message?: string, - requestType?: string, + requestType?: string ): Promise { - logger.info(`Queueing notification: ${event}`, { - requestId, - title, - userName, - }); + logger.info(`Queueing notification: ${event}`, { requestId, title, userName }); return await this.addJob( 'send_notification', { @@ -1125,7 +983,7 @@ export class JobQueueService { } as SendNotificationPayload, { priority: 5, // Medium priority - }, + } ); } @@ -1136,7 +994,7 @@ export class JobQueueService { jobType: string, payload: JobPayload, cronExpression: string, - jobId: string, + jobId: string ): Promise { await this.queue.add(jobType, payload, { repeat: { @@ -1153,7 +1011,7 @@ export class JobQueueService { async removeRepeatableJob( jobType: string, cronExpression: string, - jobId: string, + jobId: string ): Promise { await this.queue.removeRepeatable(jobType, { cron: cronExpression, diff --git a/src/lib/services/scheduler.service.ts b/src/lib/services/scheduler.service.ts index 28ee30f..7b1b2eb 100644 --- a/src/lib/services/scheduler.service.ts +++ b/src/lib/services/scheduler.service.ts @@ -10,15 +10,7 @@ import { RMABLogger } from '../utils/logger'; const logger = RMABLogger.create('Scheduler'); -export type ScheduledJobType = - | 'plex_library_scan' - | 'plex_recently_added_check' - | 'audible_refresh' - | 'retry_missing_torrents' - | 'retry_failed_imports' - | 'cleanup_seeded_torrents' - | 'monitor_rss_feeds' - | 'sync_reading_shelves'; +export type ScheduledJobType = 'plex_library_scan' | 'plex_recently_added_check' | 'audible_refresh' | 'retry_missing_torrents' | 'retry_failed_imports' | 'cleanup_seeded_torrents' | 'monitor_rss_feeds' | 'sync_reading_shelves'; export interface ScheduledJob { id: string; @@ -160,9 +152,7 @@ export class SchedulerService { data: defaultJob, }); created++; - logger.info( - `Created default job: ${defaultJob.name} (enabled: ${defaultJob.enabled})`, - ); + logger.info(`Created default job: ${defaultJob.name} (enabled: ${defaultJob.enabled})`); } } catch (error) { failed++; @@ -174,9 +164,7 @@ export class SchedulerService { } if (failed > 0) { - logger.warn( - `Default jobs: ${created} created, ${failed} failed — failed jobs will be retried on next restart`, - ); + logger.warn(`Default jobs: ${created} created, ${failed} failed — failed jobs will be retried on next restart`); } else if (created > 0) { logger.info(`Default jobs: ${created} created`); } @@ -201,9 +189,7 @@ export class SchedulerService { await this.unscheduleJob(job); } await prisma.scheduledJob.delete({ where: { id: job.id } }); - logger.info( - `Removed deprecated scheduled job: ${job.name} (${job.type})`, - ); + logger.info(`Removed deprecated scheduled job: ${job.name} (${job.type})`); } } catch (error) { logger.error('Failed to cleanup deprecated scheduled jobs', { @@ -236,13 +222,11 @@ export class SchedulerService { job.type, { scheduledJobId: job.id }, job.schedule, - `scheduled-${job.id}`, + `scheduled-${job.id}` ); logger.info(`Job scheduled: ${job.name} (${job.schedule})`); } catch (error) { - logger.error(`Failed to schedule job ${job.name}`, { - error: error instanceof Error ? error.message : String(error), - }); + logger.error(`Failed to schedule job ${job.name}`, { error: error instanceof Error ? error.message : String(error) }); throw error; } } @@ -255,13 +239,11 @@ export class SchedulerService { await this.jobQueue.removeRepeatableJob( job.type, job.schedule, - `scheduled-${job.id}`, + `scheduled-${job.id}` ); logger.info(`Job unscheduled: ${job.name}`); } catch (error) { - logger.error(`Failed to unschedule job ${job.name}`, { - error: error instanceof Error ? error.message : String(error), - }); + logger.error(`Failed to unschedule job ${job.name}`, { error: error instanceof Error ? error.message : String(error) }); // Don't throw - job might not exist in Bull yet } } @@ -313,7 +295,7 @@ export class SchedulerService { */ async updateScheduledJob( id: string, - dto: UpdateScheduledJobDto, + dto: UpdateScheduledJobDto ): Promise { if (dto.schedule) { this.validateCronExpression(dto.schedule); @@ -457,8 +439,7 @@ export class SchedulerService { throw new Error(errorMsg); } - libraryId = - job.payload?.libraryId || absConfig['audiobookshelf.library_id']; + libraryId = job.payload?.libraryId || absConfig['audiobookshelf.library_id']; } else { const plexConfig = await configService.getMany([ 'plex_url', @@ -482,18 +463,15 @@ export class SchedulerService { throw new Error(errorMsg); } - libraryId = - job.payload?.libraryId || plexConfig.plex_audiobook_library_id; + libraryId = job.payload?.libraryId || plexConfig.plex_audiobook_library_id; } - logger.info( - `Triggering ${backendMode} library scan for library: ${libraryId}`, - ); + logger.info(`Triggering ${backendMode} library scan for library: ${libraryId}`); return await this.jobQueue.addPlexScanJob( libraryId || '', job.payload?.partial, - job.payload?.path, + job.payload?.path ); } @@ -514,6 +492,7 @@ export class SchedulerService { return await this.jobQueue.addAudibleRefreshJob(job.id); } + /** * Enable a scheduled job */ @@ -545,12 +524,10 @@ export class SchedulerService { await this.triggerJobNow(job.id); // Stagger triggers to avoid connection pool burst on startup - await new Promise((resolve) => setTimeout(resolve, 500)); + await new Promise(resolve => setTimeout(resolve, 500)); } } catch (error) { - logger.error(`Failed to trigger overdue job "${job.name}"`, { - error: error instanceof Error ? error.message : String(error), - }); + logger.error(`Failed to trigger overdue job "${job.name}"`, { error: error instanceof Error ? error.message : String(error) }); } } } @@ -623,22 +600,13 @@ export class SchedulerService { if (dayOfMonth === '*' && month === '*' && dayOfWeek === '*') { const hourNum = parseInt(hour, 10); const minuteNum = parseInt(minute, 10); - if ( - !isNaN(hourNum) && - !isNaN(minuteNum) && - hourNum >= 0 && - hourNum <= 23 && - minuteNum >= 0 && - minuteNum <= 59 - ) { + if (!isNaN(hourNum) && !isNaN(minuteNum) && hourNum >= 0 && hourNum <= 23 && minuteNum >= 0 && minuteNum <= 59) { return 24 * 60 * 60 * 1000; // 24 hours } } // For other patterns, return a conservative default (24 hours) - logger.warn( - `Unknown cron pattern "${cronExpression}", defaulting to 24 hours`, - ); + logger.warn(`Unknown cron pattern "${cronExpression}", defaulting to 24 hours`); return 24 * 60 * 60 * 1000; } @@ -688,7 +656,7 @@ export class SchedulerService { * Trigger Reading shelves sync */ private async triggerSyncShelves(job: any): Promise { - return await this.jobQueue.addSyncShelvesJob(job.id); + return await this.jobQueue.addSyncShelvesJob(job.id, undefined, 'goodreads'); } }