Kaynağa Gözat

Move indexing to background worker with enhanced logging

- Created indexing-worker.ts to handle file indexing in a separate thread
- Updated maintenance service to use worker instead of blocking main thread
- Added progress updates every 5% with detailed logging
- Enhanced logging shows: batch progress, file counts, percentage complete
- Worker logs every 50 files indexed for detailed progress tracking
- Prevents I/O blocking during large indexing operations
Timothy Pomeroy 4 hafta önce
ebeveyn
işleme
30c70cfd5e

+ 206 - 0
apps/service/src/indexing-worker.ts

@@ -0,0 +1,206 @@
+import Database from 'better-sqlite3';
+import crypto from 'crypto';
+import fs from 'fs';
+import fsPromises from 'fs/promises';
+import path from 'path';
+import { parentPort } from 'worker_threads';
+
+interface WorkerMessage {
+  type: string;
+  dataset: string;
+  destination: string;
+  dbPath: string;
+  reindex?: boolean;
+  batchSize?: number;
+}
+
+interface IndexResult {
+  indexed: number;
+  skipped: number;
+  errors: number;
+}
+
+function walkFiles(root: string): string[] {
+  const pending = [root];
+  const files: string[] = [];
+
+  while (pending.length) {
+    const current = pending.pop();
+    if (!current) continue;
+
+    let stat: fs.Stats;
+    try {
+      stat = fs.statSync(current);
+    } catch {
+      continue;
+    }
+
+    if (stat.isDirectory()) {
+      const children = fs.readdirSync(current);
+      for (const child of children) {
+        pending.push(path.join(current, child));
+      }
+    } else if (stat.isFile()) {
+      files.push(current);
+    }
+  }
+
+  return files;
+}
+
+async function hashFileAsync(filePath: string): Promise<string | null> {
+  try {
+    const data = await fsPromises.readFile(filePath);
+    const hash = crypto.createHash('sha1');
+    hash.update(data);
+    return hash.digest('hex');
+  } catch (error) {
+    console.warn(`Worker: Hashing failed for ${filePath}: ${error}`);
+    return null;
+  }
+}
+
+async function indexDestination(
+  dataset: string,
+  destination: string,
+  dbPath: string,
+  reindex = false,
+  batchSize = 100,
+): Promise<IndexResult> {
+  console.log(
+    `Worker: Starting indexing for ${dataset} at ${destination}`,
+  );
+  console.log(`Worker: Database path: ${dbPath}`);
+  console.log(`Worker: Reindex: ${reindex}, Batch size: ${batchSize}`);
+
+  let db: Database.Database | null = null;
+  try {
+    db = new Database(dbPath);
+
+    // Clear existing entries if reindexing
+    if (reindex) {
+      const stmt = db.prepare(
+        'DELETE FROM files WHERE dataset = ? AND destination_path LIKE ?',
+      );
+      const result = stmt.run(dataset, `${destination}%`);
+      console.log(
+        `Worker: Cleared ${result.changes} existing destination file entries for reindexing`,
+      );
+    }
+
+    // Walk the destination directory
+    console.log(`Worker: Scanning directory: ${destination}`);
+    const files = walkFiles(destination);
+    console.log(`Worker: Found ${files.length} files to index`);
+
+    let indexed = 0;
+    let skipped = 0;
+    let errors = 0;
+
+    // Prepare statement for inserting/updating files
+    const upsertStmt = db.prepare(`
+      INSERT INTO files (dataset, input, destination_path, hash, file_size, date)
+      VALUES (?, ?, ?, ?, ?, datetime('now'))
+      ON CONFLICT(dataset, input) DO UPDATE SET
+        destination_path = excluded.destination_path,
+        hash = excluded.hash,
+        file_size = excluded.file_size,
+        date = excluded.date
+    `);
+
+    // Process files in batches
+    for (let i = 0; i < files.length; i += batchSize) {
+      const batch = files.slice(i, i + batchSize);
+      const progressPercent = Math.round((i / files.length) * 100);
+
+      console.log(
+        `Worker: Processing batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(files.length / batchSize)} (${progressPercent}% complete, ${indexed} indexed, ${skipped} skipped, ${errors} errors)`,
+      );
+
+      // Send progress update
+      parentPort?.postMessage({
+        type: 'progress',
+        indexed,
+        skipped,
+        errors,
+        total: files.length,
+        progress: progressPercent,
+      });
+
+      await Promise.all(
+        batch.map(async (filePath) => {
+          try {
+            const stat = await fsPromises.stat(filePath);
+            if (!stat.isFile()) {
+              skipped++;
+              return;
+            }
+
+            console.log(`Worker: Indexing ${filePath}`);
+            const hash = await hashFileAsync(filePath);
+            if (!hash) {
+              console.warn(`Worker: Failed to hash ${filePath}`);
+              errors++;
+              return;
+            }
+
+            // Store in database
+            upsertStmt.run(dataset, filePath, filePath, hash, stat.size);
+            indexed++;
+
+            // Log every 50 files
+            if (indexed % 50 === 0) {
+              console.log(
+                `Worker: Indexed ${indexed}/${files.length} files (${Math.round((indexed / files.length) * 100)}%)`,
+              );
+            }
+          } catch (error) {
+            console.error(
+              `Worker: Failed to index file ${filePath}: ${error}`,
+            );
+            errors++;
+          }
+        }),
+      );
+    }
+
+    console.log(
+      `Worker: Indexing complete - ${indexed} indexed, ${skipped} skipped, ${errors} errors`,
+    );
+
+    return { indexed, skipped, errors };
+  } catch (error) {
+    console.error(`Worker: Fatal error during indexing: ${error}`);
+    throw error;
+  } finally {
+    if (db) {
+      db.close();
+    }
+  }
+}
+
+parentPort?.on('message', async (message: WorkerMessage) => {
+  if (message.type === 'index_destination') {
+    try {
+      const result = await indexDestination(
+        message.dataset,
+        message.destination,
+        message.dbPath,
+        message.reindex,
+        message.batchSize,
+      );
+
+      parentPort?.postMessage({
+        type: 'index_result',
+        dataset: message.dataset,
+        destination: message.destination,
+        ...result,
+      });
+    } catch (error) {
+      parentPort?.postMessage({
+        type: 'error',
+        error: error instanceof Error ? error.message : String(error),
+      });
+    }
+  }
+});

