Przeglądaj źródła

Add worker thread for file validation in Chokidar scanner to prevent blocking I/O

Timothy Pomeroy 4 tygodni temu
rodzic
commit
5077ed36d1

+ 46 - 0
apps/service/src/file-validation-worker.ts

@@ -0,0 +1,46 @@
+import { parentPort } from 'worker_threads';
+import fs from 'fs';
+import path from 'path';
+
+function isVideoFile(file: string): boolean {
+  const ext = path.extname(file).toLowerCase();
+  const videoExts = ['.avi', '.mkv', '.mov', '.mp4', '.m4v'];
+  return videoExts.includes(ext);
+}
+
+function isValidVideoFile(file: string): boolean {
+  try {
+    // Read first 64KB to check for video headers
+    const buffer = fs.readFileSync(file, { length: 65536 });
+    // Simple check for common video signatures
+    const signatures = [
+      Buffer.from([0x00, 0x00, 0x00, 0x20, 0x66, 0x74, 0x79, 0x70]), // MP4
+      Buffer.from([0x1A, 0x45, 0xDF, 0xA3]), // MKV/WebM
+      Buffer.from([0x52, 0x49, 0x46, 0x46]), // AVI
+      Buffer.from([0x00, 0x00, 0x00, 0x14, 0x66, 0x74, 0x79, 0x70, 0x71, 0x74, 0x20, 0x20]), // MOV
+    ];
+    for (const sig of signatures) {
+      if (buffer.indexOf(sig) !== -1) {
+        return true;
+      }
+    }
+    return false;
+  } catch (error) {
+    console.warn(`Failed to validate file ${file}: ${error}`);
+    return false;
+  }
+}
+
+parentPort?.on('message', (message) => {
+  if (message.type === 'validate_file') {
+    const { file } = message;
+    const isVideo = isVideoFile(file);
+    const isValid = isVideo && isValidVideoFile(file);
+    parentPort?.postMessage({
+      type: 'validation_result',
+      file,
+      isVideo,
+      isValid,
+    });
+  }
+});

+ 31 - 79
apps/service/src/watcher.service.ts

@@ -1,4 +1,5 @@
 import { Inject, Injectable, Logger } from '@nestjs/common';
+import { Worker } from 'worker_threads';
 import chokidar, { FSWatcher } from 'chokidar';
 import fs from 'fs';
 import path from 'path';
@@ -21,13 +22,29 @@ export class WatcherService {
   private lastWatches: string[] = [];
   private lastOptions: any = {};
   private logger = new Logger('WatcherService');
+  private validationWorker: Worker;
+  private validationCallbacks = new Map<string, (result: any) => void>();
 
   constructor(
     @Inject(DatasetsService) private readonly datasetsService: DatasetsService,
     @Inject(DbService) private readonly db: DbService,
     @Inject(EventsGateway) private readonly eventsGateway: EventsGateway,
     @Inject(TaskQueueService) private readonly taskQueue: TaskQueueService,
-  ) {}
+  ) {
+    this.validationWorker = new Worker(path.join(__dirname, 'file-validation-worker.js'));
+    this.validationWorker.on('message', (message) => {
+      if (message.type === 'validation_result') {
+        const callback = this.validationCallbacks.get(message.file);
+        if (callback) {
+          callback(message);
+          this.validationCallbacks.delete(message.file);
+        }
+      }
+    });
+    this.validationWorker.on('error', (error) => {
+      this.logger.error(`Validation worker error: ${error}`);
+    });
+  }
 
   start(watches?: string[], options: any = {}) {
     if (this.isWatching) {
@@ -84,17 +101,21 @@ export class WatcherService {
       return;
     }
 
-    // Check if this is a video file (basic extension check)
-    if (!this.isVideoFile(file)) {
-      return;
-    }
+    // Offload validation to worker
+    this.validationCallbacks.set(file, (result) => {
+      if (!result.isValid) {
+        this.logger.warn(`File appears to be corrupted or incomplete: ${file}`);
+        return;
+      }
 
-    // Validate that the file has proper video headers
-    if (!this.isValidVideoFile(file)) {
-      this.logger.warn(`File appears to be corrupted or incomplete: ${file}`);
-      return;
-    }
+      // Proceed with task creation
+      this.processValidFile(file, dataset);
+    });
+
+    this.validationWorker.postMessage({ type: 'validate_file', file });
+  }
 
+  private processValidFile(file: string, dataset: string) {
     // Get dataset configuration
     const datasetConfig = this.datasetsService.getDatasetConfig();
     const datasetSettings = datasetConfig[dataset];
@@ -341,76 +362,7 @@ export class WatcherService {
     return null;
   }
 
-  private isVideoFile(file: string): boolean {
-    const videoExtensions = [
-      '.mp4',
-      '.mkv',
-      '.avi',
-      '.mov',
-      '.wmv',
-      '.flv',
-      '.webm',
-      '.m4v',
-    ];
-    const ext = path.extname(file).toLowerCase();
-    return videoExtensions.includes(ext);
-  }
-
-  private isValidVideoFile(file: string): boolean {
-    try {
-      // Check if file exists and is readable
-      if (!fs.existsSync(file)) {
-        return false;
-      }
 
-      const stats = fs.statSync(file);
-      if (stats.size === 0) {
-        return false;
-      }
-
-      // Read first few bytes to check for video file signatures
-      const buffer = Buffer.alloc(12);
-      const fd = fs.openSync(file, 'r');
-      try {
-        fs.readSync(fd, buffer, 0, 12, 0);
-      } finally {
-        fs.closeSync(fd);
-      }
-
-      // Check for common video file signatures
-      const signature = buffer.toString('hex');
-
-      // MP4 signature (ftyp box)
-      if (signature.includes('66747970')) {
-        return true;
-      }
-
-      // MKV/WebM signature (EBML)
-      if (signature.startsWith('1a45dfa3')) {
-        return true;
-      }
-
-      // AVI signature (RIFF)
-      if (
-        signature.startsWith('52494646') &&
-        buffer.toString('ascii', 8, 12) === 'AVI '
-      ) {
-        return true;
-      }
-
-      // MOV signature (ftyp)
-      if (signature.includes('66747971') || signature.includes('66747970')) {
-        return true;
-      }
-
-      // For other formats, just check if file is large enough to be a video (> 1MB)
-      // This is a basic heuristic since not all video formats have easily detectable headers
-      return stats.size > 1024 * 1024;
-    } catch (error) {
-      this.logger.warn(`Error validating video file ${file}: ${error.message}`);
-      return false;
-    }
-  }
 
   async stop() {
     if (this.watcher && this.isWatching) {