import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from './config.service'; import { DbService } from './db.service'; import { EventsGateway } from './events.gateway'; import { HandbrakeService } from './handbrake.service'; interface Task { id: number; type: string; status: string; progress: number; dataset?: string; input?: string; output?: string; preset?: string; priority: number; retry_count?: number; max_retries?: number; error_message?: string; created_at: string; updated_at: string; } interface QueueSettings { batchSize: number; concurrency: number; retryEnabled: boolean; maxRetries: number; retryDelay: number; // in milliseconds processingInterval: number; // in milliseconds } export type { QueueSettings }; @Injectable() export class TaskQueueService implements OnModuleInit { private logger = new Logger('TaskQueueService'); private isProcessing = false; private processingInterval: NodeJS.Timeout | null = null; private activeTasks = new Set(); private queueSettings: QueueSettings; constructor( private readonly db: DbService, private readonly handbrake: HandbrakeService, private readonly eventsGateway: EventsGateway, private readonly config: ConfigService, ) { this.loadQueueSettings(); } private loadQueueSettings() { const settings = this.config.getSettings('queue', {}); this.queueSettings = { batchSize: settings.batchSize || 10, concurrency: settings.concurrency || 1, retryEnabled: settings.retryEnabled !== false, // default true maxRetries: settings.maxRetries || 3, retryDelay: settings.retryDelay || 30000, // 30 seconds default processingInterval: settings.processingInterval || 5000, // 5 seconds default }; this.logger.log('Loaded queue settings:', this.queueSettings); } updateQueueSettings(settings: Partial) { this.queueSettings = { ...this.queueSettings, ...settings }; // Save to config const currentSettings = this.config.getSettings('queue', {}); this.config.setSettings({ queue: { ...currentSettings, ...settings }, }); this.logger.log('Updated queue settings:', this.queueSettings); // Restart processing with new interval if changed if (settings.processingInterval && this.processingInterval) { this.stopProcessing(); this.startProcessing(); } } getQueueSettings(): QueueSettings { return { ...this.queueSettings }; } onModuleInit() { this.startProcessing(); } startProcessing() { if (this.processingInterval) { this.logger.warn('Task queue processing already running'); return; } this.logger.log('Starting automatic task processing'); this.processingInterval = setInterval(() => { this.processPendingTasks(); }, this.queueSettings.processingInterval); } stopProcessing() { if (this.processingInterval) { clearInterval(this.processingInterval); this.processingInterval = null; this.logger.log('Stopped automatic task processing'); } } private async processPendingTasks() { if (this.isProcessing) { return; // Already processing } try { this.isProcessing = true; // Check for tasks that need retry await this.processRetryTasks(); // Get pending tasks up to batch size const pendingTasks = this.db.getPendingTasks( this.queueSettings.batchSize, ) as Task[]; if (pendingTasks.length === 0) { return; // No tasks to process } // Process tasks up to concurrency limit const processingPromises: Promise[] = []; const tasksToProcess = pendingTasks.slice( 0, this.queueSettings.concurrency, ); for (const task of tasksToProcess) { if (this.activeTasks.size >= this.queueSettings.concurrency) { break; // Respect concurrency limit } if (!task.input || !task.output || !task.preset) { this.logger.error(`Task ${task.id} is missing required fields`); this.db.updateTask(task.id, { status: 'failed', error_message: 'Missing required fields: input, output, or preset', }); continue; } // Mark task as processing this.db.updateTask(task.id, { status: 'processing' }); this.activeTasks.add(task.id); // Emit task update this.eventsGateway.emitTaskUpdate({ type: 'started', taskId: task.id, task: 'handbrake', input: task.input, output: task.output, preset: task.preset, }); // Process task asynchronously const processPromise = this.processTask(task); processingPromises.push(processPromise); } // Wait for all concurrent tasks to complete await Promise.allSettled(processingPromises); } catch (error) { this.logger.error(`Error in processPendingTasks: ${error.message}`); } finally { this.isProcessing = false; } } private async processRetryTasks() { if (!this.queueSettings.retryEnabled) { return; } try { // Get failed tasks that haven't exceeded max retries const failedTasks = (this.db.getAllTasks() as Task[]).filter( (task) => task.status === 'failed' && (task.retry_count || 0) < this.queueSettings.maxRetries, ); for (const task of failedTasks) { const retryCount = (task.retry_count || 0) + 1; const lastUpdate = new Date(task.updated_at); const timeSinceFailure = Date.now() - lastUpdate.getTime(); // Check if enough time has passed for retry if (timeSinceFailure >= this.queueSettings.retryDelay) { this.logger.log(`Retrying task ${task.id} (attempt ${retryCount})`); // Reset task for retry this.db.updateTask(task.id, { status: 'pending', progress: 0, retry_count: retryCount, error_message: undefined, }); // Emit retry event this.eventsGateway.emitTaskUpdate({ type: 'retry', taskId: task.id, task: 'handbrake', retryCount, }); } } } catch (error) { this.logger.error(`Error in processRetryTasks: ${error.message}`); } } private async processTask(task: Task): Promise { try { // Process the file const success = await this.handbrake.processWithHandbrake( task.input!, task.output!, task.preset!, task.id, ); if (success) { // Update task status this.db.updateTask(task.id, { status: 'completed', progress: 100 }); // Update file status if it exists if (task.dataset) { this.db.setFile(task.dataset, task.input!, { status: 'success', date: new Date().toISOString(), }); } // Emit completion event this.eventsGateway.emitTaskUpdate({ type: 'completed', taskId: task.id, task: 'handbrake', input: task.input, output: task.output, preset: task.preset, success: true, }); this.logger.log(`Task ${task.id} completed successfully`); } else { throw new Error('Handbrake processing failed'); } } catch (error) { const retryCount = task.retry_count || 0; if ( this.queueSettings.retryEnabled && retryCount < this.queueSettings.maxRetries ) { // Mark for retry this.db.updateTask(task.id, { status: 'failed', error_message: error.message, retry_count: retryCount + 1, }); this.logger.warn( `Task ${task.id} failed, will retry (attempt ${retryCount + 1}): ${error.message}`, ); } else { // Final failure this.db.updateTask(task.id, { status: 'failed', error_message: error.message, }); // Update file status if it exists if (task.dataset) { this.db.setFile(task.dataset, task.input!, { status: 'error', date: new Date().toISOString(), }); } this.logger.error( `Task ${task.id} failed permanently: ${error.message}`, ); } // Emit failure event this.eventsGateway.emitTaskUpdate({ type: 'failed', taskId: task.id, task: 'handbrake', input: task.input, output: task.output, preset: task.preset, success: false, error: error.message, retryCount: retryCount + 1, maxRetries: this.queueSettings.maxRetries, }); } finally { // Remove from active tasks this.activeTasks.delete(task.id); } } // Manual task creation (for requeueing from web interface) createTask(taskData: { dataset: string; input: string; output: string; preset: string; priority?: number; }) { // Check if file already exists in database const existingFile = this.db.findFile(taskData.dataset, taskData.input); if (!existingFile) { // Create file record this.db.setFile(taskData.dataset, taskData.input, { output: taskData.output, status: 'pending', date: new Date().toISOString(), }); } // Create task const task = this.db.createTask({ type: 'handbrake', status: 'pending', dataset: taskData.dataset, input: taskData.input, output: taskData.output, preset: taskData.preset, priority: taskData.priority || 0, }); this.logger.log(`Created task ${task.id} for file: ${taskData.input}`); return task; } // Get queue status getQueueStatus() { const allTasks = this.db.getAllTasks() as Task[]; const pending = allTasks.filter((t) => t.status === 'pending').length; const processing = allTasks.filter((t) => t.status === 'processing').length; const completed = allTasks.filter((t) => t.status === 'completed').length; const failed = allTasks.filter((t) => t.status === 'failed').length; return { isProcessing: this.isProcessing, activeTasks: this.activeTasks.size, pending, processing, completed, failed, total: allTasks.length, settings: this.getQueueSettings(), }; } }