Add notification system with admin UI and backend

Introduces a full notification system with support for Discord and Pushover backends, event triggers, and message formatting. Adds backend services, processors, and API endpoints for managing notifications, as well as a new Notifications tab in the admin settings UI. Updates documentation, database schema, and tests to cover notification features and approval workflow improvements. Also changes project license from MIT to AGPL v3.
This commit is contained in:
kikootwo
2026-01-21 15:28:23 -05:00
parent ac2ad8aac2
commit dc7e557694
51 changed files with 5065 additions and 264 deletions
@@ -196,12 +196,14 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
} else if (progress.state === 'failed') {
logger.error(`Download failed for request ${requestId}`);
const errorMessage = 'Download failed in qBittorrent';
// Update request to failed
await prisma.request.update({
where: { id: requestId },
data: {
status: 'failed',
errorMessage: 'Download failed in qBittorrent',
errorMessage,
updatedAt: new Date(),
},
});
@@ -211,10 +213,33 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
where: { id: downloadHistoryId },
data: {
downloadStatus: 'failed',
downloadError: 'Download failed in qBittorrent',
downloadError: errorMessage,
},
});
// Send notification for request failure
const request = await prisma.request.findUnique({
where: { id: requestId },
include: {
audiobook: true,
user: { select: { plexUsername: true } },
},
});
if (request) {
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_error',
request.id,
request.audiobook.title,
request.audiobook.author,
request.user.plexUsername || 'Unknown User',
errorMessage
).catch((error) => {
logger.error('Failed to queue notification', { error: error instanceof Error ? error.message : String(error) });
});
}
return {
success: false,
completed: true,
@@ -266,14 +291,38 @@ export async function processMonitorDownload(payload: MonitorDownloadPayload): P
logger.warn(`Transient error for request ${requestId}, allowing Bull to retry`);
} else {
// Permanent error - mark request as failed immediately
const failureMessage = errorMessage || 'Monitor download failed';
await prisma.request.update({
where: { id: requestId },
data: {
status: 'failed',
errorMessage: errorMessage || 'Monitor download failed',
errorMessage: failureMessage,
updatedAt: new Date(),
},
});
// Send notification for request failure
const request = await prisma.request.findUnique({
where: { id: requestId },
include: {
audiobook: true,
user: { select: { plexUsername: true } },
},
});
if (request) {
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_error',
request.id,
request.audiobook.title,
request.audiobook.author,
request.user.plexUsername || 'Unknown User',
failureMessage
).catch((error) => {
logger.error('Failed to queue notification', { error: error instanceof Error ? error.message : String(error) });
});
}
}
// Rethrow to trigger Bull's retry mechanism
+49 -1
View File
@@ -253,16 +253,41 @@ export async function processOrganizeFiles(payload: OrganizeFilesPayload): Promi
// Max retries exceeded - move to warn status
logger.warn(`Max retries (${currentRequest.maxImportRetries}) exceeded for request ${requestId}, moving to warn status`);
const warnMessage = `${errorMessage}. Max retries (${currentRequest.maxImportRetries}) exceeded. Manual retry available.`;
await prisma.request.update({
where: { id: requestId },
data: {
status: 'warn',
importAttempts: newAttempts,
errorMessage: `${errorMessage}. Max retries (${currentRequest.maxImportRetries}) exceeded. Manual retry available.`,
errorMessage: warnMessage,
updatedAt: new Date(),
},
});
// Send notification for request failure
const request = await prisma.request.findUnique({
where: { id: requestId },
include: {
audiobook: true,
user: { select: { plexUsername: true } },
},
});
if (request) {
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_error',
request.id,
request.audiobook.title,
request.audiobook.author,
request.user.plexUsername || 'Unknown User',
warnMessage
).catch((error) => {
logger.error('Failed to queue notification', { error: error instanceof Error ? error.message : String(error) });
});
}
return {
success: false,
message: 'Max import retries exceeded, manual intervention required',
@@ -282,6 +307,29 @@ export async function processOrganizeFiles(payload: OrganizeFilesPayload): Promi
},
});
// Send notification for request failure
const request = await prisma.request.findUnique({
where: { id: requestId },
include: {
audiobook: true,
user: { select: { plexUsername: true } },
},
});
if (request) {
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_error',
request.id,
request.audiobook.title,
request.audiobook.author,
request.user.plexUsername || 'Unknown User',
errorMessage
).catch((error) => {
logger.error('Failed to queue notification', { error: error instanceof Error ? error.message : String(error) });
});
}
throw error;
}
}
@@ -184,7 +184,14 @@ export async function processPlexRecentlyAddedCheck(payload: PlexRecentlyAddedPa
status: { notIn: ['available', 'cancelled'] },
deletedAt: null,
},
include: { audiobook: true },
include: {
audiobook: true,
user: {
select: {
plexUsername: true,
},
},
},
take: 100,
});
@@ -237,6 +244,19 @@ export async function processPlexRecentlyAddedCheck(payload: PlexRecentlyAddedPa
},
});
// Send notification that audiobook is now available
const { getJobQueueService } = await import('../services/job-queue.service');
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_available',
request.id,
audiobook.title,
audiobook.author,
request.user.plexUsername || 'Unknown User'
).catch((error) => {
logger.error('Failed to queue notification', { error: error instanceof Error ? error.message : String(error) });
});
matchedDownloads++;
// Trigger metadata match for Audiobookshelf items (only for our downloaded requests)
+21 -1
View File
@@ -366,7 +366,14 @@ export async function processScanPlex(payload: ScanPlexPayload): Promise<any> {
status: { notIn: ['available', 'cancelled'] },
deletedAt: null,
},
include: { audiobook: true },
include: {
audiobook: true,
user: {
select: {
plexUsername: true,
},
},
},
take: 100, // Increased from 50 to handle more eligible requests
});
@@ -423,6 +430,19 @@ export async function processScanPlex(payload: ScanPlexPayload): Promise<any> {
},
});
// Send notification that audiobook is now available
const { getJobQueueService } = await import('../services/job-queue.service');
const jobQueue = getJobQueueService();
await jobQueue.addNotificationJob(
'request_available',
request.id,
audiobook.title,
audiobook.author,
request.user.plexUsername || 'Unknown User'
).catch((error) => {
logger.error('Failed to queue notification', { error: error instanceof Error ? error.message : String(error) });
});
matchedCount++;
// Trigger metadata match for Audiobookshelf items (only for our downloaded requests)
@@ -0,0 +1,55 @@
/**
* Component: Send Notification Job Processor
* Documentation: documentation/backend/services/notifications.md
*
* Processes notification jobs by calling NotificationService to send alerts
* to all enabled backends subscribed to the event.
*/
import { getNotificationService } from '../services/notification.service';
import { RMABLogger } from '../utils/logger';
export interface SendNotificationPayload {
jobId?: string;
event: 'request_pending_approval' | 'request_approved' | 'request_available' | 'request_error';
requestId: string;
title: string;
author: string;
userName: string;
message?: string;
timestamp: Date;
}
/**
* Process send notification job
* Calls NotificationService to send notifications to all enabled backends
*/
export async function processSendNotification(payload: SendNotificationPayload): Promise<void> {
const { event, requestId, title, author, userName, message, jobId } = payload;
const logger = RMABLogger.forJob(jobId, 'SendNotification');
logger.info(`Processing notification: ${event}`, { requestId });
try {
const notificationService = getNotificationService();
await notificationService.sendNotification({
event,
requestId,
title,
author,
userName,
message,
timestamp: new Date(),
});
logger.info(`Notification processed: ${event}`, { requestId });
} catch (error) {
logger.error('Failed to process notification', {
event,
requestId,
error: error instanceof Error ? error.message : String(error),
});
// Don't throw - non-blocking
}
}