indexing-worker.ts 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import Database from 'better-sqlite3';
  2. import crypto from 'crypto';
  3. import fs from 'fs';
  4. import fsPromises from 'fs/promises';
  5. import path from 'path';
  6. import { parentPort } from 'worker_threads';
  7. interface WorkerMessage {
  8. type: string;
  9. dataset: string;
  10. destination: string;
  11. dbPath: string;
  12. reindex?: boolean;
  13. batchSize?: number;
  14. }
  15. interface IndexResult {
  16. indexed: number;
  17. skipped: number;
  18. errors: number;
  19. }
  20. function walkFiles(root: string): string[] {
  21. const pending = [root];
  22. const files: string[] = [];
  23. while (pending.length) {
  24. const current = pending.pop();
  25. if (!current) continue;
  26. let stat: fs.Stats;
  27. try {
  28. stat = fs.statSync(current);
  29. } catch {
  30. continue;
  31. }
  32. if (stat.isDirectory()) {
  33. const children = fs.readdirSync(current);
  34. for (const child of children) {
  35. pending.push(path.join(current, child));
  36. }
  37. } else if (stat.isFile()) {
  38. files.push(current);
  39. }
  40. }
  41. return files;
  42. }
  43. async function hashFileAsync(filePath: string): Promise<string | null> {
  44. try {
  45. const data = await fsPromises.readFile(filePath);
  46. const hash = crypto.createHash('sha1');
  47. hash.update(data);
  48. return hash.digest('hex');
  49. } catch (error) {
  50. console.warn(`Worker: Hashing failed for ${filePath}: ${error}`);
  51. return null;
  52. }
  53. }
  54. async function indexDestination(
  55. dataset: string,
  56. destination: string,
  57. dbPath: string,
  58. reindex = false,
  59. batchSize = 100,
  60. ): Promise<IndexResult> {
  61. console.log(`Worker: Starting indexing for ${dataset} at ${destination}`);
  62. console.log(`Worker: Database path: ${dbPath}`);
  63. console.log(`Worker: Reindex: ${reindex}, Batch size: ${batchSize}`);
  64. let db: Database.Database | null = null;
  65. try {
  66. db = new Database(dbPath);
  67. // Clear existing entries if reindexing
  68. if (reindex) {
  69. const stmt = db.prepare(
  70. 'DELETE FROM files WHERE dataset = ? AND destination_path LIKE ?',
  71. );
  72. const result = stmt.run(dataset, `${destination}%`);
  73. console.log(
  74. `Worker: Cleared ${result.changes} existing destination file entries for reindexing`,
  75. );
  76. }
  77. // Walk the destination directory
  78. console.log(`Worker: Scanning directory: ${destination}`);
  79. const files = walkFiles(destination);
  80. console.log(`Worker: Found ${files.length} files to index`);
  81. let indexed = 0;
  82. let skipped = 0;
  83. let errors = 0;
  84. // Prepare statement for inserting/updating files
  85. const upsertStmt = db.prepare(`
  86. INSERT INTO files (dataset, input, destination_path, hash, file_size, date)
  87. VALUES (?, ?, ?, ?, ?, datetime('now'))
  88. ON CONFLICT(dataset, input) DO UPDATE SET
  89. destination_path = excluded.destination_path,
  90. hash = excluded.hash,
  91. file_size = excluded.file_size,
  92. date = excluded.date
  93. `);
  94. // Process files in batches
  95. for (let i = 0; i < files.length; i += batchSize) {
  96. const batch = files.slice(i, i + batchSize);
  97. const progressPercent = Math.round((i / files.length) * 100);
  98. console.log(
  99. `Worker: Processing batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(files.length / batchSize)} (${progressPercent}% complete, ${indexed} indexed, ${skipped} skipped, ${errors} errors)`,
  100. );
  101. // Send progress update
  102. parentPort?.postMessage({
  103. type: 'progress',
  104. indexed,
  105. skipped,
  106. errors,
  107. total: files.length,
  108. progress: progressPercent,
  109. });
  110. await Promise.all(
  111. batch.map(async (filePath) => {
  112. try {
  113. const stat = await fsPromises.stat(filePath);
  114. if (!stat.isFile()) {
  115. skipped++;
  116. return;
  117. }
  118. console.log(`Worker: Indexing ${filePath}`);
  119. const hash = await hashFileAsync(filePath);
  120. if (!hash) {
  121. console.warn(`Worker: Failed to hash ${filePath}`);
  122. errors++;
  123. return;
  124. }
  125. // Store in database
  126. upsertStmt.run(dataset, filePath, filePath, hash, stat.size);
  127. indexed++;
  128. // Log every 50 files
  129. if (indexed % 50 === 0) {
  130. console.log(
  131. `Worker: Indexed ${indexed}/${files.length} files (${Math.round((indexed / files.length) * 100)}%)`,
  132. );
  133. }
  134. } catch (error) {
  135. console.error(`Worker: Failed to index file ${filePath}: ${error}`);
  136. errors++;
  137. }
  138. }),
  139. );
  140. }
  141. console.log(
  142. `Worker: Indexing complete - ${indexed} indexed, ${skipped} skipped, ${errors} errors`,
  143. );
  144. return { indexed, skipped, errors };
  145. } catch (error) {
  146. console.error(`Worker: Fatal error during indexing: ${error}`);
  147. throw error;
  148. } finally {
  149. if (db) {
  150. db.close();
  151. }
  152. }
  153. }
  154. parentPort?.on('message', async (message: WorkerMessage) => {
  155. if (message.type === 'index_destination') {
  156. try {
  157. const result = await indexDestination(
  158. message.dataset,
  159. message.destination,
  160. message.dbPath,
  161. message.reindex,
  162. message.batchSize,
  163. );
  164. parentPort?.postMessage({
  165. type: 'index_result',
  166. dataset: message.dataset,
  167. destination: message.destination,
  168. ...result,
  169. });
  170. } catch (error) {
  171. parentPort?.postMessage({
  172. type: 'error',
  173. error: error instanceof Error ? error.message : String(error),
  174. });
  175. }
  176. }
  177. });