From 3820b9b21da3bb574d0b8017bd9ff84855b68fd7 Mon Sep 17 00:00:00 2001 From: kikootwo Date: Wed, 18 Feb 2026 02:43:00 -0500 Subject: [PATCH] Add DB pooling, throttling and monitor backoff Add connection pool params to DATABASE_URL and configure Prisma to use the pooled URL (connection_limit=20, pool_timeout=30) to reduce connection exhaustion. Introduce safeguards and throttling across processors: limit in-flight progress DB updates in direct-download, add short delays when processing RSS, retry-failed-imports, and retry-missing-torrents, and stagger scheduler triggers to avoid bursts. Implement adaptive monitor-download polling with stallCount/lastProgress and exponential backoff, and thread these fields through JobQueueService (including reduced worker concurrency for several queues). Batch audiobook enrichment queries to small parallel batches to limit DB load. Update tests to reflect new monitor payload parameters. Overall intent: reduce DB connection pool pressure and smooth load spikes during startup and heavy processing. --- src/lib/db.ts | 28 +++++++++++++ .../processors/direct-download.processor.ts | 9 +++-- .../processors/monitor-download.processor.ts | 40 ++++++++++++++++--- .../processors/monitor-rss-feeds.processor.ts | 3 ++ .../retry-failed-imports.processor.ts | 3 ++ .../retry-missing-torrents.processor.ts | 4 ++ src/lib/services/job-queue.service.ts | 22 ++++++---- src/lib/services/scheduler.service.ts | 3 ++ src/lib/utils/audiobook-matcher.ts | 15 ++++++- .../monitor-download.processor.test.ts | 4 +- 10 files changed, 112 insertions(+), 19 deletions(-) diff --git a/src/lib/db.ts b/src/lib/db.ts index 73d1f6b..e27df2b 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -5,6 +5,29 @@ import { PrismaClient } from '@/generated/prisma/client'; +/** + * Append connection pool parameters to DATABASE_URL if not already present. + * - connection_limit=20: up from default 9, fits 22 max workers + API routes + * - pool_timeout=30: up from default 10s, gives queued requests time + */ +function getPooledDatabaseUrl(): string { + const baseUrl = process.env.DATABASE_URL || ''; + if (!baseUrl) return baseUrl; + + const separator = baseUrl.includes('?') ? '&' : '?'; + const params: string[] = []; + + if (!baseUrl.includes('connection_limit')) { + params.push('connection_limit=20'); + } + if (!baseUrl.includes('pool_timeout')) { + params.push('pool_timeout=30'); + } + + if (params.length === 0) return baseUrl; + return `${baseUrl}${separator}${params.join('&')}`; +} + // Prevent multiple instances of Prisma Client in development const globalForPrisma = globalThis as unknown as { prisma: PrismaClient | undefined; @@ -14,6 +37,11 @@ export const prisma = globalForPrisma.prisma ?? new PrismaClient({ log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'], + datasources: { + db: { + url: getPooledDatabaseUrl(), + }, + }, }); if (process.env.NODE_ENV !== 'production') globalForPrisma.prisma = prisma; diff --git a/src/lib/processors/direct-download.processor.ts b/src/lib/processors/direct-download.processor.ts index 2dbdad4..2f4a2f7 100644 --- a/src/lib/processors/direct-download.processor.ts +++ b/src/lib/processors/direct-download.processor.ts @@ -316,6 +316,7 @@ async function downloadFileWithProgress( let bytesDownloaded = 0; let lastLogTime = Date.now(); let lastDbUpdateTime = Date.now(); + let dbUpdatePending = false; // Guard against stacking unresolved DB updates response.data.on('data', (chunk: Buffer) => { bytesDownloaded += chunk.length; @@ -332,18 +333,18 @@ async function downloadFileWithProgress( logger.info(`Download progress: ${percent}% (${(bytesDownloaded / (1024 * 1024)).toFixed(1)} MB, ${speedMBps.toFixed(2)} MB/s)`); lastLogTime = now; - // Update database with progress (non-blocking) - if (now - lastDbUpdateTime >= PROGRESS_UPDATE_INTERVAL_MS) { + // Update database with progress (non-blocking, at most 1 in-flight at a time) + if (now - lastDbUpdateTime >= PROGRESS_UPDATE_INTERVAL_MS && !dbUpdatePending) { lastDbUpdateTime = now; + dbUpdatePending = true; - // Non-blocking update - fire and forget prisma.request.update({ where: { id: tracking.requestId }, data: { progress: Math.min(percent, 99), // Cap at 99% until fully complete updatedAt: new Date(), }, - }).catch(() => {}); // Ignore errors during progress update + }).catch(() => {}).finally(() => { dbUpdatePending = false; }); } } }); diff --git a/src/lib/processors/monitor-download.processor.ts b/src/lib/processors/monitor-download.processor.ts index 39c57a4..dada097 100644 --- a/src/lib/processors/monitor-download.processor.ts +++ b/src/lib/processors/monitor-download.processor.ts @@ -16,8 +16,23 @@ import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download- * Checks download progress from download client and updates request status * Re-schedules itself if download is still in progress */ +/** Base polling interval in seconds */ +const BASE_POLL_INTERVAL = 10; +/** Maximum polling interval in seconds (5 minutes) */ +const MAX_POLL_INTERVAL = 300; + +/** + * Compute next poll delay with exponential backoff for stalled downloads. + * Active downloads poll every 10s; stalled downloads back off up to 5 min. + */ +function getBackoffDelay(stallCount: number): number { + if (stallCount <= 0) return BASE_POLL_INTERVAL; + return Math.min(BASE_POLL_INTERVAL * Math.pow(2, stallCount), MAX_POLL_INTERVAL); +} + export async function processMonitorDownload(payload: MonitorDownloadPayload): Promise { - const { requestId, downloadHistoryId, downloadClientId, downloadClient, jobId } = payload; + const { requestId, downloadHistoryId, downloadClientId, downloadClient, jobId, + lastProgress: prevProgress, stallCount: prevStallCount } = payload; const logger = RMABLogger.forJob(jobId, 'MonitorDownload'); @@ -199,22 +214,35 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P progress: progressPercent, }; } else { - // Still downloading - schedule another check in 10 seconds + // Still downloading — compute adaptive poll interval + const isStalled = info.downloadSpeed === 0 + || progressPercent === (prevProgress ?? -1) + || progressState === 'paused' + || progressState === 'queued' + || progressState === 'checking'; + + const stallCount = isStalled ? (prevStallCount ?? 0) + 1 : 0; + const delay = getBackoffDelay(stallCount); + const jobQueue = getJobQueueService(); await jobQueue.addMonitorJob( requestId, downloadHistoryId, downloadClientId, downloadClient, - 10 // Delay 10 seconds between checks + delay, + progressPercent, + stallCount ); - // Only log every 5% progress to reduce log spam - const shouldLog = progressPercent % 5 === 0 || progressPercent < 5; + // Only log every 5% progress to reduce log spam, but always log stall transitions + const shouldLog = progressPercent % 5 === 0 || progressPercent < 5 + || (stallCount === 1) || (stallCount > 0 && stallCount % 10 === 0); if (shouldLog) { logger.info(`Request ${requestId}: ${progressPercent}% complete (${progressState})`, { speed: info.downloadSpeed, eta: info.eta, + ...(stallCount > 0 && { stallCount, nextPollSec: delay }), }); } @@ -227,6 +255,8 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P speed: info.downloadSpeed, eta: info.eta, state: progressState, + stallCount, + nextPollSec: delay, }; } } catch (error) { diff --git a/src/lib/processors/monitor-rss-feeds.processor.ts b/src/lib/processors/monitor-rss-feeds.processor.ts index f0f986a..807d70e 100644 --- a/src/lib/processors/monitor-rss-feeds.processor.ts +++ b/src/lib/processors/monitor-rss-feeds.processor.ts @@ -124,6 +124,9 @@ export async function processMonitorRssFeeds(payload: MonitorRssFeedsPayload): P break; } } + + // Spread DB operations over time to avoid connection pool exhaustion + await new Promise(resolve => setTimeout(resolve, 100)); } logger.info(`RSS monitoring complete: ${matched} matches found and queued for processing`); diff --git a/src/lib/processors/retry-failed-imports.processor.ts b/src/lib/processors/retry-failed-imports.processor.ts index 2a31c63..3f03383 100644 --- a/src/lib/processors/retry-failed-imports.processor.ts +++ b/src/lib/processors/retry-failed-imports.processor.ts @@ -157,6 +157,9 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo ); triggered++; logger.info(`Triggered organize job for ${request.type || 'audiobook'} request ${request.id}: ${request.audiobook.title}`); + + // Spread DB operations over time to avoid connection pool exhaustion + await new Promise(resolve => setTimeout(resolve, 100)); } catch (error) { logger.error(`Failed to trigger organize for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`); skipped++; diff --git a/src/lib/processors/retry-missing-torrents.processor.ts b/src/lib/processors/retry-missing-torrents.processor.ts index 5c56532..72cd5fd 100644 --- a/src/lib/processors/retry-missing-torrents.processor.ts +++ b/src/lib/processors/retry-missing-torrents.processor.ts @@ -44,6 +44,7 @@ export async function processRetryMissingTorrents(payload: RetryMissingTorrentsP } // Trigger appropriate search job for each request based on type + // Throttle: 100ms delay between jobs to avoid connection pool burst const jobQueue = getJobQueueService(); let triggered = 0; @@ -73,6 +74,9 @@ export async function processRetryMissingTorrents(payload: RetryMissingTorrentsP } catch (error) { logger.error(`Failed to trigger search for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`); } + + // Spread DB operations over time to avoid connection pool exhaustion + await new Promise(resolve => setTimeout(resolve, 100)); } logger.info(`Triggered ${triggered}/${requests.length} search jobs`); diff --git a/src/lib/services/job-queue.service.ts b/src/lib/services/job-queue.service.ts index 1075bae..ea7c015 100644 --- a/src/lib/services/job-queue.service.ts +++ b/src/lib/services/job-queue.service.ts @@ -63,6 +63,8 @@ export interface MonitorDownloadPayload extends JobPayload { downloadHistoryId: string; downloadClientId: string; downloadClient: DownloadClientType; + lastProgress?: number; // Previous poll's progress (0-100) for stall detection + stallCount?: number; // Consecutive polls with no progress change (drives backoff) } export interface OrganizeFilesPayload extends JobPayload { @@ -277,19 +279,19 @@ export class JobQueueService { */ private startProcessors(): void { // Search indexers processor - this.queue.process('search_indexers', 3, async (job: BullJob) => { + this.queue.process('search_indexers', 2, async (job: BullJob) => { const { processSearchIndexers } = await import('../processors/search-indexers.processor'); return await processSearchIndexers(job.data); }); // Download torrent processor - this.queue.process('download_torrent', 3, async (job: BullJob) => { + this.queue.process('download_torrent', 2, async (job: BullJob) => { const { processDownloadTorrent } = await import('../processors/download-torrent.processor'); return await processDownloadTorrent(job.data); }); // Monitor download processor - this.queue.process('monitor_download', 5, async (job: BullJob) => { + this.queue.process('monitor_download', 2, async (job: BullJob) => { const { processMonitorDownload } = await import('../processors/monitor-download.processor'); return await processMonitorDownload(job.data); }); @@ -357,23 +359,23 @@ export class JobQueueService { }); // Send notification processor - this.queue.process('send_notification', 5, async (job: BullJob) => { + this.queue.process('send_notification', 2, async (job: BullJob) => { const { processSendNotification } = await import('../processors/send-notification.processor'); return await processSendNotification(job.data); }); // Ebook-specific processors - this.queue.process('search_ebook', 3, async (job: BullJob) => { + this.queue.process('search_ebook', 2, async (job: BullJob) => { const { processSearchEbook } = await import('../processors/search-ebook.processor'); return await processSearchEbook(job.data); }); - this.queue.process('start_direct_download', 3, async (job: BullJob) => { + this.queue.process('start_direct_download', 2, async (job: BullJob) => { const { processStartDirectDownload } = await import('../processors/direct-download.processor'); return await processStartDirectDownload(job.data); }); - this.queue.process('monitor_direct_download', 5, async (job: BullJob) => { + this.queue.process('monitor_direct_download', 2, async (job: BullJob) => { const { processMonitorDirectDownload } = await import('../processors/direct-download.processor'); return await processMonitorDirectDownload(job.data); }); @@ -563,7 +565,9 @@ export class JobQueueService { downloadHistoryId: string, downloadClientId: string, downloadClient: DownloadClientType, - delaySeconds: number = 0 + delaySeconds: number = 0, + lastProgress?: number, + stallCount?: number ): Promise { return await this.addJob( 'monitor_download', @@ -572,6 +576,8 @@ export class JobQueueService { downloadHistoryId, downloadClientId, downloadClient, + lastProgress, + stallCount, } as MonitorDownloadPayload, { priority: 5, // Medium priority diff --git a/src/lib/services/scheduler.service.ts b/src/lib/services/scheduler.service.ts index b95ae81..f64fd6a 100644 --- a/src/lib/services/scheduler.service.ts +++ b/src/lib/services/scheduler.service.ts @@ -491,6 +491,9 @@ export class SchedulerService { if (this.isJobOverdue(job)) { logger.info(`Job "${job.name}" is overdue, triggering now...`); await this.triggerJobNow(job.id); + + // Stagger triggers to avoid connection pool burst on startup + await new Promise(resolve => setTimeout(resolve, 500)); } } catch (error) { logger.error(`Failed to trigger overdue job "${job.name}"`, { error: error instanceof Error ? error.message : String(error) }); diff --git a/src/lib/utils/audiobook-matcher.ts b/src/lib/utils/audiobook-matcher.ts index d1f193d..ec86f45 100644 --- a/src/lib/utils/audiobook-matcher.ts +++ b/src/lib/utils/audiobook-matcher.ts @@ -163,7 +163,20 @@ export async function enrichAudiobooksWithMatches( audiobooks: Array>, userId?: string ) { - const results = await Promise.all(audiobooks.map((book) => enrichAudiobookWithMatch(book))); + // Batch parallel DB queries to avoid connection pool exhaustion + const BATCH_SIZE = 5; + const results: Awaited>[] = []; + for (let i = 0; i < audiobooks.length; i += BATCH_SIZE) { + const batch = audiobooks.slice(i, i + BATCH_SIZE); + const batchResults = await Promise.allSettled(batch.map((book) => enrichAudiobookWithMatch(book))); + for (const result of batchResults) { + if (result.status === 'fulfilled') { + results.push(result.value); + } else { + logger.error('Failed to enrich audiobook', { error: result.reason instanceof Error ? result.reason.message : String(result.reason) }); + } + } + } // Always enrich with request status (check ANY user's requests) const asins = audiobooks.map(book => book.asin); diff --git a/tests/processors/monitor-download.processor.test.ts b/tests/processors/monitor-download.processor.test.ts index 1fba0b9..fcdbc60 100644 --- a/tests/processors/monitor-download.processor.test.ts +++ b/tests/processors/monitor-download.processor.test.ts @@ -150,7 +150,9 @@ describe('processMonitorDownload', () => { 'dh-2', 'hash-2', 'qbittorrent', - 10 + 10, + 45, // progressPercent passed as lastProgress + 0, // stallCount reset (download is actively progressing) ); });