Selaa lähdekoodia

Add worker threads for duplicate scanning to prevent blocking the event loop

Timothy Pomeroy 4 viikkoa sitten
vanhempi
commit
ff75774cf6
2 muutettua tiedostoa jossa 222 lisäystä ja 147 poistoa
  1. 150 0
      apps/service/src/duplicate-worker.ts
  2. 72 147
      apps/service/src/maintenance.service.ts

+ 150 - 0
apps/service/src/duplicate-worker.ts

@@ -0,0 +1,150 @@
+import { parentPort } from 'worker_threads';
+import crypto from 'crypto';
+import fs from 'fs';
+import fsPromises from 'fs/promises';
+import path from 'path';
+
+interface ScanResult {
+  dataset: string;
+  destination: string;
+  hash: string;
+  size: number;
+  files: string[];
+}
+
+interface SimilarResult {
+  baseName: string;
+  files: string[];
+}
+
+function walkFiles(root: string): string[] {
+  const pending = [root];
+  const files: string[] = [];
+
+  while (pending.length) {
+    const current = pending.pop();
+    if (!current) continue;
+
+    let stat: fs.Stats;
+    try {
+      stat = fs.statSync(current);
+    } catch {
+      continue;
+    }
+
+    if (stat.isDirectory()) {
+      const children = fs.readdirSync(current);
+      for (const child of children) {
+        pending.push(path.join(current, child));
+      }
+    } else if (stat.isFile()) {
+      files.push(current);
+    }
+  }
+
+  return files;
+}
+
+async function hashFileAsync(filePath: string): Promise<string | null> {
+  try {
+    const data = await fsPromises.readFile(filePath);
+    const hash = crypto.createHash('sha1');
+    hash.update(data);
+    return hash.digest('hex');
+  } catch (error) {
+    console.warn(`Hashing failed for ${filePath}: ${error}`);
+    return null;
+  }
+}
+
+async function scanDestinationForDuplicates(destination: string): Promise<ScanResult[]> {
+  const files = walkFiles(destination);
+  console.log(`Worker: Found ${files.length} files to scan in ${destination}`);
+  const groups = new Map<string, { size: number; files: string[] }>();
+  let processed = 0;
+
+  for (const filePath of files) {
+    try {
+      const stat = await fsPromises.stat(filePath);
+      if (!stat.isFile()) continue;
+
+      const hash = await hashFileAsync(filePath);
+      if (hash) {
+        const key = `${hash}:${stat.size}`;
+        const group = groups.get(key) || { size: stat.size, files: [] };
+        group.files.push(filePath);
+        groups.set(key, group);
+      }
+      processed++;
+      if (processed % 100 === 0) {
+        console.log(`Worker: Processed ${processed}/${files.length} files in ${destination}`);
+      }
+    } catch (error) {
+      console.warn(`Worker: Failed to process file for duplicate scan: ${filePath} (${error})`);
+    }
+  }
+
+  console.log(`Worker: Completed scanning ${processed} files in ${destination}`);
+
+  return Array.from(groups.entries())
+    .filter(([, group]) => group.files.length > 1)
+    .map(([key, group]) => ({
+      hash: key.split(':')[0],
+      size: group.size,
+      files: group.files,
+    } as ScanResult));
+}
+
+async function scanForSimilarNames(destination: string): Promise<SimilarResult[]> {
+  const files = walkFiles(destination);
+  console.log(`Worker: Checking ${files.length} files for similar names in ${destination}`);
+  const nameGroups = new Map<string, string[]>();
+  let processed = 0;
+
+  for (const filePath of files) {
+    try {
+      const stat = await fsPromises.stat(filePath);
+      if (!stat.isFile()) continue;
+
+      const baseName = path.basename(filePath, path.extname(filePath)).toLowerCase();
+      const group = nameGroups.get(baseName) || [];
+      group.push(filePath);
+      nameGroups.set(baseName, group);
+      processed++;
+      if (processed % 100 === 0) {
+        console.log(`Worker: Processed ${processed}/${files.length} files for similar names in ${destination}`);
+      }
+    } catch (error) {
+      console.warn(`Worker: Failed to process file for similar name scan: ${filePath} (${error})`);
+    }
+  }
+
+  console.log(`Worker: Completed similar name check for ${processed} files in ${destination}`);
+
+  return Array.from(nameGroups.entries())
+    .filter(([, files]) => files.length > 1)
+    .map(([baseName, files]) => ({ baseName, files }));
+}
+
+parentPort?.on('message', async (message) => {
+  const { type, destination, dataset } = message;
+
+  if (type === 'scan_duplicates') {
+    try {
+      const duplicates = await scanDestinationForDuplicates(destination);
+      const similars = await scanForSimilarNames(destination);
+      parentPort?.postMessage({
+        type: 'scan_result',
+        dataset,
+        destination,
+        duplicates,
+        similars,
+      });
+    } catch (error) {
+      parentPort?.postMessage({
+        type: 'error',
+        error: error.message,
+      });
+    }
+  }
+});

