diff --git a/src/lib/db.ts b/src/lib/db.ts index 73d1f6b..e27df2b 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -5,6 +5,29 @@ import { PrismaClient } from '@/generated/prisma/client'; +/** + * Append connection pool parameters to DATABASE_URL if not already present. + * - connection_limit=20: up from default 9, fits 22 max workers + API routes + * - pool_timeout=30: up from default 10s, gives queued requests time + */ +function getPooledDatabaseUrl(): string { + const baseUrl = process.env.DATABASE_URL || ''; + if (!baseUrl) return baseUrl; + + const separator = baseUrl.includes('?') ? '&' : '?'; + const params: string[] = []; + + if (!baseUrl.includes('connection_limit')) { + params.push('connection_limit=20'); + } + if (!baseUrl.includes('pool_timeout')) { + params.push('pool_timeout=30'); + } + + if (params.length === 0) return baseUrl; + return `${baseUrl}${separator}${params.join('&')}`; +} + // Prevent multiple instances of Prisma Client in development const globalForPrisma = globalThis as unknown as { prisma: PrismaClient | undefined; @@ -14,6 +37,11 @@ export const prisma = globalForPrisma.prisma ?? new PrismaClient({ log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'], + datasources: { + db: { + url: getPooledDatabaseUrl(), + }, + }, }); if (process.env.NODE_ENV !== 'production') globalForPrisma.prisma = prisma; diff --git a/src/lib/processors/direct-download.processor.ts b/src/lib/processors/direct-download.processor.ts index 2dbdad4..2f4a2f7 100644 --- a/src/lib/processors/direct-download.processor.ts +++ b/src/lib/processors/direct-download.processor.ts @@ -316,6 +316,7 @@ async function downloadFileWithProgress( let bytesDownloaded = 0; let lastLogTime = Date.now(); let lastDbUpdateTime = Date.now(); + let dbUpdatePending = false; // Guard against stacking unresolved DB updates response.data.on('data', (chunk: Buffer) => { bytesDownloaded += chunk.length; @@ -332,18 +333,18 @@ async function downloadFileWithProgress( logger.info(`Download progress: ${percent}% (${(bytesDownloaded / (1024 * 1024)).toFixed(1)} MB, ${speedMBps.toFixed(2)} MB/s)`); lastLogTime = now; - // Update database with progress (non-blocking) - if (now - lastDbUpdateTime >= PROGRESS_UPDATE_INTERVAL_MS) { + // Update database with progress (non-blocking, at most 1 in-flight at a time) + if (now - lastDbUpdateTime >= PROGRESS_UPDATE_INTERVAL_MS && !dbUpdatePending) { lastDbUpdateTime = now; + dbUpdatePending = true; - // Non-blocking update - fire and forget prisma.request.update({ where: { id: tracking.requestId }, data: { progress: Math.min(percent, 99), // Cap at 99% until fully complete updatedAt: new Date(), }, - }).catch(() => {}); // Ignore errors during progress update + }).catch(() => {}).finally(() => { dbUpdatePending = false; }); } } }); diff --git a/src/lib/processors/monitor-download.processor.ts b/src/lib/processors/monitor-download.processor.ts index 39c57a4..dada097 100644 --- a/src/lib/processors/monitor-download.processor.ts +++ b/src/lib/processors/monitor-download.processor.ts @@ -16,8 +16,23 @@ import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download- * Checks download progress from download client and updates request status * Re-schedules itself if download is still in progress */ +/** Base polling interval in seconds */ +const BASE_POLL_INTERVAL = 10; +/** Maximum polling interval in seconds (5 minutes) */ +const MAX_POLL_INTERVAL = 300; + +/** + * Compute next poll delay with exponential backoff for stalled downloads. + * Active downloads poll every 10s; stalled downloads back off up to 5 min. + */ +function getBackoffDelay(stallCount: number): number { + if (stallCount <= 0) return BASE_POLL_INTERVAL; + return Math.min(BASE_POLL_INTERVAL * Math.pow(2, stallCount), MAX_POLL_INTERVAL); +} + export async function processMonitorDownload(payload: MonitorDownloadPayload): Promise { - const { requestId, downloadHistoryId, downloadClientId, downloadClient, jobId } = payload; + const { requestId, downloadHistoryId, downloadClientId, downloadClient, jobId, + lastProgress: prevProgress, stallCount: prevStallCount } = payload; const logger = RMABLogger.forJob(jobId, 'MonitorDownload'); @@ -199,22 +214,35 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P progress: progressPercent, }; } else { - // Still downloading - schedule another check in 10 seconds + // Still downloading — compute adaptive poll interval + const isStalled = info.downloadSpeed === 0 + || progressPercent === (prevProgress ?? -1) + || progressState === 'paused' + || progressState === 'queued' + || progressState === 'checking'; + + const stallCount = isStalled ? (prevStallCount ?? 0) + 1 : 0; + const delay = getBackoffDelay(stallCount); + const jobQueue = getJobQueueService(); await jobQueue.addMonitorJob( requestId, downloadHistoryId, downloadClientId, downloadClient, - 10 // Delay 10 seconds between checks + delay, + progressPercent, + stallCount ); - // Only log every 5% progress to reduce log spam - const shouldLog = progressPercent % 5 === 0 || progressPercent < 5; + // Only log every 5% progress to reduce log spam, but always log stall transitions + const shouldLog = progressPercent % 5 === 0 || progressPercent < 5 + || (stallCount === 1) || (stallCount > 0 && stallCount % 10 === 0); if (shouldLog) { logger.info(`Request ${requestId}: ${progressPercent}% complete (${progressState})`, { speed: info.downloadSpeed, eta: info.eta, + ...(stallCount > 0 && { stallCount, nextPollSec: delay }), }); } @@ -227,6 +255,8 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P speed: info.downloadSpeed, eta: info.eta, state: progressState, + stallCount, + nextPollSec: delay, }; } } catch (error) { diff --git a/src/lib/processors/monitor-rss-feeds.processor.ts b/src/lib/processors/monitor-rss-feeds.processor.ts index f0f986a..807d70e 100644 --- a/src/lib/processors/monitor-rss-feeds.processor.ts +++ b/src/lib/processors/monitor-rss-feeds.processor.ts @@ -124,6 +124,9 @@ export async function processMonitorRssFeeds(payload: MonitorRssFeedsPayload): P break; } } + + // Spread DB operations over time to avoid connection pool exhaustion + await new Promise(resolve => setTimeout(resolve, 100)); } logger.info(`RSS monitoring complete: ${matched} matches found and queued for processing`); diff --git a/src/lib/processors/retry-failed-imports.processor.ts b/src/lib/processors/retry-failed-imports.processor.ts index 2a31c63..3f03383 100644 --- a/src/lib/processors/retry-failed-imports.processor.ts +++ b/src/lib/processors/retry-failed-imports.processor.ts @@ -157,6 +157,9 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo ); triggered++; logger.info(`Triggered organize job for ${request.type || 'audiobook'} request ${request.id}: ${request.audiobook.title}`); + + // Spread DB operations over time to avoid connection pool exhaustion + await new Promise(resolve => setTimeout(resolve, 100)); } catch (error) { logger.error(`Failed to trigger organize for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`); skipped++; diff --git a/src/lib/processors/retry-missing-torrents.processor.ts b/src/lib/processors/retry-missing-torrents.processor.ts index 5c56532..72cd5fd 100644 --- a/src/lib/processors/retry-missing-torrents.processor.ts +++ b/src/lib/processors/retry-missing-torrents.processor.ts @@ -44,6 +44,7 @@ export async function processRetryMissingTorrents(payload: RetryMissingTorrentsP } // Trigger appropriate search job for each request based on type + // Throttle: 100ms delay between jobs to avoid connection pool burst const jobQueue = getJobQueueService(); let triggered = 0; @@ -73,6 +74,9 @@ export async function processRetryMissingTorrents(payload: RetryMissingTorrentsP } catch (error) { logger.error(`Failed to trigger search for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`); } + + // Spread DB operations over time to avoid connection pool exhaustion + await new Promise(resolve => setTimeout(resolve, 100)); } logger.info(`Triggered ${triggered}/${requests.length} search jobs`); diff --git a/src/lib/services/job-queue.service.ts b/src/lib/services/job-queue.service.ts index 1075bae..ea7c015 100644 --- a/src/lib/services/job-queue.service.ts +++ b/src/lib/services/job-queue.service.ts @@ -63,6 +63,8 @@ export interface MonitorDownloadPayload extends JobPayload { downloadHistoryId: string; downloadClientId: string; downloadClient: DownloadClientType; + lastProgress?: number; // Previous poll's progress (0-100) for stall detection + stallCount?: number; // Consecutive polls with no progress change (drives backoff) } export interface OrganizeFilesPayload extends JobPayload { @@ -277,19 +279,19 @@ export class JobQueueService { */ private startProcessors(): void { // Search indexers processor - this.queue.process('search_indexers', 3, async (job: BullJob) => { + this.queue.process('search_indexers', 2, async (job: BullJob) => { const { processSearchIndexers } = await import('../processors/search-indexers.processor'); return await processSearchIndexers(job.data); }); // Download torrent processor - this.queue.process('download_torrent', 3, async (job: BullJob) => { + this.queue.process('download_torrent', 2, async (job: BullJob) => { const { processDownloadTorrent } = await import('../processors/download-torrent.processor'); return await processDownloadTorrent(job.data); }); // Monitor download processor - this.queue.process('monitor_download', 5, async (job: BullJob) => { + this.queue.process('monitor_download', 2, async (job: BullJob) => { const { processMonitorDownload } = await import('../processors/monitor-download.processor'); return await processMonitorDownload(job.data); }); @@ -357,23 +359,23 @@ export class JobQueueService { }); // Send notification processor - this.queue.process('send_notification', 5, async (job: BullJob) => { + this.queue.process('send_notification', 2, async (job: BullJob) => { const { processSendNotification } = await import('../processors/send-notification.processor'); return await processSendNotification(job.data); }); // Ebook-specific processors - this.queue.process('search_ebook', 3, async (job: BullJob) => { + this.queue.process('search_ebook', 2, async (job: BullJob) => { const { processSearchEbook } = await import('../processors/search-ebook.processor'); return await processSearchEbook(job.data); }); - this.queue.process('start_direct_download', 3, async (job: BullJob) => { + this.queue.process('start_direct_download', 2, async (job: BullJob) => { const { processStartDirectDownload } = await import('../processors/direct-download.processor'); return await processStartDirectDownload(job.data); }); - this.queue.process('monitor_direct_download', 5, async (job: BullJob) => { + this.queue.process('monitor_direct_download', 2, async (job: BullJob) => { const { processMonitorDirectDownload } = await import('../processors/direct-download.processor'); return await processMonitorDirectDownload(job.data); }); @@ -563,7 +565,9 @@ export class JobQueueService { downloadHistoryId: string, downloadClientId: string, downloadClient: DownloadClientType, - delaySeconds: number = 0 + delaySeconds: number = 0, + lastProgress?: number, + stallCount?: number ): Promise { return await this.addJob( 'monitor_download', @@ -572,6 +576,8 @@ export class JobQueueService { downloadHistoryId, downloadClientId, downloadClient, + lastProgress, + stallCount, } as MonitorDownloadPayload, { priority: 5, // Medium priority diff --git a/src/lib/services/scheduler.service.ts b/src/lib/services/scheduler.service.ts index b95ae81..f64fd6a 100644 --- a/src/lib/services/scheduler.service.ts +++ b/src/lib/services/scheduler.service.ts @@ -491,6 +491,9 @@ export class SchedulerService { if (this.isJobOverdue(job)) { logger.info(`Job "${job.name}" is overdue, triggering now...`); await this.triggerJobNow(job.id); + + // Stagger triggers to avoid connection pool burst on startup + await new Promise(resolve => setTimeout(resolve, 500)); } } catch (error) { logger.error(`Failed to trigger overdue job "${job.name}"`, { error: error instanceof Error ? error.message : String(error) }); diff --git a/src/lib/utils/audiobook-matcher.ts b/src/lib/utils/audiobook-matcher.ts index d1f193d..ec86f45 100644 --- a/src/lib/utils/audiobook-matcher.ts +++ b/src/lib/utils/audiobook-matcher.ts @@ -163,7 +163,20 @@ export async function enrichAudiobooksWithMatches( audiobooks: Array>, userId?: string ) { - const results = await Promise.all(audiobooks.map((book) => enrichAudiobookWithMatch(book))); + // Batch parallel DB queries to avoid connection pool exhaustion + const BATCH_SIZE = 5; + const results: Awaited>[] = []; + for (let i = 0; i < audiobooks.length; i += BATCH_SIZE) { + const batch = audiobooks.slice(i, i + BATCH_SIZE); + const batchResults = await Promise.allSettled(batch.map((book) => enrichAudiobookWithMatch(book))); + for (const result of batchResults) { + if (result.status === 'fulfilled') { + results.push(result.value); + } else { + logger.error('Failed to enrich audiobook', { error: result.reason instanceof Error ? result.reason.message : String(result.reason) }); + } + } + } // Always enrich with request status (check ANY user's requests) const asins = audiobooks.map(book => book.asin); diff --git a/tests/processors/monitor-download.processor.test.ts b/tests/processors/monitor-download.processor.test.ts index 1fba0b9..fcdbc60 100644 --- a/tests/processors/monitor-download.processor.test.ts +++ b/tests/processors/monitor-download.processor.test.ts @@ -150,7 +150,9 @@ describe('processMonitorDownload', () => { 'dh-2', 'hash-2', 'qbittorrent', - 10 + 10, + 45, // progressPercent passed as lastProgress + 0, // stallCount reset (download is actively progressing) ); });