mirror of
https://github.com/kikootwo/ReadMeABook.git
synced 2026-06-03 04:40:09 +00:00
Add Transmission/NZBGet and per-client paths and much more
Extend multi-download-client support to include Transmission and NZBGet and introduce per-client custom download paths. Adds protocol mapping and new client types, Transmission/NZBGet integration services, API CRUD and validation changes, UI components/modal updates and live path previews, and manager routing by protocol. Includes DB migrations (download_path on download_history, interactive_search_access on users), schema updates, and related processor/service fixes and tests to ensure backward compatibility and proper path resolution.
This commit is contained in:
@@ -2,11 +2,13 @@
|
||||
* Component: Cleanup Seeded Torrents Processor
|
||||
* Documentation: documentation/backend/services/scheduler.md
|
||||
*
|
||||
* Cleans up torrents that have met their seeding requirements
|
||||
* Cleans up downloads that have met their seeding requirements.
|
||||
* Uses the IDownloadClient interface for client-agnostic operation.
|
||||
*/
|
||||
|
||||
import { prisma } from '../db';
|
||||
import { RMABLogger } from '../utils/logger';
|
||||
import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download-client.interface';
|
||||
|
||||
export interface CleanupSeededTorrentsPayload {
|
||||
jobId?: string;
|
||||
@@ -22,7 +24,9 @@ export async function processCleanupSeededTorrents(payload: CleanupSeededTorrent
|
||||
try {
|
||||
// Get indexer configuration with per-indexer seeding times
|
||||
const { getConfigService } = await import('../services/config.service');
|
||||
const { getDownloadClientManager } = await import('../services/download-client-manager.service');
|
||||
const configService = getConfigService();
|
||||
const manager = getDownloadClientManager(configService);
|
||||
const indexersConfigStr = await configService.get('prowlarr_indexers');
|
||||
|
||||
if (!indexersConfigStr) {
|
||||
@@ -44,22 +48,28 @@ export async function processCleanupSeededTorrents(payload: CleanupSeededTorrent
|
||||
|
||||
logger.info(`Loaded configuration for ${indexerConfigMap.size} indexers`);
|
||||
|
||||
// Find all completed audiobook requests + soft-deleted audiobook requests (orphaned downloads)
|
||||
// Find all completed requests + soft-deleted requests (orphaned downloads)
|
||||
// IMPORTANT: Only cleanup requests that are truly complete and not being actively processed
|
||||
// NOTE: Multiple requests can share the same torrent hash (e.g., re-requesting same audiobook)
|
||||
// Before deleting torrent, we check if other active requests are using it
|
||||
// NOTE: Ebook requests use direct HTTP downloads (no torrent seeding), so they're excluded
|
||||
// NOTE: Ebooks downloaded via indexer search use torrent clients and need seeding cleanup too.
|
||||
// Direct HTTP ebook downloads are naturally skipped (no torrent hash / unknown client type).
|
||||
const completedRequests = await prisma.request.findMany({
|
||||
where: {
|
||||
type: 'audiobook', // Only audiobook requests (ebooks don't have torrents to seed)
|
||||
OR: [
|
||||
// Active requests that are fully available (scanned by Plex/ABS)
|
||||
// Audiobook requests that are fully available (matched in Plex/ABS)
|
||||
{
|
||||
type: 'audiobook',
|
||||
status: 'available',
|
||||
deletedAt: null,
|
||||
},
|
||||
// Soft-deleted requests (orphaned downloads)
|
||||
// We'll check if torrent is shared with active requests before deletion
|
||||
// Ebook requests that are fully downloaded (terminal state for ebooks)
|
||||
{
|
||||
type: 'ebook',
|
||||
status: 'downloaded',
|
||||
deletedAt: null,
|
||||
},
|
||||
// Soft-deleted requests of any type (orphaned downloads)
|
||||
{
|
||||
deletedAt: { not: null },
|
||||
},
|
||||
@@ -78,11 +88,12 @@ export async function processCleanupSeededTorrents(payload: CleanupSeededTorrent
|
||||
take: 100, // Limit to 100 requests per run
|
||||
});
|
||||
|
||||
logger.info(`Found ${completedRequests.length} requests to check (status: 'available' or soft-deleted)`);
|
||||
logger.info(`Found ${completedRequests.length} requests to check (audiobook: available, ebook: downloaded, or soft-deleted)`);
|
||||
|
||||
let cleaned = 0;
|
||||
let skipped = 0;
|
||||
let noConfig = 0;
|
||||
const deletedHashes = new Set<string>(); // Track torrents already deleted this run
|
||||
|
||||
for (const request of completedRequests) {
|
||||
try {
|
||||
@@ -92,18 +103,27 @@ export async function processCleanupSeededTorrents(payload: CleanupSeededTorrent
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip SABnzbd downloads - Usenet doesn't have seeding concept
|
||||
// Skip Usenet downloads - no seeding concept
|
||||
if (downloadHistory.nzbId && !downloadHistory.torrentHash) {
|
||||
// For soft-deleted SABnzbd requests, hard delete immediately (no seeding needed)
|
||||
// For soft-deleted Usenet requests, hard delete immediately (no seeding needed)
|
||||
if (request.deletedAt) {
|
||||
await prisma.request.delete({ where: { id: request.id } });
|
||||
logger.info(`Hard-deleted orphaned SABnzbd request ${request.id}`);
|
||||
logger.info(`Hard-deleted orphaned Usenet request ${request.id}`);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only process torrent downloads
|
||||
if (!downloadHistory.torrentHash) {
|
||||
// Only process downloads that have a client ID
|
||||
if (!downloadHistory.downloadClientId && !downloadHistory.torrentHash) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Determine the download client ID and protocol
|
||||
const clientId = downloadHistory.downloadClientId || downloadHistory.torrentHash!;
|
||||
const clientType = downloadHistory.downloadClient || 'qbittorrent';
|
||||
const protocol = CLIENT_PROTOCOL_MAP[clientType as DownloadClientType];
|
||||
if (!protocol) {
|
||||
logger.warn(`Unknown download client type: ${clientType}, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -126,20 +146,40 @@ export async function processCleanupSeededTorrents(payload: CleanupSeededTorrent
|
||||
|
||||
const seedingTimeSeconds = seedingConfig.seedingTimeMinutes * 60;
|
||||
|
||||
// Get torrent info from qBittorrent to check seeding time
|
||||
const { getQBittorrentService } = await import('../integrations/qbittorrent.service');
|
||||
const qbt = await getQBittorrentService();
|
||||
// Skip if this torrent was already deleted earlier in this run
|
||||
if (deletedHashes.has(clientId.toLowerCase())) {
|
||||
if (request.deletedAt) {
|
||||
await prisma.request.delete({ where: { id: request.id } });
|
||||
logger.info(`Hard-deleted orphaned request ${request.id} (torrent already cleaned this run)`);
|
||||
}
|
||||
cleaned++;
|
||||
continue;
|
||||
}
|
||||
|
||||
let torrent;
|
||||
// Get download info from the appropriate client via the interface
|
||||
const client = await manager.getClientServiceForProtocol(protocol as 'torrent' | 'usenet');
|
||||
|
||||
if (!client) {
|
||||
logger.warn(`No ${clientType} client configured, skipping request ${request.id}`);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
let downloadInfo;
|
||||
try {
|
||||
torrent = await qbt.getTorrent(downloadHistory.torrentHash);
|
||||
downloadInfo = await client.getDownload(clientId);
|
||||
} catch (error) {
|
||||
// Torrent might already be deleted, skip
|
||||
// Download not found in client (already removed), skip
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!downloadInfo) {
|
||||
// Download not found in client (already removed)
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if seeding time requirement is met
|
||||
const actualSeedingTime = torrent.seeding_time || 0;
|
||||
const actualSeedingTime = downloadInfo.seedingTime || 0;
|
||||
const hasMetRequirement = actualSeedingTime >= seedingTimeSeconds;
|
||||
|
||||
if (!hasMetRequirement) {
|
||||
@@ -148,47 +188,49 @@ export async function processCleanupSeededTorrents(payload: CleanupSeededTorrent
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.info(`Torrent ${torrent.name} (${indexerName}) has met seeding requirement (${Math.floor(actualSeedingTime / 60)}/${seedingConfig.seedingTimeMinutes} minutes)`);
|
||||
logger.info(`Download ${downloadInfo.name} (${indexerName}) has met seeding requirement (${Math.floor(actualSeedingTime / 60)}/${seedingConfig.seedingTimeMinutes} minutes)`);
|
||||
|
||||
// CRITICAL: Check if any other active (non-deleted) audiobook request is using this same torrent hash
|
||||
// This prevents deleting shared torrents when user re-requests the same audiobook
|
||||
const otherActiveRequests = await prisma.request.findMany({
|
||||
where: {
|
||||
id: { not: request.id }, // Exclude current request
|
||||
type: 'audiobook', // Only check audiobook requests
|
||||
deletedAt: null, // Only check active requests
|
||||
downloadHistory: {
|
||||
some: {
|
||||
torrentHash: downloadHistory.torrentHash,
|
||||
selected: true,
|
||||
// CRITICAL: Check if any other active (non-deleted) request is using this same download
|
||||
const hashToCheck = downloadHistory.torrentHash;
|
||||
if (hashToCheck) {
|
||||
const otherActiveRequests = await prisma.request.findMany({
|
||||
where: {
|
||||
id: { not: request.id }, // Exclude current request
|
||||
deletedAt: null, // Only check active requests
|
||||
downloadHistory: {
|
||||
some: {
|
||||
torrentHash: hashToCheck,
|
||||
selected: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
select: { id: true, status: true },
|
||||
});
|
||||
select: { id: true, status: true },
|
||||
});
|
||||
|
||||
if (otherActiveRequests.length > 0) {
|
||||
logger.info(`Skipping torrent deletion - ${otherActiveRequests.length} other active request(s) still using this torrent (IDs: ${otherActiveRequests.map(r => r.id).join(', ')})`);
|
||||
if (otherActiveRequests.length > 0) {
|
||||
logger.info(`Skipping download deletion - ${otherActiveRequests.length} other active request(s) still using this download (IDs: ${otherActiveRequests.map(r => r.id).join(', ')})`);
|
||||
|
||||
// If this is a soft-deleted request, hard delete it but DON'T delete the torrent
|
||||
if (request.deletedAt) {
|
||||
await prisma.request.delete({ where: { id: request.id } });
|
||||
logger.info(`Hard-deleted orphaned request ${request.id} (kept shared torrent for active requests)`);
|
||||
// If this is a soft-deleted request, hard delete it but DON'T delete the download
|
||||
if (request.deletedAt) {
|
||||
await prisma.request.delete({ where: { id: request.id } });
|
||||
logger.info(`Hard-deleted orphaned request ${request.id} (kept shared download for active requests)`);
|
||||
}
|
||||
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Safe to delete - no other active requests using this torrent
|
||||
await qbt.deleteTorrent(downloadHistory.torrentHash, true); // true = delete files
|
||||
// Safe to delete - no other active requests using this download
|
||||
await client.deleteDownload(clientId, true); // true = delete files
|
||||
deletedHashes.add(clientId.toLowerCase());
|
||||
|
||||
// If this is a soft-deleted request (orphaned download), hard delete it now
|
||||
if (request.deletedAt) {
|
||||
await prisma.request.delete({ where: { id: request.id } });
|
||||
logger.info(`Hard-deleted orphaned request ${request.id} after torrent cleanup`);
|
||||
logger.info(`Hard-deleted orphaned request ${request.id} after download cleanup`);
|
||||
} else {
|
||||
logger.info(`Deleted torrent and files for active request ${request.id}`);
|
||||
logger.info(`Deleted download and files for active request ${request.id}`);
|
||||
}
|
||||
|
||||
cleaned++;
|
||||
@@ -197,7 +239,7 @@ export async function processCleanupSeededTorrents(payload: CleanupSeededTorrent
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`Cleanup complete: ${cleaned} torrents cleaned, ${skipped} still seeding, ${noConfig} unlimited`);
|
||||
logger.info(`Cleanup complete: ${cleaned} downloads cleaned, ${skipped} still seeding, ${noConfig} unlimited`);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
|
||||
@@ -78,7 +78,7 @@ export async function processStartDirectDownload(payload: StartDirectDownloadPay
|
||||
|
||||
// Get download configuration
|
||||
const configService = getConfigService();
|
||||
const downloadsDir = await configService.get('downloads_dir') || '/downloads';
|
||||
const downloadsDir = await configService.get('download_dir') || '/downloads';
|
||||
const baseUrl = await configService.get('ebook_sidecar_base_url') || 'https://annas-archive.li';
|
||||
const preferredFormat = await configService.get('ebook_sidecar_preferred_format') || 'epub';
|
||||
const flaresolverrUrl = await configService.get('ebook_sidecar_flaresolverr_url') || undefined;
|
||||
|
||||
@@ -5,8 +5,6 @@
|
||||
|
||||
import { DownloadTorrentPayload, getJobQueueService } from '../services/job-queue.service';
|
||||
import { prisma } from '../db';
|
||||
import { getQBittorrentService } from '../integrations/qbittorrent.service';
|
||||
import { getSABnzbdService } from '../integrations/sabnzbd.service';
|
||||
import { getConfigService } from '../services/config.service';
|
||||
import { getDownloadClientManager } from '../services/download-client-manager.service';
|
||||
import { ProwlarrService } from '../integrations/prowlarr.service';
|
||||
@@ -14,7 +12,7 @@ import { RMABLogger } from '../utils/logger';
|
||||
|
||||
/**
|
||||
* Process download job
|
||||
* Routes to appropriate download client based on configuration
|
||||
* Routes to appropriate download client based on protocol detection
|
||||
* Adds selected result to download client and starts monitoring
|
||||
*/
|
||||
export async function processDownloadTorrent(payload: DownloadTorrentPayload): Promise<any> {
|
||||
@@ -41,151 +39,85 @@ export async function processDownloadTorrent(payload: DownloadTorrentPayload): P
|
||||
},
|
||||
});
|
||||
|
||||
// Detect protocol from result and route to appropriate client
|
||||
// Detect protocol from result and get appropriate client
|
||||
const isUsenet = ProwlarrService.isNZBResult(torrent);
|
||||
const protocol = isUsenet ? 'usenet' : 'torrent';
|
||||
const config = await getConfigService();
|
||||
const manager = getDownloadClientManager(config);
|
||||
|
||||
const clientConfig = await manager.getClientForProtocol(isUsenet ? 'usenet' : 'torrent');
|
||||
const client = await manager.getClientServiceForProtocol(protocol);
|
||||
|
||||
if (!clientConfig) {
|
||||
const protocol = isUsenet ? 'Usenet (SABnzbd)' : 'Torrent (qBittorrent)';
|
||||
throw new Error(`No ${protocol} client configured`);
|
||||
if (!client) {
|
||||
throw new Error(`No ${protocol} download client configured. Please add a ${protocol} client in Settings > Download Clients.`);
|
||||
}
|
||||
|
||||
let downloadClientId: string;
|
||||
let downloadClient: 'qbittorrent' | 'sabnzbd';
|
||||
// Get client config for category
|
||||
const clientConfig = await manager.getClientForProtocol(protocol);
|
||||
const category = clientConfig?.category || 'readmeabook';
|
||||
|
||||
if (isUsenet) {
|
||||
// Route to SABnzbd
|
||||
logger.info(`Routing to SABnzbd`);
|
||||
logger.info(`Routing to ${client.clientType} (${client.protocol})`);
|
||||
|
||||
const sabnzbd = await getSABnzbdService();
|
||||
downloadClientId = await sabnzbd.addNZB(torrent.downloadUrl, {
|
||||
category: clientConfig.category || 'readmeabook',
|
||||
priority: 'normal',
|
||||
});
|
||||
downloadClient = 'sabnzbd';
|
||||
// Add download via unified interface
|
||||
const downloadClientId = await client.addDownload(torrent.downloadUrl, {
|
||||
category,
|
||||
priority: 'normal',
|
||||
});
|
||||
|
||||
logger.info(`NZB added with ID: ${downloadClientId}`);
|
||||
logger.info(`Download added with ID: ${downloadClientId}`);
|
||||
|
||||
// Create DownloadHistory record
|
||||
// Determine indexer page URL - exclude magnet links from guid fallback
|
||||
const indexerPageUrl = torrent.infoUrl || (torrent.guid?.startsWith('magnet:') ? null : torrent.guid);
|
||||
// Create DownloadHistory record
|
||||
// Determine indexer page URL - exclude magnet links from guid fallback
|
||||
const indexerPageUrl = torrent.infoUrl || (torrent.guid?.startsWith('magnet:') ? null : torrent.guid);
|
||||
|
||||
const downloadHistory = await prisma.downloadHistory.create({
|
||||
data: {
|
||||
requestId,
|
||||
indexerName: torrent.indexer,
|
||||
indexerId: torrent.indexerId, // Store indexer ID for configuration lookup
|
||||
downloadClient: 'sabnzbd',
|
||||
downloadClientId,
|
||||
torrentName: torrent.title,
|
||||
nzbId: downloadClientId, // Store NZB ID
|
||||
torrentSizeBytes: torrent.size,
|
||||
torrentUrl: indexerPageUrl, // Indexer page URL (only if available and not a magnet/download link)
|
||||
magnetLink: torrent.downloadUrl, // Download URL (.nzb file)
|
||||
seeders: torrent.seeders || 0, // Usenet doesn't have seeders, but include for consistency
|
||||
leechers: 0,
|
||||
downloadStatus: 'downloading',
|
||||
selected: true,
|
||||
startedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
logger.info(`Created download history record: ${downloadHistory.id}`);
|
||||
|
||||
// Trigger monitor download job with initial delay
|
||||
const jobQueue = getJobQueueService();
|
||||
await jobQueue.addMonitorJob(
|
||||
const downloadHistory = await prisma.downloadHistory.create({
|
||||
data: {
|
||||
requestId,
|
||||
downloadHistory.id,
|
||||
indexerName: torrent.indexer,
|
||||
indexerId: torrent.indexerId,
|
||||
downloadClient: client.clientType,
|
||||
downloadClientId,
|
||||
'sabnzbd',
|
||||
3 // Wait 3 seconds before first check
|
||||
);
|
||||
torrentName: torrent.title,
|
||||
// Set protocol-specific ID fields for backward compatibility
|
||||
torrentHash: client.protocol === 'torrent' ? (torrent.infoHash || downloadClientId) : undefined,
|
||||
nzbId: client.protocol === 'usenet' ? downloadClientId : undefined,
|
||||
torrentSizeBytes: torrent.size,
|
||||
torrentUrl: indexerPageUrl,
|
||||
magnetLink: torrent.downloadUrl,
|
||||
seeders: torrent.seeders || 0,
|
||||
leechers: torrent.leechers || 0,
|
||||
downloadStatus: 'downloading',
|
||||
selected: true,
|
||||
startedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
logger.info(`Started monitoring job for request ${requestId} (SABnzbd, 3s initial delay)`);
|
||||
logger.info(`Created download history record: ${downloadHistory.id}`);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: 'NZB added to SABnzbd and monitoring started',
|
||||
requestId,
|
||||
downloadHistoryId: downloadHistory.id,
|
||||
nzbId: downloadClientId,
|
||||
torrent: {
|
||||
title: torrent.title,
|
||||
size: torrent.size,
|
||||
format: torrent.format,
|
||||
},
|
||||
};
|
||||
} else {
|
||||
// Route to qBittorrent (default)
|
||||
logger.info(`Routing to qBittorrent`);
|
||||
// Trigger monitor download job with initial delay
|
||||
const jobQueue = getJobQueueService();
|
||||
await jobQueue.addMonitorJob(
|
||||
requestId,
|
||||
downloadHistory.id,
|
||||
downloadClientId,
|
||||
client.clientType,
|
||||
3 // Wait 3 seconds before first check
|
||||
);
|
||||
|
||||
const qbt = await getQBittorrentService();
|
||||
downloadClientId = await qbt.addTorrent(torrent.downloadUrl, {
|
||||
category: clientConfig.category || 'readmeabook',
|
||||
tags: ['audiobook'],
|
||||
sequentialDownload: true,
|
||||
paused: false,
|
||||
});
|
||||
downloadClient = 'qbittorrent';
|
||||
logger.info(`Started monitoring job for request ${requestId} (${client.clientType}, 3s initial delay)`);
|
||||
|
||||
logger.info(`Torrent added with hash: ${downloadClientId}`);
|
||||
|
||||
// Create DownloadHistory record
|
||||
// Determine indexer page URL - exclude magnet links from guid fallback
|
||||
const indexerPageUrl = torrent.infoUrl || (torrent.guid?.startsWith('magnet:') ? null : torrent.guid);
|
||||
|
||||
const downloadHistory = await prisma.downloadHistory.create({
|
||||
data: {
|
||||
requestId,
|
||||
indexerName: torrent.indexer,
|
||||
indexerId: torrent.indexerId, // Store indexer ID for configuration lookup
|
||||
downloadClient: 'qbittorrent',
|
||||
downloadClientId,
|
||||
torrentName: torrent.title,
|
||||
torrentHash: torrent.infoHash || downloadClientId, // Store torrent hash
|
||||
torrentSizeBytes: torrent.size,
|
||||
torrentUrl: indexerPageUrl, // Indexer page URL (only if available and not a magnet/download link)
|
||||
magnetLink: torrent.downloadUrl,
|
||||
seeders: torrent.seeders || 0,
|
||||
leechers: torrent.leechers || 0,
|
||||
downloadStatus: 'downloading',
|
||||
selected: true,
|
||||
startedAt: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
logger.info(`Created download history record: ${downloadHistory.id}`);
|
||||
|
||||
// Trigger monitor download job with initial delay
|
||||
const jobQueue = getJobQueueService();
|
||||
await jobQueue.addMonitorJob(
|
||||
requestId,
|
||||
downloadHistory.id,
|
||||
downloadClientId,
|
||||
'qbittorrent',
|
||||
3 // Wait 3 seconds before first check to avoid race condition
|
||||
);
|
||||
|
||||
logger.info(`Started monitoring job for request ${requestId} (qBittorrent, 3s initial delay)`);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: 'Torrent added to qBittorrent and monitoring started',
|
||||
requestId,
|
||||
downloadHistoryId: downloadHistory.id,
|
||||
torrentHash: downloadClientId,
|
||||
torrent: {
|
||||
title: torrent.title,
|
||||
size: torrent.size,
|
||||
seeders: torrent.seeders || 0,
|
||||
format: torrent.format,
|
||||
},
|
||||
};
|
||||
}
|
||||
return {
|
||||
success: true,
|
||||
message: `Download added to ${client.clientType} and monitoring started`,
|
||||
requestId,
|
||||
downloadHistoryId: downloadHistory.id,
|
||||
downloadClientId,
|
||||
torrent: {
|
||||
title: torrent.title,
|
||||
size: torrent.size,
|
||||
seeders: torrent.seeders || 0,
|
||||
format: torrent.format,
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error(`Error: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||
|
||||
|
||||
@@ -3,50 +3,13 @@
|
||||
* Documentation: documentation/phase3/README.md
|
||||
*/
|
||||
|
||||
import path from 'path';
|
||||
import { MonitorDownloadPayload, getJobQueueService } from '../services/job-queue.service';
|
||||
import { prisma } from '../db';
|
||||
import { getQBittorrentService } from '../integrations/qbittorrent.service';
|
||||
import { RMABLogger } from '../utils/logger';
|
||||
import { PathMapper, PathMappingConfig } from '../utils/path-mapper';
|
||||
import { getConfigService } from '../services/config.service';
|
||||
import { getDownloadClientManager } from '../services/download-client-manager.service';
|
||||
|
||||
/**
|
||||
* Helper function to retry getTorrent with exponential backoff
|
||||
* Handles race condition where torrent isn't immediately available after adding
|
||||
*/
|
||||
async function getTorrentWithRetry(
|
||||
qbt: any,
|
||||
hash: string,
|
||||
logger: RMABLogger,
|
||||
maxRetries: number = 3,
|
||||
initialDelayMs: number = 500
|
||||
): Promise<any> {
|
||||
let lastError: Error | null = null;
|
||||
|
||||
for (let attempt = 0; attempt < maxRetries; attempt++) {
|
||||
try {
|
||||
return await qbt.getTorrent(hash);
|
||||
} catch (error) {
|
||||
lastError = error as Error;
|
||||
|
||||
// If this is the last attempt, throw the error
|
||||
if (attempt === maxRetries - 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Exponential backoff: 500ms, 1000ms, 2000ms
|
||||
const delayMs = initialDelayMs * Math.pow(2, attempt);
|
||||
logger.warn(`Torrent ${hash} not found, retrying in ${delayMs}ms (attempt ${attempt + 1}/${maxRetries})`);
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, delayMs));
|
||||
}
|
||||
}
|
||||
|
||||
// All retries failed
|
||||
throw lastError || new Error('Failed to get torrent after retries');
|
||||
}
|
||||
import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download-client.interface';
|
||||
|
||||
/**
|
||||
* Process monitor download job
|
||||
@@ -59,57 +22,42 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
||||
const logger = RMABLogger.forJob(jobId, 'MonitorDownload');
|
||||
|
||||
try {
|
||||
let progress: any;
|
||||
let downloadPath: string | undefined;
|
||||
// Get the download client service via the manager
|
||||
const configService = getConfigService();
|
||||
const manager = getDownloadClientManager(configService);
|
||||
const protocol = CLIENT_PROTOCOL_MAP[downloadClient as DownloadClientType];
|
||||
if (!protocol) {
|
||||
throw new Error(`Unknown download client type: ${downloadClient}`);
|
||||
}
|
||||
const client = await manager.getClientServiceForProtocol(protocol);
|
||||
|
||||
if (downloadClient === 'qbittorrent') {
|
||||
// qBittorrent flow
|
||||
const qbt = await getQBittorrentService();
|
||||
if (!client) {
|
||||
throw new Error(`No ${downloadClient} client configured`);
|
||||
}
|
||||
|
||||
// Get torrent status with retry logic (handles race condition)
|
||||
const torrent = await getTorrentWithRetry(qbt, downloadClientId, logger);
|
||||
progress = qbt.getDownloadProgress(torrent);
|
||||
// Get download status via unified interface
|
||||
const info = await client.getDownload(downloadClientId);
|
||||
|
||||
// Store download path for later use
|
||||
downloadPath = torrent.content_path || path.join(torrent.save_path, torrent.name);
|
||||
} else if (downloadClient === 'sabnzbd') {
|
||||
// SABnzbd flow
|
||||
const { getSABnzbdService } = await import('../integrations/sabnzbd.service');
|
||||
const sabnzbd = await getSABnzbdService();
|
||||
if (!info) {
|
||||
throw new Error(`Download ${downloadClientId} not found in ${downloadClient}`);
|
||||
}
|
||||
|
||||
// Get NZB status
|
||||
const nzbInfo = await sabnzbd.getNZB(downloadClientId);
|
||||
// Build progress object for request updates
|
||||
const progressPercent = Math.round(info.progress * 100);
|
||||
const progressState = info.status;
|
||||
|
||||
if (!nzbInfo) {
|
||||
throw new Error(`NZB ${downloadClientId} not found in SABnzbd queue or history`);
|
||||
}
|
||||
|
||||
// Convert NZBInfo to progress format
|
||||
progress = {
|
||||
percent: nzbInfo.progress * 100, // Convert 0.0-1.0 to 0-100 (matches qBittorrent format)
|
||||
bytesDownloaded: nzbInfo.size * nzbInfo.progress,
|
||||
bytesTotal: nzbInfo.size,
|
||||
speed: nzbInfo.downloadSpeed,
|
||||
eta: nzbInfo.timeLeft,
|
||||
state: nzbInfo.status,
|
||||
};
|
||||
|
||||
// Store download path if available (only set after completion)
|
||||
downloadPath = nzbInfo.downloadPath;
|
||||
|
||||
logger.info(`SABnzbd status: ${nzbInfo.status}`, {
|
||||
progress: `${(nzbInfo.progress * 100).toFixed(1)}%`,
|
||||
speed: `${(nzbInfo.downloadSpeed / 1024 / 1024).toFixed(2)} MB/s`,
|
||||
if (client.protocol === 'usenet') {
|
||||
logger.info(`${client.clientType} status: ${info.status}`, {
|
||||
progress: `${(info.progress * 100).toFixed(1)}%`,
|
||||
speed: `${(info.downloadSpeed / 1024 / 1024).toFixed(2)} MB/s`,
|
||||
});
|
||||
} else {
|
||||
throw new Error(`Download client ${downloadClient} not supported`);
|
||||
}
|
||||
|
||||
// Update request progress
|
||||
await prisma.request.update({
|
||||
where: { id: requestId },
|
||||
data: {
|
||||
progress: progress.percent,
|
||||
progress: progressPercent,
|
||||
updatedAt: new Date(),
|
||||
},
|
||||
});
|
||||
@@ -118,23 +66,21 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
||||
await prisma.downloadHistory.update({
|
||||
where: { id: downloadHistoryId },
|
||||
data: {
|
||||
downloadStatus: progress.state,
|
||||
downloadStatus: progressState,
|
||||
},
|
||||
});
|
||||
|
||||
// Check download state
|
||||
if (progress.state === 'completed') {
|
||||
if (progressState === 'completed' || progressState === 'seeding') {
|
||||
logger.info(`Download completed for request ${requestId}`);
|
||||
|
||||
// Ensure we have a download path
|
||||
const downloadPath = info.downloadPath;
|
||||
if (!downloadPath) {
|
||||
throw new Error('Download path not available from download client');
|
||||
}
|
||||
|
||||
// Get path mapping configuration from the specific download client
|
||||
const configService = getConfigService();
|
||||
const manager = getDownloadClientManager(configService);
|
||||
const protocol = downloadClient === 'sabnzbd' ? 'usenet' : 'torrent';
|
||||
const clientConfig = await manager.getClientForProtocol(protocol);
|
||||
|
||||
// Build path mapping config from client settings
|
||||
@@ -150,17 +96,18 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
||||
const organizePath = PathMapper.transform(downloadPath, pathMappingConfig);
|
||||
|
||||
logger.info(`Download completed`, {
|
||||
downloadClient,
|
||||
downloadClient: client.clientType,
|
||||
downloadPath,
|
||||
organizePath: organizePath !== downloadPath ? `${organizePath} (mapped)` : organizePath,
|
||||
});
|
||||
|
||||
// Update download history to completed
|
||||
// Update download history to completed (store mapped path for retry reliability)
|
||||
await prisma.downloadHistory.update({
|
||||
where: { id: downloadHistoryId },
|
||||
data: {
|
||||
downloadStatus: 'completed',
|
||||
completedAt: new Date(),
|
||||
downloadPath: organizePath,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -197,10 +144,10 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
||||
progress: 100,
|
||||
downloadPath: organizePath,
|
||||
};
|
||||
} else if (progress.state === 'failed') {
|
||||
} else if (progressState === 'failed') {
|
||||
logger.error(`Download failed for request ${requestId}`);
|
||||
|
||||
const errorMessage = 'Download failed in qBittorrent';
|
||||
const errorMessage = `Download failed in ${client.clientType}`;
|
||||
|
||||
// Update request to failed
|
||||
await prisma.request.update({
|
||||
@@ -249,7 +196,7 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
||||
completed: true,
|
||||
message: 'Download failed',
|
||||
requestId,
|
||||
progress: progress.percent,
|
||||
progress: progressPercent,
|
||||
};
|
||||
} else {
|
||||
// Still downloading - schedule another check in 10 seconds
|
||||
@@ -263,11 +210,11 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
||||
);
|
||||
|
||||
// Only log every 5% progress to reduce log spam
|
||||
const shouldLog = progress.percent % 5 === 0 || progress.percent < 5;
|
||||
const shouldLog = progressPercent % 5 === 0 || progressPercent < 5;
|
||||
if (shouldLog) {
|
||||
logger.info(`Request ${requestId}: ${progress.percent}% complete (${progress.state})`, {
|
||||
speed: progress.speed,
|
||||
eta: progress.eta,
|
||||
logger.info(`Request ${requestId}: ${progressPercent}% complete (${progressState})`, {
|
||||
speed: info.downloadSpeed,
|
||||
eta: info.eta,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -276,20 +223,20 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
|
||||
completed: false,
|
||||
message: 'Download in progress, monitoring continues',
|
||||
requestId,
|
||||
progress: progress.percent,
|
||||
speed: progress.speed,
|
||||
eta: progress.eta,
|
||||
state: progress.state,
|
||||
progress: progressPercent,
|
||||
speed: info.downloadSpeed,
|
||||
eta: info.eta,
|
||||
state: progressState,
|
||||
};
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||
|
||||
// Check if this is a transient "torrent not found" error
|
||||
// Check if this is a transient "not found" error
|
||||
const errorMessage = error instanceof Error ? error.message : '';
|
||||
const isTorrentNotFound = errorMessage.includes('not found') || errorMessage.includes('Torrent') && errorMessage.includes('not found');
|
||||
const isNotFound = errorMessage.includes('not found');
|
||||
|
||||
if (isTorrentNotFound) {
|
||||
if (isNotFound) {
|
||||
// Transient error - don't mark request as failed, let Bull retry
|
||||
// The request stays in 'downloading' status until Bull exhausts all retries
|
||||
logger.warn(`Transient error for request ${requestId}, allowing Bull to retry`);
|
||||
|
||||
@@ -9,6 +9,8 @@ import { getFileOrganizer } from '../utils/file-organizer';
|
||||
import { RMABLogger } from '../utils/logger';
|
||||
import { getLibraryService } from '../services/library';
|
||||
import { getConfigService } from '../services/config.service';
|
||||
import { getDownloadClientManager } from '../services/download-client-manager.service';
|
||||
import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download-client.interface';
|
||||
import { generateFilesHash } from '../utils/files-hash';
|
||||
import { fixEpubForKindle, cleanupFixedEpub } from '../utils/epub-fixer';
|
||||
import { removeEmptyParentDirectories } from '../utils/cleanup-helpers';
|
||||
@@ -242,106 +244,8 @@ export async function processOrganizeFiles(payload: OrganizeFilesPayload): Promi
|
||||
);
|
||||
}
|
||||
|
||||
// Cleanup Usenet downloads if configured
|
||||
try {
|
||||
logger.info('Checking if cleanup is needed for this download');
|
||||
|
||||
// Get download history to find NZB ID and indexer
|
||||
const downloadHistory = await prisma.downloadHistory.findFirst({
|
||||
where: { requestId },
|
||||
orderBy: { createdAt: 'desc' },
|
||||
});
|
||||
|
||||
logger.info(`Download history found: ${downloadHistory ? 'yes' : 'no'}`, {
|
||||
hasNzbId: !!downloadHistory?.nzbId,
|
||||
hasIndexerId: !!downloadHistory?.indexerId,
|
||||
nzbId: downloadHistory?.nzbId || 'none',
|
||||
indexerId: downloadHistory?.indexerId || 'none',
|
||||
});
|
||||
|
||||
if (downloadHistory?.nzbId && downloadHistory?.indexerId) {
|
||||
// Get indexer configuration
|
||||
const indexersConfig = await configService.get('prowlarr_indexers');
|
||||
logger.info(`Indexers config found: ${indexersConfig ? 'yes' : 'no'}`);
|
||||
|
||||
if (indexersConfig) {
|
||||
const indexers: Array<{ id: number; protocol: string; removeAfterProcessing?: boolean }> = JSON.parse(indexersConfig);
|
||||
const indexer = indexers.find(idx => idx.id === downloadHistory.indexerId);
|
||||
|
||||
logger.info(`Indexer found in config: ${indexer ? 'yes' : 'no'}`, {
|
||||
indexerId: downloadHistory.indexerId,
|
||||
protocol: indexer?.protocol || 'none',
|
||||
removeAfterProcessing: indexer?.removeAfterProcessing ?? 'undefined',
|
||||
});
|
||||
|
||||
// Check if this is a Usenet indexer with cleanup enabled
|
||||
if (indexer && indexer.protocol?.toLowerCase() !== 'torrent' && indexer.removeAfterProcessing) {
|
||||
logger.info(`Cleaning up NZB ${downloadHistory.nzbId} (cleanup enabled for indexer ${indexer.id})`);
|
||||
|
||||
// First, manually delete files from filesystem
|
||||
if (downloadPath) {
|
||||
logger.info(`Removing download files from filesystem: ${downloadPath}`);
|
||||
|
||||
const fs = await import('fs/promises');
|
||||
|
||||
try {
|
||||
// Check if it's a file or directory
|
||||
const stats = await fs.stat(downloadPath);
|
||||
|
||||
if (stats.isDirectory()) {
|
||||
// Remove directory and all contents
|
||||
await fs.rm(downloadPath, { recursive: true, force: true });
|
||||
logger.info(`Removed directory: ${downloadPath}`);
|
||||
} else {
|
||||
// Remove single file
|
||||
await fs.unlink(downloadPath);
|
||||
logger.info(`Removed file: ${downloadPath}`);
|
||||
}
|
||||
|
||||
// Clean up empty parent directories (e.g., empty category folders)
|
||||
// Get download_dir as the boundary - never delete above this
|
||||
const downloadDir = await configService.get('download_dir') || '/downloads';
|
||||
const cleanupResult = await removeEmptyParentDirectories(downloadPath, {
|
||||
boundaryPath: downloadDir,
|
||||
logContext: jobId ? { jobId, context: 'CleanupParents' } : undefined,
|
||||
});
|
||||
|
||||
if (cleanupResult.removedDirectories.length > 0) {
|
||||
logger.info(`Cleaned up ${cleanupResult.removedDirectories.length} empty parent directories`);
|
||||
}
|
||||
} catch (fsError) {
|
||||
// File/directory might already be deleted or not exist
|
||||
if ((fsError as NodeJS.ErrnoException).code === 'ENOENT') {
|
||||
logger.info(`Download path already deleted: ${downloadPath}`);
|
||||
} else {
|
||||
throw fsError;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.warn(`No download path available, skipping filesystem deletion`);
|
||||
}
|
||||
|
||||
// Then archive from SABnzbd history (hides from UI but preserves for troubleshooting)
|
||||
// Note: We only archive from history, not queue. If the NZB is still in the queue
|
||||
// when we're organizing files, something went wrong with the download monitoring.
|
||||
const { getSABnzbdService } = await import('../integrations/sabnzbd.service');
|
||||
const sabnzbd = await getSABnzbdService();
|
||||
|
||||
await sabnzbd.archiveCompletedNZB(downloadHistory.nzbId);
|
||||
|
||||
logger.info(`Successfully archived NZB ${downloadHistory.nzbId} and removed files`);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// Log error but don't fail the job - cleanup is optional
|
||||
logger.warn(
|
||||
`Failed to cleanup NZB download: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
{
|
||||
error: error instanceof Error ? error.stack : undefined,
|
||||
}
|
||||
);
|
||||
}
|
||||
// Cleanup downloads if configured (uses IDownloadClient.postProcess for client-specific cleanup)
|
||||
await cleanupDownloadAfterOrganize(requestId, downloadPath, configService, jobId, logger);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
@@ -592,12 +496,20 @@ async function processEbookOrganization(
|
||||
const isIndexerDownload = downloadHistory?.downloadClient !== 'direct';
|
||||
logger.info(`Download source: ${downloadHistory?.downloadClient || 'unknown'} (indexer download: ${isIndexerDownload})`);
|
||||
|
||||
// Get file organizer and template
|
||||
// Get file organizer and ebook-specific template (falls back to audiobook template)
|
||||
const organizer = await getFileOrganizer();
|
||||
const templateConfig = await prisma.configuration.findUnique({
|
||||
where: { key: 'audiobook_path_template' },
|
||||
const ebookTemplateConfig = await prisma.configuration.findUnique({
|
||||
where: { key: 'ebook_path_template' },
|
||||
});
|
||||
const template = templateConfig?.value || '{author}/{title} {asin}';
|
||||
let template: string;
|
||||
if (ebookTemplateConfig?.value) {
|
||||
template = ebookTemplateConfig.value;
|
||||
} else {
|
||||
const audiobookTemplateConfig = await prisma.configuration.findUnique({
|
||||
where: { key: 'audiobook_path_template' },
|
||||
});
|
||||
template = audiobookTemplateConfig?.value || '{author}/{title} {asin}';
|
||||
}
|
||||
|
||||
// Check if Kindle EPUB fix is needed
|
||||
let effectiveDownloadPath = downloadPath;
|
||||
@@ -739,99 +651,8 @@ async function processEbookOrganization(
|
||||
logger.debug(`Ebook library scan disabled (scanEnabled=${scanEnabled})`);
|
||||
}
|
||||
|
||||
// Cleanup Usenet downloads if configured (same logic as audiobooks)
|
||||
try {
|
||||
logger.info('Checking if cleanup is needed for ebook download');
|
||||
|
||||
// downloadHistory was already fetched earlier in this function
|
||||
logger.info(`Download history found: ${downloadHistory ? 'yes' : 'no'}`, {
|
||||
hasNzbId: !!downloadHistory?.nzbId,
|
||||
hasIndexerId: !!downloadHistory?.indexerId,
|
||||
nzbId: downloadHistory?.nzbId || 'none',
|
||||
indexerId: downloadHistory?.indexerId || 'none',
|
||||
});
|
||||
|
||||
if (downloadHistory?.nzbId && downloadHistory?.indexerId) {
|
||||
// Get indexer configuration
|
||||
const indexersConfig = await configService.get('prowlarr_indexers');
|
||||
logger.info(`Indexers config found: ${indexersConfig ? 'yes' : 'no'}`);
|
||||
|
||||
if (indexersConfig) {
|
||||
const indexers: Array<{ id: number; protocol: string; removeAfterProcessing?: boolean }> = JSON.parse(indexersConfig);
|
||||
const indexer = indexers.find(idx => idx.id === downloadHistory.indexerId);
|
||||
|
||||
logger.info(`Indexer found in config: ${indexer ? 'yes' : 'no'}`, {
|
||||
indexerId: downloadHistory.indexerId,
|
||||
protocol: indexer?.protocol || 'none',
|
||||
removeAfterProcessing: indexer?.removeAfterProcessing ?? 'undefined',
|
||||
});
|
||||
|
||||
// Check if this is a Usenet indexer with cleanup enabled
|
||||
if (indexer && indexer.protocol?.toLowerCase() !== 'torrent' && indexer.removeAfterProcessing) {
|
||||
logger.info(`Cleaning up NZB ${downloadHistory.nzbId} (cleanup enabled for indexer ${indexer.id})`);
|
||||
|
||||
// First, manually delete files from filesystem
|
||||
if (downloadPath) {
|
||||
logger.info(`Removing download files from filesystem: ${downloadPath}`);
|
||||
|
||||
const fs = await import('fs/promises');
|
||||
|
||||
try {
|
||||
// Check if it's a file or directory
|
||||
const stats = await fs.stat(downloadPath);
|
||||
|
||||
if (stats.isDirectory()) {
|
||||
// Remove directory and all contents
|
||||
await fs.rm(downloadPath, { recursive: true, force: true });
|
||||
logger.info(`Removed directory: ${downloadPath}`);
|
||||
} else {
|
||||
// Remove single file
|
||||
await fs.unlink(downloadPath);
|
||||
logger.info(`Removed file: ${downloadPath}`);
|
||||
}
|
||||
|
||||
// Clean up empty parent directories (e.g., empty category folders)
|
||||
// Get download_dir as the boundary - never delete above this
|
||||
const downloadDir = await configService.get('download_dir') || '/downloads';
|
||||
const cleanupResult = await removeEmptyParentDirectories(downloadPath, {
|
||||
boundaryPath: downloadDir,
|
||||
logContext: jobId ? { jobId, context: 'CleanupParents' } : undefined,
|
||||
});
|
||||
|
||||
if (cleanupResult.removedDirectories.length > 0) {
|
||||
logger.info(`Cleaned up ${cleanupResult.removedDirectories.length} empty parent directories`);
|
||||
}
|
||||
} catch (fsError) {
|
||||
// File/directory might already be deleted or not exist
|
||||
if ((fsError as NodeJS.ErrnoException).code === 'ENOENT') {
|
||||
logger.info(`Download path already deleted: ${downloadPath}`);
|
||||
} else {
|
||||
throw fsError;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.warn(`No download path available, skipping filesystem deletion`);
|
||||
}
|
||||
|
||||
// Then archive from SABnzbd history (hides from UI but preserves for troubleshooting)
|
||||
const { getSABnzbdService } = await import('../integrations/sabnzbd.service');
|
||||
const sabnzbd = await getSABnzbdService();
|
||||
|
||||
await sabnzbd.archiveCompletedNZB(downloadHistory.nzbId);
|
||||
|
||||
logger.info(`Successfully archived NZB ${downloadHistory.nzbId} and removed files`);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// Log error but don't fail the job - cleanup is optional
|
||||
logger.warn(
|
||||
`Failed to cleanup NZB download: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
{
|
||||
error: error instanceof Error ? error.stack : undefined,
|
||||
}
|
||||
);
|
||||
}
|
||||
// Cleanup downloads if configured (uses IDownloadClient.postProcess for client-specific cleanup)
|
||||
await cleanupDownloadAfterOrganize(requestId, downloadPath, configService, jobId, logger);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
@@ -932,6 +753,129 @@ async function createEbookRequestIfEnabled(
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// DOWNLOAD CLEANUP
|
||||
// =========================================================================
|
||||
|
||||
/**
|
||||
* Cleanup download files and archive from download client after successful organization.
|
||||
* Uses the IDownloadClient.postProcess() method for client-specific cleanup (e.g., SABnzbd archive).
|
||||
* Shared between audiobook and ebook organization flows.
|
||||
*/
|
||||
async function cleanupDownloadAfterOrganize(
|
||||
requestId: string,
|
||||
downloadPath: string,
|
||||
configService: any,
|
||||
jobId: string | undefined,
|
||||
logger: RMABLogger
|
||||
): Promise<void> {
|
||||
try {
|
||||
logger.info('Checking if cleanup is needed for this download');
|
||||
|
||||
// Get download history to find client ID and indexer
|
||||
const downloadHistory = await prisma.downloadHistory.findFirst({
|
||||
where: { requestId },
|
||||
orderBy: { createdAt: 'desc' },
|
||||
});
|
||||
|
||||
logger.info(`Download history found: ${downloadHistory ? 'yes' : 'no'}`, {
|
||||
hasDownloadClientId: !!downloadHistory?.downloadClientId,
|
||||
hasIndexerId: !!downloadHistory?.indexerId,
|
||||
downloadClient: downloadHistory?.downloadClient || 'none',
|
||||
});
|
||||
|
||||
if (!downloadHistory?.indexerId || !downloadHistory?.downloadClientId) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get indexer configuration
|
||||
const indexersConfig = await configService.get('prowlarr_indexers');
|
||||
if (!indexersConfig) {
|
||||
return;
|
||||
}
|
||||
|
||||
const indexers: Array<{ id: number; protocol: string; removeAfterProcessing?: boolean }> = JSON.parse(indexersConfig);
|
||||
const indexer = indexers.find(idx => idx.id === downloadHistory.indexerId);
|
||||
|
||||
logger.info(`Indexer found in config: ${indexer ? 'yes' : 'no'}`, {
|
||||
indexerId: downloadHistory.indexerId,
|
||||
protocol: indexer?.protocol || 'none',
|
||||
removeAfterProcessing: indexer?.removeAfterProcessing ?? 'undefined',
|
||||
});
|
||||
|
||||
// Check if this is a non-torrent indexer with cleanup enabled
|
||||
if (!indexer || indexer.protocol?.toLowerCase() === 'torrent' || !indexer.removeAfterProcessing) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(`Cleaning up download ${downloadHistory.downloadClientId} (cleanup enabled for indexer ${indexer.id})`);
|
||||
|
||||
// First, manually delete files from filesystem
|
||||
if (downloadPath) {
|
||||
logger.info(`Removing download files from filesystem: ${downloadPath}`);
|
||||
|
||||
const fs = await import('fs/promises');
|
||||
|
||||
try {
|
||||
const stats = await fs.stat(downloadPath);
|
||||
|
||||
if (stats.isDirectory()) {
|
||||
await fs.rm(downloadPath, { recursive: true, force: true });
|
||||
logger.info(`Removed directory: ${downloadPath}`);
|
||||
} else {
|
||||
await fs.unlink(downloadPath);
|
||||
logger.info(`Removed file: ${downloadPath}`);
|
||||
}
|
||||
|
||||
// Clean up empty parent directories
|
||||
const downloadDir = await configService.get('download_dir') || '/downloads';
|
||||
const cleanupResult = await removeEmptyParentDirectories(downloadPath, {
|
||||
boundaryPath: downloadDir,
|
||||
logContext: jobId ? { jobId, context: 'CleanupParents' } : undefined,
|
||||
});
|
||||
|
||||
if (cleanupResult.removedDirectories.length > 0) {
|
||||
logger.info(`Cleaned up ${cleanupResult.removedDirectories.length} empty parent directories`);
|
||||
}
|
||||
} catch (fsError) {
|
||||
if ((fsError as NodeJS.ErrnoException).code === 'ENOENT') {
|
||||
logger.info(`Download path already deleted: ${downloadPath}`);
|
||||
} else {
|
||||
throw fsError;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.warn(`No download path available, skipping filesystem deletion`);
|
||||
}
|
||||
|
||||
// Then use the download client interface for client-specific post-processing
|
||||
// (e.g., usenet clients archive from history, torrent clients are a no-op)
|
||||
const clientType = downloadHistory.downloadClient;
|
||||
if (clientType && clientType !== 'direct') {
|
||||
const manager = getDownloadClientManager(configService);
|
||||
const protocol = CLIENT_PROTOCOL_MAP[clientType as DownloadClientType];
|
||||
if (!protocol) {
|
||||
logger.warn(`Unknown download client type: ${clientType}, skipping post-processing`);
|
||||
return;
|
||||
}
|
||||
const client = await manager.getClientServiceForProtocol(protocol as 'torrent' | 'usenet');
|
||||
|
||||
if (client) {
|
||||
await client.postProcess(downloadHistory.downloadClientId);
|
||||
logger.info(`Successfully post-processed download ${downloadHistory.downloadClientId} via ${client.clientType}`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// Log error but don't fail the job - cleanup is optional
|
||||
logger.warn(
|
||||
`Failed to cleanup download: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
{
|
||||
error: error instanceof Error ? error.stack : undefined,
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// HELPER FUNCTIONS
|
||||
// =========================================================================
|
||||
|
||||
@@ -2,15 +2,18 @@
|
||||
* Component: Retry Failed Imports Processor
|
||||
* Documentation: documentation/backend/services/scheduler.md
|
||||
*
|
||||
* Retries file organization for requests that are awaiting import
|
||||
* Retries file organization for requests that are awaiting import.
|
||||
* Uses the IDownloadClient interface for client-agnostic path resolution.
|
||||
*/
|
||||
|
||||
import path from 'path';
|
||||
import { prisma } from '../db';
|
||||
import { RMABLogger } from '../utils/logger';
|
||||
import { getJobQueueService } from '../services/job-queue.service';
|
||||
import { getConfigService } from '../services/config.service';
|
||||
import { getDownloadClientManager } from '../services/download-client-manager.service';
|
||||
import { getDownloadClientManager, DownloadClientManager } from '../services/download-client-manager.service';
|
||||
import { PathMapper, PathMappingConfig } from '../utils/path-mapper';
|
||||
import { CLIENT_PROTOCOL_MAP, DownloadClientType, ProtocolType } from '../interfaces/download-client.interface';
|
||||
|
||||
export interface RetryFailedImportsPayload {
|
||||
jobId?: string;
|
||||
@@ -30,7 +33,7 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo
|
||||
|
||||
// Helper function to get path mapping config for a specific download client type
|
||||
const getPathMappingForClient = async (clientType: string): Promise<PathMappingConfig> => {
|
||||
const protocol = clientType === 'sabnzbd' ? 'usenet' : 'torrent';
|
||||
const protocol = CLIENT_PROTOCOL_MAP[clientType as DownloadClientType] || 'torrent';
|
||||
const clientConfig = await manager.getClientForProtocol(protocol);
|
||||
|
||||
if (clientConfig && clientConfig.remotePathMappingEnabled) {
|
||||
@@ -43,11 +46,10 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo
|
||||
return { enabled: false, remotePath: '', localPath: '' };
|
||||
};
|
||||
|
||||
// Find all active audiobook requests in awaiting_import status
|
||||
// Note: Ebook requests use the same organize_files processor but with type branching
|
||||
// Find all requests in awaiting_import status (both audiobook and ebook)
|
||||
// The organize_files processor handles both types with type-based branching
|
||||
const requests = await prisma.request.findMany({
|
||||
where: {
|
||||
type: 'audiobook', // Only audiobook requests (ebooks handled by same processor but different flow)
|
||||
status: 'awaiting_import',
|
||||
deletedAt: null,
|
||||
},
|
||||
@@ -90,111 +92,62 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo
|
||||
|
||||
let downloadPath: string;
|
||||
|
||||
// Try to get download path from the appropriate download client
|
||||
// Get path mapping for this specific download client
|
||||
const clientType = downloadHistory.downloadClient || 'qbittorrent';
|
||||
const mappingConfig = await getPathMappingForClient(clientType);
|
||||
|
||||
if (downloadHistory.torrentHash) {
|
||||
// qBittorrent download
|
||||
try {
|
||||
const { getQBittorrentService } = await import('../integrations/qbittorrent.service');
|
||||
const qbt = await getQBittorrentService();
|
||||
const torrent = await qbt.getTorrent(downloadHistory.torrentHash);
|
||||
const qbPath = `${torrent.save_path}/${torrent.name}`;
|
||||
downloadPath = PathMapper.transform(qbPath, mappingConfig);
|
||||
logger.info(
|
||||
`Got download path from qBittorrent for request ${request.id}: ${qbPath}` +
|
||||
(downloadPath !== qbPath ? ` → ${downloadPath} (mapped)` : '')
|
||||
);
|
||||
} catch (qbtError) {
|
||||
// Torrent not found in qBittorrent - try to construct path from config
|
||||
logger.warn(`Torrent not found in qBittorrent for request ${request.id}, falling back to configured path`);
|
||||
|
||||
if (!downloadHistory.torrentName) {
|
||||
logger.warn(`No torrent name stored for request ${request.id}, cannot construct fallback path, skipping`);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const downloadDir = await configService.get('download_dir');
|
||||
|
||||
if (!downloadDir) {
|
||||
logger.error(`download_dir not configured, cannot retry request ${request.id}, skipping`);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const fallbackPath = `${downloadDir}/${downloadHistory.torrentName}`;
|
||||
downloadPath = PathMapper.transform(fallbackPath, mappingConfig);
|
||||
logger.info(
|
||||
`Using fallback download path for request ${request.id}: ${fallbackPath}` +
|
||||
(downloadPath !== fallbackPath ? ` → ${downloadPath} (mapped)` : '')
|
||||
);
|
||||
}
|
||||
} else if (downloadHistory.nzbId) {
|
||||
// SABnzbd download
|
||||
try {
|
||||
const { getSABnzbdService } = await import('../integrations/sabnzbd.service');
|
||||
const sabnzbd = await getSABnzbdService();
|
||||
const nzbInfo = await sabnzbd.getNZB(downloadHistory.nzbId);
|
||||
if (nzbInfo && nzbInfo.downloadPath) {
|
||||
downloadPath = PathMapper.transform(nzbInfo.downloadPath, mappingConfig);
|
||||
logger.info(
|
||||
`Got download path from SABnzbd for request ${request.id}: ${nzbInfo.downloadPath}` +
|
||||
(downloadPath !== nzbInfo.downloadPath ? ` → ${downloadPath} (mapped)` : '')
|
||||
);
|
||||
} else {
|
||||
logger.warn(`NZB ${downloadHistory.nzbId} not found or has no download path for request ${request.id}, falling back to configured path`);
|
||||
|
||||
if (!downloadHistory.torrentName) {
|
||||
logger.warn(`No name stored for request ${request.id}, cannot construct fallback path, skipping`);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const downloadDir = await configService.get('download_dir');
|
||||
|
||||
if (!downloadDir) {
|
||||
logger.error(`download_dir not configured, cannot retry request ${request.id}, skipping`);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const fallbackPath = `${downloadDir}/${downloadHistory.torrentName}`;
|
||||
downloadPath = PathMapper.transform(fallbackPath, mappingConfig);
|
||||
logger.info(
|
||||
`Using fallback download path for request ${request.id}: ${fallbackPath}` +
|
||||
(downloadPath !== fallbackPath ? ` → ${downloadPath} (mapped)` : '')
|
||||
);
|
||||
}
|
||||
} catch (sabnzbdError) {
|
||||
logger.warn(`SABnzbd error for request ${request.id}: ${sabnzbdError instanceof Error ? sabnzbdError.message : 'Unknown error'}, skipping`);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
// Direct downloads (e.g. Anna's Archive ebooks) have no external download client
|
||||
// Use stored path or construct from download_dir directly
|
||||
if (clientType === 'direct') {
|
||||
const noMapping: PathMappingConfig = { enabled: false, remotePath: '', localPath: '' };
|
||||
downloadPath = getStoredPath(downloadHistory, request.id, logger) || await getFallbackPath(downloadHistory, configService, noMapping, request.id, logger);
|
||||
} else {
|
||||
// No download client ID - use fallback path
|
||||
if (!downloadHistory.torrentName) {
|
||||
logger.warn(`No download client ID or name for request ${request.id}, skipping`);
|
||||
// Real download client — resolve path via client API with path mapping
|
||||
const mappingConfig = await getPathMappingForClient(clientType);
|
||||
const clientId = downloadHistory.downloadClientId || downloadHistory.torrentHash || downloadHistory.nzbId;
|
||||
|
||||
const protocol = CLIENT_PROTOCOL_MAP[clientType as DownloadClientType] as ProtocolType | undefined;
|
||||
if (!protocol) {
|
||||
logger.warn(`Unknown download client type: ${clientType} for request ${request.id}, skipping`);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const downloadDir = await configService.get('download_dir');
|
||||
if (clientId) {
|
||||
// Try to get path from download client via unified interface
|
||||
const client = await manager.getClientServiceForProtocol(protocol);
|
||||
|
||||
if (!downloadDir) {
|
||||
logger.error(`download_dir not configured, cannot retry request ${request.id}, skipping`);
|
||||
skipped++;
|
||||
continue;
|
||||
if (client) {
|
||||
try {
|
||||
const info = await client.getDownload(clientId);
|
||||
if (info?.downloadPath) {
|
||||
downloadPath = PathMapper.transform(info.downloadPath, mappingConfig);
|
||||
logger.info(
|
||||
`Got download path from ${client.clientType} for request ${request.id}: ${info.downloadPath}` +
|
||||
(downloadPath !== info.downloadPath ? ` → ${downloadPath} (mapped)` : '')
|
||||
);
|
||||
} else {
|
||||
// Download found but no path — try stored path, then fallback
|
||||
downloadPath = getStoredPath(downloadHistory, request.id, logger) || await getFallbackPath(downloadHistory, configService, mappingConfig, request.id, logger, manager, protocol);
|
||||
}
|
||||
} catch (clientError) {
|
||||
// Client error — try stored path, then fallback
|
||||
logger.warn(`${client.clientType} error for request ${request.id}: ${clientError instanceof Error ? clientError.message : 'Unknown error'}, using fallback path`);
|
||||
downloadPath = getStoredPath(downloadHistory, request.id, logger) || await getFallbackPath(downloadHistory, configService, mappingConfig, request.id, logger, manager, protocol);
|
||||
}
|
||||
} else {
|
||||
// No client configured — try stored path, then fallback
|
||||
downloadPath = getStoredPath(downloadHistory, request.id, logger) || await getFallbackPath(downloadHistory, configService, mappingConfig, request.id, logger, manager, protocol);
|
||||
}
|
||||
} else {
|
||||
// No client ID — try stored path, then fallback
|
||||
downloadPath = getStoredPath(downloadHistory, request.id, logger) || await getFallbackPath(downloadHistory, configService, mappingConfig, request.id, logger, manager, protocol);
|
||||
}
|
||||
}
|
||||
|
||||
const configuredPath = `${downloadDir}/${downloadHistory.torrentName}`;
|
||||
downloadPath = PathMapper.transform(configuredPath, mappingConfig);
|
||||
logger.info(
|
||||
`Using configured download path for request ${request.id}: ${configuredPath}` +
|
||||
(downloadPath !== configuredPath ? ` → ${downloadPath} (mapped)` : '')
|
||||
);
|
||||
// Check if we got a valid path (getFallbackPath returns empty string on failure)
|
||||
if (!downloadPath) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
await jobQueue.addOrganizeJob(
|
||||
@@ -203,7 +156,7 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo
|
||||
downloadPath
|
||||
);
|
||||
triggered++;
|
||||
logger.info(`Triggered organize job for request ${request.id}: ${request.audiobook.title}`);
|
||||
logger.info(`Triggered organize job for ${request.type || 'audiobook'} request ${request.id}: ${request.audiobook.title}`);
|
||||
} catch (error) {
|
||||
logger.error(`Failed to trigger organize for request ${request.id}: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||
skipped++;
|
||||
@@ -224,3 +177,62 @@ export async function processRetryFailedImports(payload: RetryFailedImportsPaylo
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the stored download path from the database (saved at download completion time).
|
||||
* Returns empty string if not available (old records won't have this field).
|
||||
*/
|
||||
function getStoredPath(
|
||||
downloadHistory: { downloadPath?: string | null },
|
||||
requestId: string,
|
||||
logger: RMABLogger
|
||||
): string {
|
||||
if (downloadHistory.downloadPath) {
|
||||
logger.info(`Using stored download path for request ${requestId}: ${downloadHistory.downloadPath}`);
|
||||
return downloadHistory.downloadPath;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a fallback download path from config when the download client can't provide one.
|
||||
* Returns empty string if path cannot be determined (caller should skip the request).
|
||||
*/
|
||||
async function getFallbackPath(
|
||||
downloadHistory: { torrentName: string | null },
|
||||
configService: any,
|
||||
mappingConfig: PathMappingConfig,
|
||||
requestId: string,
|
||||
logger: RMABLogger,
|
||||
manager?: DownloadClientManager,
|
||||
protocol?: ProtocolType
|
||||
): Promise<string> {
|
||||
if (!downloadHistory.torrentName) {
|
||||
logger.warn(`No download name stored for request ${requestId}, cannot construct fallback path, skipping`);
|
||||
return '';
|
||||
}
|
||||
|
||||
const baseDir = await configService.get('download_dir');
|
||||
|
||||
if (!baseDir) {
|
||||
logger.error(`download_dir not configured, cannot retry request ${requestId}, skipping`);
|
||||
return '';
|
||||
}
|
||||
|
||||
// Resolve customPath from the client config if available
|
||||
let downloadDir = baseDir;
|
||||
if (manager && protocol) {
|
||||
const clientConfig = await manager.getClientForProtocol(protocol);
|
||||
if (clientConfig?.customPath) {
|
||||
downloadDir = path.join(baseDir, clientConfig.customPath);
|
||||
}
|
||||
}
|
||||
|
||||
const fallbackPath = `${downloadDir}/${downloadHistory.torrentName}`;
|
||||
const mappedPath = PathMapper.transform(fallbackPath, mappingConfig);
|
||||
logger.info(
|
||||
`Using fallback download path for request ${requestId}: ${fallbackPath}` +
|
||||
(mappedPath !== fallbackPath ? ` → ${mappedPath} (mapped)` : '')
|
||||
);
|
||||
return mappedPath;
|
||||
}
|
||||
|
||||
@@ -243,9 +243,14 @@ async function searchIndexers(
|
||||
const flagConfigs = flagConfigStr ? JSON.parse(flagConfigStr) : [];
|
||||
|
||||
// Group indexers by their EBOOK category configuration
|
||||
const groups = groupIndexersByCategories(indexersConfig, 'ebook');
|
||||
const { groups, skippedIndexers } = groupIndexersByCategories(indexersConfig, 'ebook');
|
||||
|
||||
logger.info(`Searching ${indexersConfig.length} enabled indexers in ${groups.length} group${groups.length > 1 ? 's' : ''}`);
|
||||
if (skippedIndexers.length > 0) {
|
||||
const skippedNames = skippedIndexers.map(idx => idx.name).join(', ');
|
||||
logger.info(`Skipping ${skippedIndexers.length} indexer(s) with no ebook categories: ${skippedNames}`);
|
||||
}
|
||||
|
||||
logger.info(`Searching ${indexersConfig.length - skippedIndexers.length} enabled indexers in ${groups.length} group${groups.length > 1 ? 's' : ''}`);
|
||||
|
||||
// Log each group for transparency
|
||||
groups.forEach((group, index) => {
|
||||
|
||||
@@ -58,9 +58,14 @@ export async function processSearchIndexers(payload: SearchIndexersPayload): Pro
|
||||
|
||||
// Group indexers by their category configuration
|
||||
// This minimizes API calls while ensuring each indexer only searches its configured categories
|
||||
const groups = groupIndexersByCategories(indexersConfig);
|
||||
const { groups, skippedIndexers } = groupIndexersByCategories(indexersConfig);
|
||||
|
||||
logger.info(`Searching ${indexersConfig.length} enabled indexers in ${groups.length} group${groups.length > 1 ? 's' : ''}`);
|
||||
if (skippedIndexers.length > 0) {
|
||||
const skippedNames = skippedIndexers.map(idx => idx.name).join(', ');
|
||||
logger.info(`Skipping ${skippedIndexers.length} indexer(s) with no audiobook categories: ${skippedNames}`);
|
||||
}
|
||||
|
||||
logger.info(`Searching ${indexersConfig.length - skippedIndexers.length} enabled indexers in ${groups.length} group${groups.length > 1 ? 's' : ''}`);
|
||||
|
||||
// Log each group for transparency
|
||||
groups.forEach((group, index) => {
|
||||
|
||||
Reference in New Issue
Block a user