Add custom search terms & retry download (admin)

Add support for per-request custom search terms and an admin retry-download flow.

- DB/schema: add custom_search_terms column via Prisma migration and schema update.
- Admin UI: new AdjustSearchTermsModal component and UI badges to show custom search status; RequestActionsDropdown and RecentRequestsTable updated to surface adjust/retry actions.
- API: new PATCH /api/admin/requests/[id]/search-terms to set/clear custom terms (optionally trigger a new search) and new POST /api/admin/requests/[id]/retry-download to resume monitoring or re-add downloads using DownloadHistory metadata.
- Behavior: interactive search now prefers customSearchTerms when present; manual import exposes cleanupSource option to organize job; admin requests listing returns downloadAttempts and customSearchTerms.
- UX: add SectionToolbar, LoadMoreBar and HideAvailableToggle components and wire hide-available preference across home, search, author and series pages; authors/series endpoints/page handlers gain pagination metadata.
- Misc: add connection-errors util and update related processors/services and tests to cover the new flows.

These changes enable admins to override search terms per request, trigger searches from the admin UI, and retry failed downloads more robustly.
This commit is contained in:
kikootwo
2026-03-02 17:05:21 -05:00
parent 3ee67c8763
commit d25a6ebf79
39 changed files with 2034 additions and 311 deletions
+51 -11
View File
@@ -5,7 +5,9 @@
'use client';
import { useRef, useEffect, useCallback } from 'react';
import useSWR from 'swr';
import useSWRInfinite from 'swr/infinite';
import { authenticatedFetcher } from '@/lib/utils/api';
export interface Audiobook {
@@ -57,20 +59,58 @@ export function useAudiobooks(type: 'popular' | 'new-releases', limit: number =
};
}
export function useSearch(query: string, page: number = 1) {
const shouldFetch = query && query.length > 0;
const endpoint = shouldFetch ? `/api/audiobooks/search?q=${encodeURIComponent(query)}&page=${page}` : null;
const { data, error, isLoading } = useSWR(endpoint, authenticatedFetcher, {
revalidateOnFocus: false,
dedupingInterval: 30000, // Cache for 30 seconds
function dedupeByAsin<T extends { asin: string }>(items: T[]): T[] {
const seen = new Set<string>();
return items.filter(item => {
if (seen.has(item.asin)) return false;
seen.add(item.asin);
return true;
});
}
export function useSearch(query: string) {
const prevQueryRef = useRef(query);
const { data, error, size, setSize, isLoading, isValidating } = useSWRInfinite(
(pageIndex, prevPageData) => {
if (!query || query.length === 0) return null;
if (pageIndex === 0) return `/api/audiobooks/search?q=${encodeURIComponent(query)}&page=1`;
if (!prevPageData?.hasMore) return null;
return `/api/audiobooks/search?q=${encodeURIComponent(query)}&page=${pageIndex + 1}`;
},
authenticatedFetcher,
{
revalidateOnFocus: false,
dedupingInterval: 30000,
revalidateFirstPage: false,
}
);
// Reset to page 1 when query changes
useEffect(() => {
if (query !== prevQueryRef.current) {
prevQueryRef.current = query;
setSize(1);
}
}, [query, setSize]);
const results = data ? dedupeByAsin(data.flatMap(page => page?.results || [])) : [];
const totalResults = data?.[0]?.totalResults || 0;
const hasMore = !!(data && data.length > 0 && data[data.length - 1]?.hasMore);
const isLoadingInitial = !data && !error && !!query;
const isLoadingMore = !!(data && typeof data[size - 1] === 'undefined' && isValidating);
const loadMore = useCallback(() => {
setSize(prev => prev + 1);
}, [setSize]);
return {
results: data?.results || [],
totalResults: data?.totalResults || 0,
hasMore: data?.hasMore || false,
isLoading: shouldFetch && isLoading,
results,
totalResults,
hasMore,
isLoading: isLoadingInitial,
isLoadingMore,
loadMore,
error,
};
}
+52 -12
View File
@@ -5,7 +5,9 @@
'use client';
import { useRef, useEffect, useCallback } from 'react';
import useSWR from 'swr';
import useSWRInfinite from 'swr/infinite';
import { authenticatedFetcher } from '@/lib/utils/api';
import { Audiobook } from './useAudiobooks';
@@ -68,21 +70,59 @@ export function useAuthorDetail(asin: string | null) {
};
}
export function useAuthorBooks(asin: string | null, authorName: string | null) {
const shouldFetch = asin && authorName;
const endpoint = shouldFetch
? `/api/authors/${asin}/books?name=${encodeURIComponent(authorName)}`
: null;
const { data, error, isLoading } = useSWR(endpoint, authenticatedFetcher, {
revalidateOnFocus: false,
dedupingInterval: 60000, // Cache for 1 minute
function dedupeByAsin<T extends { asin: string }>(items: T[]): T[] {
const seen = new Set<string>();
return items.filter(item => {
if (seen.has(item.asin)) return false;
seen.add(item.asin);
return true;
});
}
export function useAuthorBooks(asin: string | null, authorName: string | null) {
const prevIdentityRef = useRef<string | null>(null);
const identity = asin && authorName ? `${asin}:${authorName}` : null;
const { data, error, size, setSize, isLoading, isValidating } = useSWRInfinite(
(pageIndex, prevPageData) => {
if (!asin || !authorName) return null;
if (pageIndex === 0) return `/api/authors/${asin}/books?name=${encodeURIComponent(authorName)}&page=1`;
if (!prevPageData?.hasMore) return null;
return `/api/authors/${asin}/books?name=${encodeURIComponent(authorName)}&page=${pageIndex + 1}`;
},
authenticatedFetcher,
{
revalidateOnFocus: false,
dedupingInterval: 60000,
revalidateFirstPage: false,
}
);
// Reset when author changes
useEffect(() => {
if (identity !== prevIdentityRef.current) {
prevIdentityRef.current = identity;
setSize(1);
}
}, [identity, setSize]);
const books = (data ? dedupeByAsin(data.flatMap(page => page?.books || [])) : []) as Audiobook[];
const totalBooks = data?.[0]?.totalBooks || 0;
const hasMore = !!(data && data.length > 0 && data[data.length - 1]?.hasMore);
const isLoadingInitial = !data && !error && !!identity;
const isLoadingMore = !!(data && typeof data[size - 1] === 'undefined' && isValidating);
const loadMore = useCallback(() => {
setSize(prev => prev + 1);
}, [setSize]);
return {
books: (data?.books || []) as Audiobook[],
totalBooks: data?.totalBooks || 0,
isLoading: !!shouldFetch && isLoading,
books,
totalBooks,
hasMore,
isLoading: isLoadingInitial || (!!identity && isLoading),
isLoadingMore,
loadMore,
error,
};
}
+56 -8
View File
@@ -5,7 +5,9 @@
'use client';
import { useRef, useEffect, useCallback } from 'react';
import useSWR from 'swr';
import useSWRInfinite from 'swr/infinite';
import { authenticatedFetcher } from '@/lib/utils/api';
import { Audiobook } from './useAudiobooks';
@@ -59,17 +61,63 @@ export function useSeriesSearch(query: string) {
};
}
export function useSeriesDetail(asin: string | null) {
const endpoint = asin ? `/api/series/${asin}` : null;
const { data, error, isLoading } = useSWR(endpoint, authenticatedFetcher, {
revalidateOnFocus: false,
dedupingInterval: 300000, // Cache for 5 minutes
function dedupeByAsin<T extends { asin: string }>(items: T[]): T[] {
const seen = new Set<string>();
return items.filter(item => {
if (seen.has(item.asin)) return false;
seen.add(item.asin);
return true;
});
}
export function useSeriesDetail(asin: string | null) {
const prevAsinRef = useRef<string | null>(null);
const { data, error, size, setSize, isLoading, isValidating } = useSWRInfinite(
(pageIndex, prevPageData) => {
if (!asin) return null;
if (pageIndex === 0) return `/api/series/${asin}?page=1`;
if (!prevPageData?.hasMore) return null;
return `/api/series/${asin}?page=${pageIndex + 1}`;
},
authenticatedFetcher,
{
revalidateOnFocus: false,
dedupingInterval: 300000,
revalidateFirstPage: false,
}
);
// Reset when series changes
useEffect(() => {
if (asin !== prevAsinRef.current) {
prevAsinRef.current = asin;
setSize(1);
}
}, [asin, setSize]);
// Merge pages: use first page's metadata, accumulate all books
const firstPageSeries = data?.[0]?.series as SeriesDetail | undefined;
const allBooks = (data ? dedupeByAsin(data.flatMap(page => page?.series?.books || [])) : []) as Audiobook[];
const series: SeriesDetail | null = firstPageSeries
? { ...firstPageSeries, books: allBooks }
: null;
const hasMore = !!(data && data.length > 0 && data[data.length - 1]?.hasMore);
const isLoadingInitial = !data && !error && !!asin;
const isLoadingMore = !!(data && typeof data[size - 1] === 'undefined' && isValidating);
const loadMore = useCallback(() => {
setSize(prev => prev + 1);
}, [setSize]);
return {
series: (data?.series || null) as SeriesDetail | null,
isLoading,
series,
hasMore,
isLoading: isLoadingInitial || (!!asin && isLoading),
isLoadingMore,
loadMore,
error,
};
}
+11 -4
View File
@@ -288,17 +288,17 @@ function parseSeriesPageSummary(
* Scrape a series page for full detail data including books and similar series.
* Used by the detail API endpoint.
*/
export async function scrapeSeriesPage(asin: string): Promise<SeriesDetail | null> {
export async function scrapeSeriesPage(asin: string, page: number = 1): Promise<(SeriesDetail & { hasMore: boolean; page: number }) | null> {
const service = getAudibleService();
const region = service.getRegion();
const baseUrl = service.getBaseUrl();
const langConfig = getLanguageForRegion(region);
logger.info(`Scraping series detail page: ${asin}`);
logger.info(`Scraping series detail page: ${asin}, page ${page}`);
try {
const { data: response } = await service.fetch(`/series/${asin}`, {
params: { ipRedirectOverride: 'true', pageSize: AUDIBLE_PAGE_SIZE },
params: { ipRedirectOverride: 'true', pageSize: AUDIBLE_PAGE_SIZE, page },
});
const $ = cheerio.load(response.data);
@@ -316,10 +316,15 @@ export async function scrapeSeriesPage(asin: string): Promise<SeriesDetail | nul
// Use actual book count if we got more from scraping
const bookCount = Math.max(summary.bookCount, books.length);
// Calculate hasMore: use header bookCount if available, otherwise check if full page
const hasMore = bookCount > 0
? page * AUDIBLE_PAGE_SIZE < bookCount
: books.length >= AUDIBLE_PAGE_SIZE;
// Parse similar series ("Listeners also enjoyed" or similar section)
const similarSeries = parseSimilarSeries($);
logger.info(`Series detail complete: "${summary.title}" (${books.length} books, ${similarSeries.length} similar)`);
logger.info(`Series detail complete: "${summary.title}" (${books.length} books, page ${page}, hasMore: ${hasMore})`);
return {
asin,
@@ -332,6 +337,8 @@ export async function scrapeSeriesPage(asin: string): Promise<SeriesDetail | nul
books,
similarSeries,
audibleUrl: `${baseUrl}/series/${asin}`,
hasMore,
page,
};
} catch (error) {
logger.error(`Failed to scrape series detail ${asin}`, {
+93 -96
View File
@@ -59,6 +59,13 @@ export interface AudibleSearchResult {
hasMore: boolean;
}
export interface AuthorBooksResult {
books: AudibleAudiobook[];
hasMore: boolean;
page: number;
totalResults: number;
}
export class AudibleService {
private client!: AxiosInstance;
private baseUrl: string = 'https://www.audible.com';
@@ -564,7 +571,9 @@ export class AudibleService {
results: audiobooks,
totalResults,
page,
hasMore: audiobooks.length > 0 && totalResults > page * AUDIBLE_PAGE_SIZE,
hasMore: audiobooks.length > 0 && (totalResults > 0
? totalResults > page * AUDIBLE_PAGE_SIZE
: audiobooks.length >= AUDIBLE_PAGE_SIZE),
};
} catch (error) {
logger.error('Search failed', { error: error instanceof Error ? error.message : String(error) });
@@ -583,123 +592,111 @@ export class AudibleService {
* Uses Audible's searchAuthor parameter and paginates through all results.
* Filters: (1) author link must contain the target ASIN, (2) language must be English.
*/
async searchByAuthorAsin(authorName: string, authorAsin: string): Promise<AudibleAudiobook[]> {
async searchByAuthorAsin(authorName: string, authorAsin: string, page: number = 1): Promise<AuthorBooksResult> {
await this.initialize();
const MAX_PAGES = 10;
const allBooks: AudibleAudiobook[] = [];
const books: AudibleAudiobook[] = [];
const seenAsins = new Set<string>();
try {
logger.info(`Searching books by author "${authorName}" (ASIN: ${authorAsin})...`);
logger.info(`Searching books by author "${authorName}" (ASIN: ${authorAsin}), page ${page}...`);
for (let page = 1; page <= MAX_PAGES; page++) {
const { data: response, meta } = await this.fetchWithRetry('/search', {
params: {
ipRedirectOverride: 'true',
searchAuthor: authorName,
pageSize: AUDIBLE_PAGE_SIZE,
page,
},
const { data: response } = await this.fetchWithRetry('/search', {
params: {
ipRedirectOverride: 'true',
searchAuthor: authorName,
pageSize: AUDIBLE_PAGE_SIZE,
page,
},
});
const $ = cheerio.load(response.data);
// Count raw items on page before filtering (for hasMore fallback)
const pageItemCount = $('.s-result-item, .productListItem').length;
$('.s-result-item, .productListItem').each((_index, element) => {
const $el = $(element);
// --- Language filter: require matching language for region ---
const langConfig = this.getLangConfig();
const langText = $el.find(buildContainsSelector('span', langConfig.scraping.languageLabels)).text().trim() ||
$el.find('.languageLabel').text().trim();
const langLabelPattern = new RegExp(`(?:${langConfig.scraping.languageLabels.map(l => l.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')).join('|')})\\s*(.+)`, 'i');
const langMatch = langText.match(langLabelPattern);
const language = langMatch?.[1]?.trim();
if (!language || !isAcceptedLanguage(language, langConfig)) return;
// --- Author ASIN filter: verify target ASIN in author links ---
const authorLinks = $el.find('a[href*="/author/"]');
let hasMatchingAuthor = false;
authorLinks.each((_i, link) => {
const href = $(link).attr('href') || '';
const asinMatch = href.match(/\/author\/[^\/]+\/([A-Z0-9]{10})/);
if (asinMatch && asinMatch[1] === authorAsin) {
hasMatchingAuthor = true;
return false; // break .each()
}
});
if (!hasMatchingAuthor) return;
const $ = cheerio.load(response.data);
let pageResults = 0;
// --- Extract book ASIN ---
const bookAsin = $el.find('li').attr('data-asin') ||
$el.find('a[href*="/pd/"]').attr('href')?.match(/\/pd\/[^\/]+\/([A-Z0-9]{10})/)?.[1] ||
$el.find('a[href*="/ac/"]').attr('href')?.match(/\/ac\/[^\/]+\/([A-Z0-9]{10})/)?.[1] ||
$el.find('a').attr('href')?.match(/\/(?:pd|ac)\/[^\/]+\/([A-Z0-9]{10})/)?.[1] || '';
if (!bookAsin || seenAsins.has(bookAsin)) return;
seenAsins.add(bookAsin);
$('.s-result-item, .productListItem').each((_index, element) => {
const $el = $(element);
// --- Parse book details ---
const title = $el.find('h2').first().text().trim() ||
$el.find('h3 a').text().trim() ||
$el.find('.bc-heading a').text().trim();
// --- Language filter: require matching language for region ---
const langConfig = this.getLangConfig();
const langText = $el.find(buildContainsSelector('span', langConfig.scraping.languageLabels)).text().trim() ||
$el.find('.languageLabel').text().trim();
// Extract language value (e.g. "Language: English" -> "English", "Sprache: Deutsch" -> "Deutsch")
const langLabelPattern = new RegExp(`(?:${langConfig.scraping.languageLabels.map(l => l.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')).join('|')})\\s*(.+)`, 'i');
const langMatch = langText.match(langLabelPattern);
const language = langMatch?.[1]?.trim();
if (!language || !isAcceptedLanguage(language, langConfig)) return;
const authorText = $el.find('a[href*="/author/"]').first().text().trim() ||
$el.find('.authorLabel').text().trim() ||
$el.find('.bc-size-small .bc-text-bold').first().text().trim();
// --- Author ASIN filter: verify target ASIN in author links ---
const authorLinks = $el.find('a[href*="/author/"]');
let hasMatchingAuthor = false;
authorLinks.each((_i, link) => {
const href = $(link).attr('href') || '';
const asinMatch = href.match(/\/author\/[^\/]+\/([A-Z0-9]{10})/);
if (asinMatch && asinMatch[1] === authorAsin) {
hasMatchingAuthor = true;
return false; // break .each()
}
});
if (!hasMatchingAuthor) return;
const narratorText = $el.find('a[href*="searchNarrator="]').first().text().trim() ||
$el.find('.narratorLabel').text().trim();
// --- Extract book ASIN ---
const bookAsin = $el.find('li').attr('data-asin') ||
$el.find('a[href*="/pd/"]').attr('href')?.match(/\/pd\/[^\/]+\/([A-Z0-9]{10})/)?.[1] ||
$el.find('a[href*="/ac/"]').attr('href')?.match(/\/ac\/[^\/]+\/([A-Z0-9]{10})/)?.[1] ||
$el.find('a').attr('href')?.match(/\/(?:pd|ac)\/[^\/]+\/([A-Z0-9]{10})/)?.[1] || '';
if (!bookAsin || seenAsins.has(bookAsin)) return;
seenAsins.add(bookAsin);
const coverArtUrl = $el.find('img').attr('src') || '';
// --- Parse book details ---
const title = $el.find('h2').first().text().trim() ||
$el.find('h3 a').text().trim() ||
$el.find('.bc-heading a').text().trim();
const runtimeText = $el.find('.runtimeLabel').text().trim() ||
$el.find(buildContainsSelector('span', langConfig.scraping.lengthLabels)).text().trim();
const durationMinutes = this.parseRuntime(runtimeText);
const authorText = $el.find('a[href*="/author/"]').first().text().trim() ||
$el.find('.authorLabel').text().trim() ||
$el.find('.bc-size-small .bc-text-bold').first().text().trim();
const ratingText = $el.find('.ratingsLabel').text().trim() ||
$el.find('.a-icon-star span').first().text().trim();
const rating = ratingText ? parseFloat(ratingText.split(' ')[0]) : undefined;
const narratorText = $el.find('a[href*="searchNarrator="]').first().text().trim() ||
$el.find('.narratorLabel').text().trim();
const coverArtUrl = $el.find('img').attr('src') || '';
const runtimeText = $el.find('.runtimeLabel').text().trim() ||
$el.find(buildContainsSelector('span', langConfig.scraping.lengthLabels)).text().trim();
const durationMinutes = this.parseRuntime(runtimeText);
const ratingText = $el.find('.ratingsLabel').text().trim() ||
$el.find('.a-icon-star span').first().text().trim();
const rating = ratingText ? parseFloat(ratingText.split(' ')[0]) : undefined;
allBooks.push({
asin: bookAsin,
title,
author: stripPrefixes(authorText, langConfig.scraping.authorPrefixes),
authorAsin,
narrator: stripPrefixes(narratorText, langConfig.scraping.narratorPrefixes),
coverArtUrl: coverArtUrl.replace(/\._.*_\./, '._SL500_.'),
durationMinutes,
rating,
});
pageResults++;
books.push({
asin: bookAsin,
title,
author: stripPrefixes(authorText, langConfig.scraping.authorPrefixes),
authorAsin,
narrator: stripPrefixes(narratorText, langConfig.scraping.narratorPrefixes),
coverArtUrl: coverArtUrl.replace(/\._.*_\./, '._SL500_.'),
durationMinutes,
rating,
});
});
// Check if there are more pages
const resultsText = $('.resultsInfo').text().trim();
const totalResults = parseInt(resultsText.match(/of ([\d,]+)/)?.[1]?.replace(/,/g, '') || '0');
const hasMore = totalResults > page * AUDIBLE_PAGE_SIZE;
// Check total results for pagination
const resultsText = $('.resultsInfo').text().trim();
const totalResults = parseInt(resultsText.match(/of ([\d,]+)/)?.[1]?.replace(/,/g, '') || '0');
// Use totalResults if available; otherwise fall back to whether Audible returned a full page
const hasMore = books.length > 0 && (totalResults > 0
? totalResults > page * AUDIBLE_PAGE_SIZE
: pageItemCount >= AUDIBLE_PAGE_SIZE);
logger.info(`Author books page ${page}: ${pageResults} valid results (${allBooks.length} total, ${totalResults} Audible total)`);
if (!hasMore || pageResults === 0) break;
// Pace between pages
if (page < MAX_PAGES) {
await this.delay(this.pacer.reportPageResult(meta));
}
}
logger.info(`Author books search complete: "${authorName}" → ${allBooks.length} books`);
return allBooks;
logger.info(`Author books page ${page}: ${books.length} valid results (${totalResults} Audible total)`);
return { books, hasMore, page, totalResults };
} catch (error) {
logger.error(`Author books search failed for "${authorName}"`, {
error: error instanceof Error ? error.message : String(error),
collectedSoFar: allBooks.length,
});
// Return what we collected before the error
return allBooks;
return { books, hasMore: false, page, totalResults: 0 };
}
}
@@ -9,6 +9,7 @@ import { getConfigService } from '../services/config.service';
import { getDownloadClientManager } from '../services/download-client-manager.service';
import { ProwlarrService } from '../integrations/prowlarr.service';
import { RMABLogger } from '../utils/logger';
import { isTransientConnectionError } from '../utils/connection-errors';
/**
* Process download job
@@ -121,15 +122,22 @@ export async function processDownloadTorrent(payload: DownloadTorrentPayload): P
} catch (error) {
logger.error(`Error: ${error instanceof Error ? error.message : 'Unknown error'}`);
// Update request status to failed
await prisma.request.update({
where: { id: requestId },
data: {
status: 'failed',
errorMessage: error instanceof Error ? error.message : 'Failed to add download to client',
updatedAt: new Date(),
},
});
if (isTransientConnectionError(error)) {
// Connection error — don't mark request as failed yet.
// Bull will retry this job (3 attempts with exponential backoff).
// If all retries are exhausted, the global failed handler marks it failed.
logger.warn(`Download client unreachable for request ${requestId}, allowing Bull to retry`);
} else {
// Permanent error — mark request as failed immediately
await prisma.request.update({
where: { id: requestId },
data: {
status: 'failed',
errorMessage: error instanceof Error ? error.message : 'Failed to add download to client',
updatedAt: new Date(),
},
});
}
throw error;
}
@@ -10,6 +10,7 @@ import { PathMapper, PathMappingConfig } from '../utils/path-mapper';
import { getConfigService } from '../services/config.service';
import { getDownloadClientManager } from '../services/download-client-manager.service';
import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download-client.interface';
import { isTransientConnectionError } from '../utils/connection-errors';
/**
* Process monitor download job
@@ -20,6 +21,12 @@ import { CLIENT_PROTOCOL_MAP, DownloadClientType } from '../interfaces/download-
const BASE_POLL_INTERVAL = 10;
/** Maximum polling interval in seconds (5 minutes) */
const MAX_POLL_INTERVAL = 300;
/**
* Maximum consecutive connection failures before permanently failing the download.
* With exponential backoff (10s base, 300s cap), 30 failures spans roughly 30-45 minutes —
* enough to survive a Docker restart, service update, or transient network outage.
*/
const MAX_CONNECTION_FAILURES = 30;
/**
* Compute next poll delay with exponential backoff for stalled downloads.
@@ -32,7 +39,8 @@ function getBackoffDelay(stallCount: number): number {
export async function processMonitorDownload(payload: MonitorDownloadPayload): Promise<any> {
const { requestId, downloadHistoryId, downloadClientId, downloadClient, jobId,
lastProgress: prevProgress, stallCount: prevStallCount, pathWaitCount: prevPathWaitCount } = payload;
lastProgress: prevProgress, stallCount: prevStallCount, pathWaitCount: prevPathWaitCount,
connectionFailureCount: prevConnectionFailures } = payload;
const logger = RMABLogger.forJob(jobId, 'MonitorDownload');
@@ -288,51 +296,99 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
} catch (error) {
logger.error(`Error: ${error instanceof Error ? error.message : 'Unknown error'}`);
// Check if this is a transient "not found" error
const errorMessage = error instanceof Error ? error.message : '';
const isNotFound = errorMessage.includes('not found');
const isConnectionError = isTransientConnectionError(error);
if (isNotFound) {
// Transient error - don't mark request as failed, let Bull retry
// The request stays in 'downloading' status until Bull exhausts all retries
// PATH 1: "Not found" — transient race condition.
// Don't mark request as failed; let Bull retry the same job.
logger.warn(`Transient error for request ${requestId}, allowing Bull to retry`);
} else {
// Permanent error - mark request as failed immediately
const failureMessage = errorMessage || 'Monitor download failed';
await prisma.request.update({
where: { id: requestId },
data: {
status: 'failed',
errorMessage: failureMessage,
updatedAt: new Date(),
},
});
throw error;
}
// Send notification for request failure
const request = await prisma.request.findUnique({
where: { id: requestId },
include: {
audiobook: true,
user: { select: { plexUsername: true } },
},
});
if (isConnectionError) {
// PATH 2: Connection failure — download client is temporarily unreachable.
// Instead of failing the download, self-schedule the next poll with backoff.
// This reuses the same adaptive backoff as stalled downloads, giving the
// client time to recover (restart, network blip, update, etc.).
const failureCount = (prevConnectionFailures ?? 0) + 1;
if (failureCount >= MAX_CONNECTION_FAILURES) {
// Exhausted patience — treat as permanent failure
logger.error(
`Download client unreachable for ${failureCount} consecutive checks, giving up on request ${requestId}`
);
// Fall through to permanent failure handling below
} else {
const delay = getBackoffDelay(failureCount);
logger.warn(
`Download client unreachable (${failureCount}/${MAX_CONNECTION_FAILURES}), ` +
`retrying in ${delay}s for request ${requestId}`,
{ error: errorMessage }
);
if (request) {
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_error',
request.id,
request.audiobook.title,
request.audiobook.author,
request.user.plexUsername || 'Unknown User',
failureMessage
).catch((error) => {
logger.error('Failed to queue notification', { error: error instanceof Error ? error.message : String(error) });
});
await jobQueue.addMonitorJob(
requestId,
downloadHistoryId,
downloadClientId,
downloadClient,
delay,
prevProgress,
prevStallCount ?? 0,
prevPathWaitCount,
failureCount
);
// Return success — the monitoring loop continues via the new job.
// Do NOT throw: that would trigger Bull's retry on this job as well.
return {
success: true,
completed: false,
message: `Download client unreachable, will retry in ${delay}s`,
requestId,
connectionFailureCount: failureCount,
};
}
}
// Rethrow to trigger Bull's retry mechanism
// PATH 3: Permanent error (or connection failures exhausted).
// Mark request as failed immediately.
const failureMessage = errorMessage || 'Monitor download failed';
await prisma.request.update({
where: { id: requestId },
data: {
status: 'failed',
errorMessage: failureMessage,
updatedAt: new Date(),
},
});
// Send notification for request failure
const request = await prisma.request.findUnique({
where: { id: requestId },
include: {
audiobook: true,
user: { select: { plexUsername: true } },
},
});
if (request) {
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_error',
request.id,
request.audiobook.title,
request.audiobook.author,
request.user.plexUsername || 'Unknown User',
failureMessage
).catch((notifError) => {
logger.error('Failed to queue notification', { error: notifError instanceof Error ? notifError.message : String(notifError) });
});
}
// Rethrow to trigger Bull's retry mechanism as a safety net
throw error;
}
}
+74 -2
View File
@@ -22,7 +22,7 @@ import { removeEmptyParentDirectories } from '../utils/cleanup-helpers';
* Handles both audiobook and ebook request types with appropriate branching
*/
export async function processOrganizeFiles(payload: OrganizeFilesPayload): Promise<any> {
const { requestId, audiobookId, downloadPath, jobId } = payload;
const { requestId, audiobookId, downloadPath, jobId, cleanupSource } = payload;
const logger = RMABLogger.forJob(jobId, 'OrganizeFiles');
@@ -264,6 +264,11 @@ export async function processOrganizeFiles(payload: OrganizeFilesPayload): Promi
// Cleanup downloads if configured (uses IDownloadClient.postProcess for client-specific cleanup)
await cleanupDownloadAfterOrganize(requestId, downloadPath, configService, jobId, logger);
// Cleanup source files if requested (manual import feature)
if (cleanupSource) {
await cleanupSourceAfterOrganize(downloadPath, configService, jobId, logger);
}
return {
success: true,
message: 'Files organized successfully',
@@ -467,7 +472,7 @@ async function processEbookOrganization(
request: { id: string; userId: string; type: string; user: { plexUsername: string | null } },
logger: RMABLogger
): Promise<any> {
const { requestId, audiobookId, downloadPath, jobId } = payload;
const { requestId, audiobookId, downloadPath, jobId, cleanupSource } = payload;
logger.info(`Processing ebook organization for request ${requestId}`);
@@ -726,6 +731,11 @@ async function processEbookOrganization(
// Cleanup downloads if configured (uses IDownloadClient.postProcess for client-specific cleanup)
await cleanupDownloadAfterOrganize(requestId, downloadPath, configService, jobId, logger);
// Cleanup source files if requested (manual import feature)
if (cleanupSource) {
await cleanupSourceAfterOrganize(downloadPath, configService, jobId, logger);
}
return {
success: true,
message: 'Ebook organized successfully',
@@ -1003,6 +1013,68 @@ async function cleanupDownloadAfterOrganize(
}
}
// =========================================================================
// SOURCE FILE CLEANUP (MANUAL IMPORT)
// =========================================================================
/**
* Delete source files after successful manual import.
* Non-fatal: logs a warning on failure but does not fail the job.
* Files are already safely copied to the media library at this point.
*/
async function cleanupSourceAfterOrganize(
downloadPath: string,
configService: any,
jobId: string | undefined,
logger: RMABLogger
): Promise<void> {
try {
const fs = await import('fs/promises');
logger.info(`Cleaning up source files: ${downloadPath}`);
const stats = await fs.stat(downloadPath);
if (stats.isDirectory()) {
await fs.rm(downloadPath, { recursive: true, force: true });
logger.info(`Removed source directory: ${downloadPath}`);
} else {
await fs.unlink(downloadPath);
logger.info(`Removed source file: ${downloadPath}`);
}
// Determine boundary path based on download path prefix
const BOOKDROP_PATH = '/bookdrop';
const downloadDir = await configService.get('download_dir') || '/downloads';
const mediaDir = await configService.get('media_dir') || '/media';
let boundaryPath = downloadDir;
if (downloadPath.startsWith(BOOKDROP_PATH)) {
boundaryPath = BOOKDROP_PATH;
} else if (downloadPath.startsWith(mediaDir)) {
boundaryPath = mediaDir;
}
const cleanupResult = await removeEmptyParentDirectories(downloadPath, {
boundaryPath,
logContext: jobId ? { jobId, context: 'CleanupSourceParents' } : undefined,
});
if (cleanupResult.removedDirectories.length > 0) {
logger.info(`Cleaned up ${cleanupResult.removedDirectories.length} empty parent directories`);
}
} catch (error) {
// Non-fatal - files are already safely in the media library
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
logger.info(`Source path already deleted: ${downloadPath}`);
} else {
logger.warn(
`Failed to cleanup source files: ${error instanceof Error ? error.message : 'Unknown error'}`,
{ error: error instanceof Error ? error.stack : undefined }
);
}
}
}
// =========================================================================
// HELPER FUNCTIONS
// =========================================================================
@@ -34,6 +34,13 @@ export async function processSearchIndexers(payload: SearchIndexersPayload): Pro
},
});
// Check for custom search terms override
const requestRecord = await prisma.request.findUnique({
where: { id: requestId },
select: { customSearchTerms: true },
});
const effectiveSearchTitle = requestRecord?.customSearchTerms || audiobook.title;
// Get enabled indexers from configuration
const { getConfigService } = await import('../services/config.service');
const configService = getConfigService();
@@ -77,7 +84,11 @@ export async function processSearchIndexers(payload: SearchIndexersPayload): Pro
// Get Prowlarr service
const prowlarr = await getProwlarrService();
logger.info(`Searching for: "${audiobook.title}" by "${audiobook.author}"`);
if (requestRecord?.customSearchTerms) {
logger.info(`Searching with custom terms: "${effectiveSearchTitle}" (original: "${audiobook.title}") by "${audiobook.author}"`);
} else {
logger.info(`Searching for: "${audiobook.title}" by "${audiobook.author}"`);
}
// Search Prowlarr for each group and combine results
const allResults = [];
@@ -87,7 +98,7 @@ export async function processSearchIndexers(payload: SearchIndexersPayload): Pro
logger.info(`Searching group ${i + 1}/${groups.length}: ${getGroupDescription(group)}`);
try {
const groupResults = await prowlarr.searchWithVariations(audiobook.title, audiobook.author, {
const groupResults = await prowlarr.searchWithVariations(effectiveSearchTitle, audiobook.author, {
categories: group.categories,
indexerIds: group.indexerIds,
minSeeders: 1, // Only torrents with at least 1 seeder
+31 -2
View File
@@ -66,6 +66,7 @@ export interface MonitorDownloadPayload extends JobPayload {
lastProgress?: number; // Previous poll's progress (0-100) for stall detection
stallCount?: number; // Consecutive polls with no progress change (drives backoff)
pathWaitCount?: number; // Consecutive polls waiting for content_path to relocate to save_path
connectionFailureCount?: number; // Consecutive polls where the download client was unreachable
}
export interface OrganizeFilesPayload extends JobPayload {
@@ -73,6 +74,7 @@ export interface OrganizeFilesPayload extends JobPayload {
audiobookId: string;
downloadPath: string;
targetPath?: string; // Optional - not used by processor (reads from database config)
cleanupSource?: boolean; // If true, delete source files after successful import
}
export interface ScanPlexPayload extends JobPayload {
@@ -259,6 +261,29 @@ export class JobQueueService {
logger.error('Failed to update request/download status', { error: updateError instanceof Error ? updateError.message : String(updateError) });
}
}
// Safety net for download_torrent: if the processor skipped marking the
// request as failed (e.g. connection error with Bull retries), ensure the
// request is marked failed after all retries are exhausted.
if (job.name === 'download_torrent' && job.data) {
const payload = job.data as DownloadTorrentPayload;
logger.error(`DownloadTorrent job permanently failed for request ${payload.requestId} after ${job.attemptsMade} attempts`);
try {
await prisma.request.update({
where: { id: payload.requestId },
data: {
status: 'failed',
errorMessage: error.message || 'Failed to add download after multiple retries',
updatedAt: new Date(),
},
});
} catch (updateError) {
logger.error('Failed to update request status after download_torrent failure', {
error: updateError instanceof Error ? updateError.message : String(updateError),
});
}
}
});
this.queue.on('stalled', async (job: BullJob) => {
@@ -569,7 +594,8 @@ export class JobQueueService {
delaySeconds: number = 0,
lastProgress?: number,
stallCount?: number,
pathWaitCount?: number
pathWaitCount?: number,
connectionFailureCount?: number
): Promise<string> {
return await this.addJob(
'monitor_download',
@@ -581,6 +607,7 @@ export class JobQueueService {
lastProgress,
stallCount,
pathWaitCount,
connectionFailureCount,
} as MonitorDownloadPayload,
{
priority: 5, // Medium priority
@@ -597,7 +624,8 @@ export class JobQueueService {
requestId: string,
audiobookId: string,
downloadPath: string,
targetPath?: string
targetPath?: string,
cleanupSource?: boolean
): Promise<string> {
return await this.addJob(
'organize_files',
@@ -606,6 +634,7 @@ export class JobQueueService {
audiobookId,
downloadPath,
targetPath, // Not used by processor
cleanupSource,
} as OrganizeFilesPayload,
{
priority: 8,
@@ -45,13 +45,31 @@ export class AppriseProvider implements INotificationProvider {
const meta = getEventMeta(payload.event);
const { title, body } = this.formatMessage(payload);
const serverUrl = appriseConfig.serverUrl.replace(/\/+$/, '');
const notificationType = SEVERITY_TYPES[meta.severity];
// Parse URL to extract embedded HTTP Basic Auth credentials (e.g. https://user:pass@host/)
let serverUrl: string;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
try {
const parsed = new URL(appriseConfig.serverUrl);
if (parsed.username) {
const username = decodeURIComponent(parsed.username);
const password = decodeURIComponent(parsed.password);
headers['Authorization'] = `Basic ${Buffer.from(`${username}:${password}`).toString('base64')}`;
parsed.username = '';
parsed.password = '';
serverUrl = parsed.toString().replace(/\/+$/, '');
} else {
serverUrl = appriseConfig.serverUrl.replace(/\/+$/, '');
}
} catch {
serverUrl = appriseConfig.serverUrl.replace(/\/+$/, '');
}
const notificationType = SEVERITY_TYPES[meta.severity];
// Explicit authToken (Bearer) takes precedence over URL-embedded credentials
if (appriseConfig.authToken) {
headers['Authorization'] = `Bearer ${appriseConfig.authToken}`;
}
+80
View File
@@ -0,0 +1,80 @@
/**
* Component: Connection Error Classification Utility
* Documentation: documentation/phase3/README.md
*
* Classifies errors as transient connection failures (e.g. download client
* restarting, network blip) vs permanent failures. Used by download
* processors to decide whether to retry with backoff or fail immediately.
*/
/** Node/Axios error codes that indicate the remote service is temporarily unreachable. */
const TRANSIENT_ERROR_CODES = new Set([
'ECONNREFUSED',
'ECONNRESET',
'ECONNABORTED',
'ETIMEDOUT',
'ENOTFOUND',
'EHOSTUNREACH',
'ENETUNREACH',
'EPIPE',
'EAI_AGAIN',
]);
/** HTTP status codes that indicate a gateway / upstream service issue. */
const TRANSIENT_HTTP_STATUSES = new Set([502, 503, 504]);
/**
* Substrings in error messages that strongly indicate a connection-level
* failure. Checked as a fallback when structured error properties are
* unavailable (e.g. errors re-thrown as plain Error with a message string).
*/
const TRANSIENT_MESSAGE_PATTERNS = [
'ECONNREFUSED',
'ECONNRESET',
'ECONNABORTED',
'ETIMEDOUT',
'ENOTFOUND',
'EHOSTUNREACH',
'ENETUNREACH',
'EPIPE',
'EAI_AGAIN',
'connect ECONNREFUSED',
'socket hang up',
'network error',
'Client network socket disconnected',
] as const;
/**
* Returns `true` when the error looks like a transient connection failure
* rather than a permanent / logical error.
*
* Checks (in order):
* 1. `error.code` — Node.js / Axios error codes
* 2. `error.response.status` — HTTP gateway errors (502/503/504)
* 3. `error.message` — fallback substring matching
*/
export function isTransientConnectionError(error: unknown): boolean {
if (!error) return false;
// 1. Structured error code (Node.js / Axios)
const code = (error as any)?.code;
if (typeof code === 'string' && TRANSIENT_ERROR_CODES.has(code)) {
return true;
}
// 2. HTTP gateway status from Axios response
const status = (error as any)?.response?.status;
if (typeof status === 'number' && TRANSIENT_HTTP_STATUSES.has(status)) {
return true;
}
// 3. Fallback: substring match on the error message
const message = (error instanceof Error ? error.message : String(error)).toUpperCase();
for (const pattern of TRANSIENT_MESSAGE_PATTERNS) {
if (message.includes(pattern.toUpperCase())) {
return true;
}
}
return false;
}