+ 68 - 49
apps/service/src/maintenance.service.ts

@@ -373,7 +373,7 @@ export class MaintenanceService {
   /**
    * Index all files in a destination directory with their hashes
    * This populates the files table with destination_path, hash, and file_size
-   * for fast duplicate detection
+   * for fast duplicate detection. Runs in a worker thread to avoid blocking.
    */
   async indexDestinationFiles(
     dataset: string,
@@ -390,63 +390,82 @@ export class MaintenanceService {
     const { reindex = false, batchSize = 100 } = options;
 
     this.logger.log(
-      `Indexing destination files for ${dataset} at ${destinationPath}`,
+      `Starting indexing worker for ${dataset} at ${destinationPath}`,
+    );
+    this.logger.log(
+      `Options: reindex=${reindex}, batchSize=${batchSize}`,
     );
 
-    // Clear existing entries if reindexing
-    if (reindex) {
-      const cleared = this.db.clearDestinationFiles(dataset, destinationPath);
-      this.logger.log(`Cleared ${cleared} existing destination file entries`);
+    // Get database path
+    let projectRoot = process.cwd();
+    while (projectRoot !== path.dirname(projectRoot)) {
+      if (fs.existsSync(path.join(projectRoot, 'package.json'))) {
+        try {
+          const pkg = JSON.parse(
+            fs.readFileSync(path.join(projectRoot, 'package.json'), 'utf-8'),
+          );
+          if (pkg.name === 'watch-finished-turbo') {
+            break;
+          }
+        } catch (e) {
+          // ignore
+        }
+      }
+      projectRoot = path.dirname(projectRoot);
     }
+    const dbPath = path.resolve(projectRoot, 'data/database.db');
 
-    // Walk the destination directory
-    const files = this.walkFiles(destinationPath);
-    this.logger.log(`Found ${files.length} files to index`);
-
-    let indexed = 0;
-    let skipped = 0;
-    let errors = 0;
-
-    // Process files in batches
-    for (let i = 0; i < files.length; i += batchSize) {
-      const batch = files.slice(i, i + batchSize);
-
-      await Promise.all(
-        batch.map(async (filePath) => {
-          try {
-            const stat = await fsPromises.stat(filePath);
-            if (!stat.isFile()) {
-              skipped++;
-              return;
-            }
-
-            const hash = await this.hashFile(filePath);
-            if (!hash) {
-              errors++;
-              return;
-            }
+    return new Promise((resolve, reject) => {
+      const worker = new Worker(path.join(__dirname, 'indexing-worker.js'));
+      let lastProgress = 0;
 
-            this.db.storeDestinationFile(dataset, filePath, hash, stat.size);
-            indexed++;
-          } catch (error) {
-            this.logger.error(`Failed to index file ${filePath}: ${error}`);
-            errors++;
+      worker.on('message', (message) => {
+        if (message.type === 'progress') {
+          // Log progress updates
+          if (message.progress > lastProgress + 5 || message.progress === 100) {
+            this.logger.log(
+              `Indexing progress: ${message.progress}% (${message.indexed} indexed, ${message.skipped} skipped, ${message.errors} errors)`,
+            );
+            lastProgress = message.progress;
           }
-        }),
-      );
+        } else if (message.type === 'index_result') {
+          this.logger.log(
+            `Indexing complete: ${message.indexed} indexed, ${message.skipped} skipped, ${message.errors} errors`,
+          );
+          worker.terminate();
+          resolve({
+            indexed: message.indexed,
+            skipped: message.skipped,
+            errors: message.errors,
+          });
+        } else if (message.type === 'error') {
+          this.logger.error(`Indexing worker error: ${message.error}`);
+          worker.terminate();
+          reject(new Error(message.error));
+        }
+      });
 
-      if ((i + batchSize) % 1000 === 0 || i + batchSize >= files.length) {
-        this.logger.log(
-          `Indexed ${indexed}/${files.length} files (${skipped} skipped, ${errors} errors)`,
-        );
-      }
-    }
+      worker.on('error', (error) => {
+        this.logger.error(`Indexing worker error: ${error}`);
+        reject(error);
+      });
 
-    this.logger.log(
-      `Indexing complete: ${indexed} indexed, ${skipped} skipped, ${errors} errors`,
-    );
+      worker.on('exit', (code) => {
+        if (code !== 0) {
+          this.logger.error(`Indexing worker exited with code ${code}`);
+          reject(new Error(`Indexing worker exited with code ${code}`));
+        }
+      });
 
-    return { indexed, skipped, errors };
+      worker.postMessage({
+        type: 'index_destination',
+        dataset,
+        destination: destinationPath,
+        dbPath,
+        reindex,
+        batchSize,
+      });
+    });
   }
 
   /**