duplicate-worker.ts 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import Database from 'better-sqlite3';
  2. import crypto from 'crypto';
  3. import fs from 'fs';
  4. import fsPromises from 'fs/promises';
  5. import path from 'path';
  6. import { parentPort } from 'worker_threads';
  7. interface ScanResult {
  8. dataset: string;
  9. destination: string;
  10. hash: string;
  11. size: number;
  12. files: string[];
  13. }
  14. interface SimilarResult {
  15. baseName: string;
  16. files: string[];
  17. }
  18. interface WorkerMessage {
  19. type: string;
  20. dataset: string;
  21. destination: string;
  22. useDatabase?: boolean; // New flag to use DB-based scanning
  23. dbPath?: string; // Path to the database
  24. }
  25. function walkFiles(root: string): string[] {
  26. const pending = [root];
  27. const files: string[] = [];
  28. while (pending.length) {
  29. const current = pending.pop();
  30. if (!current) continue;
  31. let stat: fs.Stats;
  32. try {
  33. stat = fs.statSync(current);
  34. } catch {
  35. continue;
  36. }
  37. if (stat.isDirectory()) {
  38. const children = fs.readdirSync(current);
  39. for (const child of children) {
  40. pending.push(path.join(current, child));
  41. }
  42. } else if (stat.isFile()) {
  43. files.push(current);
  44. }
  45. }
  46. return files;
  47. }
  48. async function hashFileAsync(filePath: string): Promise<string | null> {
  49. try {
  50. const data = await fsPromises.readFile(filePath);
  51. const hash = crypto.createHash('sha1');
  52. hash.update(data);
  53. return hash.digest('hex');
  54. } catch (error) {
  55. console.warn(`Hashing failed for ${filePath}: ${error}`);
  56. return null;
  57. }
  58. }
  59. async function scanDestinationForDuplicates(
  60. destination: string,
  61. ): Promise<ScanResult[]> {
  62. const files = walkFiles(destination);
  63. console.log(`Worker: Found ${files.length} files to scan in ${destination}`);
  64. const groups = new Map<string, { size: number; files: string[] }>();
  65. let processed = 0;
  66. for (const filePath of files) {
  67. try {
  68. const stat = await fsPromises.stat(filePath);
  69. if (!stat.isFile()) continue;
  70. const hash = await hashFileAsync(filePath);
  71. if (hash) {
  72. const key = `${hash}:${stat.size}`;
  73. const group = groups.get(key) || { size: stat.size, files: [] };
  74. group.files.push(filePath);
  75. groups.set(key, group);
  76. }
  77. processed++;
  78. if (processed % 100 === 0) {
  79. console.log(
  80. `Worker: Processed ${processed}/${files.length} files in ${destination}`,
  81. );
  82. }
  83. } catch (error) {
  84. console.warn(
  85. `Worker: Failed to process file for duplicate scan: ${filePath} (${error})`,
  86. );
  87. }
  88. }
  89. console.log(
  90. `Worker: Completed scanning ${processed} files in ${destination}`,
  91. );
  92. return Array.from(groups.entries())
  93. .filter(([, group]) => group.files.length > 1)
  94. .map(
  95. ([key, group]) =>
  96. ({
  97. hash: key.split(':')[0],
  98. size: group.size,
  99. files: group.files,
  100. }) as ScanResult,
  101. );
  102. }
  103. async function scanForSimilarNames(
  104. destination: string,
  105. ): Promise<SimilarResult[]> {
  106. const files = walkFiles(destination);
  107. console.log(
  108. `Worker: Checking ${files.length} files for similar names in ${destination}`,
  109. );
  110. const nameGroups = new Map<string, string[]>();
  111. let processed = 0;
  112. for (const filePath of files) {
  113. try {
  114. const stat = await fsPromises.stat(filePath);
  115. if (!stat.isFile()) continue;
  116. const baseName = path
  117. .basename(filePath, path.extname(filePath))
  118. .toLowerCase();
  119. const group = nameGroups.get(baseName) || [];
  120. group.push(filePath);
  121. nameGroups.set(baseName, group);
  122. processed++;
  123. if (processed % 100 === 0) {
  124. console.log(
  125. `Worker: Processed ${processed}/${files.length} files for similar names in ${destination}`,
  126. );
  127. }
  128. } catch (error) {
  129. console.warn(
  130. `Worker: Failed to process file for similar name scan: ${filePath} (${error})`,
  131. );
  132. }
  133. }
  134. console.log(
  135. `Worker: Completed similar name check for ${processed} files in ${destination}`,
  136. );
  137. return Array.from(nameGroups.entries())
  138. .filter(([, files]) => files.length > 1)
  139. .map(([baseName, files]) => ({ baseName, files }));
  140. }
  141. /**
  142. * Scan using database-indexed files for much faster duplicate detection
  143. */
  144. async function scanDestinationWithDatabase(
  145. dataset: string,
  146. destination: string,
  147. dbPath: string,
  148. ): Promise<ScanResult[]> {
  149. console.log(
  150. `Worker: Scanning ${destination} using database index at ${dbPath}`,
  151. );
  152. const db = new Database(dbPath, { readonly: true });
  153. try {
  154. // Query duplicates from the database view
  155. const duplicates = db
  156. .prepare(
  157. `
  158. SELECT
  159. hash,
  160. file_size,
  161. COUNT(*) as file_count,
  162. GROUP_CONCAT(
  163. CASE
  164. WHEN destination_path IS NOT NULL THEN destination_path
  165. ELSE input
  166. END,
  167. '|||'
  168. ) as file_paths
  169. FROM files
  170. WHERE dataset = ?
  171. AND hash IS NOT NULL
  172. AND (destination_path LIKE ? OR destination_path IS NULL)
  173. GROUP BY hash, file_size
  174. HAVING COUNT(*) > 1
  175. `,
  176. )
  177. .all(dataset, `${destination}%`) as Array<{
  178. hash: string;
  179. file_size: number;
  180. file_count: number;
  181. file_paths: string;
  182. }>;
  183. console.log(
  184. `Worker: Found ${duplicates.length} duplicate groups from database`,
  185. );
  186. return duplicates.map((dup) => ({
  187. dataset,
  188. destination,
  189. hash: dup.hash,
  190. size: dup.file_size,
  191. files: dup.file_paths.split('|||'),
  192. }));
  193. } finally {
  194. db.close();
  195. }
  196. }
  197. parentPort?.on('message', (message: WorkerMessage) => {
  198. void (async () => {
  199. const { type, destination, dataset, useDatabase, dbPath } = message;
  200. if (type === 'scan_duplicates') {
  201. try {
  202. let duplicates: ScanResult[];
  203. // Use database-based scanning if enabled and DB path is provided
  204. if (useDatabase && dbPath) {
  205. duplicates = await scanDestinationWithDatabase(
  206. dataset,
  207. destination,
  208. dbPath,
  209. );
  210. } else {
  211. // Fall back to traditional file-system scanning
  212. duplicates = await scanDestinationForDuplicates(destination);
  213. }
  214. const similars = await scanForSimilarNames(destination);
  215. parentPort?.postMessage({
  216. type: 'scan_result',
  217. dataset,
  218. destination,
  219. duplicates,
  220. similars,
  221. });
  222. } catch (error) {
  223. parentPort?.postMessage({
  224. type: 'error',
  225. error: error.message,
  226. });
  227. }
  228. }
  229. })();
  230. });