+ 72 - 147
apps/service/src/maintenance.service.ts

@@ -1,8 +1,8 @@
 import { Injectable, Logger } from '@nestjs/common';
 import { Cron, CronExpression } from '@nestjs/schedule';
+import { Worker } from 'worker_threads';
 import crypto from 'crypto';
 import fs from 'fs';
-import fsPromises from 'fs/promises';
 import path from 'path';
 import { DatasetsService } from './datasets.service';
 import { DbService } from './db.service';
@@ -140,13 +140,7 @@ export class MaintenanceService {
     }
 
     const datasetConfig = this.datasetsService.getDatasetConfig();
-    const duplicates: Array<{
-      dataset: string;
-      destination: string;
-      hash: string;
-      size: number;
-      files: string[];
-    }> = [];
+    const scanPromises: Promise<void>[] = [];
 
     for (const [datasetName, datasetObj] of Object.entries(datasetConfig)) {
       if (
@@ -169,51 +163,13 @@ export class MaintenanceService {
         }
 
         this.logger.log(`Scanning destination: ${destination}`);
-        const groups = await this.scanDestinationForDuplicates(destination);
-        await this.scanForSimilarNames(destination);
-        for (const group of groups) {
-          const entry = {
-            dataset: datasetName,
-            destination,
-            hash: group.hash,
-            size: group.size,
-            files: group.files,
-          };
-
-          const key = `${entry.dataset}|${entry.destination}|${entry.hash}|${entry.size}`;
-          const existingEntry = existingMap.get(key);
-
-          // Skip groups that were marked reviewed/ignored previously
-          if (existingEntry && existingEntry.status === 'reviewed') {
-            continue;
-          }
-
-          duplicates.push(entry);
-
-          if (existingEntry) {
-            this.db.updateDuplicateGroupFiles(
-              existingEntry.id,
-              entry.files,
-              'pending',
-            );
-          } else {
-            this.db.saveDuplicateGroup(entry);
-          }
-        }
-
-        if (groups.length) {
-          this.logger.warn(
-            `Found ${groups.length} duplicate group(s) in destination ${destination} (dataset: ${datasetName})`,
-          );
-        }
+        scanPromises.push(this.scanDestinationWithWorker(datasetName, destination, existingMap));
       }
     }
 
-    this.logger.log(
-      `Duplicate scan completed. Processed ${duplicates.length} groups.`,
-    );
+    await Promise.all(scanPromises);
 
