import Database from 'better-sqlite3'; import crypto from 'crypto'; import fs from 'fs'; import fsPromises from 'fs/promises'; import path from 'path'; import { parentPort } from 'worker_threads'; interface ScanResult { dataset: string; destination: string; hash: string; size: number; files: string[]; } interface SimilarResult { baseName: string; files: string[]; } interface WorkerMessage { type: string; dataset: string; destination: string; useDatabase?: boolean; // New flag to use DB-based scanning dbPath?: string; // Path to the database } function walkFiles(root: string): string[] { const pending = [root]; const files: string[] = []; while (pending.length) { const current = pending.pop(); if (!current) continue; let stat: fs.Stats; try { stat = fs.statSync(current); } catch { continue; } if (stat.isDirectory()) { const children = fs.readdirSync(current); for (const child of children) { pending.push(path.join(current, child)); } } else if (stat.isFile()) { files.push(current); } } return files; } async function hashFileAsync(filePath: string): Promise { try { const data = await fsPromises.readFile(filePath); const hash = crypto.createHash('sha1'); hash.update(data); return hash.digest('hex'); } catch (error) { console.warn(`Hashing failed for ${filePath}: ${error}`); return null; } } async function scanDestinationForDuplicates( destination: string, ): Promise { const files = walkFiles(destination); console.log(`Worker: Found ${files.length} files to scan in ${destination}`); const groups = new Map(); let processed = 0; for (const filePath of files) { try { const stat = await fsPromises.stat(filePath); if (!stat.isFile()) continue; const hash = await hashFileAsync(filePath); if (hash) { const key = `${hash}:${stat.size}`; const group = groups.get(key) || { size: stat.size, files: [] }; group.files.push(filePath); groups.set(key, group); } processed++; if (processed % 100 === 0) { console.log( `Worker: Processed ${processed}/${files.length} files in ${destination}`, ); } } catch (error) { console.warn( `Worker: Failed to process file for duplicate scan: ${filePath} (${error})`, ); } } console.log( `Worker: Completed scanning ${processed} files in ${destination}`, ); return Array.from(groups.entries()) .filter(([, group]) => group.files.length > 1) .map( ([key, group]) => ({ hash: key.split(':')[0], size: group.size, files: group.files, }) as ScanResult, ); } async function scanForSimilarNames( destination: string, ): Promise { const files = walkFiles(destination); console.log( `Worker: Checking ${files.length} files for similar names in ${destination}`, ); const nameGroups = new Map(); let processed = 0; for (const filePath of files) { try { const stat = await fsPromises.stat(filePath); if (!stat.isFile()) continue; const baseName = path .basename(filePath, path.extname(filePath)) .toLowerCase(); const group = nameGroups.get(baseName) || []; group.push(filePath); nameGroups.set(baseName, group); processed++; if (processed % 100 === 0) { console.log( `Worker: Processed ${processed}/${files.length} files for similar names in ${destination}`, ); } } catch (error) { console.warn( `Worker: Failed to process file for similar name scan: ${filePath} (${error})`, ); } } console.log( `Worker: Completed similar name check for ${processed} files in ${destination}`, ); return Array.from(nameGroups.entries()) .filter(([, files]) => files.length > 1) .map(([baseName, files]) => ({ baseName, files })); } /** * Scan using database-indexed files for much faster duplicate detection */ async function scanDestinationWithDatabase( dataset: string, destination: string, dbPath: string, ): Promise { console.log( `Worker: Scanning ${destination} using database index at ${dbPath}`, ); const db = new Database(dbPath, { readonly: true }); try { // Query duplicates from the database view const duplicates = db .prepare( ` SELECT hash, file_size, COUNT(*) as file_count, GROUP_CONCAT( CASE WHEN destination_path IS NOT NULL THEN destination_path ELSE input END, '|||' ) as file_paths FROM files WHERE dataset = ? AND hash IS NOT NULL AND (destination_path LIKE ? OR destination_path IS NULL) GROUP BY hash, file_size HAVING COUNT(*) > 1 `, ) .all(dataset, `${destination}%`) as Array<{ hash: string; file_size: number; file_count: number; file_paths: string; }>; console.log( `Worker: Found ${duplicates.length} duplicate groups from database`, ); return duplicates.map((dup) => ({ dataset, destination, hash: dup.hash, size: dup.file_size, files: dup.file_paths.split('|||'), })); } finally { db.close(); } } parentPort?.on('message', (message: WorkerMessage) => { void (async () => { const { type, destination, dataset, useDatabase, dbPath } = message; if (type === 'scan_duplicates') { try { let duplicates: ScanResult[]; // Use database-based scanning if enabled and DB path is provided if (useDatabase && dbPath) { duplicates = await scanDestinationWithDatabase( dataset, destination, dbPath, ); } else { // Fall back to traditional file-system scanning duplicates = await scanDestinationForDuplicates(destination); } const similars = await scanForSimilarNames(destination); parentPort?.postMessage({ type: 'scan_result', dataset, destination, duplicates, similars, }); } catch (error) { parentPort?.postMessage({ type: 'error', error: error.message, }); } } })(); });