Remove boy scout formatting changes

This commit is contained in:
Rob Walsh
2026-02-27 16:08:34 -07:00
parent 41d45d1210
commit 3861d07cf4
4 changed files with 190 additions and 463 deletions
+22 -60
View File
@@ -16,13 +16,10 @@ const logger = RMABLogger.create('API.GoodreadsShelves');
const GOODREADS_RSS_PATTERN = /goodreads\.com\/review\/list_rss\//; const GOODREADS_RSS_PATTERN = /goodreads\.com\/review\/list_rss\//;
const AddShelfSchema = z.object({ const AddShelfSchema = z.object({
rssUrl: z rssUrl: z.string().url().refine(
.string() (url) => GOODREADS_RSS_PATTERN.test(url),
.url() { message: 'URL must be a Goodreads shelf RSS URL (goodreads.com/review/list_rss/...)' }
.refine((url) => GOODREADS_RSS_PATTERN.test(url), { ),
message:
'URL must be a Goodreads shelf RSS URL (goodreads.com/review/list_rss/...)',
}),
}); });
/** /**
@@ -43,12 +40,7 @@ export async function GET(request: NextRequest) {
const shelvesWithMeta = shelves.map((shelf) => { const shelvesWithMeta = shelves.map((shelf) => {
// Normalize coverUrls: old format (string[]) → new format ({coverUrl,asin,title,author}[]) // Normalize coverUrls: old format (string[]) → new format ({coverUrl,asin,title,author}[])
let books: { let books: { coverUrl: string; asin: string | null; title: string; author: string }[] = [];
coverUrl: string;
asin: string | null;
title: string;
author: string;
}[] = [];
if (shelf.coverUrls) { if (shelf.coverUrls) {
const parsed = JSON.parse(shelf.coverUrls); const parsed = JSON.parse(shelf.coverUrls);
if (Array.isArray(parsed)) { if (Array.isArray(parsed)) {
@@ -80,13 +72,8 @@ export async function GET(request: NextRequest) {
return NextResponse.json({ success: true, shelves: shelvesWithMeta }); return NextResponse.json({ success: true, shelves: shelvesWithMeta });
} catch (error) { } catch (error) {
logger.error('Failed to list shelves', { logger.error('Failed to list shelves', { error: error instanceof Error ? error.message : String(error) });
error: error instanceof Error ? error.message : String(error), return NextResponse.json({ error: 'Failed to list shelves' }, { status: 500 });
});
return NextResponse.json(
{ error: 'Failed to list shelves' },
{ status: 500 },
);
} }
}); });
} }
@@ -112,43 +99,30 @@ export async function POST(request: NextRequest) {
if (existing) { if (existing) {
return NextResponse.json( return NextResponse.json(
{ { error: 'DuplicateShelf', message: 'You have already added this shelf' },
error: 'DuplicateShelf', { status: 409 }
message: 'You have already added this shelf',
},
{ status: 409 },
); );
} }
// Validate by fetching the RSS feed // Validate by fetching the RSS feed
let shelfName: string; let shelfName: string;
let bookCount: number; let bookCount: number;
let initialBooks: { let initialBooks: { coverUrl: string; asin: null; title: string; author: string }[] = [];
coverUrl: string;
asin: null;
title: string;
author: string;
}[] = [];
try { try {
const rssData = await fetchAndValidateRss(rssUrl); const rssData = await fetchAndValidateRss(rssUrl);
shelfName = rssData.shelfName; shelfName = rssData.shelfName;
bookCount = rssData.books.length; bookCount = rssData.books.length;
initialBooks = rssData.books initialBooks = rssData.books
.filter((b) => b.coverUrl) .filter(b => b.coverUrl)
.slice(0, 8) .slice(0, 8)
.map((b) => ({ .map(b => ({ coverUrl: b.coverUrl!, asin: null, title: b.title, author: b.author }));
coverUrl: b.coverUrl!,
asin: null,
title: b.title,
author: b.author,
}));
} catch (error) { } catch (error) {
return NextResponse.json( return NextResponse.json(
{ {
error: 'InvalidRSS', error: 'InvalidRSS',
message: `Could not fetch or parse the RSS feed: ${error instanceof Error ? error.message : 'Unknown error'}`, message: `Could not fetch or parse the RSS feed: ${error instanceof Error ? error.message : 'Unknown error'}`,
}, },
{ status: 400 }, { status: 400 }
); );
} }
@@ -158,25 +132,20 @@ export async function POST(request: NextRequest) {
name: shelfName, name: shelfName,
rssUrl, rssUrl,
bookCount, bookCount,
coverUrls: coverUrls: initialBooks.length > 0 ? JSON.stringify(initialBooks) : null,
initialBooks.length > 0 ? JSON.stringify(initialBooks) : null,
}, },
}); });
// Trigger immediate sync for this shelf (unlimited lookups, process all books)
try { try {
const jobQueue = getJobQueueService(); const jobQueue = getJobQueueService();
await jobQueue.addSyncShelvesJob(undefined, shelf.id, 'goodreads', 0); await jobQueue.addSyncShelvesJob(undefined, shelf.id, 'goodreads', 0);
logger.info( logger.info(`Triggered immediate sync for Goodreads shelf "${shelfName}" (${shelf.id})`);
`Triggered immediate sync for Goodreads shelf "${shelfName}" (${shelf.id})`,
);
} catch (error) { } catch (error) {
logger.error('Failed to trigger immediate shelf sync', { logger.error('Failed to trigger immediate shelf sync', { error: error instanceof Error ? error.message : String(error) });
error: error instanceof Error ? error.message : String(error),
});
} }
return NextResponse.json( return NextResponse.json({
{
success: true, success: true,
shelf: { shelf: {
id: shelf.id, id: shelf.id,
@@ -188,25 +157,18 @@ export async function POST(request: NextRequest) {
books: initialBooks, books: initialBooks,
}, },
bookCount, bookCount,
}, }, { status: 201 });
{ status: 201 },
);
} catch (error) { } catch (error) {
logger.error('Failed to add shelf', { logger.error('Failed to add shelf', { error: error instanceof Error ? error.message : String(error) });
error: error instanceof Error ? error.message : String(error),
});
if (error instanceof z.ZodError) { if (error instanceof z.ZodError) {
return NextResponse.json( return NextResponse.json(
{ error: 'ValidationError', details: error.errors }, { error: 'ValidationError', details: error.errors },
{ status: 400 }, { status: 400 }
); );
} }
return NextResponse.json( return NextResponse.json({ error: 'Failed to add shelf' }, { status: 500 });
{ error: 'Failed to add shelf' },
{ status: 500 },
);
} }
}); });
} }
+16 -77
View File
@@ -19,11 +19,7 @@ const statConfig = [
{ key: 'waiting', label: 'Waiting', color: 'text-amber-500' }, { key: 'waiting', label: 'Waiting', color: 'text-amber-500' },
{ key: 'completed', label: 'Complete', color: 'text-emerald-500' }, { key: 'completed', label: 'Complete', color: 'text-emerald-500' },
{ key: 'failed', label: 'Failed', color: 'text-red-500' }, { key: 'failed', label: 'Failed', color: 'text-red-500' },
{ { key: 'cancelled', label: 'Cancelled', color: 'text-gray-400 dark:text-gray-500' },
key: 'cancelled',
label: 'Cancelled',
color: 'text-gray-400 dark:text-gray-500',
},
] as const; ] as const;
type StatKey = (typeof statConfig)[number]['key']; type StatKey = (typeof statConfig)[number]['key'];
@@ -34,45 +30,25 @@ export default function ProfilePage() {
const stats = useMemo(() => { const stats = useMemo(() => {
if (!requests.length) { if (!requests.length) {
return { return { total: 0, completed: 0, active: 0, waiting: 0, failed: 0, cancelled: 0 };
total: 0,
completed: 0,
active: 0,
waiting: 0,
failed: 0,
cancelled: 0,
};
} }
return { return {
total: requests.length, total: requests.length,
completed: requests.filter((r: any) => completed: requests.filter((r: any) => ['available', 'downloaded'].includes(r.status)).length,
['available', 'downloaded'].includes(r.status), active: requests.filter((r: any) => ['pending', 'searching', 'downloading', 'processing'].includes(r.status)).length,
).length, waiting: requests.filter((r: any) => ['awaiting_search', 'awaiting_import'].includes(r.status)).length,
active: requests.filter((r: any) =>
['pending', 'searching', 'downloading', 'processing'].includes(
r.status,
),
).length,
waiting: requests.filter((r: any) =>
['awaiting_search', 'awaiting_import'].includes(r.status),
).length,
failed: requests.filter((r: any) => r.status === 'failed').length, failed: requests.filter((r: any) => r.status === 'failed').length,
cancelled: requests.filter((r: any) => r.status === 'cancelled').length, cancelled: requests.filter((r: any) => r.status === 'cancelled').length,
}; };
}, [requests]); }, [requests]);
const activeDownloads = useMemo(() => { const activeDownloads = useMemo(() => {
return requests.filter((r: any) => return requests.filter((r: any) => ['downloading', 'processing'].includes(r.status));
['downloading', 'processing'].includes(r.status),
);
}, [requests]); }, [requests]);
const recentRequests = useMemo(() => { const recentRequests = useMemo(() => {
return [...requests] return [...requests]
.sort( .sort((a: any, b: any) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime())
(a: any, b: any) =>
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(),
)
.slice(0, 5); .slice(0, 5);
}, [requests]); }, [requests]);
@@ -82,18 +58,8 @@ export default function ProfilePage() {
<Header /> <Header />
<main className="container mx-auto px-4 py-20 max-w-5xl text-center"> <main className="container mx-auto px-4 py-20 max-w-5xl text-center">
<div className="w-16 h-16 rounded-full bg-gray-100 dark:bg-gray-800 flex items-center justify-center mx-auto mb-5"> <div className="w-16 h-16 rounded-full bg-gray-100 dark:bg-gray-800 flex items-center justify-center mx-auto mb-5">
<svg <svg className="w-8 h-8 text-gray-400" fill="none" stroke="currentColor" viewBox="0 0 24 24" strokeWidth={1.5}>
className="w-8 h-8 text-gray-400" <path strokeLinecap="round" strokeLinejoin="round" d="M16.5 10.5V6.75a4.5 4.5 0 10-9 0v3.75m-.75 11.25h10.5a2.25 2.25 0 002.25-2.25v-6.75a2.25 2.25 0 00-2.25-2.25H6.75a2.25 2.25 0 00-2.25 2.25v6.75a2.25 2.25 0 002.25 2.25z" />
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
strokeWidth={1.5}
>
<path
strokeLinecap="round"
strokeLinejoin="round"
d="M16.5 10.5V6.75a4.5 4.5 0 10-9 0v3.75m-.75 11.25h10.5a2.25 2.25 0 002.25-2.25v-6.75a2.25 2.25 0 00-2.25-2.25H6.75a2.25 2.25 0 00-2.25 2.25v6.75a2.25 2.25 0 002.25 2.25z"
/>
</svg> </svg>
</div> </div>
<h2 className="text-2xl font-bold text-gray-900 dark:text-white mb-2"> <h2 className="text-2xl font-bold text-gray-900 dark:text-white mb-2">
@@ -147,7 +113,7 @@ export default function ProfilePage() {
'inline-flex items-center px-3 py-1 rounded-full text-xs font-semibold uppercase tracking-wide', 'inline-flex items-center px-3 py-1 rounded-full text-xs font-semibold uppercase tracking-wide',
user.role === 'admin' user.role === 'admin'
? 'bg-purple-50 text-purple-600 dark:bg-purple-500/15 dark:text-purple-400' ? 'bg-purple-50 text-purple-600 dark:bg-purple-500/15 dark:text-purple-400'
: 'bg-gray-100 text-gray-500 dark:bg-gray-700/50 dark:text-gray-400', : 'bg-gray-100 text-gray-500 dark:bg-gray-700/50 dark:text-gray-400'
)} )}
> >
{user.role === 'admin' ? 'Administrator' : 'User'} {user.role === 'admin' ? 'Administrator' : 'User'}
@@ -162,12 +128,7 @@ export default function ProfilePage() {
key={stat.key} key={stat.key}
className="py-5 sm:py-6 px-3 text-center bg-white dark:bg-gray-800" className="py-5 sm:py-6 px-3 text-center bg-white dark:bg-gray-800"
> >
<div <div className={cn('text-2xl sm:text-3xl font-bold tabular-nums', stat.color)}>
className={cn(
'text-2xl sm:text-3xl font-bold tabular-nums',
stat.color,
)}
>
{isLoading ? '\u2013' : stats[stat.key as StatKey]} {isLoading ? '\u2013' : stats[stat.key as StatKey]}
</div> </div>
<div className="text-xs font-medium text-gray-400 dark:text-gray-500 uppercase tracking-wider mt-1.5"> <div className="text-xs font-medium text-gray-400 dark:text-gray-500 uppercase tracking-wider mt-1.5">
@@ -197,11 +158,7 @@ export default function ProfilePage() {
</div> </div>
<div className="space-y-4"> <div className="space-y-4">
{activeDownloads.map((request: any) => ( {activeDownloads.map((request: any) => (
<RequestCard <RequestCard key={request.id} request={request} showActions={false} />
key={request.id}
request={request}
showActions={false}
/>
))} ))}
</div> </div>
</section> </section>
@@ -244,11 +201,7 @@ export default function ProfilePage() {
) : recentRequests.length > 0 ? ( ) : recentRequests.length > 0 ? (
<div className="space-y-4"> <div className="space-y-4">
{recentRequests.map((request: any) => ( {recentRequests.map((request: any) => (
<RequestCard <RequestCard key={request.id} request={request} showActions={false} />
key={request.id}
request={request}
showActions={false}
/>
))} ))}
</div> </div>
) : ( ) : (
@@ -260,11 +213,7 @@ export default function ProfilePage() {
viewBox="0 0 24 24" viewBox="0 0 24 24"
strokeWidth={1.5} strokeWidth={1.5}
> >
<path <path strokeLinecap="round" strokeLinejoin="round" d="M9 9l10.5-3m0 6.553v3.75a2.25 2.25 0 01-1.632 2.163l-1.32.377a1.803 1.803 0 11-.99-3.467l2.31-.66a2.25 2.25 0 001.632-2.163zm0 0V2.25L9 5.25v10.303m0 0v3.75a2.25 2.25 0 01-1.632 2.163l-1.32.377a1.803 1.803 0 01-.99-3.467l2.31-.66A2.25 2.25 0 009 15.553z" />
strokeLinecap="round"
strokeLinejoin="round"
d="M9 9l10.5-3m0 6.553v3.75a2.25 2.25 0 01-1.632 2.163l-1.32.377a1.803 1.803 0 11-.99-3.467l2.31-.66a2.25 2.25 0 001.632-2.163zm0 0V2.25L9 5.25v10.303m0 0v3.75a2.25 2.25 0 01-1.632 2.163l-1.32.377a1.803 1.803 0 01-.99-3.467l2.31-.66A2.25 2.25 0 009 15.553z"
/>
</svg> </svg>
<p className="text-base font-medium text-gray-500 dark:text-gray-400"> <p className="text-base font-medium text-gray-500 dark:text-gray-400">
No requests yet No requests yet
@@ -276,18 +225,8 @@ export default function ProfilePage() {
href="/search" href="/search"
className="inline-flex items-center gap-2 mt-5 px-5 py-2.5 text-sm font-medium text-white bg-blue-600 hover:bg-blue-700 rounded-lg transition-colors" className="inline-flex items-center gap-2 mt-5 px-5 py-2.5 text-sm font-medium text-white bg-blue-600 hover:bg-blue-700 rounded-lg transition-colors"
> >
<svg <svg className="w-4 h-4" fill="none" stroke="currentColor" viewBox="0 0 24 24" strokeWidth={2}>
className="w-4 h-4" <path strokeLinecap="round" strokeLinejoin="round" d="M21 21l-5.197-5.197m0 0A7.5 7.5 0 105.196 5.196a7.5 7.5 0 0010.607 10.607z" />
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
strokeWidth={2}
>
<path
strokeLinecap="round"
strokeLinejoin="round"
d="M21 21l-5.197-5.197m0 0A7.5 7.5 0 105.196 5.196a7.5 7.5 0 0010.607 10.607z"
/>
</svg> </svg>
Search Audiobooks Search Audiobooks
</a> </a>
+99 -241
View File
@@ -227,15 +227,13 @@ export class JobQueueService {
'failed', 'failed',
null, null,
error.message, error.message,
error.stack, error.stack
); );
// Handle permanent failures for specific job types after all retries exhausted // Handle permanent failures for specific job types after all retries exhausted
if (job.name === 'monitor_download' && job.data) { if (job.name === 'monitor_download' && job.data) {
const payload = job.data as MonitorDownloadPayload; const payload = job.data as MonitorDownloadPayload;
logger.error( logger.error(`MonitorDownload job permanently failed for request ${payload.requestId} after ${job.attemptsMade} attempts`);
`MonitorDownload job permanently failed for request ${payload.requestId} after ${job.attemptsMade} attempts`,
);
// Update request status to failed (only happens after all retries exhausted) // Update request status to failed (only happens after all retries exhausted)
try { try {
@@ -243,9 +241,7 @@ export class JobQueueService {
where: { id: payload.requestId }, where: { id: payload.requestId },
data: { data: {
status: 'failed', status: 'failed',
errorMessage: errorMessage: error.message || 'Failed to monitor download after multiple retries',
error.message ||
'Failed to monitor download after multiple retries',
updatedAt: new Date(), updatedAt: new Date(),
}, },
}); });
@@ -261,12 +257,7 @@ export class JobQueueService {
}); });
} }
} catch (updateError) { } catch (updateError) {
logger.error('Failed to update request/download status', { logger.error('Failed to update request/download status', { error: updateError instanceof Error ? updateError.message : String(updateError) });
error:
updateError instanceof Error
? updateError.message
: String(updateError),
});
} }
} }
}); });
@@ -290,211 +281,106 @@ export class JobQueueService {
*/ */
private startProcessors(): void { private startProcessors(): void {
// Search indexers processor // Search indexers processor
this.queue.process( this.queue.process('search_indexers', 2, async (job: BullJob<SearchIndexersPayload>) => {
'search_indexers', const { processSearchIndexers } = await import('../processors/search-indexers.processor');
2,
async (job: BullJob<SearchIndexersPayload>) => {
const { processSearchIndexers } =
await import('../processors/search-indexers.processor');
return await processSearchIndexers(job.data); return await processSearchIndexers(job.data);
}, });
);
// Download torrent processor // Download torrent processor
this.queue.process( this.queue.process('download_torrent', 2, async (job: BullJob<DownloadTorrentPayload>) => {
'download_torrent', const { processDownloadTorrent } = await import('../processors/download-torrent.processor');
2,
async (job: BullJob<DownloadTorrentPayload>) => {
const { processDownloadTorrent } =
await import('../processors/download-torrent.processor');
return await processDownloadTorrent(job.data); return await processDownloadTorrent(job.data);
}, });
);
// Monitor download processor // Monitor download processor
this.queue.process( this.queue.process('monitor_download', 2, async (job: BullJob<MonitorDownloadPayload>) => {
'monitor_download', const { processMonitorDownload } = await import('../processors/monitor-download.processor');
2,
async (job: BullJob<MonitorDownloadPayload>) => {
const { processMonitorDownload } =
await import('../processors/monitor-download.processor');
return await processMonitorDownload(job.data); return await processMonitorDownload(job.data);
}, });
);
// Organize files processor // Organize files processor
this.queue.process( this.queue.process('organize_files', 2, async (job: BullJob<OrganizeFilesPayload>) => {
'organize_files', const { processOrganizeFiles } = await import('../processors/organize-files.processor');
2,
async (job: BullJob<OrganizeFilesPayload>) => {
const { processOrganizeFiles } =
await import('../processors/organize-files.processor');
return await processOrganizeFiles(job.data); return await processOrganizeFiles(job.data);
}, });
);
// Scan Plex processor // Scan Plex processor
this.queue.process( this.queue.process('scan_plex', 1, async (job: BullJob<ScanPlexPayload>) => {
'scan_plex', const { processScanPlex } = await import('../processors/scan-plex.processor');
1,
async (job: BullJob<ScanPlexPayload>) => {
const { processScanPlex } =
await import('../processors/scan-plex.processor');
return await processScanPlex(job.data); return await processScanPlex(job.data);
}, });
);
// Scheduled job processors // Scheduled job processors
this.queue.process('plex_library_scan', 1, async (job: BullJob) => { this.queue.process('plex_library_scan', 1, async (job: BullJob) => {
// plex_library_scan is just an alias for scan_plex // plex_library_scan is just an alias for scan_plex
const { processScanPlex } = const { processScanPlex } = await import('../processors/scan-plex.processor');
await import('../processors/scan-plex.processor'); const payloadWithJobId = await this.ensureJobRecord(job, 'plex_library_scan');
const payloadWithJobId = await this.ensureJobRecord(
job,
'plex_library_scan',
);
return await processScanPlex(payloadWithJobId); return await processScanPlex(payloadWithJobId);
}); });
this.queue.process( this.queue.process('plex_recently_added_check', 1, async (job: BullJob<PlexRecentlyAddedPayload>) => {
'plex_recently_added_check', const { processPlexRecentlyAddedCheck } = await import('../processors/plex-recently-added.processor');
1, const payloadWithJobId = await this.ensureJobRecord(job, 'plex_recently_added_check');
async (job: BullJob<PlexRecentlyAddedPayload>) => {
const { processPlexRecentlyAddedCheck } =
await import('../processors/plex-recently-added.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'plex_recently_added_check',
);
return await processPlexRecentlyAddedCheck(payloadWithJobId); return await processPlexRecentlyAddedCheck(payloadWithJobId);
}, });
);
this.queue.process( this.queue.process('monitor_rss_feeds', 1, async (job: BullJob<MonitorRssFeedsPayload>) => {
'monitor_rss_feeds', const { processMonitorRssFeeds } = await import('../processors/monitor-rss-feeds.processor');
1, const payloadWithJobId = await this.ensureJobRecord(job, 'monitor_rss_feeds');
async (job: BullJob<MonitorRssFeedsPayload>) => {
const { processMonitorRssFeeds } =
await import('../processors/monitor-rss-feeds.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'monitor_rss_feeds',
);
return await processMonitorRssFeeds(payloadWithJobId); return await processMonitorRssFeeds(payloadWithJobId);
}, });
);
this.queue.process( this.queue.process('audible_refresh', 1, async (job: BullJob<AudibleRefreshPayload>) => {
'audible_refresh', const { processAudibleRefresh } = await import('../processors/audible-refresh.processor');
1, const payloadWithJobId = await this.ensureJobRecord(job, 'audible_refresh');
async (job: BullJob<AudibleRefreshPayload>) => {
const { processAudibleRefresh } =
await import('../processors/audible-refresh.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'audible_refresh',
);
return await processAudibleRefresh(payloadWithJobId); return await processAudibleRefresh(payloadWithJobId);
}, });
);
this.queue.process( this.queue.process('retry_missing_torrents', 1, async (job: BullJob<RetryMissingTorrentsPayload>) => {
'retry_missing_torrents', const { processRetryMissingTorrents } = await import('../processors/retry-missing-torrents.processor');
1, const payloadWithJobId = await this.ensureJobRecord(job, 'retry_missing_torrents');
async (job: BullJob<RetryMissingTorrentsPayload>) => {
const { processRetryMissingTorrents } =
await import('../processors/retry-missing-torrents.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'retry_missing_torrents',
);
return await processRetryMissingTorrents(payloadWithJobId); return await processRetryMissingTorrents(payloadWithJobId);
}, });
);
this.queue.process( this.queue.process('retry_failed_imports', 1, async (job: BullJob<RetryFailedImportsPayload>) => {
'retry_failed_imports', const { processRetryFailedImports } = await import('../processors/retry-failed-imports.processor');
1, const payloadWithJobId = await this.ensureJobRecord(job, 'retry_failed_imports');
async (job: BullJob<RetryFailedImportsPayload>) => {
const { processRetryFailedImports } =
await import('../processors/retry-failed-imports.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'retry_failed_imports',
);
return await processRetryFailedImports(payloadWithJobId); return await processRetryFailedImports(payloadWithJobId);
}, });
);
this.queue.process( this.queue.process('cleanup_seeded_torrents', 1, async (job: BullJob<CleanupSeededTorrentsPayload>) => {
'cleanup_seeded_torrents', const { processCleanupSeededTorrents } = await import('../processors/cleanup-seeded-torrents.processor');
1, const payloadWithJobId = await this.ensureJobRecord(job, 'cleanup_seeded_torrents');
async (job: BullJob<CleanupSeededTorrentsPayload>) => {
const { processCleanupSeededTorrents } =
await import('../processors/cleanup-seeded-torrents.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'cleanup_seeded_torrents',
);
return await processCleanupSeededTorrents(payloadWithJobId); return await processCleanupSeededTorrents(payloadWithJobId);
}, });
);
this.queue.process( this.queue.process('sync_reading_shelves', 1, async (job: BullJob<SyncShelvesPayload>) => {
'sync_reading_shelves', const { processSyncShelves } = await import('../processors/sync-shelves.processor');
1, const payloadWithJobId = await this.ensureJobRecord(job, 'sync_reading_shelves');
async (job: BullJob<SyncShelvesPayload>) => {
const { processSyncShelves } =
await import('../processors/sync-shelves.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'sync_reading_shelves',
);
return await processSyncShelves(payloadWithJobId); return await processSyncShelves(payloadWithJobId);
}, });
);
// Send notification processor // Send notification processor
this.queue.process( this.queue.process('send_notification', 2, async (job: BullJob<SendNotificationPayload>) => {
'send_notification', const { processSendNotification } = await import('../processors/send-notification.processor');
2,
async (job: BullJob<SendNotificationPayload>) => {
const { processSendNotification } =
await import('../processors/send-notification.processor');
return await processSendNotification(job.data); return await processSendNotification(job.data);
}, });
);
// Ebook-specific processors // Ebook-specific processors
this.queue.process( this.queue.process('search_ebook', 2, async (job: BullJob<SearchEbookPayload>) => {
'search_ebook', const { processSearchEbook } = await import('../processors/search-ebook.processor');
2,
async (job: BullJob<SearchEbookPayload>) => {
const { processSearchEbook } =
await import('../processors/search-ebook.processor');
return await processSearchEbook(job.data); return await processSearchEbook(job.data);
}, });
);
this.queue.process( this.queue.process('start_direct_download', 2, async (job: BullJob<StartDirectDownloadPayload>) => {
'start_direct_download', const { processStartDirectDownload } = await import('../processors/direct-download.processor');
2,
async (job: BullJob<StartDirectDownloadPayload>) => {
const { processStartDirectDownload } =
await import('../processors/direct-download.processor');
return await processStartDirectDownload(job.data); return await processStartDirectDownload(job.data);
}, });
);
this.queue.process( this.queue.process('monitor_direct_download', 2, async (job: BullJob<MonitorDirectDownloadPayload>) => {
'monitor_direct_download', const { processMonitorDirectDownload } = await import('../processors/direct-download.processor');
2,
async (job: BullJob<MonitorDirectDownloadPayload>) => {
const { processMonitorDirectDownload } =
await import('../processors/direct-download.processor');
return await processMonitorDirectDownload(job.data); return await processMonitorDirectDownload(job.data);
}, });
);
} }
/** /**
@@ -519,16 +405,11 @@ export class JobQueueService {
if (existingJob) { if (existingJob) {
// Update lastRun for the scheduled job if this is a timer-triggered job // Update lastRun for the scheduled job if this is a timer-triggered job
if (payload.scheduledJobId) { if (payload.scheduledJobId) {
await prisma.scheduledJob await prisma.scheduledJob.update({
.update({
where: { id: payload.scheduledJobId }, where: { id: payload.scheduledJobId },
data: { lastRun: new Date() }, data: { lastRun: new Date() },
}) }).catch(err => {
.catch((err) => { logger.error(`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, { error: err instanceof Error ? err.message : String(err) });
logger.error(
`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`,
{ error: err instanceof Error ? err.message : String(err) },
);
}); });
} }
return { ...payload, jobId: existingJob.id }; return { ...payload, jobId: existingJob.id };
@@ -549,16 +430,11 @@ export class JobQueueService {
// Update lastRun for the scheduled job if this is a timer-triggered job // Update lastRun for the scheduled job if this is a timer-triggered job
if (payload.scheduledJobId) { if (payload.scheduledJobId) {
await prisma.scheduledJob await prisma.scheduledJob.update({
.update({
where: { id: payload.scheduledJobId }, where: { id: payload.scheduledJobId },
data: { lastRun: new Date() }, data: { lastRun: new Date() },
}) }).catch(err => {
.catch((err) => { logger.error(`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, { error: err instanceof Error ? err.message : String(err) });
logger.error(
`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`,
{ error: err instanceof Error ? err.message : String(err) },
);
}); });
} }
@@ -573,7 +449,7 @@ export class JobQueueService {
status: string, status: string,
result?: any, result?: any,
errorMessage?: string, errorMessage?: string,
stackTrace?: string, stackTrace?: string
): Promise<void> { ): Promise<void> {
try { try {
const updateData: any = { const updateData: any = {
@@ -606,9 +482,7 @@ export class JobQueueService {
data: updateData, data: updateData,
}); });
} catch (error) { } catch (error) {
logger.error('Failed to update job in database', { logger.error('Failed to update job in database', { error: error instanceof Error ? error.message : String(error) });
error: error instanceof Error ? error.message : String(error),
});
} }
} }
@@ -618,7 +492,7 @@ export class JobQueueService {
private async addJob( private async addJob(
type: JobType, type: JobType,
payload: JobPayload, payload: JobPayload,
options?: JobOptions, options?: JobOptions
): Promise<string> { ): Promise<string> {
// First create the database job record // First create the database job record
const dbJob = await prisma.job.create({ const dbJob = await prisma.job.create({
@@ -651,10 +525,7 @@ export class JobQueueService {
/** /**
* Add search indexers job * Add search indexers job
*/ */
async addSearchJob( async addSearchJob(requestId: string, audiobook: { id: string; title: string; author: string; asin?: string }): Promise<string> {
requestId: string,
audiobook: { id: string; title: string; author: string; asin?: string },
): Promise<string> {
return await this.addJob( return await this.addJob(
'search_indexers', 'search_indexers',
{ {
@@ -663,7 +534,7 @@ export class JobQueueService {
} as SearchIndexersPayload, } as SearchIndexersPayload,
{ {
priority: 10, // High priority for user-initiated requests priority: 10, // High priority for user-initiated requests
}, }
); );
} }
@@ -673,7 +544,7 @@ export class JobQueueService {
async addDownloadJob( async addDownloadJob(
requestId: string, requestId: string,
audiobook: { id: string; title: string; author: string }, audiobook: { id: string; title: string; author: string },
torrent: TorrentResult, torrent: TorrentResult
): Promise<string> { ): Promise<string> {
return await this.addJob( return await this.addJob(
'download_torrent', 'download_torrent',
@@ -684,7 +555,7 @@ export class JobQueueService {
} as DownloadTorrentPayload, } as DownloadTorrentPayload,
{ {
priority: 9, // High priority - download selected torrent priority: 9, // High priority - download selected torrent
}, }
); );
} }
@@ -699,7 +570,7 @@ export class JobQueueService {
delaySeconds: number = 0, delaySeconds: number = 0,
lastProgress?: number, lastProgress?: number,
stallCount?: number, stallCount?: number,
pathWaitCount?: number, pathWaitCount?: number
): Promise<string> { ): Promise<string> {
return await this.addJob( return await this.addJob(
'monitor_download', 'monitor_download',
@@ -715,7 +586,7 @@ export class JobQueueService {
{ {
priority: 5, // Medium priority priority: 5, // Medium priority
delay: delaySeconds * 1000, // Convert seconds to milliseconds delay: delaySeconds * 1000, // Convert seconds to milliseconds
}, }
); );
} }
@@ -727,7 +598,7 @@ export class JobQueueService {
requestId: string, requestId: string,
audiobookId: string, audiobookId: string,
downloadPath: string, downloadPath: string,
targetPath?: string, targetPath?: string
): Promise<string> { ): Promise<string> {
return await this.addJob( return await this.addJob(
'organize_files', 'organize_files',
@@ -739,18 +610,14 @@ export class JobQueueService {
} as OrganizeFilesPayload, } as OrganizeFilesPayload,
{ {
priority: 8, priority: 8,
}, }
); );
} }
/** /**
* Add Plex scan job * Add Plex scan job
*/ */
async addPlexScanJob( async addPlexScanJob(libraryId: string, partial?: boolean, path?: string): Promise<string> {
libraryId: string,
partial?: boolean,
path?: string,
): Promise<string> {
return await this.addJob( return await this.addJob(
'scan_plex', 'scan_plex',
{ {
@@ -760,7 +627,7 @@ export class JobQueueService {
} as ScanPlexPayload, } as ScanPlexPayload,
{ {
priority: 7, priority: 7,
}, }
); );
} }
@@ -775,7 +642,7 @@ export class JobQueueService {
} as PlexRecentlyAddedPayload, } as PlexRecentlyAddedPayload,
{ {
priority: 8, priority: 8,
}, }
); );
} }
@@ -790,7 +657,7 @@ export class JobQueueService {
} as MonitorRssFeedsPayload, } as MonitorRssFeedsPayload,
{ {
priority: 8, priority: 8,
}, }
); );
} }
@@ -805,7 +672,7 @@ export class JobQueueService {
} as AudibleRefreshPayload, } as AudibleRefreshPayload,
{ {
priority: 9, priority: 9,
}, }
); );
} }
@@ -820,7 +687,7 @@ export class JobQueueService {
} as RetryMissingTorrentsPayload, } as RetryMissingTorrentsPayload,
{ {
priority: 7, priority: 7,
}, }
); );
} }
@@ -835,7 +702,7 @@ export class JobQueueService {
} as RetryFailedImportsPayload, } as RetryFailedImportsPayload,
{ {
priority: 7, priority: 7,
}, }
); );
} }
@@ -850,19 +717,14 @@ export class JobQueueService {
} as CleanupSeededTorrentsPayload, } as CleanupSeededTorrentsPayload,
{ {
priority: 10, priority: 10,
}, }
); );
} }
/** /**
* Add sync reading shelves job * Add sync reading shelves job
*/ */
async addSyncShelvesJob( async addSyncShelvesJob(scheduledJobId?: string, shelfId?: string, shelfType?: 'goodreads' | 'hardcover', maxLookupsPerShelf?: number): Promise<string> {
scheduledJobId?: string,
shelfId?: string,
shelfType?: 'goodreads' | 'hardcover',
maxLookupsPerShelf?: number,
): Promise<string> {
return await this.addJob( return await this.addJob(
'sync_reading_shelves', 'sync_reading_shelves',
{ {
@@ -873,7 +735,7 @@ export class JobQueueService {
} as SyncShelvesPayload, } as SyncShelvesPayload,
{ {
priority: 7, priority: 7,
}, }
); );
} }
@@ -887,7 +749,7 @@ export class JobQueueService {
async addSearchEbookJob( async addSearchEbookJob(
requestId: string, requestId: string,
audiobook: { id: string; title: string; author: string; asin?: string }, audiobook: { id: string; title: string; author: string; asin?: string },
preferredFormat?: string, preferredFormat?: string
): Promise<string> { ): Promise<string> {
return await this.addJob( return await this.addJob(
'search_ebook', 'search_ebook',
@@ -898,7 +760,7 @@ export class JobQueueService {
} as SearchEbookPayload, } as SearchEbookPayload,
{ {
priority: 10, // High priority for user-initiated requests priority: 10, // High priority for user-initiated requests
}, }
); );
} }
@@ -910,7 +772,7 @@ export class JobQueueService {
downloadHistoryId: string, downloadHistoryId: string,
downloadUrl: string, downloadUrl: string,
targetFilename: string, targetFilename: string,
expectedSize?: number, expectedSize?: number
): Promise<string> { ): Promise<string> {
return await this.addJob( return await this.addJob(
'start_direct_download', 'start_direct_download',
@@ -923,7 +785,7 @@ export class JobQueueService {
} as StartDirectDownloadPayload, } as StartDirectDownloadPayload,
{ {
priority: 9, // High priority - download selected ebook priority: 9, // High priority - download selected ebook
}, }
); );
} }
@@ -936,7 +798,7 @@ export class JobQueueService {
downloadId: string, downloadId: string,
targetPath: string, targetPath: string,
expectedSize?: number, expectedSize?: number,
delaySeconds: number = 0, delaySeconds: number = 0
): Promise<string> { ): Promise<string> {
return await this.addJob( return await this.addJob(
'monitor_direct_download', 'monitor_direct_download',
@@ -950,7 +812,7 @@ export class JobQueueService {
{ {
priority: 5, // Medium priority priority: 5, // Medium priority
delay: delaySeconds * 1000, delay: delaySeconds * 1000,
}, }
); );
} }
@@ -1099,13 +961,9 @@ export class JobQueueService {
author: string, author: string,
userName: string, userName: string,
message?: string, message?: string,
requestType?: string, requestType?: string
): Promise<string> { ): Promise<string> {
logger.info(`Queueing notification: ${event}`, { logger.info(`Queueing notification: ${event}`, { requestId, title, userName });
requestId,
title,
userName,
});
return await this.addJob( return await this.addJob(
'send_notification', 'send_notification',
{ {
@@ -1125,7 +983,7 @@ export class JobQueueService {
} as SendNotificationPayload, } as SendNotificationPayload,
{ {
priority: 5, // Medium priority priority: 5, // Medium priority
}, }
); );
} }
@@ -1136,7 +994,7 @@ export class JobQueueService {
jobType: string, jobType: string,
payload: JobPayload, payload: JobPayload,
cronExpression: string, cronExpression: string,
jobId: string, jobId: string
): Promise<void> { ): Promise<void> {
await this.queue.add(jobType, payload, { await this.queue.add(jobType, payload, {
repeat: { repeat: {
@@ -1153,7 +1011,7 @@ export class JobQueueService {
async removeRepeatableJob( async removeRepeatableJob(
jobType: string, jobType: string,
cronExpression: string, cronExpression: string,
jobId: string, jobId: string
): Promise<void> { ): Promise<void> {
await this.queue.removeRepeatable(jobType, { await this.queue.removeRepeatable(jobType, {
cron: cronExpression, cron: cronExpression,
+19 -51
View File
@@ -10,15 +10,7 @@ import { RMABLogger } from '../utils/logger';
const logger = RMABLogger.create('Scheduler'); const logger = RMABLogger.create('Scheduler');
export type ScheduledJobType = export type ScheduledJobType = 'plex_library_scan' | 'plex_recently_added_check' | 'audible_refresh' | 'retry_missing_torrents' | 'retry_failed_imports' | 'cleanup_seeded_torrents' | 'monitor_rss_feeds' | 'sync_reading_shelves';
| 'plex_library_scan'
| 'plex_recently_added_check'
| 'audible_refresh'
| 'retry_missing_torrents'
| 'retry_failed_imports'
| 'cleanup_seeded_torrents'
| 'monitor_rss_feeds'
| 'sync_reading_shelves';
export interface ScheduledJob { export interface ScheduledJob {
id: string; id: string;
@@ -160,9 +152,7 @@ export class SchedulerService {
data: defaultJob, data: defaultJob,
}); });
created++; created++;
logger.info( logger.info(`Created default job: ${defaultJob.name} (enabled: ${defaultJob.enabled})`);
`Created default job: ${defaultJob.name} (enabled: ${defaultJob.enabled})`,
);
} }
} catch (error) { } catch (error) {
failed++; failed++;
@@ -174,9 +164,7 @@ export class SchedulerService {
} }
if (failed > 0) { if (failed > 0) {
logger.warn( logger.warn(`Default jobs: ${created} created, ${failed} failed — failed jobs will be retried on next restart`);
`Default jobs: ${created} created, ${failed} failed — failed jobs will be retried on next restart`,
);
} else if (created > 0) { } else if (created > 0) {
logger.info(`Default jobs: ${created} created`); logger.info(`Default jobs: ${created} created`);
} }
@@ -201,9 +189,7 @@ export class SchedulerService {
await this.unscheduleJob(job); await this.unscheduleJob(job);
} }
await prisma.scheduledJob.delete({ where: { id: job.id } }); await prisma.scheduledJob.delete({ where: { id: job.id } });
logger.info( logger.info(`Removed deprecated scheduled job: ${job.name} (${job.type})`);
`Removed deprecated scheduled job: ${job.name} (${job.type})`,
);
} }
} catch (error) { } catch (error) {
logger.error('Failed to cleanup deprecated scheduled jobs', { logger.error('Failed to cleanup deprecated scheduled jobs', {
@@ -236,13 +222,11 @@ export class SchedulerService {
job.type, job.type,
{ scheduledJobId: job.id }, { scheduledJobId: job.id },
job.schedule, job.schedule,
`scheduled-${job.id}`, `scheduled-${job.id}`
); );
logger.info(`Job scheduled: ${job.name} (${job.schedule})`); logger.info(`Job scheduled: ${job.name} (${job.schedule})`);
} catch (error) { } catch (error) {
logger.error(`Failed to schedule job ${job.name}`, { logger.error(`Failed to schedule job ${job.name}`, { error: error instanceof Error ? error.message : String(error) });
error: error instanceof Error ? error.message : String(error),
});
throw error; throw error;
} }
} }
@@ -255,13 +239,11 @@ export class SchedulerService {
await this.jobQueue.removeRepeatableJob( await this.jobQueue.removeRepeatableJob(
job.type, job.type,
job.schedule, job.schedule,
`scheduled-${job.id}`, `scheduled-${job.id}`
); );
logger.info(`Job unscheduled: ${job.name}`); logger.info(`Job unscheduled: ${job.name}`);
} catch (error) { } catch (error) {
logger.error(`Failed to unschedule job ${job.name}`, { logger.error(`Failed to unschedule job ${job.name}`, { error: error instanceof Error ? error.message : String(error) });
error: error instanceof Error ? error.message : String(error),
});
// Don't throw - job might not exist in Bull yet // Don't throw - job might not exist in Bull yet
} }
} }
@@ -313,7 +295,7 @@ export class SchedulerService {
*/ */
async updateScheduledJob( async updateScheduledJob(
id: string, id: string,
dto: UpdateScheduledJobDto, dto: UpdateScheduledJobDto
): Promise<ScheduledJob> { ): Promise<ScheduledJob> {
if (dto.schedule) { if (dto.schedule) {
this.validateCronExpression(dto.schedule); this.validateCronExpression(dto.schedule);
@@ -457,8 +439,7 @@ export class SchedulerService {
throw new Error(errorMsg); throw new Error(errorMsg);
} }
libraryId = libraryId = job.payload?.libraryId || absConfig['audiobookshelf.library_id'];
job.payload?.libraryId || absConfig['audiobookshelf.library_id'];
} else { } else {
const plexConfig = await configService.getMany([ const plexConfig = await configService.getMany([
'plex_url', 'plex_url',
@@ -482,18 +463,15 @@ export class SchedulerService {
throw new Error(errorMsg); throw new Error(errorMsg);
} }
libraryId = libraryId = job.payload?.libraryId || plexConfig.plex_audiobook_library_id;
job.payload?.libraryId || plexConfig.plex_audiobook_library_id;
} }
logger.info( logger.info(`Triggering ${backendMode} library scan for library: ${libraryId}`);
`Triggering ${backendMode} library scan for library: ${libraryId}`,
);
return await this.jobQueue.addPlexScanJob( return await this.jobQueue.addPlexScanJob(
libraryId || '', libraryId || '',
job.payload?.partial, job.payload?.partial,
job.payload?.path, job.payload?.path
); );
} }
@@ -514,6 +492,7 @@ export class SchedulerService {
return await this.jobQueue.addAudibleRefreshJob(job.id); return await this.jobQueue.addAudibleRefreshJob(job.id);
} }
/** /**
* Enable a scheduled job * Enable a scheduled job
*/ */
@@ -545,12 +524,10 @@ export class SchedulerService {
await this.triggerJobNow(job.id); await this.triggerJobNow(job.id);
// Stagger triggers to avoid connection pool burst on startup // Stagger triggers to avoid connection pool burst on startup
await new Promise((resolve) => setTimeout(resolve, 500)); await new Promise(resolve => setTimeout(resolve, 500));
} }
} catch (error) { } catch (error) {
logger.error(`Failed to trigger overdue job "${job.name}"`, { logger.error(`Failed to trigger overdue job "${job.name}"`, { error: error instanceof Error ? error.message : String(error) });
error: error instanceof Error ? error.message : String(error),
});
} }
} }
} }
@@ -623,22 +600,13 @@ export class SchedulerService {
if (dayOfMonth === '*' && month === '*' && dayOfWeek === '*') { if (dayOfMonth === '*' && month === '*' && dayOfWeek === '*') {
const hourNum = parseInt(hour, 10); const hourNum = parseInt(hour, 10);
const minuteNum = parseInt(minute, 10); const minuteNum = parseInt(minute, 10);
if ( if (!isNaN(hourNum) && !isNaN(minuteNum) && hourNum >= 0 && hourNum <= 23 && minuteNum >= 0 && minuteNum <= 59) {
!isNaN(hourNum) &&
!isNaN(minuteNum) &&
hourNum >= 0 &&
hourNum <= 23 &&
minuteNum >= 0 &&
minuteNum <= 59
) {
return 24 * 60 * 60 * 1000; // 24 hours return 24 * 60 * 60 * 1000; // 24 hours
} }
} }
// For other patterns, return a conservative default (24 hours) // For other patterns, return a conservative default (24 hours)
logger.warn( logger.warn(`Unknown cron pattern "${cronExpression}", defaulting to 24 hours`);
`Unknown cron pattern "${cronExpression}", defaulting to 24 hours`,
);
return 24 * 60 * 60 * 1000; return 24 * 60 * 60 * 1000;
} }
@@ -688,7 +656,7 @@ export class SchedulerService {
* Trigger Reading shelves sync * Trigger Reading shelves sync
*/ */
private async triggerSyncShelves(job: any): Promise<string> { private async triggerSyncShelves(job: any): Promise<string> {
return await this.jobQueue.addSyncShelvesJob(job.id); return await this.jobQueue.addSyncShelvesJob(job.id, undefined, 'goodreads');
} }
} }