||
- import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
- import { ConfigService } from './config.service';
- import { DbService } from './db.service';
- import { EventsGateway } from './events.gateway';
- import { HandbrakeService } from './handbrake.service';
- interface Task {
- id: number;
- type: string;
- status: string;
- progress: number;
- dataset?: string;
- input?: string;
- output?: string;
- preset?: string;
- priority: number;
- retry_count?: number;
- max_retries?: number;
- error_message?: string;
- created_at: string;
- updated_at: string;
- }
- interface QueueSettings {
- batchSize: number;
- concurrency: number;
- retryEnabled: boolean;
- maxRetries: number;
- retryDelay: number; // in milliseconds
- processingInterval: number; // in milliseconds
- }
- export type { QueueSettings };
- @Injectable()
- export class TaskQueueService implements OnModuleInit {
- private logger = new Logger('TaskQueueService');
- private isProcessing = false;
- private processingInterval: NodeJS.Timeout | null = null;
- private activeTasks = new Set<number>();
- private queueSettings: QueueSettings;
- constructor(
- private readonly db: DbService,
- private readonly handbrake: HandbrakeService,
- private readonly eventsGateway: EventsGateway,
- private readonly config: ConfigService,
- ) {
- this.loadQueueSettings();
- }
- private loadQueueSettings() {
- const settings = this.config.getSettings('queue', {});
- this.queueSettings = {
- batchSize: settings.batchSize || 10,
- concurrency: settings.concurrency || 1,
- retryEnabled: settings.retryEnabled !== false, // default true
- maxRetries: settings.maxRetries || 3,
- retryDelay: settings.retryDelay || 30000, // 30 seconds default
- processingInterval: settings.processingInterval || 5000, // 5 seconds default
- };
- this.logger.log('Loaded queue settings:', this.queueSettings);
- }
- updateQueueSettings(settings: Partial<QueueSettings>) {
- this.queueSettings = { ...this.queueSettings, ...settings };
- // Save to config
- const currentSettings = this.config.getSettings('queue', {});
- this.config.setSettings({
- queue: { ...currentSettings, ...settings },
- });
- this.logger.log('Updated queue settings:', this.queueSettings);
- // Restart processing with new interval if changed
- if (settings.processingInterval && this.processingInterval) {
- this.stopProcessing();
- this.startProcessing();
- }
- }
- getQueueSettings(): QueueSettings {
- return { ...this.queueSettings };
- }
- onModuleInit() {
- this.startProcessing();
- }
- startProcessing() {
- if (this.processingInterval) {
- this.logger.warn('Task queue processing already running');
- return;
- }
- this.logger.log('Starting automatic task processing');
- this.processingInterval = setInterval(() => {
- this.processPendingTasks();
- }, this.queueSettings.processingInterval);
- }
- stopProcessing() {
- if (this.processingInterval) {
- clearInterval(this.processingInterval);
- this.processingInterval = null;
- this.logger.log('Stopped automatic task processing');
- }
- }
- private async processPendingTasks() {
- if (this.isProcessing) {
- return; // Already processing
- }
- try {
- this.isProcessing = true;
- // Check for tasks that need retry
- await this.processRetryTasks();
- // Get pending tasks up to batch size
- const pendingTasks = this.db.getPendingTasks(
- this.queueSettings.batchSize,
- ) as Task[];
- if (pendingTasks.length === 0) {
- return; // No tasks to process
- }
- // Process tasks up to concurrency limit
- const processingPromises: Promise<void>[] = [];
- const tasksToProcess = pendingTasks.slice(
- 0,
- this.queueSettings.concurrency,
- );
- for (const task of tasksToProcess) {
- if (this.activeTasks.size >= this.queueSettings.concurrency) {
- break; // Respect concurrency limit
- }
- if (!task.input || !task.output || !task.preset) {
- this.logger.error(`Task ${task.id} is missing required fields`);
- this.db.updateTask(task.id, {
- status: 'failed',
- error_message: 'Missing required fields: input, output, or preset',
- });
- continue;
- }
- // Mark task as processing
- this.db.updateTask(task.id, { status: 'processing' });
- this.activeTasks.add(task.id);
- // Emit task update
- this.eventsGateway.emitTaskUpdate({
- type: 'started',
- taskId: task.id,
- task: 'handbrake',
- input: task.input,
- output: task.output,
- preset: task.preset,
- });
- // Process task asynchronously
- const processPromise = this.processTask(task);
- processingPromises.push(processPromise);
- }
- // Wait for all concurrent tasks to complete
- await Promise.allSettled(processingPromises);
- } catch (error) {
- this.logger.error(`Error in processPendingTasks: ${error.message}`);
- } finally {
- this.isProcessing = false;
- }
- }
- private async processRetryTasks() {
- if (!this.queueSettings.retryEnabled) {
- return;
- }
- try {
- // Get failed tasks that haven't exceeded max retries
- const failedTasks = (this.db.getAllTasks() as Task[]).filter(
- (task) =>
- task.status === 'failed' &&
- (task.retry_count || 0) < this.queueSettings.maxRetries,
- );
- for (const task of failedTasks) {
- const retryCount = (task.retry_count || 0) + 1;
- const lastUpdate = new Date(task.updated_at);
- const timeSinceFailure = Date.now() - lastUpdate.getTime();
- // Check if enough time has passed for retry
- if (timeSinceFailure >= this.queueSettings.retryDelay) {
- this.logger.log(`Retrying task ${task.id} (attempt ${retryCount})`);
- // Reset task for retry
- this.db.updateTask(task.id, {
- status: 'pending',
- progress: 0,
- retry_count: retryCount,
- error_message: undefined,
- });
- // Emit retry event
- this.eventsGateway.emitTaskUpdate({
- type: 'retry',
- taskId: task.id,
- task: 'handbrake',
- retryCount,
- });
- }
- }
- } catch (error) {
- this.logger.error(`Error in processRetryTasks: ${error.message}`);
- }
- }
- private async processTask(task: Task): Promise<void> {
- try {
- // Process the file
- const success = await this.handbrake.processWithHandbrake(
- task.input!,
- task.output!,
- task.preset!,
- task.id,
- );
- if (success) {
- // Update task status
- this.db.updateTask(task.id, { status: 'completed', progress: 100 });
- // Update file status if it exists
- if (task.dataset) {
- this.db.setFile(task.dataset, task.input!, {
- status: 'success',
- date: new Date().toISOString(),
- });
- }
- // Emit completion event
- this.eventsGateway.emitTaskUpdate({
- type: 'completed',
- taskId: task.id,
- task: 'handbrake',
- input: task.input,
- output: task.output,
- preset: task.preset,
- success: true,
- });
- this.logger.log(`Task ${task.id} completed successfully`);
- } else {
- throw new Error('Handbrake processing failed');
- }
- } catch (error) {
- const retryCount = task.retry_count || 0;
- if (
- this.queueSettings.retryEnabled &&
- retryCount < this.queueSettings.maxRetries
- ) {
- // Mark for retry
- this.db.updateTask(task.id, {
- status: 'failed',
- error_message: error.message,
- retry_count: retryCount + 1,
- });
- this.logger.warn(
- `Task ${task.id} failed, will retry (attempt ${retryCount + 1}): ${error.message}`,
- );
- } else {
- // Final failure
- this.db.updateTask(task.id, {
- status: 'failed',
- error_message: error.message,
- });
- // Update file status if it exists
- if (task.dataset) {
- this.db.setFile(task.dataset, task.input!, {
- status: 'error',
- date: new Date().toISOString(),
- });
- }
- this.logger.error(
- `Task ${task.id} failed permanently: ${error.message}`,
- );
- }
- // Emit failure event
- this.eventsGateway.emitTaskUpdate({
- type: 'failed',
- taskId: task.id,
- task: 'handbrake',
- input: task.input,
- output: task.output,
- preset: task.preset,
- success: false,
- error: error.message,
- retryCount: retryCount + 1,
- maxRetries: this.queueSettings.maxRetries,
- });
- } finally {
- // Remove from active tasks
- this.activeTasks.delete(task.id);
- }
- }
- // Manual task creation (for requeueing from web interface)
- createTask(taskData: {
- dataset: string;
- input: string;
- output: string;
- preset: string;
- priority?: number;
- }) {
- // Check if file already exists in database
- const existingFile = this.db.findFile(taskData.dataset, taskData.input);
- if (!existingFile) {
- // Create file record
- this.db.setFile(taskData.dataset, taskData.input, {
- output: taskData.output,
- status: 'pending',
- date: new Date().toISOString(),
- });
- }
- // Create task
- const task = this.db.createTask({
- type: 'handbrake',
- status: 'pending',
- dataset: taskData.dataset,
- input: taskData.input,
- output: taskData.output,
- preset: taskData.preset,
- priority: taskData.priority || 0,
- });
- this.logger.log(`Created task ${task.id} for file: ${taskData.input}`);
- return task;
- }
- // Get queue status
- getQueueStatus() {
- const allTasks = this.db.getAllTasks() as Task[];
- const pending = allTasks.filter((t) => t.status === 'pending').length;
- const processing = allTasks.filter((t) => t.status === 'processing').length;
- const completed = allTasks.filter((t) => t.status === 'completed').length;
- const failed = allTasks.filter((t) => t.status === 'failed').length;
- return {
- isProcessing: this.isProcessing,
- activeTasks: this.activeTasks.size,
- pending,
- processing,
- completed,
- failed,
- total: allTasks.length,
- settings: this.getQueueSettings(),
- };
- }
- }
|