import Database from 'better-sqlite3'; import crypto from 'crypto'; import fs from 'fs'; import fsPromises from 'fs/promises'; import path from 'path'; import { parentPort } from 'worker_threads'; interface WorkerMessage { type: string; dataset: string; destination: string; dbPath: string; reindex?: boolean; batchSize?: number; } interface IndexResult { indexed: number; skipped: number; errors: number; } function walkFiles(root: string): string[] { const pending = [root]; const files: string[] = []; while (pending.length) { const current = pending.pop(); if (!current) continue; let stat: fs.Stats; try { stat = fs.statSync(current); } catch { continue; } if (stat.isDirectory()) { const children = fs.readdirSync(current); for (const child of children) { pending.push(path.join(current, child)); } } else if (stat.isFile()) { files.push(current); } } return files; } async function hashFileAsync(filePath: string): Promise { try { const data = await fsPromises.readFile(filePath); const hash = crypto.createHash('sha1'); hash.update(data); return hash.digest('hex'); } catch (error) { console.warn(`Worker: Hashing failed for ${filePath}: ${error}`); return null; } } async function indexDestination( dataset: string, destination: string, dbPath: string, reindex = false, batchSize = 100, ): Promise { console.log(`Worker: Starting indexing for ${dataset} at ${destination}`); console.log(`Worker: Database path: ${dbPath}`); console.log(`Worker: Reindex: ${reindex}, Batch size: ${batchSize}`); let db: Database.Database | null = null; try { db = new Database(dbPath); // Clear existing entries if reindexing if (reindex) { const stmt = db.prepare( 'DELETE FROM files WHERE dataset = ? AND destination_path LIKE ?', ); const result = stmt.run(dataset, `${destination}%`); console.log( `Worker: Cleared ${result.changes} existing destination file entries for reindexing`, ); } // Walk the destination directory console.log(`Worker: Scanning directory: ${destination}`); const files = walkFiles(destination); console.log(`Worker: Found ${files.length} files to index`); let indexed = 0; let skipped = 0; let errors = 0; // Prepare statement for inserting/updating files const upsertStmt = db.prepare(` INSERT INTO files (dataset, input, destination_path, hash, file_size, date) VALUES (?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(dataset, input) DO UPDATE SET destination_path = excluded.destination_path, hash = excluded.hash, file_size = excluded.file_size, date = excluded.date `); // Process files in batches for (let i = 0; i < files.length; i += batchSize) { const batch = files.slice(i, i + batchSize); const progressPercent = Math.round((i / files.length) * 100); console.log( `Worker: Processing batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(files.length / batchSize)} (${progressPercent}% complete, ${indexed} indexed, ${skipped} skipped, ${errors} errors)`, ); // Send progress update parentPort?.postMessage({ type: 'progress', indexed, skipped, errors, total: files.length, progress: progressPercent, }); await Promise.all( batch.map(async (filePath) => { try { const stat = await fsPromises.stat(filePath); if (!stat.isFile()) { skipped++; return; } console.log(`Worker: Indexing ${filePath}`); const hash = await hashFileAsync(filePath); if (!hash) { console.warn(`Worker: Failed to hash ${filePath}`); errors++; return; } // Store in database upsertStmt.run(dataset, filePath, filePath, hash, stat.size); indexed++; // Log every 50 files if (indexed % 50 === 0) { console.log( `Worker: Indexed ${indexed}/${files.length} files (${Math.round((indexed / files.length) * 100)}%)`, ); } } catch (error) { console.error(`Worker: Failed to index file ${filePath}: ${error}`); errors++; } }), ); } console.log( `Worker: Indexing complete - ${indexed} indexed, ${skipped} skipped, ${errors} errors`, ); return { indexed, skipped, errors }; } catch (error) { console.error(`Worker: Fatal error during indexing: ${error}`); throw error; } finally { if (db) { db.close(); } } } parentPort?.on('message', async (message: WorkerMessage) => { if (message.type === 'index_destination') { try { const result = await indexDestination( message.dataset, message.destination, message.dbPath, message.reindex, message.batchSize, ); parentPort?.postMessage({ type: 'index_result', dataset: message.dataset, destination: message.destination, ...result, }); } catch (error) { parentPort?.postMessage({ type: 'error', error: error instanceof Error ? error.message : String(error), }); } } });