-    return duplicates;
+    this.logger.log('Duplicate scan completed');
   }
 
   private collectDestinations(datasetObj: Record<string, any>): Set<string> {
@@ -233,101 +189,80 @@ export class MaintenanceService {
     return destinations;
   }
 
-  private async scanDestinationForDuplicates(destination: string) {
-    const files = this.walkFiles(destination);
-    this.logger.log(`Found ${files.length} files to scan in ${destination}`);
-    const groups = new Map<string, { size: number; files: string[] }>();
-    let processed = 0;
-
-    for (const filePath of files) {
-      try {
-        const stat = await fsPromises.stat(filePath);
-        if (!stat.isFile()) continue;
-
-        const hash = await this.hashFileAsync(filePath);
-        if (hash) {
-          const key = `${hash}:${stat.size}`;
-          const group = groups.get(key) || { size: stat.size, files: [] };
-          group.files.push(filePath);
-          groups.set(key, group);
-        }
-        processed++;
-        if (processed % 100 === 0) {
-          this.logger.log(
-            `Processed ${processed}/${files.length} files in ${destination}`,
-          );
-        }
-      } catch (error) {
-        this.logger.warn(
-          `Failed to process file for duplicate scan: ${filePath} (${error})`,
-        );
-      }
-    }
+  private async scanDestinationWithWorker(
+    dataset: string,
+    destination: string,
+    existingMap: Map<string, { id: number; status: string; files: string[] }>
+  ): Promise<void> {
+    return new Promise((resolve, reject) => {
+      const worker = new Worker(path.join(__dirname, 'duplicate-worker.js'));
+
+      worker.on('message', (message) => {
+        if (message.type === 'scan_result') {
+          // Save duplicates
+          for (const dup of message.duplicates) {
+            const entry = {
+              dataset: message.dataset,
+              destination: message.destination,
+              hash: dup.hash,
+              size: dup.size,
+              files: dup.files,
+            };
+
+            const key = `${entry.dataset}|${entry.destination}|${entry.hash}|${entry.size}`;
+            const existingEntry = existingMap.get(key);
+
+            // Skip groups that were marked reviewed/ignored previously
+            if (existingEntry && existingEntry.status === 'reviewed') {
+              continue;
+            }
+
+            if (existingEntry) {
+              this.db.updateDuplicateGroupFiles(
+                existingEntry.id,
+                entry.files,
+                'pending',
+              );
+            } else {
+              this.db.saveDuplicateGroup(entry);
+            }
+          }
 
-    this.logger.log(`Completed scanning ${processed} files in ${destination}`);
+          // Log similars
+          if (message.similars.length) {
+            this.logger.log(`Found ${message.similars.length} groups of files with similar names in ${message.destination}`);
+            for (const group of message.similars) {
+              this.logger.log(`Similar: ${group.baseName} - ${group.files.join(', ')}`);
+            }
+          }
 
-    return Array.from(groups.entries())
-      .filter(([, group]) => group.files.length > 1)
-      .map(([key, group]) => ({
-        hash: key.split(':')[0],
-        size: group.size,
-        files: group.files,
-      }));
-  }
+          if (message.duplicates.length) {
+            this.logger.warn(`Found ${message.duplicates.length} duplicate group(s) in destination ${message.destination} (dataset: ${message.dataset})`);
+          }
+        } else if (message.type === 'error') {
+          this.logger.error(`Worker error: ${message.error}`);
+        }
+        worker.terminate();
+        resolve();
+      });
 
-  private async scanForSimilarNames(destination: string) {
-    const files = this.walkFiles(destination);
-    this.logger.log(
-      `Checking ${files.length} files for similar names in ${destination}`,
-    );
-    const nameGroups = new Map<string, string[]>();
-    let processed = 0;
+      worker.on('error', (error) => {
+        this.logger.error(`Worker error: ${error}`);
+        reject(error);
+      });
 
-    for (const filePath of files) {
-      try {
-        const stat = await fsPromises.stat(filePath);
-        if (!stat.isFile()) continue;
-
-        const baseName = path
-          .basename(filePath, path.extname(filePath))
-          .toLowerCase();
-        const group = nameGroups.get(baseName) || [];
-        group.push(filePath);
-        nameGroups.set(baseName, group);
-        processed++;
-        if (processed % 100 === 0) {
-          this.logger.log(
-            `Processed ${processed}/${files.length} files for similar names in ${destination}`,
-          );
+      worker.on('exit', (code) => {
+        if (code !== 0) {
+          this.logger.error(`Worker exited with code ${code}`);
+          reject(new Error(`Worker exited with code ${code}`));
         }
-      } catch (error) {
-        this.logger.warn(
-          `Failed to process file for similar name scan: ${filePath} (${error})`,
-        );
-      }
-    }
-
-    this.logger.log(
-      `Completed similar name check for ${processed} files in ${destination}`,
-    );
+      });
 
-    const similarGroups = Array.from(nameGroups.entries())
-      .filter(([, files]) => files.length > 1)
-      .map(([baseName, files]) => ({ baseName, files }));
+      worker.postMessage({ type: 'scan_duplicates', dataset, destination });
+    });
+  }
 
-    if (similarGroups.length) {
-      this.logger.log(
-        `Found ${similarGroups.length} groups of files with similar names in ${destination}`,
-      );
-      for (const group of similarGroups) {
-        this.logger.log(
-          `Similar: ${group.baseName} - ${group.files.join(', ')}`,
-        );
-      }
-    }
 
-    return similarGroups;
-  }
 
   private walkFiles(root: string) {
     const pending = [root];
@@ -357,17 +292,7 @@ export class MaintenanceService {
     return files;
   }
 
-  private async hashFileAsync(filePath: string): Promise<string | null> {
-    try {
-      const data = await fsPromises.readFile(filePath);
-      const hash = crypto.createHash('sha1');
-      hash.update(data);
-      return hash.digest('hex');
-    } catch (error) {
-      this.logger.warn(`Hashing failed for ${filePath}: ${error}`);
-      return null;
-    }
-  }
+
 
   purgeDuplicateFiles(id: number, files: string[], note?: string) {
     const group = this.db.getDuplicateGroup(id);