mirror of
https://github.com/kikootwo/ReadMeABook.git
synced 2026-06-03 04:40:09 +00:00
Merge branch 'main' into feature/hardover-shelves
This commit is contained in:
@@ -27,6 +27,7 @@ export type JobType =
|
||||
| 'cleanup_seeded_torrents'
|
||||
| 'monitor_rss_feeds'
|
||||
| 'sync_reading_shelves'
|
||||
| 'check_watched_lists'
|
||||
| 'send_notification'
|
||||
// Ebook-specific job types
|
||||
| 'search_ebook'
|
||||
@@ -114,6 +115,16 @@ export interface SyncShelvesPayload extends JobPayload {
|
||||
maxLookupsPerShelf?: number;
|
||||
}
|
||||
|
||||
export interface CheckWatchedListsPayload extends JobPayload {
|
||||
scheduledJobId?: string;
|
||||
/** If set, only process watched items for this user */
|
||||
userId?: string;
|
||||
/** If set, only process this specific series */
|
||||
seriesAsin?: string;
|
||||
/** If set, only process this specific author */
|
||||
authorAsin?: string;
|
||||
}
|
||||
|
||||
// Ebook-specific payload interfaces
|
||||
export interface SearchEbookPayload extends JobPayload {
|
||||
requestId: string;
|
||||
@@ -385,6 +396,12 @@ export class JobQueueService {
|
||||
return await processSyncShelves(payloadWithJobId);
|
||||
});
|
||||
|
||||
this.queue.process('check_watched_lists', 1, async (job: BullJob<CheckWatchedListsPayload>) => {
|
||||
const { processCheckWatchedLists } = await import('../processors/check-watched-lists.processor');
|
||||
const payloadWithJobId = await this.ensureJobRecord(job, 'check_watched_lists');
|
||||
return await processCheckWatchedLists(payloadWithJobId);
|
||||
});
|
||||
|
||||
// Send notification processor
|
||||
this.queue.process('send_notification', 2, async (job: BullJob<SendNotificationPayload>) => {
|
||||
const { processSendNotification } = await import('../processors/send-notification.processor');
|
||||
@@ -768,6 +785,39 @@ export class JobQueueService {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add check watched lists job (watched series + watched authors)
|
||||
*/
|
||||
async addCheckWatchedListsJob(scheduledJobId?: string): Promise<string> {
|
||||
return await this.addJob(
|
||||
'check_watched_lists',
|
||||
{
|
||||
scheduledJobId,
|
||||
} as CheckWatchedListsPayload,
|
||||
{
|
||||
priority: 7,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a targeted check for a specific watched series or author for a specific user.
|
||||
* Used for immediate processing when a user adds a new watch.
|
||||
*/
|
||||
async addCheckWatchedItemJob(userId: string, seriesAsin?: string, authorAsin?: string): Promise<string> {
|
||||
return await this.addJob(
|
||||
'check_watched_lists',
|
||||
{
|
||||
userId,
|
||||
seriesAsin,
|
||||
authorAsin,
|
||||
} as CheckWatchedListsPayload,
|
||||
{
|
||||
priority: 8, // Higher than scheduled (7) since user-initiated
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// EBOOK-SPECIFIC JOB METHODS
|
||||
// =========================================================================
|
||||
|
||||
@@ -12,6 +12,7 @@ import { getJobQueueService } from '@/lib/services/job-queue.service';
|
||||
import { findPlexMatch } from '@/lib/utils/audiobook-matcher';
|
||||
import { getAudibleService } from '@/lib/integrations/audible.service';
|
||||
import { RMABLogger } from '@/lib/utils/logger';
|
||||
import { seedAsin } from '@/lib/services/works.service';
|
||||
|
||||
const logger = RMABLogger.create('RequestCreator');
|
||||
|
||||
@@ -147,6 +148,15 @@ export async function createRequestForUser(
|
||||
}
|
||||
}
|
||||
|
||||
// Seed works table for cross-ASIN matching (Layer 2: request-time seeding)
|
||||
seedAsin(
|
||||
audiobook.asin,
|
||||
audiobookRecord.title,
|
||||
audiobookRecord.author,
|
||||
audiobookRecord.narrator || undefined,
|
||||
undefined // duration not available at request time
|
||||
).catch(() => {});
|
||||
|
||||
// Check if user already has an active request for this audiobook
|
||||
const existingRequest = await prisma.request.findFirst({
|
||||
where: {
|
||||
|
||||
@@ -10,7 +10,7 @@ import { RMABLogger } from '../utils/logger';
|
||||
|
||||
const logger = RMABLogger.create('Scheduler');
|
||||
|
||||
export type ScheduledJobType = 'plex_library_scan' | 'plex_recently_added_check' | 'audible_refresh' | 'retry_missing_torrents' | 'retry_failed_imports' | 'cleanup_seeded_torrents' | 'monitor_rss_feeds' | 'sync_reading_shelves';
|
||||
export type ScheduledJobType = 'plex_library_scan' | 'plex_recently_added_check' | 'audible_refresh' | 'retry_missing_torrents' | 'retry_failed_imports' | 'cleanup_seeded_torrents' | 'monitor_rss_feeds' | 'sync_reading_shelves' | 'check_watched_lists';
|
||||
|
||||
export interface ScheduledJob {
|
||||
id: string;
|
||||
@@ -136,6 +136,13 @@ export class SchedulerService {
|
||||
enabled: true, // Enable by default
|
||||
payload: {},
|
||||
},
|
||||
{
|
||||
name: 'Check Watched Lists',
|
||||
type: 'check_watched_lists' as ScheduledJobType,
|
||||
schedule: '0 0 * * *', // Daily at midnight (every 24 hours)
|
||||
enabled: true, // Enable by default
|
||||
payload: {},
|
||||
},
|
||||
];
|
||||
|
||||
let created = 0;
|
||||
@@ -381,6 +388,9 @@ export class SchedulerService {
|
||||
case 'sync_reading_shelves':
|
||||
bullJobId = await this.triggerSyncShelves(job);
|
||||
break;
|
||||
case 'check_watched_lists':
|
||||
bullJobId = await this.triggerCheckWatchedLists(job);
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unknown job type: ${job.type}`);
|
||||
}
|
||||
@@ -655,6 +665,13 @@ export class SchedulerService {
|
||||
private async triggerSyncShelves(job: any): Promise<string> {
|
||||
return await this.jobQueue.addSyncShelvesJob(job.id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger watched lists check (watched series + watched authors)
|
||||
*/
|
||||
private async triggerCheckWatchedLists(job: any): Promise<string> {
|
||||
return await this.jobQueue.addCheckWatchedListsJob(job.id);
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
|
||||
@@ -0,0 +1,414 @@
|
||||
/**
|
||||
* Component: Watched Lists Service
|
||||
* Documentation: documentation/features/watched-lists.md
|
||||
*
|
||||
* Checks watched series and watched authors for new releases.
|
||||
* Deduplicates results using the works table, checks against user's library,
|
||||
* and auto-creates requests via the shared request-creator service.
|
||||
* Follows the same pattern as goodreads-sync.service.ts.
|
||||
*/
|
||||
|
||||
import { prisma } from '@/lib/db';
|
||||
import { getAudibleService, AudibleAudiobook } from '@/lib/integrations/audible.service';
|
||||
import { scrapeSeriesPage } from '@/lib/integrations/audible-series';
|
||||
import { deduplicateAndCollectGroups } from '@/lib/utils/deduplicate-audiobooks';
|
||||
import { persistDedupGroups } from '@/lib/services/works.service';
|
||||
import { createRequestForUser } from '@/lib/services/request-creator.service';
|
||||
import { findPlexMatch } from '@/lib/utils/audiobook-matcher';
|
||||
import { getSiblingAsins } from '@/lib/services/works.service';
|
||||
import { RMABLogger } from '@/lib/utils/logger';
|
||||
|
||||
const logger = RMABLogger.create('WatchedLists');
|
||||
|
||||
/** Max books to process per series (avoid excessively long runs) */
|
||||
const MAX_BOOKS_PER_SERIES = 200;
|
||||
|
||||
/** Max author book pages to scrape */
|
||||
const MAX_AUTHOR_PAGES = 4;
|
||||
|
||||
/** Delay between scrapes to avoid rate limiting (ms) */
|
||||
const SCRAPE_DELAY_MS = 2000;
|
||||
|
||||
export interface WatchedListsSyncStats {
|
||||
seriesChecked: number;
|
||||
authorsChecked: number;
|
||||
booksFound: number;
|
||||
requestsCreated: number;
|
||||
skippedOwned: number;
|
||||
skippedExisting: number;
|
||||
errors: number;
|
||||
}
|
||||
|
||||
export interface WatchedListsSyncOptions {
|
||||
/** Process only this specific user (for targeted sync) */
|
||||
userId?: string;
|
||||
/** Process only this specific series (for immediate sync on watch) */
|
||||
seriesAsin?: string;
|
||||
/** Process only this specific author (for immediate sync on watch) */
|
||||
authorAsin?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all watched series and authors: scrape for new releases,
|
||||
* deduplicate, check library ownership, and create requests.
|
||||
* Called from the check_watched_lists processor.
|
||||
*/
|
||||
export async function processWatchedLists(
|
||||
jobLogger?: ReturnType<typeof RMABLogger.forJob>,
|
||||
options: WatchedListsSyncOptions = {}
|
||||
): Promise<WatchedListsSyncStats> {
|
||||
const log = jobLogger || logger;
|
||||
const stats: WatchedListsSyncStats = {
|
||||
seriesChecked: 0,
|
||||
authorsChecked: 0,
|
||||
booksFound: 0,
|
||||
requestsCreated: 0,
|
||||
skippedOwned: 0,
|
||||
skippedExisting: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
// ---- Watched Series ----
|
||||
await processAllWatchedSeries(log, stats, options);
|
||||
|
||||
// ---- Watched Authors ----
|
||||
await processAllWatchedAuthors(log, stats, options);
|
||||
|
||||
log.info('Watched lists sync complete', {
|
||||
seriesChecked: stats.seriesChecked,
|
||||
authorsChecked: stats.authorsChecked,
|
||||
booksFound: stats.booksFound,
|
||||
requestsCreated: stats.requestsCreated,
|
||||
skippedOwned: stats.skippedOwned,
|
||||
skippedExisting: stats.skippedExisting,
|
||||
errors: stats.errors,
|
||||
});
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Watched Series
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function processAllWatchedSeries(
|
||||
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
|
||||
stats: WatchedListsSyncStats,
|
||||
options: WatchedListsSyncOptions
|
||||
): Promise<void> {
|
||||
const whereClause: any = {};
|
||||
if (options.userId) whereClause.userId = options.userId;
|
||||
if (options.seriesAsin) whereClause.seriesAsin = options.seriesAsin;
|
||||
const watchedSeries = await prisma.watchedSeries.findMany({
|
||||
where: whereClause,
|
||||
include: { user: { select: { id: true, plexUsername: true } } },
|
||||
});
|
||||
|
||||
if (watchedSeries.length === 0) {
|
||||
log.info('No watched series to process');
|
||||
return;
|
||||
}
|
||||
|
||||
// Group by seriesAsin to avoid re-scraping the same series for multiple users
|
||||
const seriesByAsin = new Map<string, typeof watchedSeries>();
|
||||
for (const ws of watchedSeries) {
|
||||
const list = seriesByAsin.get(ws.seriesAsin) || [];
|
||||
list.push(ws);
|
||||
seriesByAsin.set(ws.seriesAsin, list);
|
||||
}
|
||||
|
||||
log.info(`Processing ${seriesByAsin.size} unique watched series (${watchedSeries.length} total subscriptions)`);
|
||||
|
||||
for (const [seriesAsin, subscriptions] of seriesByAsin) {
|
||||
try {
|
||||
await processSeriesForUsers(seriesAsin, subscriptions, log, stats);
|
||||
} catch (error) {
|
||||
stats.errors++;
|
||||
log.error(`Failed to process watched series ${seriesAsin}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
|
||||
// Rate limit between series
|
||||
await delay(SCRAPE_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
async function processSeriesForUsers(
|
||||
seriesAsin: string,
|
||||
subscriptions: Array<{ id: string; seriesTitle: string; user: { id: string; plexUsername: string } }>,
|
||||
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
|
||||
stats: WatchedListsSyncStats
|
||||
): Promise<void> {
|
||||
const title = subscriptions[0].seriesTitle;
|
||||
log.info(`Scraping watched series: "${title}" (${seriesAsin})`);
|
||||
|
||||
// Scrape all pages of the series (up to MAX_BOOKS_PER_SERIES)
|
||||
const allBooks: AudibleAudiobook[] = [];
|
||||
let page = 1;
|
||||
let hasMore = true;
|
||||
|
||||
while (hasMore && allBooks.length < MAX_BOOKS_PER_SERIES) {
|
||||
const result = await scrapeSeriesPage(seriesAsin, page);
|
||||
if (!result || result.books.length === 0) break;
|
||||
|
||||
allBooks.push(...result.books);
|
||||
hasMore = result.hasMore;
|
||||
page++;
|
||||
|
||||
if (hasMore) await delay(1000);
|
||||
}
|
||||
|
||||
if (allBooks.length === 0) {
|
||||
log.info(`No books found for series "${title}"`);
|
||||
stats.seriesChecked++;
|
||||
return;
|
||||
}
|
||||
|
||||
stats.booksFound += allBooks.length;
|
||||
|
||||
// Deduplicate
|
||||
const { books: dedupedBooks, groups } = deduplicateAndCollectGroups(allBooks);
|
||||
|
||||
// Persist dedup groups (fire-and-forget)
|
||||
if (groups.length > 0) {
|
||||
persistDedupGroups(groups).catch(() => {});
|
||||
}
|
||||
|
||||
// For each user watching this series, create requests for new books
|
||||
for (const subscription of subscriptions) {
|
||||
await createRequestsForUser(
|
||||
subscription.user.id,
|
||||
subscription.user.plexUsername,
|
||||
dedupedBooks,
|
||||
log,
|
||||
stats
|
||||
);
|
||||
|
||||
// Update lastCheckedAt
|
||||
await prisma.watchedSeries.update({
|
||||
where: { id: subscription.id },
|
||||
data: { lastCheckedAt: new Date() },
|
||||
}).catch(() => {});
|
||||
}
|
||||
|
||||
stats.seriesChecked++;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Watched Authors
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function processAllWatchedAuthors(
|
||||
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
|
||||
stats: WatchedListsSyncStats,
|
||||
options: WatchedListsSyncOptions
|
||||
): Promise<void> {
|
||||
const whereClause: any = {};
|
||||
if (options.userId) whereClause.userId = options.userId;
|
||||
if (options.authorAsin) whereClause.authorAsin = options.authorAsin;
|
||||
const watchedAuthors = await prisma.watchedAuthor.findMany({
|
||||
where: whereClause,
|
||||
include: { user: { select: { id: true, plexUsername: true } } },
|
||||
});
|
||||
|
||||
if (watchedAuthors.length === 0) {
|
||||
log.info('No watched authors to process');
|
||||
return;
|
||||
}
|
||||
|
||||
// Group by authorAsin to avoid re-scraping the same author for multiple users
|
||||
const authorsByAsin = new Map<string, typeof watchedAuthors>();
|
||||
for (const wa of watchedAuthors) {
|
||||
const list = authorsByAsin.get(wa.authorAsin) || [];
|
||||
list.push(wa);
|
||||
authorsByAsin.set(wa.authorAsin, list);
|
||||
}
|
||||
|
||||
log.info(`Processing ${authorsByAsin.size} unique watched authors (${watchedAuthors.length} total subscriptions)`);
|
||||
|
||||
for (const [authorAsin, subscriptions] of authorsByAsin) {
|
||||
try {
|
||||
await processAuthorForUsers(authorAsin, subscriptions, log, stats);
|
||||
} catch (error) {
|
||||
stats.errors++;
|
||||
log.error(`Failed to process watched author ${authorAsin}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
|
||||
// Rate limit between authors
|
||||
await delay(SCRAPE_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
async function processAuthorForUsers(
|
||||
authorAsin: string,
|
||||
subscriptions: Array<{ id: string; authorName: string; user: { id: string; plexUsername: string } }>,
|
||||
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
|
||||
stats: WatchedListsSyncStats
|
||||
): Promise<void> {
|
||||
const authorName = subscriptions[0].authorName;
|
||||
log.info(`Scraping watched author: "${authorName}" (${authorAsin})`);
|
||||
|
||||
const audibleService = getAudibleService();
|
||||
const allBooks: AudibleAudiobook[] = [];
|
||||
let page = 1;
|
||||
let hasMore = true;
|
||||
|
||||
while (hasMore && page <= MAX_AUTHOR_PAGES) {
|
||||
try {
|
||||
const result = await audibleService.searchByAuthorAsin(authorName, authorAsin, page);
|
||||
if (result.books.length === 0) break;
|
||||
|
||||
allBooks.push(...result.books);
|
||||
hasMore = result.hasMore;
|
||||
page++;
|
||||
|
||||
if (hasMore) await delay(1000);
|
||||
} catch (error) {
|
||||
log.error(`Failed to scrape author page ${page} for "${authorName}"`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (allBooks.length === 0) {
|
||||
log.info(`No books found for author "${authorName}"`);
|
||||
stats.authorsChecked++;
|
||||
return;
|
||||
}
|
||||
|
||||
stats.booksFound += allBooks.length;
|
||||
|
||||
// Deduplicate
|
||||
const { books: dedupedBooks, groups } = deduplicateAndCollectGroups(allBooks);
|
||||
|
||||
// Persist dedup groups (fire-and-forget)
|
||||
if (groups.length > 0) {
|
||||
persistDedupGroups(groups).catch(() => {});
|
||||
}
|
||||
|
||||
// For each user watching this author, create requests for new books
|
||||
for (const subscription of subscriptions) {
|
||||
await createRequestsForUser(
|
||||
subscription.user.id,
|
||||
subscription.user.plexUsername,
|
||||
dedupedBooks,
|
||||
log,
|
||||
stats
|
||||
);
|
||||
|
||||
// Update lastCheckedAt
|
||||
await prisma.watchedAuthor.update({
|
||||
where: { id: subscription.id },
|
||||
data: { lastCheckedAt: new Date() },
|
||||
}).catch(() => {});
|
||||
}
|
||||
|
||||
stats.authorsChecked++;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Shared: Create requests for a user from a list of books
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function createRequestsForUser(
|
||||
userId: string,
|
||||
username: string,
|
||||
books: AudibleAudiobook[],
|
||||
log: ReturnType<typeof RMABLogger.forJob> | ReturnType<typeof RMABLogger.create>,
|
||||
stats: WatchedListsSyncStats
|
||||
): Promise<void> {
|
||||
// Filter to books that have an ASIN
|
||||
const booksWithAsin = books.filter(b => b.asin);
|
||||
if (booksWithAsin.length === 0) return;
|
||||
|
||||
// Batch check: which ASINs are already in library (direct + sibling expansion)
|
||||
const ownedAsins = await getOwnedAsins(booksWithAsin.map(b => b.asin));
|
||||
|
||||
for (const book of booksWithAsin) {
|
||||
// Skip if user already owns this (direct or via sibling ASIN)
|
||||
if (ownedAsins.has(book.asin)) {
|
||||
stats.skippedOwned++;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await createRequestForUser(userId, {
|
||||
asin: book.asin,
|
||||
title: book.title,
|
||||
author: book.author,
|
||||
narrator: book.narrator,
|
||||
description: book.description,
|
||||
coverArtUrl: book.coverArtUrl,
|
||||
});
|
||||
|
||||
if (result.success) {
|
||||
stats.requestsCreated++;
|
||||
log.info(`Auto-requested "${book.title}" by ${book.author} for ${username}`);
|
||||
} else {
|
||||
// already_available, being_processed, duplicate — all expected
|
||||
stats.skippedExisting++;
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(`Failed to create request for "${book.title}" for ${username}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the set of ASINs that are already in the library (direct match + sibling expansion).
|
||||
*/
|
||||
async function getOwnedAsins(asins: string[]): Promise<Set<string>> {
|
||||
const owned = new Set<string>();
|
||||
|
||||
// Direct library lookup
|
||||
const libraryItems = await prisma.plexLibrary.findMany({
|
||||
where: { asin: { in: asins } },
|
||||
select: { asin: true },
|
||||
});
|
||||
for (const item of libraryItems) {
|
||||
if (item.asin) owned.add(item.asin);
|
||||
}
|
||||
|
||||
// Sibling expansion via works table
|
||||
try {
|
||||
const siblingMap = await getSiblingAsins(asins);
|
||||
if (siblingMap.size > 0) {
|
||||
const allSiblings = new Set<string>();
|
||||
for (const siblings of siblingMap.values()) {
|
||||
for (const s of siblings) allSiblings.add(s);
|
||||
}
|
||||
|
||||
if (allSiblings.size > 0) {
|
||||
const siblingLibrary = await prisma.plexLibrary.findMany({
|
||||
where: { asin: { in: [...allSiblings] } },
|
||||
select: { asin: true },
|
||||
});
|
||||
|
||||
for (const item of siblingLibrary) {
|
||||
if (item.asin) {
|
||||
// Mark the original ASIN as owned (not the sibling)
|
||||
for (const [originalAsin, siblings] of siblingMap) {
|
||||
if (siblings.includes(item.asin)) {
|
||||
owned.add(originalAsin);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Works table expansion is best-effort
|
||||
}
|
||||
|
||||
return owned;
|
||||
}
|
||||
|
||||
function delay(ms: number): Promise<void> {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
@@ -0,0 +1,248 @@
|
||||
/**
|
||||
* Component: Works Service
|
||||
* Documentation: documentation/integrations/audible.md
|
||||
*
|
||||
* Manages the works table — persistent cross-ASIN audiobook identity mapping.
|
||||
* Layer 1: Auto-populated from dedup logic when users browse search/author/series pages.
|
||||
* Layer 2: Seeded at request time to ensure requested ASINs are tracked.
|
||||
*/
|
||||
|
||||
import { prisma } from '@/lib/db';
|
||||
import { RMABLogger } from '@/lib/utils/logger';
|
||||
import type { DedupGroup } from '@/lib/utils/deduplicate-audiobooks';
|
||||
|
||||
const logger = RMABLogger.create('WorksService');
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Layer 1: Persist dedup groups (fire-and-forget from API routes)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Persist dedup groups to the works table. For each group of 2+ ASINs that
|
||||
* were identified as the same audiobook, create or update a Work record
|
||||
* linking all ASINs together.
|
||||
*
|
||||
* Safe to call fire-and-forget — never throws.
|
||||
*/
|
||||
export async function persistDedupGroups(groups: DedupGroup[]): Promise<void> {
|
||||
try {
|
||||
for (const group of groups) {
|
||||
await persistSingleGroup(group);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to persist dedup groups', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
groupCount: groups.length,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a single dedup group. Handles merging when ASINs span multiple
|
||||
* existing works.
|
||||
*/
|
||||
async function persistSingleGroup(group: DedupGroup): Promise<void> {
|
||||
const { canonicalAsin, allAsins, title, author, narrator, durationMinutes } = group;
|
||||
|
||||
// Find which of these ASINs already exist in work_asins
|
||||
const existingEntries = await prisma.workAsin.findMany({
|
||||
where: { asin: { in: allAsins } },
|
||||
select: { asin: true, workId: true },
|
||||
});
|
||||
|
||||
// Collect unique work IDs that already contain any of our ASINs
|
||||
const existingWorkIds = [...new Set(existingEntries.map(e => e.workId))];
|
||||
const existingAsinSet = new Set(existingEntries.map(e => e.asin));
|
||||
|
||||
if (existingWorkIds.length === 0) {
|
||||
// No existing works — create a new one with all ASINs
|
||||
const work = await prisma.work.create({
|
||||
data: { title, author },
|
||||
});
|
||||
|
||||
await Promise.all(
|
||||
allAsins.map(asin =>
|
||||
prisma.workAsin.create({
|
||||
data: {
|
||||
workId: work.id,
|
||||
asin,
|
||||
narrator: asin === canonicalAsin ? narrator : undefined,
|
||||
durationMinutes: asin === canonicalAsin ? durationMinutes : undefined,
|
||||
isCanonical: asin === canonicalAsin,
|
||||
source: 'dedup_auto',
|
||||
},
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
logger.debug('Created new work', { workId: work.id, asinCount: allAsins.length });
|
||||
} else {
|
||||
// Use the first existing work as the target
|
||||
const targetWorkId = existingWorkIds[0];
|
||||
|
||||
// If multiple existing works, merge them into the target
|
||||
if (existingWorkIds.length > 1) {
|
||||
const mergeWorkIds = existingWorkIds.slice(1);
|
||||
|
||||
// Move all ASINs from other works to the target
|
||||
await prisma.workAsin.updateMany({
|
||||
where: { workId: { in: mergeWorkIds } },
|
||||
data: { workId: targetWorkId },
|
||||
});
|
||||
|
||||
// Delete the now-empty works
|
||||
await prisma.work.deleteMany({
|
||||
where: { id: { in: mergeWorkIds } },
|
||||
});
|
||||
|
||||
logger.debug('Merged works', {
|
||||
targetWorkId,
|
||||
mergedWorkIds: mergeWorkIds,
|
||||
});
|
||||
}
|
||||
|
||||
// Add any new ASINs that don't already exist
|
||||
const newAsins = allAsins.filter(a => !existingAsinSet.has(a));
|
||||
if (newAsins.length > 0) {
|
||||
await Promise.all(
|
||||
newAsins.map(asin =>
|
||||
prisma.workAsin.create({
|
||||
data: {
|
||||
workId: targetWorkId,
|
||||
asin,
|
||||
narrator: asin === canonicalAsin ? narrator : undefined,
|
||||
durationMinutes: asin === canonicalAsin ? durationMinutes : undefined,
|
||||
isCanonical: asin === canonicalAsin,
|
||||
source: 'dedup_auto',
|
||||
},
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
logger.debug('Added ASINs to existing work', {
|
||||
workId: targetWorkId,
|
||||
newAsinCount: newAsins.length,
|
||||
});
|
||||
}
|
||||
|
||||
// Update canonical status: ensure the canonical ASIN is marked
|
||||
await prisma.workAsin.updateMany({
|
||||
where: { workId: targetWorkId, asin: canonicalAsin },
|
||||
data: { isCanonical: true },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Layer 2: Seed ASIN at request time
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Ensure an ASIN is tracked in the works table. Creates a single-ASIN work
|
||||
* if the ASIN isn't already present. Called at request creation time.
|
||||
*
|
||||
* Safe to call fire-and-forget — never throws.
|
||||
*/
|
||||
export async function seedAsin(
|
||||
asin: string,
|
||||
title: string,
|
||||
author: string,
|
||||
narrator?: string,
|
||||
durationMinutes?: number
|
||||
): Promise<void> {
|
||||
try {
|
||||
// Check if ASIN already tracked
|
||||
const existing = await prisma.workAsin.findUnique({
|
||||
where: { asin },
|
||||
});
|
||||
if (existing) return;
|
||||
|
||||
// Create a new single-ASIN work
|
||||
const work = await prisma.work.create({
|
||||
data: { title, author },
|
||||
});
|
||||
|
||||
await prisma.workAsin.create({
|
||||
data: {
|
||||
workId: work.id,
|
||||
asin,
|
||||
narrator,
|
||||
durationMinutes,
|
||||
isCanonical: true,
|
||||
source: 'dedup_auto',
|
||||
},
|
||||
});
|
||||
|
||||
logger.debug('Seeded ASIN', { workId: work.id, asin });
|
||||
} catch (error) {
|
||||
logger.error('Failed to seed ASIN', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
asin,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Sibling ASIN lookup (for library matching expansion)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Given a list of ASINs, return a map of each input ASIN to its sibling ASINs
|
||||
* (other ASINs in the same work, NOT including the input ASIN itself).
|
||||
*
|
||||
* ASINs not found in the works table are simply omitted from the result.
|
||||
*/
|
||||
export async function getSiblingAsins(
|
||||
asins: string[]
|
||||
): Promise<Map<string, string[]>> {
|
||||
const result = new Map<string, string[]>();
|
||||
if (asins.length === 0) return result;
|
||||
|
||||
// Step 1: Find which input ASINs are in work_asins and their work IDs
|
||||
const inputEntries = await prisma.workAsin.findMany({
|
||||
where: { asin: { in: asins } },
|
||||
select: { asin: true, workId: true },
|
||||
});
|
||||
|
||||
if (inputEntries.length === 0) return result;
|
||||
|
||||
// Build map of workId -> input ASINs in that work
|
||||
const workIdToInputAsins = new Map<string, string[]>();
|
||||
for (const entry of inputEntries) {
|
||||
const list = workIdToInputAsins.get(entry.workId);
|
||||
if (list) {
|
||||
list.push(entry.asin);
|
||||
} else {
|
||||
workIdToInputAsins.set(entry.workId, [entry.asin]);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Get ALL ASINs in those works
|
||||
const workIds = [...workIdToInputAsins.keys()];
|
||||
const allWorkAsins = await prisma.workAsin.findMany({
|
||||
where: { workId: { in: workIds } },
|
||||
select: { asin: true, workId: true },
|
||||
});
|
||||
|
||||
// Build map of workId -> all ASINs
|
||||
const workIdToAllAsins = new Map<string, string[]>();
|
||||
for (const entry of allWorkAsins) {
|
||||
const list = workIdToAllAsins.get(entry.workId);
|
||||
if (list) {
|
||||
list.push(entry.asin);
|
||||
} else {
|
||||
workIdToAllAsins.set(entry.workId, [entry.asin]);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: For each input ASIN, compute siblings (all ASINs in same work minus self)
|
||||
for (const entry of inputEntries) {
|
||||
const allInWork = workIdToAllAsins.get(entry.workId) || [];
|
||||
const siblings = allInWork.filter(a => a !== entry.asin);
|
||||
if (siblings.length > 0) {
|
||||
result.set(entry.asin, siblings);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
Reference in New Issue
Block a user