mirror of
https://github.com/kikootwo/ReadMeABook.git
synced 2026-06-03 12:50:09 +00:00
Add DB pooling, throttling and monitor backoff
Add connection pool params to DATABASE_URL and configure Prisma to use the pooled URL (connection_limit=20, pool_timeout=30) to reduce connection exhaustion. Introduce safeguards and throttling across processors: limit in-flight progress DB updates in direct-download, add short delays when processing RSS, retry-failed-imports, and retry-missing-torrents, and stagger scheduler triggers to avoid bursts. Implement adaptive monitor-download polling with stallCount/lastProgress and exponential backoff, and thread these fields through JobQueueService (including reduced worker concurrency for several queues). Batch audiobook enrichment queries to small parallel batches to limit DB load. Update tests to reflect new monitor payload parameters. Overall intent: reduce DB connection pool pressure and smooth load spikes during startup and heavy processing.
This commit is contained in:
@@ -5,6 +5,29 @@
|
|||||||
|
|
||||||
import { PrismaClient } from '@/generated/prisma/client';
|
import { PrismaClient } from '@/generated/prisma/client';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append connection pool parameters to DATABASE_URL if not already present.
|
||||||
|
* - connection_limit=20: up from default 9, fits 22 max workers + API routes
|
||||||
|
* - pool_timeout=30: up from default 10s, gives queued requests time
|
||||||
|
*/
|
||||||
|
function getPooledDatabaseUrl(): string {
|
||||||
|
const baseUrl = process.env.DATABASE_URL || '';
|
||||||
|
if (!baseUrl) return baseUrl;
|
||||||
|
|
||||||
|
const separator = baseUrl.includes('?') ? '&' : '?';
|
||||||
|
const params: string[] = [];
|
||||||
|
|
||||||
|
if (!baseUrl.includes('connection_limit')) {
|
||||||
|
params.push('connection_limit=20');
|
||||||
|
}
|
||||||
|
if (!baseUrl.includes('pool_timeout')) {
|
||||||
|
params.push('pool_timeout=30');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (params.length === 0) return baseUrl;
|
||||||
|
return `${baseUrl}${separator}${params.join('&')}`;
|
||||||
|
}
|
||||||
|
|
||||||
// Prevent multiple instances of Prisma Client in development
|
// Prevent multiple instances of Prisma Client in development
|
||||||
const globalForPrisma = globalThis as unknown as {
|
const globalForPrisma = globalThis as unknown as {
|
||||||
prisma: PrismaClient | undefined;
|
prisma: PrismaClient | undefined;
|
||||||
@@ -14,6 +37,11 @@ export const prisma =
|
|||||||
globalForPrisma.prisma ??
|
globalForPrisma.prisma ??
|
||||||
new PrismaClient({
|
new PrismaClient({
|
||||||
log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'],
|
log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'],
|
||||||
|
datasources: {
|
||||||
|
db: {
|
||||||
|
url: getPooledDatabaseUrl(),
|
||||||
|
},
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
if (process.env.NODE_ENV !== 'production') globalForPrisma.prisma = prisma;
|
if (process.env.NODE_ENV !== 'production') globalForPrisma.prisma = prisma;
|
||||||
|
|||||||
@@ -316,6 +316,7 @@ async function downloadFileWithProgress(
|
|||||||
let bytesDownloaded = 0;
|
let bytesDownloaded = 0;
|
||||||
let lastLogTime = Date.now();
|
let lastLogTime = Date.now();
|
||||||
let lastDbUpdateTime = Date.now();
|
let lastDbUpdateTime = Date.now();
|
||||||
|
let dbUpdatePending = false; // Guard against stacking unresolved DB updates
|
||||||
|
|
||||||
response.data.on('data', (chunk: Buffer) => {
|
response.data.on('data', (chunk: Buffer) => {
|
||||||
bytesDownloaded += chunk.length;
|
bytesDownloaded += chunk.length;
|
||||||
@@ -332,18 +333,18 @@ async function downloadFileWithProgress(
|
|||||||
logger.info(`Download progress: ${percent}% (${(bytesDownloaded / (1024 * 1024)).toFixed(1)} MB, ${speedMBps.toFixed(2)} MB/s)`);
|
logger.info(`Download progress: ${percent}% (${(bytesDownloaded / (1024 * 1024)).toFixed(1)} MB, ${speedMBps.toFixed(2)} MB/s)`);
|
||||||
lastLogTime = now;
|
lastLogTime = now;
|
||||||
|
|
||||||
// Update database with progress (non-blocking)
|
// Update database with progress (non-blocking, at most 1 in-flight at a time)
|
||||||
if (now - lastDbUpdateTime >= PROGRESS_UPDATE_INTERVAL_MS) {
|
if (now - lastDbUpdateTime >= PROGRESS_UPDATE_INTERVAL_MS && !dbUpdatePending) {
|
||||||
lastDbUpdateTime = now;
|
lastDbUpdateTime = now;
|
||||||
|
dbUpdatePending = true;
|
||||||
|
|
||||||
// Non-blocking update - fire and forget
|
|
||||||
prisma.request.update({
|
prisma.request.update({
|
||||||
where: { id: tracking.requestId },
|
where: { id: tracking.requestId },
|
||||||
data: {
|
data: {
|
||||||
progress: Math.min(percent, 99), // Cap at 99% until fully complete
|
progress: Math.min(percent, 99), // Cap at 99% until fully complete
|
||||||
updatedAt: new Date(),
|
updatedAt: new Date(),
|
||||||
},
|
},
|
||||||
}).catch(() => {}); // Ignore errors during progress update
|
}).catch(() => {}).finally(() => { dbUpdatePending = false; });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -16,8 +16,23 @@ import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download-
|
|||||||
* Checks download progress from download client and updates request status
|
* Checks download progress from download client and updates request status
|
||||||
* Re-schedules itself if download is still in progress
|
* Re-schedules itself if download is still in progress
|
||||||
*/
|
*/
|
||||||
|
/** Base polling interval in seconds */
|
||||||
|
const BASE_POLL_INTERVAL = 10;
|
||||||
|
/** Maximum polling interval in seconds (5 minutes) */
|
||||||
|
const MAX_POLL_INTERVAL = 300;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute next poll delay with exponential backoff for stalled downloads.
|
||||||
|
* Active downloads poll every 10s; stalled downloads back off up to 5 min.
|
||||||
|
*/
|
||||||
|
function getBackoffDelay(stallCount: number): number {
|
||||||
|
if (stallCount <= 0) return BASE_POLL_INTERVAL;
|
||||||
|
return Math.min(BASE_POLL_INTERVAL * Math.pow(2, stallCount), MAX_POLL_INTERVAL);
|
||||||
|
}
|
||||||
|
|
||||||
export async function processMonitorDownload(payload: MonitorDownloadPayload): Promise<any> {
|
export async function processMonitorDownload(payload: MonitorDownloadPayload): Promise<any> {
|
||||||
const { requestId, downloadHistoryId, downloadClientId, downloadClient, jobId } = payload;
|
const { requestId, downloadHistoryId, downloadClientId, downloadClient, jobId,
|
||||||
|
lastProgress: prevProgress, stallCount: prevStallCount } = payload;
|
||||||
|
|
||||||
const logger = RMABLogger.forJob(jobId, 'MonitorDownload');
|
const logger = RMABLogger.forJob(jobId, 'MonitorDownload');
|
||||||
|
|
||||||
@@ -199,22 +214,35 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
|||||||
progress: progressPercent,
|
progress: progressPercent,
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
// Still downloading - schedule another check in 10 seconds
|
// Still downloading — compute adaptive poll interval
|
||||||
|
const isStalled = info.downloadSpeed === 0
|
||||||
|
|| progressPercent === (prevProgress ?? -1)
|
||||||
|
|| progressState === 'paused'
|
||||||
|
|| progressState === 'queued'
|
||||||
|
|| progressState === 'checking';
|
||||||
|
|
||||||
|
const stallCount = isStalled ? (prevStallCount ?? 0) + 1 : 0;
|
||||||
|
const delay = getBackoffDelay(stallCount);
|
||||||
|
|
||||||
const jobQueue = getJobQueueService();
|
const jobQueue = getJobQueueService();
|
||||||
await jobQueue.addMonitorJob(
|
await jobQueue.addMonitorJob(
|
||||||
requestId,
|
requestId,
|
||||||
downloadHistoryId,
|
downloadHistoryId,
|
||||||
downloadClientId,
|
downloadClientId,
|
||||||
downloadClient,
|
downloadClient,
|
||||||
10 // Delay 10 seconds between checks
|
delay,
|
||||||
|
progressPercent,
|
||||||
|
stallCount
|
||||||
);
|
);
|
||||||
|
|
||||||
// Only log every 5% progress to reduce log spam
|
// Only log every 5% progress to reduce log spam, but always log stall transitions
|
||||||
const shouldLog = progressPercent % 5 === 0 || progressPercent < 5;
|
const shouldLog = progressPercent % 5 === 0 || progressPercent < 5
|
||||||
|
|| (stallCount === 1) || (stallCount > 0 && stallCount % 10 === 0);
|
||||||
if (shouldLog) {
|
if (shouldLog) {
|
||||||
logger.info(`Request ${requestId}: ${progressPercent}% complete (${progressState})`, {
|
logger.info(`Request ${requestId}: ${progressPercent}% complete (${progressState})`, {
|
||||||
speed: info.downloadSpeed,
|
speed: info.downloadSpeed,
|
||||||
eta: info.eta,
|
eta: info.eta,
|
||||||
|
...(stallCount > 0 && { stallCount, nextPollSec: delay }),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -227,6 +255,8 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
|||||||
speed: info.downloadSpeed,
|
speed: info.downloadSpeed,
|
||||||
eta: info.eta,
|
eta: info.eta,
|
||||||
state: progressState,
|
state: progressState,
|
||||||
|
stallCount,
|
||||||
|
nextPollSec: delay,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|||||||
@@ -124,6 +124,9 @@ export async function processMonitorRssFeeds(payload: MonitorRssFeedsPayload): P
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Spread DB operations over time to avoid connection pool exhaustion
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(`RSS monitoring complete: ${matched} matches found and queued for processing`);
|
logger.info(`RSS monitoring complete: ${matched} matches found and queued for processing`);
|
||||||
|
|||||||
@@ -157,6 +157,9 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo
|
|||||||
);
|
);
|
||||||
triggered++;
|
triggered++;
|
||||||
logger.info(`Triggered organize job for ${request.type || 'audiobook'} request ${request.id}: ${request.audiobook.title}`);
|
logger.info(`Triggered organize job for ${request.type || 'audiobook'} request ${request.id}: ${request.audiobook.title}`);
|
||||||
|
|
||||||
|
// Spread DB operations over time to avoid connection pool exhaustion
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(`Failed to trigger organize for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
logger.error(`Failed to trigger organize for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||||
skipped++;
|
skipped++;
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ export async function processRetryMissingTorrents(payload: RetryMissingTorrentsP
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Trigger appropriate search job for each request based on type
|
// Trigger appropriate search job for each request based on type
|
||||||
|
// Throttle: 100ms delay between jobs to avoid connection pool burst
|
||||||
const jobQueue = getJobQueueService();
|
const jobQueue = getJobQueueService();
|
||||||
let triggered = 0;
|
let triggered = 0;
|
||||||
|
|
||||||
@@ -73,6 +74,9 @@ export async function processRetryMissingTorrents(payload: RetryMissingTorrentsP
|
|||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(`Failed to trigger search for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
logger.error(`Failed to trigger search for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Spread DB operations over time to avoid connection pool exhaustion
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(`Triggered ${triggered}/${requests.length} search jobs`);
|
logger.info(`Triggered ${triggered}/${requests.length} search jobs`);
|
||||||
|
|||||||
@@ -63,6 +63,8 @@ export interface MonitorDownloadPayload extends JobPayload {
|
|||||||
downloadHistoryId: string;
|
downloadHistoryId: string;
|
||||||
downloadClientId: string;
|
downloadClientId: string;
|
||||||
downloadClient: DownloadClientType;
|
downloadClient: DownloadClientType;
|
||||||
|
lastProgress?: number; // Previous poll's progress (0-100) for stall detection
|
||||||
|
stallCount?: number; // Consecutive polls with no progress change (drives backoff)
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface OrganizeFilesPayload extends JobPayload {
|
export interface OrganizeFilesPayload extends JobPayload {
|
||||||
@@ -277,19 +279,19 @@ export class JobQueueService {
|
|||||||
*/
|
*/
|
||||||
private startProcessors(): void {
|
private startProcessors(): void {
|
||||||
// Search indexers processor
|
// Search indexers processor
|
||||||
this.queue.process('search_indexers', 3, async (job: BullJob<SearchIndexersPayload>) => {
|
this.queue.process('search_indexers', 2, async (job: BullJob<SearchIndexersPayload>) => {
|
||||||
const { processSearchIndexers } = await import('../processors/search-indexers.processor');
|
const { processSearchIndexers } = await import('../processors/search-indexers.processor');
|
||||||
return await processSearchIndexers(job.data);
|
return await processSearchIndexers(job.data);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Download torrent processor
|
// Download torrent processor
|
||||||
this.queue.process('download_torrent', 3, async (job: BullJob<DownloadTorrentPayload>) => {
|
this.queue.process('download_torrent', 2, async (job: BullJob<DownloadTorrentPayload>) => {
|
||||||
const { processDownloadTorrent } = await import('../processors/download-torrent.processor');
|
const { processDownloadTorrent } = await import('../processors/download-torrent.processor');
|
||||||
return await processDownloadTorrent(job.data);
|
return await processDownloadTorrent(job.data);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Monitor download processor
|
// Monitor download processor
|
||||||
this.queue.process('monitor_download', 5, async (job: BullJob<MonitorDownloadPayload>) => {
|
this.queue.process('monitor_download', 2, async (job: BullJob<MonitorDownloadPayload>) => {
|
||||||
const { processMonitorDownload } = await import('../processors/monitor-download.processor');
|
const { processMonitorDownload } = await import('../processors/monitor-download.processor');
|
||||||
return await processMonitorDownload(job.data);
|
return await processMonitorDownload(job.data);
|
||||||
});
|
});
|
||||||
@@ -357,23 +359,23 @@ export class JobQueueService {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Send notification processor
|
// Send notification processor
|
||||||
this.queue.process('send_notification', 5, async (job: BullJob<SendNotificationPayload>) => {
|
this.queue.process('send_notification', 2, async (job: BullJob<SendNotificationPayload>) => {
|
||||||
const { processSendNotification } = await import('../processors/send-notification.processor');
|
const { processSendNotification } = await import('../processors/send-notification.processor');
|
||||||
return await processSendNotification(job.data);
|
return await processSendNotification(job.data);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Ebook-specific processors
|
// Ebook-specific processors
|
||||||
this.queue.process('search_ebook', 3, async (job: BullJob<SearchEbookPayload>) => {
|
this.queue.process('search_ebook', 2, async (job: BullJob<SearchEbookPayload>) => {
|
||||||
const { processSearchEbook } = await import('../processors/search-ebook.processor');
|
const { processSearchEbook } = await import('../processors/search-ebook.processor');
|
||||||
return await processSearchEbook(job.data);
|
return await processSearchEbook(job.data);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.queue.process('start_direct_download', 3, async (job: BullJob<StartDirectDownloadPayload>) => {
|
this.queue.process('start_direct_download', 2, async (job: BullJob<StartDirectDownloadPayload>) => {
|
||||||
const { processStartDirectDownload } = await import('../processors/direct-download.processor');
|
const { processStartDirectDownload } = await import('../processors/direct-download.processor');
|
||||||
return await processStartDirectDownload(job.data);
|
return await processStartDirectDownload(job.data);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.queue.process('monitor_direct_download', 5, async (job: BullJob<MonitorDirectDownloadPayload>) => {
|
this.queue.process('monitor_direct_download', 2, async (job: BullJob<MonitorDirectDownloadPayload>) => {
|
||||||
const { processMonitorDirectDownload } = await import('../processors/direct-download.processor');
|
const { processMonitorDirectDownload } = await import('../processors/direct-download.processor');
|
||||||
return await processMonitorDirectDownload(job.data);
|
return await processMonitorDirectDownload(job.data);
|
||||||
});
|
});
|
||||||
@@ -563,7 +565,9 @@ export class JobQueueService {
|
|||||||
downloadHistoryId: string,
|
downloadHistoryId: string,
|
||||||
downloadClientId: string,
|
downloadClientId: string,
|
||||||
downloadClient: DownloadClientType,
|
downloadClient: DownloadClientType,
|
||||||
delaySeconds: number = 0
|
delaySeconds: number = 0,
|
||||||
|
lastProgress?: number,
|
||||||
|
stallCount?: number
|
||||||
): Promise<string> {
|
): Promise<string> {
|
||||||
return await this.addJob(
|
return await this.addJob(
|
||||||
'monitor_download',
|
'monitor_download',
|
||||||
@@ -572,6 +576,8 @@ export class JobQueueService {
|
|||||||
downloadHistoryId,
|
downloadHistoryId,
|
||||||
downloadClientId,
|
downloadClientId,
|
||||||
downloadClient,
|
downloadClient,
|
||||||
|
lastProgress,
|
||||||
|
stallCount,
|
||||||
} as MonitorDownloadPayload,
|
} as MonitorDownloadPayload,
|
||||||
{
|
{
|
||||||
priority: 5, // Medium priority
|
priority: 5, // Medium priority
|
||||||
|
|||||||
@@ -491,6 +491,9 @@ export class SchedulerService {
|
|||||||
if (this.isJobOverdue(job)) {
|
if (this.isJobOverdue(job)) {
|
||||||
logger.info(`Job "${job.name}" is overdue, triggering now...`);
|
logger.info(`Job "${job.name}" is overdue, triggering now...`);
|
||||||
await this.triggerJobNow(job.id);
|
await this.triggerJobNow(job.id);
|
||||||
|
|
||||||
|
// Stagger triggers to avoid connection pool burst on startup
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 500));
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(`Failed to trigger overdue job "${job.name}"`, { error: error instanceof Error ? error.message : String(error) });
|
logger.error(`Failed to trigger overdue job "${job.name}"`, { error: error instanceof Error ? error.message : String(error) });
|
||||||
|
|||||||
@@ -163,7 +163,20 @@ export async function enrichAudiobooksWithMatches(
|
|||||||
audiobooks: Array<AudiobookMatchInput & Record<string, any>>,
|
audiobooks: Array<AudiobookMatchInput & Record<string, any>>,
|
||||||
userId?: string
|
userId?: string
|
||||||
) {
|
) {
|
||||||
const results = await Promise.all(audiobooks.map((book) => enrichAudiobookWithMatch(book)));
|
// Batch parallel DB queries to avoid connection pool exhaustion
|
||||||
|
const BATCH_SIZE = 5;
|
||||||
|
const results: Awaited<ReturnType<typeof enrichAudiobookWithMatch>>[] = [];
|
||||||
|
for (let i = 0; i < audiobooks.length; i += BATCH_SIZE) {
|
||||||
|
const batch = audiobooks.slice(i, i + BATCH_SIZE);
|
||||||
|
const batchResults = await Promise.allSettled(batch.map((book) => enrichAudiobookWithMatch(book)));
|
||||||
|
for (const result of batchResults) {
|
||||||
|
if (result.status === 'fulfilled') {
|
||||||
|
results.push(result.value);
|
||||||
|
} else {
|
||||||
|
logger.error('Failed to enrich audiobook', { error: result.reason instanceof Error ? result.reason.message : String(result.reason) });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Always enrich with request status (check ANY user's requests)
|
// Always enrich with request status (check ANY user's requests)
|
||||||
const asins = audiobooks.map(book => book.asin);
|
const asins = audiobooks.map(book => book.asin);
|
||||||
|
|||||||
@@ -150,7 +150,9 @@ describe('processMonitorDownload', () => {
|
|||||||
'dh-2',
|
'dh-2',
|
||||||
'hash-2',
|
'hash-2',
|
||||||
'qbittorrent',
|
'qbittorrent',
|
||||||
10
|
10,
|
||||||
|
45, // progressPercent passed as lastProgress
|
||||||
|
0, // stallCount reset (download is actively progressing)
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user