Hardcover API support

This commit is contained in:
Rob Walsh
2026-02-27 15:10:27 -07:00
parent 3ee67c8763
commit cfe780c6f0
14 changed files with 2086 additions and 162 deletions
+479
View File
@@ -0,0 +1,479 @@
/**
* Component: Hardcover Shelf Sync Service
* Documentation: documentation/backend/services/hardcover-sync.md
*
* Fetches Hardcover books using their GraphQL API, resolves books to Audible ASINs,
* and creates requests via the shared request-creator service.
*/
import axios from 'axios';
import { prisma } from '@/lib/db';
import { getAudibleService } from '@/lib/integrations/audible.service';
import { createRequestForUser } from '@/lib/services/request-creator.service';
import { RMABLogger } from '@/lib/utils/logger';
const logger = RMABLogger.create('HardcoverSync');
/** Default max Audible lookups per shelf per scheduled sync cycle */
const DEFAULT_MAX_LOOKUPS_PER_SHELF = 10;
/** Days before retrying a noMatch book */
const NO_MATCH_RETRY_DAYS = 7;
const HARDCOVER_API_URL = 'https://api.hardcover.app/v1/graphql';
interface HardcoverApiBook {
bookId: string;
title: string;
author: string;
coverUrl?: string;
}
/**
* Fetch a Hardcover List using their GraphQL API.
* This handles both 'status_id' user_books or 'list_id' list_books queries.
* For simplicity, we assume `listId` provided by the user is an Int corresponding to a list_id or status_id.
*/
export async function fetchHardcoverList(
apiToken: string,
listIdStr: string,
): Promise<{ listName: string; books: HardcoverApiBook[] }> {
// If we can parse as integer, it could be a List ID or Status ID. If UUID, we adjust query
const isUuid =
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(
listIdStr,
);
// Example generic query to Hardcover. Adjust the table/format as needed for their schema.
// Hardcover lists use custom lists (list_books) or statuses (user_books).
// Assuming list_books for this implementation.
const query = `
query GetListBooks($listId: Int!) {
list_books(where: {list_id: {_eq: $listId}}) {
list {
name
}
book {
id
title
author_books {
author {
name
}
}
cached_image
image {
url
}
}
}
}
`;
// Provide fallback UUID query if Hardcover uses UUIDs instead.
const queryUuid = `
query GetListBooksUuid($listId: uuid!) {
list_books(where: {list_id: {_eq: $listId}}) {
list {
name
}
book {
id
title
author_books {
author {
name
}
}
cached_image
image {
url
}
}
}
}
`;
const response = await axios.post(
HARDCOVER_API_URL,
{
query: isUuid ? queryUuid : query,
variables: {
listId: isUuid ? listIdStr : parseInt(listIdStr, 10),
},
},
{
headers: {
Authorization: `Bearer ${apiToken}`,
'Content-Type': 'application/json',
},
timeout: 15000,
},
);
if (response.data?.errors) {
throw new Error(`Hardcover API Error: ${response.data.errors[0]?.message}`);
}
const listBooks = response.data?.data?.list_books || [];
let listName = 'Hardcover List';
if (listBooks.length > 0 && listBooks[0].list?.name) {
listName = listBooks[0].list.name;
}
const books: HardcoverApiBook[] = [];
for (const item of listBooks) {
const book = item.book;
if (!book || !book.id) continue;
// Hardcover authors can be multiple, we pick the first one or join them
const authorName = book.author_books?.[0]?.author?.name || 'Unknown Author';
const coverUrl = book.cached_image || book.image?.url || undefined;
books.push({
bookId: book.id.toString(),
title: book.title || 'Unknown Title',
author: authorName,
coverUrl,
});
}
return { listName, books };
}
export interface HardcoverSyncStats {
shelvesProcessed: number;
booksFound: number;
lookupsPerformed: number;
requestsCreated: number;
errors: number;
}
export interface HardcoverSyncOptions {
shelfId?: string;
maxLookupsPerShelf?: number;
}
export async function processHardcoverShelves(
jobLogger?: ReturnType<typeof RMABLogger.forJob>,
options: HardcoverSyncOptions = {},
): Promise<HardcoverSyncStats> {
const log = jobLogger || logger;
const stats: HardcoverSyncStats = {
shelvesProcessed: 0,
booksFound: 0,
lookupsPerformed: 0,
requestsCreated: 0,
errors: 0,
};
const maxLookups =
options.maxLookupsPerShelf ?? DEFAULT_MAX_LOOKUPS_PER_SHELF;
const whereClause = options.shelfId ? { id: options.shelfId } : {};
const shelves = await prisma.hardcoverShelf.findMany({
where: whereClause,
include: { user: { select: { id: true, plexUsername: true } } },
});
if (shelves.length === 0) {
log.info(
options.shelfId
? 'Hardcover list not found'
: 'No Hardcover lists configured, skipping',
);
return stats;
}
log.info(
`Processing ${shelves.length} Hardcover list(s)${maxLookups > 0 ? ` (max ${maxLookups} lookups/list)` : ' (unlimited lookups)'}`,
);
for (const shelf of shelves) {
try {
await processShelf(shelf, stats, log, maxLookups);
stats.shelvesProcessed++;
} catch (error) {
stats.errors++;
log.error(
`Failed to process list "${shelf.name}" for user ${shelf.user.plexUsername}: ${error instanceof Error ? error.message : 'Unknown error'}`,
);
}
}
log.info(
`Hardcover sync complete: ${stats.shelvesProcessed} lists, ${stats.booksFound} books, ${stats.lookupsPerformed} lookups, ${stats.requestsCreated} requests created, ${stats.errors} errors`,
);
return stats;
}
async function processShelf(
shelf: {
id: string;
listId: string;
apiToken: string;
name: string;
user: { id: string; plexUsername: string };
},
stats: HardcoverSyncStats,
log:
| ReturnType<typeof RMABLogger.forJob>
| ReturnType<typeof RMABLogger.create>,
maxLookups: number,
) {
log.info(
`Fetching Hardcover List "${shelf.name}" (user: ${shelf.user.plexUsername})`,
);
let fetchedData: { listName: string; books: HardcoverApiBook[] };
try {
fetchedData = await fetchHardcoverList(shelf.apiToken, shelf.listId);
} catch (error) {
log.error(
`Failed to fetch Hardcover list "${shelf.name}": ${error instanceof Error ? error.message : 'Unknown error'}`,
);
return;
}
const books = fetchedData.books;
stats.booksFound += books.length;
log.info(
`Found ${books.length} books in list "${shelf.name}" (Hardcover API)`,
);
let lookupsThisCycle = 0;
const unlimitedLookups = maxLookups === 0;
for (const book of books) {
let mapping = await prisma.hardcoverBookMapping.findUnique({
where: { hardcoverBookId: book.bookId },
});
if (!mapping) {
if (!unlimitedLookups && lookupsThisCycle >= maxLookups) continue;
mapping = await performAudibleLookup(book, log);
lookupsThisCycle++;
stats.lookupsPerformed++;
if (!mapping?.audibleAsin) continue;
}
if (mapping.noMatch) {
if (mapping.lastSearchAt) {
const daysSinceSearch =
(Date.now() - mapping.lastSearchAt.getTime()) / (1000 * 60 * 60 * 24);
if (
daysSinceSearch >= NO_MATCH_RETRY_DAYS &&
(unlimitedLookups || lookupsThisCycle < maxLookups)
) {
log.info(
`Retrying Audible lookup for "${book.title}" (${NO_MATCH_RETRY_DAYS}+ days since last search)`,
);
mapping = await performAudibleLookup(book, log, mapping.id);
lookupsThisCycle++;
stats.lookupsPerformed++;
if (!mapping?.audibleAsin) continue;
} else {
continue;
}
} else {
continue;
}
}
if (mapping.audibleAsin) {
try {
const result = await createRequestForUser(shelf.user.id, {
asin: mapping.audibleAsin,
title: mapping.title,
author: mapping.author,
coverArtUrl: mapping.coverUrl || undefined,
});
if (result.success) {
stats.requestsCreated++;
log.info(
`Created request for "${mapping.title}" by ${mapping.author} (ASIN: ${mapping.audibleAsin})`,
);
}
} catch (error) {
log.error(
`Failed to create request for "${mapping.title}": ${error instanceof Error ? error.message : 'Unknown error'}`,
);
}
}
}
// Collect enriched book data for display
const bookIds = books.map((b) => b.bookId);
const mappings =
bookIds.length > 0
? await prisma.hardcoverBookMapping.findMany({
where: { hardcoverBookId: { in: bookIds } },
select: {
hardcoverBookId: true,
audibleAsin: true,
title: true,
author: true,
coverUrl: true,
},
})
: [];
const mappingsByBookId = new Map(mappings.map((m) => [m.hardcoverBookId, m]));
const matchedAsins = mappings
.map((m) => m.audibleAsin)
.filter((asin): asin is string => !!asin);
const cachedCovers =
matchedAsins.length > 0
? await prisma.audibleCache.findMany({
where: { asin: { in: matchedAsins } },
select: { asin: true, coverArtUrl: true, cachedCoverPath: true },
})
: [];
const coverByAsin = new Map(
cachedCovers
.filter((c) => c.cachedCoverPath || c.coverArtUrl)
.map((c) => {
let coverUrl = c.coverArtUrl || '';
if (c.cachedCoverPath) {
const filename = c.cachedCoverPath.split('/').pop();
coverUrl = `/api/cache/thumbnails/${filename}`;
}
return [c.asin, coverUrl] as const;
}),
);
const bookData = books
.map((b) => {
const mapping = mappingsByBookId.get(b.bookId);
const coverUrl =
coverByAsin.get(mapping?.audibleAsin || '') ||
mapping?.coverUrl ||
b.coverUrl;
if (!coverUrl) return null;
return {
coverUrl,
asin: mapping?.audibleAsin || null,
title: mapping?.title || b.title,
author: mapping?.author || b.author,
};
})
.filter((b): b is NonNullable<typeof b> => b !== null)
.slice(0, 8);
const finalListName =
fetchedData.listName !== 'Hardcover List'
? fetchedData.listName
: shelf.name;
await prisma.hardcoverShelf.update({
where: { id: shelf.id },
data: {
name: finalListName,
lastSyncAt: new Date(),
bookCount: books.length,
coverUrls: bookData.length > 0 ? JSON.stringify(bookData) : null,
},
});
}
async function performAudibleLookup(
book: HardcoverApiBook,
log:
| ReturnType<typeof RMABLogger.forJob>
| ReturnType<typeof RMABLogger.create>,
existingMappingId?: string,
): Promise<any> {
const audibleService = getAudibleService();
try {
const fullQuery = `${book.title} ${book.author}`;
log.info(`Searching Audible for: "${fullQuery}"`);
let searchResult = await audibleService.search(fullQuery);
let firstResult = searchResult.results[0];
if (!firstResult?.asin) {
const cleanTitle = book.title.replace(/\s*\(.*\)\s*$/, '').trim();
if (cleanTitle !== book.title) {
const cleanQuery = `${cleanTitle} ${book.author}`;
log.info(
`No results with full title, retrying without series info: "${cleanQuery}"`,
);
searchResult = await audibleService.search(cleanQuery);
firstResult = searchResult.results[0];
}
}
if (firstResult?.asin) {
log.info(
`Audible match: "${book.title}" → ASIN ${firstResult.asin} ("${firstResult.title}" by ${firstResult.author})`,
);
const data = {
title: firstResult.title,
author: firstResult.author,
audibleAsin: firstResult.asin,
coverUrl: firstResult.coverArtUrl || book.coverUrl || null,
noMatch: false,
lastSearchAt: new Date(),
};
if (existingMappingId) {
return prisma.hardcoverBookMapping.update({
where: { id: existingMappingId },
data,
});
}
return prisma.hardcoverBookMapping.create({
data: { hardcoverBookId: book.bookId, ...data },
});
}
log.info(`No Audible match for "${book.title}" by ${book.author}`);
const noMatchData = {
title: book.title,
author: book.author,
coverUrl: book.coverUrl || null,
noMatch: true,
lastSearchAt: new Date(),
audibleAsin: null,
};
if (existingMappingId) {
return prisma.hardcoverBookMapping.update({
where: { id: existingMappingId },
data: noMatchData,
});
}
return prisma.hardcoverBookMapping.create({
data: { hardcoverBookId: book.bookId, ...noMatchData },
});
} catch (error) {
log.error(
`Audible lookup failed for "${book.title}": ${error instanceof Error ? error.message : 'Unknown error'}`,
);
const errorData = {
title: book.title,
author: book.author,
coverUrl: book.coverUrl || null,
noMatch: true,
lastSearchAt: new Date(),
};
if (existingMappingId) {
return prisma.hardcoverBookMapping.update({
where: { id: existingMappingId },
data: errorData,
});
}
return prisma.hardcoverBookMapping.create({
data: { hardcoverBookId: book.bookId, ...errorData },
});
}
}
+306 -123
View File
@@ -27,6 +27,7 @@ export type JobType =
| 'cleanup_seeded_torrents'
| 'monitor_rss_feeds'
| 'sync_goodreads_shelves'
| 'sync_hardcover_shelves'
| 'send_notification'
// Ebook-specific job types
| 'search_ebook'
@@ -63,8 +64,8 @@ export interface MonitorDownloadPayload extends JobPayload {
downloadHistoryId: string;
downloadClientId: string;
downloadClient: DownloadClientType;
lastProgress?: number; // Previous poll's progress (0-100) for stall detection
stallCount?: number; // Consecutive polls with no progress change (drives backoff)
lastProgress?: number; // Previous poll's progress (0-100) for stall detection
stallCount?: number; // Consecutive polls with no progress change (drives backoff)
pathWaitCount?: number; // Consecutive polls waiting for content_path to relocate to save_path
}
@@ -111,6 +112,12 @@ export interface SyncGoodreadsShelvesPayload extends JobPayload {
maxLookupsPerShelf?: number;
}
export interface SyncHardcoverShelvesPayload extends JobPayload {
scheduledJobId?: string;
shelfId?: string;
maxLookupsPerShelf?: number;
}
// Ebook-specific payload interfaces
export interface SearchEbookPayload extends JobPayload {
requestId: string;
@@ -226,13 +233,15 @@ export class JobQueueService {
'failed',
null,
error.message,
error.stack
error.stack,
);
// Handle permanent failures for specific job types after all retries exhausted
if (job.name === 'monitor_download' && job.data) {
const payload = job.data as MonitorDownloadPayload;
logger.error(`MonitorDownload job permanently failed for request ${payload.requestId} after ${job.attemptsMade} attempts`);
logger.error(
`MonitorDownload job permanently failed for request ${payload.requestId} after ${job.attemptsMade} attempts`,
);
// Update request status to failed (only happens after all retries exhausted)
try {
@@ -240,7 +249,9 @@ export class JobQueueService {
where: { id: payload.requestId },
data: {
status: 'failed',
errorMessage: error.message || 'Failed to monitor download after multiple retries',
errorMessage:
error.message ||
'Failed to monitor download after multiple retries',
updatedAt: new Date(),
},
});
@@ -256,7 +267,12 @@ export class JobQueueService {
});
}
} catch (updateError) {
logger.error('Failed to update request/download status', { error: updateError instanceof Error ? updateError.message : String(updateError) });
logger.error('Failed to update request/download status', {
error:
updateError instanceof Error
? updateError.message
: String(updateError),
});
}
}
});
@@ -280,106 +296,225 @@ export class JobQueueService {
*/
private startProcessors(): void {
// Search indexers processor
this.queue.process('search_indexers', 2, async (job: BullJob<SearchIndexersPayload>) => {
const { processSearchIndexers } = await import('../processors/search-indexers.processor');
return await processSearchIndexers(job.data);
});
this.queue.process(
'search_indexers',
2,
async (job: BullJob<SearchIndexersPayload>) => {
const { processSearchIndexers } =
await import('../processors/search-indexers.processor');
return await processSearchIndexers(job.data);
},
);
// Download torrent processor
this.queue.process('download_torrent', 2, async (job: BullJob<DownloadTorrentPayload>) => {
const { processDownloadTorrent } = await import('../processors/download-torrent.processor');
return await processDownloadTorrent(job.data);
});
this.queue.process(
'download_torrent',
2,
async (job: BullJob<DownloadTorrentPayload>) => {
const { processDownloadTorrent } =
await import('../processors/download-torrent.processor');
return await processDownloadTorrent(job.data);
},
);
// Monitor download processor
this.queue.process('monitor_download', 2, async (job: BullJob<MonitorDownloadPayload>) => {
const { processMonitorDownload } = await import('../processors/monitor-download.processor');
return await processMonitorDownload(job.data);
});
this.queue.process(
'monitor_download',
2,
async (job: BullJob<MonitorDownloadPayload>) => {
const { processMonitorDownload } =
await import('../processors/monitor-download.processor');
return await processMonitorDownload(job.data);
},
);
// Organize files processor
this.queue.process('organize_files', 2, async (job: BullJob<OrganizeFilesPayload>) => {
const { processOrganizeFiles } = await import('../processors/organize-files.processor');
return await processOrganizeFiles(job.data);
});
this.queue.process(
'organize_files',
2,
async (job: BullJob<OrganizeFilesPayload>) => {
const { processOrganizeFiles } =
await import('../processors/organize-files.processor');
return await processOrganizeFiles(job.data);
},
);
// Scan Plex processor
this.queue.process('scan_plex', 1, async (job: BullJob<ScanPlexPayload>) => {
const { processScanPlex } = await import('../processors/scan-plex.processor');
return await processScanPlex(job.data);
});
this.queue.process(
'scan_plex',
1,
async (job: BullJob<ScanPlexPayload>) => {
const { processScanPlex } =
await import('../processors/scan-plex.processor');
return await processScanPlex(job.data);
},
);
// Scheduled job processors
this.queue.process('plex_library_scan', 1, async (job: BullJob) => {
// plex_library_scan is just an alias for scan_plex
const { processScanPlex } = await import('../processors/scan-plex.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'plex_library_scan');
const { processScanPlex } =
await import('../processors/scan-plex.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'plex_library_scan',
);
return await processScanPlex(payloadWithJobId);
});
this.queue.process('plex_recently_added_check', 1, async (job: BullJob<PlexRecentlyAddedPayload>) => {
const { processPlexRecentlyAddedCheck } = await import('../processors/plex-recently-added.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'plex_recently_added_check');
return await processPlexRecentlyAddedCheck(payloadWithJobId);
});
this.queue.process(
'plex_recently_added_check',
1,
async (job: BullJob<PlexRecentlyAddedPayload>) => {
const { processPlexRecentlyAddedCheck } =
await import('../processors/plex-recently-added.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'plex_recently_added_check',
);
return await processPlexRecentlyAddedCheck(payloadWithJobId);
},
);
this.queue.process('monitor_rss_feeds', 1, async (job: BullJob<MonitorRssFeedsPayload>) => {
const { processMonitorRssFeeds } = await import('../processors/monitor-rss-feeds.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'monitor_rss_feeds');
return await processMonitorRssFeeds(payloadWithJobId);
});
this.queue.process(
'monitor_rss_feeds',
1,
async (job: BullJob<MonitorRssFeedsPayload>) => {
const { processMonitorRssFeeds } =
await import('../processors/monitor-rss-feeds.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'monitor_rss_feeds',
);
return await processMonitorRssFeeds(payloadWithJobId);
},
);
this.queue.process('audible_refresh', 1, async (job: BullJob<AudibleRefreshPayload>) => {
const { processAudibleRefresh } = await import('../processors/audible-refresh.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'audible_refresh');
return await processAudibleRefresh(payloadWithJobId);
});
this.queue.process(
'audible_refresh',
1,
async (job: BullJob<AudibleRefreshPayload>) => {
const { processAudibleRefresh } =
await import('../processors/audible-refresh.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'audible_refresh',
);
return await processAudibleRefresh(payloadWithJobId);
},
);
this.queue.process('retry_missing_torrents', 1, async (job: BullJob<RetryMissingTorrentsPayload>) => {
const { processRetryMissingTorrents } = await import('../processors/retry-missing-torrents.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'retry_missing_torrents');
return await processRetryMissingTorrents(payloadWithJobId);
});
this.queue.process(
'retry_missing_torrents',
1,
async (job: BullJob<RetryMissingTorrentsPayload>) => {
const { processRetryMissingTorrents } =
await import('../processors/retry-missing-torrents.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'retry_missing_torrents',
);
return await processRetryMissingTorrents(payloadWithJobId);
},
);
this.queue.process('retry_failed_imports', 1, async (job: BullJob<RetryFailedImportsPayload>) => {
const { processRetryFailedImports } = await import('../processors/retry-failed-imports.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'retry_failed_imports');
return await processRetryFailedImports(payloadWithJobId);
});
this.queue.process(
'retry_failed_imports',
1,
async (job: BullJob<RetryFailedImportsPayload>) => {
const { processRetryFailedImports } =
await import('../processors/retry-failed-imports.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'retry_failed_imports',
);
return await processRetryFailedImports(payloadWithJobId);
},
);
this.queue.process('cleanup_seeded_torrents', 1, async (job: BullJob<CleanupSeededTorrentsPayload>) => {
const { processCleanupSeededTorrents } = await import('../processors/cleanup-seeded-torrents.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'cleanup_seeded_torrents');
return await processCleanupSeededTorrents(payloadWithJobId);
});
this.queue.process(
'cleanup_seeded_torrents',
1,
async (job: BullJob<CleanupSeededTorrentsPayload>) => {
const { processCleanupSeededTorrents } =
await import('../processors/cleanup-seeded-torrents.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'cleanup_seeded_torrents',
);
return await processCleanupSeededTorrents(payloadWithJobId);
},
);
this.queue.process('sync_goodreads_shelves', 1, async (job: BullJob<SyncGoodreadsShelvesPayload>) => {
const { processSyncGoodreadsShelves } = await import('../processors/sync-goodreads-shelves.processor');
const payloadWithJobId = await this.ensureJobRecord(job, 'sync_goodreads_shelves');
return await processSyncGoodreadsShelves(payloadWithJobId);
});
this.queue.process(
'sync_goodreads_shelves',
1,
async (job: BullJob<SyncGoodreadsShelvesPayload>) => {
const { processSyncGoodreadsShelves } =
await import('../processors/sync-goodreads-shelves.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'sync_goodreads_shelves',
);
return await processSyncGoodreadsShelves(payloadWithJobId);
},
);
this.queue.process(
'sync_hardcover_shelves',
1,
async (job: BullJob<SyncHardcoverShelvesPayload>) => {
const { processSyncHardcoverShelves } =
await import('../processors/sync-hardcover-shelves.processor');
const payloadWithJobId = await this.ensureJobRecord(
job,
'sync_hardcover_shelves',
);
return await processSyncHardcoverShelves(payloadWithJobId);
},
);
// Send notification processor
this.queue.process('send_notification', 2, async (job: BullJob<SendNotificationPayload>) => {
const { processSendNotification } = await import('../processors/send-notification.processor');
return await processSendNotification(job.data);
});
this.queue.process(
'send_notification',
2,
async (job: BullJob<SendNotificationPayload>) => {
const { processSendNotification } =
await import('../processors/send-notification.processor');
return await processSendNotification(job.data);
},
);
// Ebook-specific processors
this.queue.process('search_ebook', 2, async (job: BullJob<SearchEbookPayload>) => {
const { processSearchEbook } = await import('../processors/search-ebook.processor');
return await processSearchEbook(job.data);
});
this.queue.process(
'search_ebook',
2,
async (job: BullJob<SearchEbookPayload>) => {
const { processSearchEbook } =
await import('../processors/search-ebook.processor');
return await processSearchEbook(job.data);
},
);
this.queue.process('start_direct_download', 2, async (job: BullJob<StartDirectDownloadPayload>) => {
const { processStartDirectDownload } = await import('../processors/direct-download.processor');
return await processStartDirectDownload(job.data);
});
this.queue.process(
'start_direct_download',
2,
async (job: BullJob<StartDirectDownloadPayload>) => {
const { processStartDirectDownload } =
await import('../processors/direct-download.processor');
return await processStartDirectDownload(job.data);
},
);
this.queue.process('monitor_direct_download', 2, async (job: BullJob<MonitorDirectDownloadPayload>) => {
const { processMonitorDirectDownload } = await import('../processors/direct-download.processor');
return await processMonitorDirectDownload(job.data);
});
this.queue.process(
'monitor_direct_download',
2,
async (job: BullJob<MonitorDirectDownloadPayload>) => {
const { processMonitorDirectDownload } =
await import('../processors/direct-download.processor');
return await processMonitorDirectDownload(job.data);
},
);
}
/**
@@ -404,12 +539,17 @@ export class JobQueueService {
if (existingJob) {
// Update lastRun for the scheduled job if this is a timer-triggered job
if (payload.scheduledJobId) {
await prisma.scheduledJob.update({
where: { id: payload.scheduledJobId },
data: { lastRun: new Date() },
}).catch(err => {
logger.error(`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, { error: err instanceof Error ? err.message : String(err) });
});
await prisma.scheduledJob
.update({
where: { id: payload.scheduledJobId },
data: { lastRun: new Date() },
})
.catch((err) => {
logger.error(
`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`,
{ error: err instanceof Error ? err.message : String(err) },
);
});
}
return { ...payload, jobId: existingJob.id };
}
@@ -429,12 +569,17 @@ export class JobQueueService {
// Update lastRun for the scheduled job if this is a timer-triggered job
if (payload.scheduledJobId) {
await prisma.scheduledJob.update({
where: { id: payload.scheduledJobId },
data: { lastRun: new Date() },
}).catch(err => {
logger.error(`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`, { error: err instanceof Error ? err.message : String(err) });
});
await prisma.scheduledJob
.update({
where: { id: payload.scheduledJobId },
data: { lastRun: new Date() },
})
.catch((err) => {
logger.error(
`Failed to update lastRun for scheduled job ${payload.scheduledJobId}`,
{ error: err instanceof Error ? err.message : String(err) },
);
});
}
return { ...payload, jobId: dbJob.id };
@@ -448,7 +593,7 @@ export class JobQueueService {
status: string,
result?: any,
errorMessage?: string,
stackTrace?: string
stackTrace?: string,
): Promise<void> {
try {
const updateData: any = {
@@ -481,7 +626,9 @@ export class JobQueueService {
data: updateData,
});
} catch (error) {
logger.error('Failed to update job in database', { error: error instanceof Error ? error.message : String(error) });
logger.error('Failed to update job in database', {
error: error instanceof Error ? error.message : String(error),
});
}
}
@@ -491,7 +638,7 @@ export class JobQueueService {
private async addJob(
type: JobType,
payload: JobPayload,
options?: JobOptions
options?: JobOptions,
): Promise<string> {
// First create the database job record
const dbJob = await prisma.job.create({
@@ -524,7 +671,10 @@ export class JobQueueService {
/**
* Add search indexers job
*/
async addSearchJob(requestId: string, audiobook: { id: string; title: string; author: string; asin?: string }): Promise<string> {
async addSearchJob(
requestId: string,
audiobook: { id: string; title: string; author: string; asin?: string },
): Promise<string> {
return await this.addJob(
'search_indexers',
{
@@ -533,7 +683,7 @@ export class JobQueueService {
} as SearchIndexersPayload,
{
priority: 10, // High priority for user-initiated requests
}
},
);
}
@@ -543,7 +693,7 @@ export class JobQueueService {
async addDownloadJob(
requestId: string,
audiobook: { id: string; title: string; author: string },
torrent: TorrentResult
torrent: TorrentResult,
): Promise<string> {
return await this.addJob(
'download_torrent',
@@ -554,7 +704,7 @@ export class JobQueueService {
} as DownloadTorrentPayload,
{
priority: 9, // High priority - download selected torrent
}
},
);
}
@@ -569,7 +719,7 @@ export class JobQueueService {
delaySeconds: number = 0,
lastProgress?: number,
stallCount?: number,
pathWaitCount?: number
pathWaitCount?: number,
): Promise<string> {
return await this.addJob(
'monitor_download',
@@ -585,7 +735,7 @@ export class JobQueueService {
{
priority: 5, // Medium priority
delay: delaySeconds * 1000, // Convert seconds to milliseconds
}
},
);
}
@@ -597,7 +747,7 @@ export class JobQueueService {
requestId: string,
audiobookId: string,
downloadPath: string,
targetPath?: string
targetPath?: string,
): Promise<string> {
return await this.addJob(
'organize_files',
@@ -609,14 +759,18 @@ export class JobQueueService {
} as OrganizeFilesPayload,
{
priority: 8,
}
},
);
}
/**
* Add Plex scan job
*/
async addPlexScanJob(libraryId: string, partial?: boolean, path?: string): Promise<string> {
async addPlexScanJob(
libraryId: string,
partial?: boolean,
path?: string,
): Promise<string> {
return await this.addJob(
'scan_plex',
{
@@ -626,7 +780,7 @@ export class JobQueueService {
} as ScanPlexPayload,
{
priority: 7,
}
},
);
}
@@ -641,7 +795,7 @@ export class JobQueueService {
} as PlexRecentlyAddedPayload,
{
priority: 8,
}
},
);
}
@@ -656,7 +810,7 @@ export class JobQueueService {
} as MonitorRssFeedsPayload,
{
priority: 8,
}
},
);
}
@@ -671,7 +825,7 @@ export class JobQueueService {
} as AudibleRefreshPayload,
{
priority: 9,
}
},
);
}
@@ -686,7 +840,7 @@ export class JobQueueService {
} as RetryMissingTorrentsPayload,
{
priority: 7,
}
},
);
}
@@ -701,7 +855,7 @@ export class JobQueueService {
} as RetryFailedImportsPayload,
{
priority: 7,
}
},
);
}
@@ -716,14 +870,18 @@ export class JobQueueService {
} as CleanupSeededTorrentsPayload,
{
priority: 10,
}
},
);
}
/**
* Add sync Goodreads shelves job
*/
async addSyncGoodreadsShelvesJob(scheduledJobId?: string, shelfId?: string, maxLookupsPerShelf?: number): Promise<string> {
async addSyncGoodreadsShelvesJob(
scheduledJobId?: string,
shelfId?: string,
maxLookupsPerShelf?: number,
): Promise<string> {
return await this.addJob(
'sync_goodreads_shelves',
{
@@ -733,7 +891,28 @@ export class JobQueueService {
} as SyncGoodreadsShelvesPayload,
{
priority: 7,
}
},
);
}
/**
* Add sync Hardcover shelves job
*/
async addSyncHardcoverShelvesJob(
scheduledJobId?: string,
shelfId?: string,
maxLookupsPerShelf?: number,
): Promise<string> {
return await this.addJob(
'sync_hardcover_shelves',
{
scheduledJobId,
shelfId,
maxLookupsPerShelf,
} as SyncHardcoverShelvesPayload,
{
priority: 7,
},
);
}
@@ -747,7 +926,7 @@ export class JobQueueService {
async addSearchEbookJob(
requestId: string,
audiobook: { id: string; title: string; author: string; asin?: string },
preferredFormat?: string
preferredFormat?: string,
): Promise<string> {
return await this.addJob(
'search_ebook',
@@ -758,7 +937,7 @@ export class JobQueueService {
} as SearchEbookPayload,
{
priority: 10, // High priority for user-initiated requests
}
},
);
}
@@ -770,7 +949,7 @@ export class JobQueueService {
downloadHistoryId: string,
downloadUrl: string,
targetFilename: string,
expectedSize?: number
expectedSize?: number,
): Promise<string> {
return await this.addJob(
'start_direct_download',
@@ -783,7 +962,7 @@ export class JobQueueService {
} as StartDirectDownloadPayload,
{
priority: 9, // High priority - download selected ebook
}
},
);
}
@@ -796,7 +975,7 @@ export class JobQueueService {
downloadId: string,
targetPath: string,
expectedSize?: number,
delaySeconds: number = 0
delaySeconds: number = 0,
): Promise<string> {
return await this.addJob(
'monitor_direct_download',
@@ -810,7 +989,7 @@ export class JobQueueService {
{
priority: 5, // Medium priority
delay: delaySeconds * 1000,
}
},
);
}
@@ -959,9 +1138,13 @@ export class JobQueueService {
author: string,
userName: string,
message?: string,
requestType?: string
requestType?: string,
): Promise<string> {
logger.info(`Queueing notification: ${event}`, { requestId, title, userName });
logger.info(`Queueing notification: ${event}`, {
requestId,
title,
userName,
});
return await this.addJob(
'send_notification',
{
@@ -981,7 +1164,7 @@ export class JobQueueService {
} as SendNotificationPayload,
{
priority: 5, // Medium priority
}
},
);
}
@@ -992,7 +1175,7 @@ export class JobQueueService {
jobType: string,
payload: JobPayload,
cronExpression: string,
jobId: string
jobId: string,
): Promise<void> {
await this.queue.add(jobType, payload, {
repeat: {
@@ -1009,7 +1192,7 @@ export class JobQueueService {
async removeRepeatableJob(
jobType: string,
cronExpression: string,
jobId: string
jobId: string,
): Promise<void> {
await this.queue.removeRepeatable(jobType, {
cron: cronExpression,
+65 -17
View File
@@ -10,7 +10,16 @@ import { RMABLogger } from '../utils/logger';
const logger = RMABLogger.create('Scheduler');
export type ScheduledJobType = 'plex_library_scan' | 'plex_recently_added_check' | 'audible_refresh' | 'retry_missing_torrents' | 'retry_failed_imports' | 'cleanup_seeded_torrents' | 'monitor_rss_feeds' | 'sync_goodreads_shelves';
export type ScheduledJobType =
| 'plex_library_scan'
| 'plex_recently_added_check'
| 'audible_refresh'
| 'retry_missing_torrents'
| 'retry_failed_imports'
| 'cleanup_seeded_torrents'
| 'monitor_rss_feeds'
| 'sync_goodreads_shelves'
| 'sync_hardcover_shelves';
export interface ScheduledJob {
id: string;
@@ -133,6 +142,13 @@ export class SchedulerService {
enabled: true, // Enable by default
payload: {},
},
{
name: 'Sync Hardcover Lists',
type: 'sync_hardcover_shelves' as ScheduledJobType,
schedule: '0 */6 * * *', // Every 6 hours
enabled: true, // Enable by default
payload: {},
},
];
let created = 0;
@@ -149,7 +165,9 @@ export class SchedulerService {
data: defaultJob,
});
created++;
logger.info(`Created default job: ${defaultJob.name} (enabled: ${defaultJob.enabled})`);
logger.info(
`Created default job: ${defaultJob.name} (enabled: ${defaultJob.enabled})`,
);
}
} catch (error) {
failed++;
@@ -161,7 +179,9 @@ export class SchedulerService {
}
if (failed > 0) {
logger.warn(`Default jobs: ${created} created, ${failed} failed — failed jobs will be retried on next restart`);
logger.warn(
`Default jobs: ${created} created, ${failed} failed — failed jobs will be retried on next restart`,
);
} else if (created > 0) {
logger.info(`Default jobs: ${created} created`);
}
@@ -191,11 +211,13 @@ export class SchedulerService {
job.type,
{ scheduledJobId: job.id },
job.schedule,
`scheduled-${job.id}`
`scheduled-${job.id}`,
);
logger.info(`Job scheduled: ${job.name} (${job.schedule})`);
} catch (error) {
logger.error(`Failed to schedule job ${job.name}`, { error: error instanceof Error ? error.message : String(error) });
logger.error(`Failed to schedule job ${job.name}`, {
error: error instanceof Error ? error.message : String(error),
});
throw error;
}
}
@@ -208,11 +230,13 @@ export class SchedulerService {
await this.jobQueue.removeRepeatableJob(
job.type,
job.schedule,
`scheduled-${job.id}`
`scheduled-${job.id}`,
);
logger.info(`Job unscheduled: ${job.name}`);
} catch (error) {
logger.error(`Failed to unschedule job ${job.name}`, { error: error instanceof Error ? error.message : String(error) });
logger.error(`Failed to unschedule job ${job.name}`, {
error: error instanceof Error ? error.message : String(error),
});
// Don't throw - job might not exist in Bull yet
}
}
@@ -264,7 +288,7 @@ export class SchedulerService {
*/
async updateScheduledJob(
id: string,
dto: UpdateScheduledJobDto
dto: UpdateScheduledJobDto,
): Promise<ScheduledJob> {
if (dto.schedule) {
this.validateCronExpression(dto.schedule);
@@ -353,6 +377,9 @@ export class SchedulerService {
case 'sync_goodreads_shelves':
bullJobId = await this.triggerSyncGoodreadsShelves(job);
break;
case 'sync_hardcover_shelves':
bullJobId = await this.triggerSyncHardcoverShelves(job);
break;
default:
throw new Error(`Unknown job type: ${job.type}`);
}
@@ -408,7 +435,8 @@ export class SchedulerService {
throw new Error(errorMsg);
}
libraryId = job.payload?.libraryId || absConfig['audiobookshelf.library_id'];
libraryId =
job.payload?.libraryId || absConfig['audiobookshelf.library_id'];
} else {
const plexConfig = await configService.getMany([
'plex_url',
@@ -432,15 +460,18 @@ export class SchedulerService {
throw new Error(errorMsg);
}
libraryId = job.payload?.libraryId || plexConfig.plex_audiobook_library_id;
libraryId =
job.payload?.libraryId || plexConfig.plex_audiobook_library_id;
}
logger.info(`Triggering ${backendMode} library scan for library: ${libraryId}`);
logger.info(
`Triggering ${backendMode} library scan for library: ${libraryId}`,
);
return await this.jobQueue.addPlexScanJob(
libraryId || '',
job.payload?.partial,
job.payload?.path
job.payload?.path,
);
}
@@ -461,7 +492,6 @@ export class SchedulerService {
return await this.jobQueue.addAudibleRefreshJob(job.id);
}
/**
* Enable a scheduled job
*/
@@ -493,10 +523,12 @@ export class SchedulerService {
await this.triggerJobNow(job.id);
// Stagger triggers to avoid connection pool burst on startup
await new Promise(resolve => setTimeout(resolve, 500));
await new Promise((resolve) => setTimeout(resolve, 500));
}
} catch (error) {
logger.error(`Failed to trigger overdue job "${job.name}"`, { error: error instanceof Error ? error.message : String(error) });
logger.error(`Failed to trigger overdue job "${job.name}"`, {
error: error instanceof Error ? error.message : String(error),
});
}
}
}
@@ -569,13 +601,22 @@ export class SchedulerService {
if (dayOfMonth === '*' && month === '*' && dayOfWeek === '*') {
const hourNum = parseInt(hour, 10);
const minuteNum = parseInt(minute, 10);
if (!isNaN(hourNum) && !isNaN(minuteNum) && hourNum >= 0 && hourNum <= 23 && minuteNum >= 0 && minuteNum <= 59) {
if (
!isNaN(hourNum) &&
!isNaN(minuteNum) &&
hourNum >= 0 &&
hourNum <= 23 &&
minuteNum >= 0 &&
minuteNum <= 59
) {
return 24 * 60 * 60 * 1000; // 24 hours
}
}
// For other patterns, return a conservative default (24 hours)
logger.warn(`Unknown cron pattern "${cronExpression}", defaulting to 24 hours`);
logger.warn(
`Unknown cron pattern "${cronExpression}", defaulting to 24 hours`,
);
return 24 * 60 * 60 * 1000;
}
@@ -627,6 +668,13 @@ export class SchedulerService {
private async triggerSyncGoodreadsShelves(job: any): Promise<string> {
return await this.jobQueue.addSyncGoodreadsShelvesJob(job.id);
}
/**
* Trigger Hardcover lists sync
*/
private async triggerSyncHardcoverShelves(job: any): Promise<string> {
return await this.jobQueue.addSyncHardcoverShelvesJob(job.id);
}
}
// Singleton instance