| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- import Database from 'better-sqlite3';
- import crypto from 'crypto';
- import fs from 'fs';
- import fsPromises from 'fs/promises';
- import path from 'path';
- import { parentPort } from 'worker_threads';
- interface WorkerMessage {
- type: string;
- dataset: string;
- destination: string;
- dbPath: string;
- reindex?: boolean;
- batchSize?: number;
- }
- interface IndexResult {
- indexed: number;
- skipped: number;
- errors: number;
- }
- function walkFiles(root: string): string[] {
- const pending = [root];
- const files: string[] = [];
- while (pending.length) {
- const current = pending.pop();
- if (!current) continue;
- let stat: fs.Stats;
- try {
- stat = fs.statSync(current);
- } catch {
- continue;
- }
- if (stat.isDirectory()) {
- const children = fs.readdirSync(current);
- for (const child of children) {
- pending.push(path.join(current, child));
- }
- } else if (stat.isFile()) {
- files.push(current);
- }
- }
- return files;
- }
- async function hashFileAsync(filePath: string): Promise<string | null> {
- try {
- const data = await fsPromises.readFile(filePath);
- const hash = crypto.createHash('sha1');
- hash.update(data);
- return hash.digest('hex');
- } catch (error) {
- console.warn(`Worker: Hashing failed for ${filePath}: ${error}`);
- return null;
- }
- }
- async function indexDestination(
- dataset: string,
- destination: string,
- dbPath: string,
- reindex = false,
- batchSize = 100,
- ): Promise<IndexResult> {
- console.log(`Worker: Starting indexing for ${dataset} at ${destination}`);
- console.log(`Worker: Database path: ${dbPath}`);
- console.log(`Worker: Reindex: ${reindex}, Batch size: ${batchSize}`);
- let db: Database.Database | null = null;
- try {
- db = new Database(dbPath);
- // Clear existing entries if reindexing
- if (reindex) {
- const stmt = db.prepare(
- 'DELETE FROM files WHERE dataset = ? AND destination_path LIKE ?',
- );
- const result = stmt.run(dataset, `${destination}%`);
- console.log(
- `Worker: Cleared ${result.changes} existing destination file entries for reindexing`,
- );
- }
- // Walk the destination directory
- console.log(`Worker: Scanning directory: ${destination}`);
- const files = walkFiles(destination);
- console.log(`Worker: Found ${files.length} files to index`);
- let indexed = 0;
- let skipped = 0;
- let errors = 0;
- // Prepare statement for inserting/updating files
- const upsertStmt = db.prepare(`
- INSERT INTO files (dataset, input, destination_path, hash, file_size, date)
- VALUES (?, ?, ?, ?, ?, datetime('now'))
- ON CONFLICT(dataset, input) DO UPDATE SET
- destination_path = excluded.destination_path,
- hash = excluded.hash,
- file_size = excluded.file_size,
- date = excluded.date
- `);
- // Process files in batches
- for (let i = 0; i < files.length; i += batchSize) {
- const batch = files.slice(i, i + batchSize);
- const progressPercent = Math.round((i / files.length) * 100);
- console.log(
- `Worker: Processing batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(files.length / batchSize)} (${progressPercent}% complete, ${indexed} indexed, ${skipped} skipped, ${errors} errors)`,
- );
- // Send progress update
- parentPort?.postMessage({
- type: 'progress',
- indexed,
- skipped,
- errors,
- total: files.length,
- progress: progressPercent,
- });
- await Promise.all(
- batch.map(async (filePath) => {
- try {
- const stat = await fsPromises.stat(filePath);
- if (!stat.isFile()) {
- skipped++;
- return;
- }
- console.log(`Worker: Indexing ${filePath}`);
- const hash = await hashFileAsync(filePath);
- if (!hash) {
- console.warn(`Worker: Failed to hash ${filePath}`);
- errors++;
- return;
- }
- // Store in database
- upsertStmt.run(dataset, filePath, filePath, hash, stat.size);
- indexed++;
- // Log every 50 files
- if (indexed % 50 === 0) {
- console.log(
- `Worker: Indexed ${indexed}/${files.length} files (${Math.round((indexed / files.length) * 100)}%)`,
- );
- }
- } catch (error) {
- console.error(`Worker: Failed to index file ${filePath}: ${error}`);
- errors++;
- }
- }),
- );
- }
- console.log(
- `Worker: Indexing complete - ${indexed} indexed, ${skipped} skipped, ${errors} errors`,
- );
- return { indexed, skipped, errors };
- } catch (error) {
- console.error(`Worker: Fatal error during indexing: ${error}`);
- throw error;
- } finally {
- if (db) {
- db.close();
- }
- }
- }
- parentPort?.on('message', async (message: WorkerMessage) => {
- if (message.type === 'index_destination') {
- try {
- const result = await indexDestination(
- message.dataset,
- message.destination,
- message.dbPath,
- message.reindex,
- message.batchSize,
- );
- parentPort?.postMessage({
- type: 'index_result',
- dataset: message.dataset,
- destination: message.destination,
- ...result,
- });
- } catch (error) {
- parentPort?.postMessage({
- type: 'error',
- error: error instanceof Error ? error.message : String(error),
- });
- }
- }
- });
|