Ver código fonte

fix: correct queue processing logic for batchSize and concurrency

- Fix TaskQueueService to properly handle batchSize vs concurrency
- batchSize now controls tasks pulled for consideration per cycle
- concurrency limits simultaneous active processing tasks
- Update documentation to reflect correct queue behavior
- Improve CLI help text for queue settings
Timothy Pomeroy 1 mês atrás
pai
commit
ca27d758e6

+ 1 - 1
apps/service/src/config.service.spec.ts

@@ -40,7 +40,7 @@ describe('ConfigService', () => {
   describe('constructor', () => {
   describe('constructor', () => {
     it('should create database and table on initialization', () => {
     it('should create database and table on initialization', () => {
       expect(Database).toHaveBeenCalledWith(
       expect(Database).toHaveBeenCalledWith(
-        path.resolve(__dirname, '../../../data', 'database.db'),
+        path.resolve(process.cwd(), 'data', 'database.db'),
       );
       );
       expect(mockDb.exec).toHaveBeenCalledWith(
       expect(mockDb.exec).toHaveBeenCalledWith(
         expect.stringContaining('CREATE TABLE IF NOT EXISTS settings'),
         expect.stringContaining('CREATE TABLE IF NOT EXISTS settings'),

+ 4 - 1
apps/service/src/config.service.ts

@@ -4,10 +4,13 @@ import path from 'path';
 
 
 @Injectable()
 @Injectable()
 export class ConfigService {
 export class ConfigService {
-  private dataDir = path.resolve(__dirname, '../../../data');
+  private dataDir = path.resolve(process.cwd(), 'data');
   private unifiedDbPath = path.resolve(this.dataDir, 'database.db');
   private unifiedDbPath = path.resolve(this.dataDir, 'database.db');
 
 
   constructor() {
   constructor() {
+    console.log('ConfigService: Database path:', this.unifiedDbPath);
+    console.log('ConfigService: Current working directory:', process.cwd());
+
     // Ensure database and tables exist
     // Ensure database and tables exist
     const db = new Database(this.unifiedDbPath);
     const db = new Database(this.unifiedDbPath);
     db.exec(`
     db.exec(`

+ 1 - 1
apps/service/src/datasets.service.ts

@@ -4,7 +4,7 @@ import path from 'path';
 
 
 @Injectable()
 @Injectable()
 export class DatasetsService {
 export class DatasetsService {
-  private dbPath = path.resolve(__dirname, '../../../data/database.db');
+  private dbPath = path.resolve(process.cwd(), 'data/database.db');
 
 
   /**
   /**
    * Returns all enabled dataset paths from the settings.datasets key in the database
    * Returns all enabled dataset paths from the settings.datasets key in the database

+ 22 - 3
apps/service/src/db.service.ts

@@ -20,14 +20,33 @@ export class DbService {
 
 
   constructor() {
   constructor() {
     // Use unified database for all settings/configuration
     // Use unified database for all settings/configuration
-    const rootDataPath = path.resolve(__dirname, '../../../data/database.db');
+    const rootDataPath = path.resolve(process.cwd(), 'data/database.db');
+    console.log('Database path:', rootDataPath);
+    console.log('Current working directory:', process.cwd());
+
     // Ensure the directory exists
     // Ensure the directory exists
     const dir = path.dirname(rootDataPath);
     const dir = path.dirname(rootDataPath);
+    console.log('Database directory:', dir);
+
     if (!fs.existsSync(dir)) {
     if (!fs.existsSync(dir)) {
+      console.log('Creating database directory...');
       fs.mkdirSync(dir, { recursive: true });
       fs.mkdirSync(dir, { recursive: true });
     }
     }
-    this.db = new Database(rootDataPath);
-    this.migrate();
+
+    try {
+      this.db = new Database(rootDataPath);
+      console.log('Database opened successfully');
+      this.migrate();
+    } catch (error) {
+      console.error('Failed to open database:', error);
+      console.error('Database path:', rootDataPath);
+      console.error('Directory exists:', fs.existsSync(dir));
+      console.error(
+        'Directory permissions:',
+        fs.statSync(dir).mode.toString(8),
+      );
+      throw error;
+    }
   }
   }
 
 
   /**
   /**

+ 66 - 7
apps/service/src/task-queue.service.ts

@@ -1,4 +1,5 @@
 import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
 import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import * as fs from 'fs';
 import { ConfigService } from './config.service';
 import { ConfigService } from './config.service';
 import { DbService } from './db.service';
 import { DbService } from './db.service';
 import { EventsGateway } from './events.gateway';
 import { EventsGateway } from './events.gateway';
@@ -83,7 +84,8 @@ export class TaskQueueService implements OnModuleInit {
   }
   }
 
 
   onModuleInit() {
   onModuleInit() {
-    this.startProcessing();
+    // Don't start processing automatically - wait for explicit start command
+    this.logger.log('Task queue initialized - processing stopped by default');
   }
   }
 
 
   startProcessing() {
   startProcessing() {
@@ -106,6 +108,17 @@ export class TaskQueueService implements OnModuleInit {
     }
     }
   }
   }
 
 
+  // Public API methods
+  start() {
+    this.startProcessing();
+    return { started: true };
+  }
+
+  stop() {
+    this.stopProcessing();
+    return { stopped: true };
+  }
+
   private async processPendingTasks() {
   private async processPendingTasks() {
     if (this.isProcessing) {
     if (this.isProcessing) {
       return; // Already processing
       return; // Already processing
@@ -126,14 +139,10 @@ export class TaskQueueService implements OnModuleInit {
         return; // No tasks to process
         return; // No tasks to process
       }
       }
 
 
-      // Process tasks up to concurrency limit
+      // Process tasks up to batch size, respecting concurrency limit
       const processingPromises: Promise<void>[] = [];
       const processingPromises: Promise<void>[] = [];
-      const tasksToProcess = pendingTasks.slice(
-        0,
-        this.queueSettings.concurrency,
-      );
 
 
-      for (const task of tasksToProcess) {
+      for (const task of pendingTasks) {
         if (this.activeTasks.size >= this.queueSettings.concurrency) {
         if (this.activeTasks.size >= this.queueSettings.concurrency) {
           break; // Respect concurrency limit
           break; // Respect concurrency limit
         }
         }
@@ -221,6 +230,42 @@ export class TaskQueueService implements OnModuleInit {
 
 
   private async processTask(task: Task): Promise<void> {
   private async processTask(task: Task): Promise<void> {
     try {
     try {
+      // Check if output file already exists and skip if it does (unless requeued)
+      if (task.output && fs.existsSync(task.output)) {
+        // Check if this task was requeued (has retry_count > 0 or was manually requeued)
+        const wasRequeued =
+          (task.retry_count || 0) > 0 || task.status === 'requeued';
+
+        if (!wasRequeued) {
+          // Skip processing - file already exists
+          this.db.updateTask(task.id, { status: 'skipped', progress: 100 });
+
+          // Update file status if it exists
+          if (task.dataset) {
+            this.db.setFile(task.dataset, task.input!, {
+              status: 'success',
+              date: new Date().toISOString(),
+            });
+          }
+
+          // Emit skipped event
+          this.eventsGateway.emitTaskUpdate({
+            type: 'skipped',
+            taskId: task.id,
+            task: 'handbrake',
+            input: task.input,
+            output: task.output,
+            preset: task.preset,
+            success: true,
+          });
+
+          this.logger.log(
+            `Task ${task.id} skipped - output file already exists: ${task.output}`,
+          );
+          return;
+        }
+      }
+
       // Process the file
       // Process the file
       const success = await this.handbrake.processWithHandbrake(
       const success = await this.handbrake.processWithHandbrake(
         task.input!,
         task.input!,
@@ -365,4 +410,18 @@ export class TaskQueueService implements OnModuleInit {
       settings: this.getQueueSettings(),
       settings: this.getQueueSettings(),
     };
     };
   }
   }
+
+  // Get task by input file
+  getTaskByInput(input: string): Task | undefined {
+    return this.db.getTaskByInput(input) as Task | undefined;
+  }
+
+  // Update task status
+  updateTaskStatus(taskId: number, status: string, errorMessage?: string) {
+    const updateData: any = { status };
+    if (errorMessage) {
+      updateData.error_message = errorMessage;
+    }
+    return this.db.updateTask(taskId, updateData);
+  }
 }
 }

+ 12 - 6
docs/architecture.md

@@ -255,14 +255,20 @@ flowchart TD
 
 
 ### Advanced Queue Configuration
 ### Advanced Queue Configuration
 
 
-The TaskQueueService supports configurable processing parameters:
+The TaskQueueService supports configurable processing parameters that control how tasks are managed and processed:
 
 
-- **Batch Size**: Number of tasks processed per batch (default: 10)
-- **Concurrency**: Maximum simultaneous tasks (default: 1)
+- **Batch Size** (`batchSize`): Number of tasks pulled from the queue for consideration in each processing cycle (default: 10). This determines how many pending tasks are evaluated for processing at once.
+- **Concurrency** (`concurrency`): Maximum number of tasks that can be actively processing simultaneously (default: 1). This limits parallel execution to prevent system overload.
 - **Retry Logic**: Automatic retry for failed tasks (default: enabled)
 - **Retry Logic**: Automatic retry for failed tasks (default: enabled)
-- **Max Retries**: Maximum retry attempts (default: 3)
-- **Retry Delay**: Delay between retry attempts (default: 30 seconds)
-- **Processing Interval**: How often to check for new tasks (default: 5 seconds)
+- **Max Retries** (`maxRetries`): Maximum retry attempts for failed tasks (default: 3). Total attempts = maxRetries + 1 (original attempt + retries).
+- **Retry Delay** (`retryDelay`): Delay between retry attempts in milliseconds (default: 30 seconds)
+- **Processing Interval** (`processingInterval`): How often to check for new tasks in milliseconds (default: 5 seconds)
+
+**Queue Processing Behavior:**
+- In each processing cycle, up to `batchSize` pending tasks are retrieved from the database
+- Tasks are started for processing only if the number of currently active tasks is below `concurrency`
+- If `batchSize = 3` and `concurrency = 1`, the system will pull 3 tasks but only process 1 at a time
+- Remaining tasks from the batch will be processed in subsequent cycles as active tasks complete
 
 
 Queue settings are persisted in the database and can be updated via API or CLI.
 Queue settings are persisted in the database and can be updated via API or CLI.
 
 

+ 6 - 6
docs/cli.md

@@ -102,12 +102,12 @@ watch-finished-cli task:queue:settings:update --retry-enabled false
 
 
 **Options:**
 **Options:**
 
 
-- `--batch-size <size>`: Number of tasks to process in each batch
-- `--concurrency <count>`: Maximum number of concurrent tasks
-- `--retry-enabled <true|false>`: Enable/disable automatic retries
-- `--max-retries <count>`: Maximum retry attempts for failed tasks
-- `--retry-delay <ms>`: Delay between retry attempts in milliseconds
-- `--processing-interval <ms>`: How often to check for new tasks in milliseconds
+- `--batch-size <size>`: Number of tasks pulled from queue for consideration in each processing cycle (default: 10). Controls how many pending tasks are evaluated at once.
+- `--concurrency <count>`: Maximum number of tasks that can process simultaneously (default: 1). Limits parallel execution to prevent system overload.
+- `--retry-enabled <true|false>`: Enable/disable automatic retries for failed tasks (default: true)
+- `--max-retries <count>`: Maximum retry attempts for failed tasks (default: 3). Total attempts = maxRetries + 1.
+- `--retry-delay <ms>`: Delay between retry attempts in milliseconds (default: 30000)
+- `--processing-interval <ms>`: How often to check for new tasks in milliseconds (default: 5000)
 
 
 ## File Management Commands
 ## File Management Commands