watcher.service.ts 23 KB


  1. import { Inject, Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
  2. import chokidar, { FSWatcher } from 'chokidar';
  3. import fs from 'fs';
  4. import path from 'path';
  5. import { Worker } from 'worker_threads';
  6. import { DatasetsService } from './datasets.service';
  7. import { DbService } from './db.service';
  8. import { EventsGateway } from './events.gateway';
  9. import { TaskQueueService } from './task-queue.service';
  10. interface FileRecord {
  11. dataset: string;
  12. input: string;
  13. output: string;
  14. date: string;
  15. }
  16. @Injectable()
  17. export class WatcherService implements OnModuleDestroy {
  18. private watcher: FSWatcher | null = null;
  19. private isWatching = false;
  20. private lastWatches: string[] = [];
  21. private lastOptions: any = {};
  22. private logger = new Logger('WatcherService');
  23. private validationWorker: Worker;
  24. private validationCallbacks = new Map<string, (result: any) => void>();
  25. private callbackTimeouts = new Map<string, NodeJS.Timeout>();
  26. private lastEventTime: Date = new Date();
  27. private activityCheckInterval: NodeJS.Timeout | null = null;
  28. private eventCount = 0;
  29. constructor(
  30. @Inject(DatasetsService) private readonly datasetsService: DatasetsService,
  31. @Inject(DbService) private readonly db: DbService,
  32. @Inject(EventsGateway) private readonly eventsGateway: EventsGateway,
  33. @Inject(TaskQueueService) private readonly taskQueue: TaskQueueService,
  34. ) {
  35. this.validationWorker = new Worker(
  36. path.join(__dirname, 'file-validation-worker.js'),
  37. );
  38. this.validationWorker.on('message', (message) => {
  39. if (message.type === 'validation_result') {
  40. const callback = this.validationCallbacks.get(message.file);
  41. if (callback) {
  42. callback(message);
  43. this.validationCallbacks.delete(message.file);
  44. }
  45. }
  46. });
  47. this.validationWorker.on('error', (error) => {
  48. this.logger.error(`Validation worker error: ${error}`);
  49. });
  50. // Load persisted state on startup
  51. this.loadPersistedState();
  52. }
  53. private loadPersistedState() {
  54. try {
  55. const db = this.db.getDb();
  56. const row = db
  57. .prepare('SELECT value FROM settings WHERE key = ?')
  58. .get('watcher_state') as { value?: string } | undefined;
  59. if (row && row.value) {
  60. const state = JSON.parse(row.value);
  61. this.isWatching = state.isWatching || false;
  62. this.lastWatches = state.lastWatches || [];
  63. this.lastOptions = state.lastOptions || {};
  64. // If we were watching before restart, resume watching
  65. if (this.isWatching && this.lastWatches.length > 0) {
  66. this.logger.log('Resuming watcher from persisted state');
  67. this.start(this.lastWatches, this.lastOptions);
  68. }
  69. }
  70. } catch (error) {
  71. this.logger.error(`Failed to load persisted watcher state: ${error}`);
  72. }
  73. }
  74. private savePersistedState() {
  75. try {
  76. const db = this.db.getDb();
  77. const state = {
  78. isWatching: this.isWatching,
  79. lastWatches: this.lastWatches,
  80. lastOptions: this.lastOptions,
  81. };
  82. db.prepare(
  83. 'INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)',
  84. ).run('watcher_state', JSON.stringify(state));
  85. } catch (error) {
  86. this.logger.error(`Failed to save persisted watcher state: ${error}`);
  87. }
  88. }
  89. start(watches?: string[], options: any = {}) {
  90. if (this.isWatching) {
  91. this.logger.warn('Watcher already running.');
  92. return { started: false, message: 'Watcher already running.' };
  93. }
  94. // If no watches provided, use all enabled dataset paths
  95. const enabledWatches =
  96. watches && watches.length > 0
  97. ? watches
  98. : this.datasetsService.getEnabledDatasetPaths();
  99. // Get dataset configuration to determine extensions to watch
  100. const datasetConfig = this.datasetsService.getDatasetConfig();
  101. // Create a function to determine if a file should be watched based on dataset extensions
  102. const shouldWatchFile = (filePath: string): boolean => {
  103. // Always allow directories to be traversed
  104. try {
  105. if (fs.statSync(filePath).isDirectory()) {
  106. return true;
  107. }
  108. } catch {
  109. // If we can't stat the file, assume it's a file and continue with filtering
  110. }
  111. // Get the dataset for this file path
  112. const dataset = this.getDatasetFromPath(filePath);
  113. if (!dataset) {
  114. return false; // Don't watch files that don't belong to any dataset
  115. }
  116. // Get the dataset configuration
  117. const datasetSettings = datasetConfig[dataset];
  118. if (!datasetSettings || !datasetSettings.enabled) {
  119. return false;
  120. }
  121. // Find the specific path configuration that matches this file
  122. let pathConfig = null;
  123. for (const pathKey of Object.keys(datasetSettings)) {
  124. if (pathKey !== 'enabled' && filePath.startsWith(pathKey)) {
  125. pathConfig = datasetSettings[pathKey];
  126. break;
  127. }
  128. }
  129. if (!pathConfig) {
  130. return false; // File path doesn't match any configured path in the dataset
  131. }
  132. // Get the exts array for this path
  133. const exts = pathConfig.exts;
  134. if (!exts || !Array.isArray(exts) || exts.length === 0) {
  135. // If no exts specified, watch all files (backward compatibility)
  136. return true;
  137. }
  138. // Check if file extension matches any of the allowed extensions
  139. const fileExt = path.extname(filePath).toLowerCase();
  140. return exts.some((ext: string) => {
  141. const normalizedExt = ext.startsWith('.')
  142. ? ext.toLowerCase()
  143. : `.${ext.toLowerCase()}`;
  144. return fileExt === normalizedExt;
  145. });
  146. };
  147. // Override options with robust settings for long-running stability
  148. const conservativeOptions = {
  149. ...options,
  150. // Polling is more reliable for network filesystems and prevents watcher from dying
  151. usePolling: options.usePolling !== undefined ? options.usePolling : true,
  152. interval: Math.max(options.interval || 10000, 30000), // Minimum 30 seconds
  153. binaryInterval: 60000, // Check binary files less frequently
  154. depth: options.depth !== undefined ? options.depth : 1,
  155. ignorePermissionErrors: true,
  156. // Wait for file writes to finish before emitting events
  157. awaitWriteFinish: {
  158. stabilityThreshold: 5000, // Wait 5 seconds after last change
  159. pollInterval: 1000, // Check every second
  160. },
  161. // Prevent file descriptor leaks
  162. persistent: true,
  163. // Better error handling
  164. ignoreInitial: false,
  165. followSymlinks: false,
  166. // Atomic write detection
  167. atomic: true,
  168. ignored: (filePath: string) => {
  169. // Use the shouldWatchFile function to filter files
  170. return !shouldWatchFile(filePath);
  171. },
  172. };
  173. this.watcher = chokidar.watch(enabledWatches, conservativeOptions);
  174. this.isWatching = true;
  175. this.lastWatches = enabledWatches;
  176. this.lastOptions = conservativeOptions;
  177. this.lastEventTime = new Date();
  178. this.eventCount = 0;
  179. this.watcher
  180. .on('add', (file: string) => {
  181. this.updateActivity('add');
  182. this.handleFileAdded(file);
  183. })
  184. .on('change', (file: string) => {
  185. this.updateActivity('change');
  186. this.eventsGateway.emitFileUpdate({ type: 'change', file });
  187. })
  188. .on('unlink', (file: string) => {
  189. this.updateActivity('unlink');
  190. this.eventsGateway.emitFileUpdate({ type: 'unlink', file });
  191. })
  192. .on('error', (error: Error) => {
  193. this.logger.error(`Watcher error: ${error.message}`);
  194. this.logger.error(`Error stack: ${error.stack}`);
  195. this.eventsGateway.emitWatcherUpdate({
  196. type: 'error',
  197. error: error.message,
  198. });
  199. // Don't let errors kill the watcher - try to recover
  200. this.handleWatcherError(error);
  201. })
  202. .on('ready', () => {
  203. this.logger.log('Watcher is ready and monitoring for changes');
  204. this.logger.log(`Watching ${enabledWatches.length} path(s)`);
  205. this.logger.log(`Polling enabled: ${conservativeOptions.usePolling}`);
  206. this.startActivityMonitor();
  207. })
  208. .on('raw', (event, path, details) => {
  209. // Log raw events for debugging (can be disabled in production)
  210. this.logger.debug(`Raw event: ${event} on ${path}`);
  211. });
  212. this.eventsGateway.emitWatcherUpdate({
  213. type: 'started',
  214. watches: enabledWatches,
  215. });
  216. // Save the running state
  217. this.savePersistedState();
  218. return { started: true };
  219. }
  220. private handleFileAdded(file: string) {
  221. // Determine dataset from file path
  222. const dataset = this.getDatasetFromPath(file);
  223. if (!dataset) {
  224. this.logger.warn(`Could not determine dataset for file: ${file}`);
  225. return;
  226. }
  227. // Get dataset configuration to check extensions
  228. const datasetConfig = this.datasetsService.getDatasetConfig();
  229. const datasetSettings = datasetConfig[dataset];
  230. if (!datasetSettings || !datasetSettings.enabled) {
  231. return;
  232. }
  233. // Find the specific path configuration that matches this file
  234. let pathConfig = null;
  235. for (const pathKey of Object.keys(datasetSettings)) {
  236. if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
  237. pathConfig = datasetSettings[pathKey];
  238. break;
  239. }
  240. }
  241. if (!pathConfig) {
  242. this.logger.warn(
  243. `File path ${file} doesn't match any configured path in dataset ${dataset}`,
  244. );
  245. return;
  246. }
  247. // Check if file extension matches the path's exts array
  248. const exts = pathConfig.exts;
  249. if (exts && Array.isArray(exts) && exts.length > 0) {
  250. const fileExt = path.extname(file).toLowerCase();
  251. const extensionMatches = exts.some((ext: string) => {
  252. const normalizedExt = ext.startsWith('.')
  253. ? ext.toLowerCase()
  254. : `.${ext.toLowerCase()}`;
  255. return fileExt === normalizedExt;
  256. });
  257. if (!extensionMatches) {
  258. // File extension doesn't match, skip processing
  259. this.logger.debug(
  260. `Skipping file ${file} - extension not in dataset exts array`,
  261. );
  262. return;
  263. }
  264. }
  265. // Offload validation to worker with timeout to prevent memory leaks
  266. this.validationCallbacks.set(file, (result) => {
  267. // Clear timeout when callback is called
  268. const timeout = this.callbackTimeouts.get(file);
  269. if (timeout) {
  270. clearTimeout(timeout);
  271. this.callbackTimeouts.delete(file);
  272. }
  273. if (!result.isValid) {
  274. this.logger.warn(`File appears to be corrupted or incomplete: ${file}`);
  275. return;
  276. }
  277. // Proceed with task creation
  278. this.processValidFile(file, dataset);
  279. });
  280. // Set timeout to cleanup callback if worker doesn't respond within 5 minutes
  281. const timeout = setTimeout(() => {
  282. if (this.validationCallbacks.has(file)) {
  283. this.logger.warn(`Validation timeout for file: ${file}`);
  284. this.validationCallbacks.delete(file);
  285. this.callbackTimeouts.delete(file);
  286. }
  287. }, 300000); // 5 minutes
  288. this.callbackTimeouts.set(file, timeout);
  289. this.validationWorker.postMessage({ type: 'validate_file', file });
  290. }
  291. private processValidFile(file: string, dataset: string) {
  292. // Get dataset configuration
  293. const datasetConfig = this.datasetsService.getDatasetConfig();
  294. const datasetSettings = datasetConfig[dataset];
  295. if (!datasetSettings || !datasetSettings.enabled) {
  296. return;
  297. }
  298. // Determine preset and output configuration - find the specific path configuration
  299. let preset = 'Fast 1080p30'; // Default fallback
  300. let destination: string | undefined;
  301. let ext = '.mkv'; // Default extension
  302. let clean: any;
  303. let folder = false; // Default: don't create subfolders
  304. if (datasetConfig[dataset]) {
  305. const datasetObj = datasetConfig[dataset];
  306. // Find the path configuration that matches this file
  307. for (const pathKey of Object.keys(datasetObj)) {
  308. if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
  309. const pathConfig = datasetObj[pathKey];
  310. if (pathConfig) {
  311. if (pathConfig.preset) {
  312. preset = pathConfig.preset;
  313. }
  314. if (pathConfig.destination) {
  315. destination = pathConfig.destination;
  316. }
  317. if (pathConfig.ext) {
  318. // Ensure extension starts with a dot
  319. ext = pathConfig.ext.startsWith('.')
  320. ? pathConfig.ext
  321. : '.' + pathConfig.ext;
  322. }
  323. if (pathConfig.clean) {
  324. clean = pathConfig.clean;
  325. }
  326. if (typeof pathConfig.folder === 'boolean') {
  327. folder = pathConfig.folder;
  328. }
  329. }
  330. break;
  331. }
  332. }
  333. // If no path-specific config found, try the old format (for backward compatibility)
  334. if (preset === 'Fast 1080p30' && datasetObj.preset) {
  335. preset = datasetObj.preset;
  336. }
  337. if (!destination && datasetObj.destination) {
  338. destination = datasetObj.destination;
  339. }
  340. if (ext === '.mkv' && datasetObj.ext) {
  341. ext = datasetObj.ext.startsWith('.')
  342. ? datasetObj.ext
  343. : '.' + datasetObj.ext;
  344. }
  345. if (!clean && datasetObj.clean) {
  346. clean = datasetObj.clean;
  347. }
  348. if (!folder && typeof datasetObj.folder === 'boolean') {
  349. folder = datasetObj.folder;
  350. }
  351. }
  352. // Create output path based on configuration
  353. let output: string;
  354. if (destination) {
  355. // If destination is specified, use it as the base path
  356. const fileName = path.basename(file, path.extname(file));
  357. let cleanFileName = fileName;
  358. // Apply cleaning rules if specified
  359. if (clean && typeof clean === 'object') {
  360. for (const [pattern, replacement] of Object.entries(clean)) {
  361. try {
  362. const regex = new RegExp(pattern, 'g');
  363. cleanFileName = cleanFileName.replace(regex, replacement as string);
  364. } catch (error) {
  365. this.logger.warn(
  366. `Invalid regex pattern in clean config: ${pattern}`,
  367. );
  368. }
  369. }
  370. }
  371. // If folder is enabled, create a subdirectory based on the cleaned filename
  372. if (folder) {
  373. // Try to extract series/site name from filename pattern
  374. // Look for common date/episode patterns and take everything before the first separator
  375. const patterns = [
  376. /\d{2}\.\d{2}\.\d{2}/, // 24.12.17
  377. /[A-Za-z]\d{3,4}/, // E651, S123, etc.
  378. /\d{4}/, // 2024, 1234, etc.
  379. /\.\d+/, // .123, .2024, etc.
  380. ];
  381. let folderName = cleanFileName.charAt(0).toUpperCase(); // fallback
  382. let foundMatch = false;
  383. for (const pattern of patterns) {
  384. const match = cleanFileName.match(pattern);
  385. if (match && match.index !== undefined && match.index > 0) {
  386. // Take everything before the pattern as the potential folder name
  387. let potentialFolderName = cleanFileName
  388. .substring(0, match.index)
  389. .trim();
  390. // Remove trailing dots if any
  391. potentialFolderName = potentialFolderName.replace(/\.$/, '');
  392. // If the potential folder name contains dots, take only the first part (site name)
  393. // This handles patterns like "Site.Series.Date..." where we want just "Site"
  394. if (potentialFolderName.includes('.')) {
  395. folderName = potentialFolderName.split('.')[0];
  396. } else {
  397. folderName = potentialFolderName;
  398. }
  399. foundMatch = true;
  400. break;
  401. }
  402. }
  403. // If no pattern matched but filename contains dots, try to extract site name
  404. if (!foundMatch && cleanFileName.includes('.')) {
  405. const parts = cleanFileName.split('.');
  406. if (parts.length > 1 && parts[0].length > 0) {
  407. folderName = parts[0];
  408. }
  409. } else if (cleanFileName.toLowerCase().startsWith('the ')) {
  410. // For titles starting with "The", use the next word
  411. const words = cleanFileName.split(' ');
  412. if (words.length > 1) {
  413. folderName = words[1].charAt(0).toUpperCase();
  414. }
  415. }
  416. output = path.join(destination, folderName, cleanFileName + ext);
  417. } else {
  418. output = path.join(destination, cleanFileName + ext);
  419. }
  420. // Ensure destination directory exists
  421. const outputDir = path.dirname(output);
  422. if (!fs.existsSync(outputDir)) {
  423. try {
  424. fs.mkdirSync(outputDir, { recursive: true });
  425. } catch (error) {
  426. this.logger.error(
  427. `Failed to create output directory ${outputDir}: ${error.message}`,
  428. );
  429. return;
  430. }
  431. }
  432. } else {
  433. // Default behavior: same directory with new extension
  434. output = path.join(
  435. path.dirname(file),
  436. path.basename(file, path.extname(file)) + ext,
  437. );
  438. }
  439. // Always create/update file record for discovered files
  440. const existingFileRecord = this.db.findFile(dataset, file) as
  441. | FileRecord
  442. | undefined;
  443. if (!existingFileRecord) {
  444. // Create file record for newly discovered file
  445. this.db.setFile(dataset, file, {
  446. date: new Date().toISOString(),
  447. output: output,
  448. });
  449. this.logger.log(`Discovered new file: ${file}`);
  450. } else {
  451. // Update existing file record with current output path (in case config changed)
  452. this.db.setFile(dataset, file, {
  453. output: output,
  454. });
  455. }
  456. // Automatic task creation: only when output doesn't exist
  457. const outputExists = fs.existsSync(output);
  458. if (outputExists) {
  459. return;
  460. }
  461. // Check if task already exists for this input file
  462. const existingTask = this.taskQueue.getTaskByInput(file);
  463. if (existingTask) {
  464. // If task exists and is currently processing, reset to pending for retry
  465. if (existingTask.status === 'processing') {
  466. this.taskQueue.updateTaskStatus(existingTask.id, 'pending');
  467. this.eventsGateway.emitTaskUpdate({
  468. type: 'reset',
  469. taskId: existingTask.id,
  470. file,
  471. });
  472. }
  473. return;
  474. }
  475. // Create task for processing
  476. try {
  477. const task = this.taskQueue.createTask({
  478. dataset,
  479. input: file,
  480. output,
  481. preset,
  482. });
  483. // Update file record to indicate processing has started
  484. this.eventsGateway.emitFileUpdate({
  485. type: 'add',
  486. file,
  487. dataset,
  488. taskId: task.id,
  489. });
  490. } catch (error) {
  491. this.logger.error(
  492. `Failed to create task for file ${file}: ${error.message}`,
  493. );
  494. }
  495. }
  496. private getDatasetFromPath(file: string): string | null {
  497. const datasetConfig = this.datasetsService.getDatasetConfig();
  498. // Iterate through each dataset and its paths
  499. for (const datasetName of Object.keys(datasetConfig)) {
  500. const datasetObj = datasetConfig[datasetName];
  501. if (typeof datasetObj === 'object' && datasetObj !== null) {
  502. // Check each path in the dataset configuration
  503. for (const pathKey of Object.keys(datasetObj)) {
  504. if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
  505. // Return the actual dataset name (e.g., "tvshows", "pr0n")
  506. return datasetName;
  507. }
  508. }
  509. }
  510. }
  511. return null;
  512. }
  513. private updateActivity(eventType: string) {
  514. this.lastEventTime = new Date();
  515. this.eventCount++;
  516. if (this.eventCount % 100 === 0) {
  517. this.logger.log(
  518. `Watcher activity: ${this.eventCount} events processed, last: ${eventType}`,
  519. );
  520. }
  521. }
  522. private startActivityMonitor() {
  523. // Stop any existing monitor
  524. if (this.activityCheckInterval) {
  525. clearInterval(this.activityCheckInterval);
  526. }
  527. // Check for watcher activity every 5 minutes
  528. this.activityCheckInterval = setInterval(() => {
  529. const now = new Date();
  530. const timeSinceLastEvent = now.getTime() - this.lastEventTime.getTime();
  531. const minutesSinceLastEvent = Math.floor(timeSinceLastEvent / 60000);
  532. this.logger.log(
  533. `Watcher health check - Events: ${this.eventCount}, Last activity: ${minutesSinceLastEvent}m ago, Status: ${this.isWatching ? 'active' : 'inactive'}`,
  534. );
  535. // Verify watcher is still watching
  536. if (this.watcher && this.isWatching) {
  537. const watchedPaths = this.watcher.getWatched();
  538. const pathCount = Object.keys(watchedPaths).length;
  539. this.logger.log(`Currently watching ${pathCount} directories`);
  540. if (pathCount === 0 && this.lastWatches.length > 0) {
  541. this.logger.error(
  542. 'CRITICAL: Watcher has no watched paths but should be watching!',
  543. );
  544. this.eventsGateway.emitWatcherUpdate({
  545. type: 'health_alert',
  546. healthy: false,
  547. reason: 'Watcher lost all watched paths',
  548. });
  549. }
  550. }
  551. }, 300000); // Every 5 minutes
  552. }
  553. private handleWatcherError(error: Error) {
  554. // Log detailed error information
  555. this.logger.error(
  556. 'Watcher encountered an error, attempting to continue...',
  557. );
  558. // Check if watcher is still functional
  559. if (this.watcher) {
  560. try {
  561. const watchedPaths = this.watcher.getWatched();
  562. const pathCount = Object.keys(watchedPaths).length;
  563. if (pathCount === 0) {
  564. this.logger.error('Watcher has stopped watching paths after error!');
  565. } else {
  566. this.logger.log(`Watcher still monitoring ${pathCount} directories`);
  567. }
  568. } catch (e) {
  569. this.logger.error(`Cannot check watcher status: ${e.message}`);
  570. }
  571. }
  572. }
  573. async stop() {
  574. // Stop activity monitor
  575. if (this.activityCheckInterval) {
  576. clearInterval(this.activityCheckInterval);
  577. this.activityCheckInterval = null;
  578. }
  579. // If status shows we're watching, force stop regardless of watcher object state
  580. if (this.isWatching) {
  581. if (this.watcher) {
  582. try {
  583. await this.watcher.close();
  584. } catch (error) {
  585. this.logger.warn(`Error closing watcher: ${error.message}`);
  586. }
  587. }
  588. this.watcher = null;
  589. this.isWatching = false;
  590. this.eventsGateway.emitWatcherUpdate({ type: 'stopped' });
  591. // Save the stopped state
  592. this.savePersistedState();
  593. return { stopped: true };
  594. }
  595. return { stopped: false, message: 'Watcher is not running.' };
  596. }
  597. status() {
  598. return {
  599. isWatching: this.isWatching,
  600. watches: this.lastWatches,
  601. options: this.lastOptions,
  602. };
  603. }
  604. async onModuleDestroy() {
  605. // Clean up resources on application shutdown
  606. try {
  607. // Stop activity monitor
  608. if (this.activityCheckInterval) {
  609. clearInterval(this.activityCheckInterval);
  610. this.activityCheckInterval = null;
  611. }
  612. // Clear all callback timeouts
  613. for (const timeout of this.callbackTimeouts.values()) {
  614. clearTimeout(timeout);
  615. }
  616. this.callbackTimeouts.clear();
  617. // Close the watcher if it's running
  618. if (this.watcher && this.isWatching) {
  619. await this.watcher.close();
  620. this.logger.log('Watcher closed on module destroy');
  621. }
  622. // Terminate the validation worker
  623. if (this.validationWorker) {
  624. await this.validationWorker.terminate();
  625. this.logger.log('Validation worker terminated on module destroy');
  626. }
  627. // Clear callbacks
  628. this.validationCallbacks.clear();
  629. this.logger.log(
  630. `Watcher destroyed. Total events processed: ${this.eventCount}`,
  631. );
  632. } catch (error) {
  633. this.logger.error(`Error during module destroy: ${error}`);
  634. }
  635. }
  636. }