import { Inject, Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; import chokidar, { FSWatcher } from 'chokidar'; import fs from 'fs'; import path from 'path'; import { Worker } from 'worker_threads'; import { DatasetsService } from './datasets.service'; import { DbService } from './db.service'; import { EventsGateway } from './events.gateway'; import { TaskQueueService } from './task-queue.service'; interface FileRecord { dataset: string; input: string; output: string; date: string; } @Injectable() export class WatcherService implements OnModuleDestroy { private watcher: FSWatcher | null = null; private isWatching = false; private lastWatches: string[] = []; private lastOptions: any = {}; private logger = new Logger('WatcherService'); private validationWorker: Worker; private validationCallbacks = new Map void>(); private callbackTimeouts = new Map(); private lastEventTime: Date = new Date(); private activityCheckInterval: NodeJS.Timeout | null = null; private eventCount = 0; constructor( @Inject(DatasetsService) private readonly datasetsService: DatasetsService, @Inject(DbService) private readonly db: DbService, @Inject(EventsGateway) private readonly eventsGateway: EventsGateway, @Inject(TaskQueueService) private readonly taskQueue: TaskQueueService, ) { this.validationWorker = new Worker( path.join(__dirname, 'file-validation-worker.js'), ); this.validationWorker.on('message', (message) => { if (message.type === 'validation_result') { const callback = this.validationCallbacks.get(message.file); if (callback) { callback(message); this.validationCallbacks.delete(message.file); } } }); this.validationWorker.on('error', (error) => { this.logger.error(`Validation worker error: ${error}`); }); // Load persisted state on startup this.loadPersistedState(); } private loadPersistedState() { try { const db = this.db.getDb(); const row = db .prepare('SELECT value FROM settings WHERE key = ?') .get('watcher_state') as { value?: string } | undefined; if (row && row.value) { const state = JSON.parse(row.value); this.isWatching = state.isWatching || false; this.lastWatches = state.lastWatches || []; this.lastOptions = state.lastOptions || {}; // If we were watching before restart, resume watching if (this.isWatching && this.lastWatches.length > 0) { this.logger.log('Resuming watcher from persisted state'); this.start(this.lastWatches, this.lastOptions); } } } catch (error) { this.logger.error(`Failed to load persisted watcher state: ${error}`); } } private savePersistedState() { try { const db = this.db.getDb(); const state = { isWatching: this.isWatching, lastWatches: this.lastWatches, lastOptions: this.lastOptions, }; db.prepare( 'INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)', ).run('watcher_state', JSON.stringify(state)); } catch (error) { this.logger.error(`Failed to save persisted watcher state: ${error}`); } } start(watches?: string[], options: any = {}) { if (this.isWatching) { this.logger.warn('Watcher already running.'); return { started: false, message: 'Watcher already running.' }; } // If no watches provided, use all enabled dataset paths const enabledWatches = watches && watches.length > 0 ? watches : this.datasetsService.getEnabledDatasetPaths(); // Get dataset configuration to determine extensions to watch const datasetConfig = this.datasetsService.getDatasetConfig(); // Create a function to determine if a file should be watched based on dataset extensions const shouldWatchFile = (filePath: string): boolean => { // Always allow directories to be traversed try { if (fs.statSync(filePath).isDirectory()) { return true; } } catch { // If we can't stat the file, assume it's a file and continue with filtering } // Get the dataset for this file path const dataset = this.getDatasetFromPath(filePath); if (!dataset) { return false; // Don't watch files that don't belong to any dataset } // Get the dataset configuration const datasetSettings = datasetConfig[dataset]; if (!datasetSettings || !datasetSettings.enabled) { return false; } // Find the specific path configuration that matches this file let pathConfig = null; for (const pathKey of Object.keys(datasetSettings)) { if (pathKey !== 'enabled' && filePath.startsWith(pathKey)) { pathConfig = datasetSettings[pathKey]; break; } } if (!pathConfig) { return false; // File path doesn't match any configured path in the dataset } // Get the exts array for this path const exts = pathConfig.exts; if (!exts || !Array.isArray(exts) || exts.length === 0) { // If no exts specified, watch all files (backward compatibility) return true; } // Check if file extension matches any of the allowed extensions const fileExt = path.extname(filePath).toLowerCase(); return exts.some((ext: string) => { const normalizedExt = ext.startsWith('.') ? ext.toLowerCase() : `.${ext.toLowerCase()}`; return fileExt === normalizedExt; }); }; // Override options with robust settings for long-running stability const conservativeOptions = { ...options, // Polling is more reliable for network filesystems and prevents watcher from dying usePolling: options.usePolling !== undefined ? options.usePolling : true, interval: Math.max(options.interval || 10000, 30000), // Minimum 30 seconds binaryInterval: 60000, // Check binary files less frequently depth: options.depth !== undefined ? options.depth : 1, ignorePermissionErrors: true, // Wait for file writes to finish before emitting events awaitWriteFinish: { stabilityThreshold: 5000, // Wait 5 seconds after last change pollInterval: 1000, // Check every second }, // Prevent file descriptor leaks persistent: true, // Better error handling ignoreInitial: false, followSymlinks: false, // Atomic write detection atomic: true, ignored: (filePath: string) => { // Use the shouldWatchFile function to filter files return !shouldWatchFile(filePath); }, }; this.watcher = chokidar.watch(enabledWatches, conservativeOptions); this.isWatching = true; this.lastWatches = enabledWatches; this.lastOptions = conservativeOptions; this.lastEventTime = new Date(); this.eventCount = 0; this.watcher .on('add', (file: string) => { this.updateActivity('add'); this.handleFileAdded(file); }) .on('change', (file: string) => { this.updateActivity('change'); this.eventsGateway.emitFileUpdate({ type: 'change', file }); }) .on('unlink', (file: string) => { this.updateActivity('unlink'); this.eventsGateway.emitFileUpdate({ type: 'unlink', file }); }) .on('error', (error: Error) => { this.logger.error(`Watcher error: ${error.message}`); this.logger.error(`Error stack: ${error.stack}`); this.eventsGateway.emitWatcherUpdate({ type: 'error', error: error.message, }); // Don't let errors kill the watcher - try to recover this.handleWatcherError(error); }) .on('ready', () => { this.logger.log('Watcher is ready and monitoring for changes'); this.logger.log(`Watching ${enabledWatches.length} path(s)`); this.logger.log(`Polling enabled: ${conservativeOptions.usePolling}`); this.startActivityMonitor(); }) .on('raw', (event, path, details) => { // Log raw events for debugging (can be disabled in production) this.logger.debug(`Raw event: ${event} on ${path}`); }); this.eventsGateway.emitWatcherUpdate({ type: 'started', watches: enabledWatches, }); // Save the running state this.savePersistedState(); return { started: true }; } private handleFileAdded(file: string) { // Determine dataset from file path const dataset = this.getDatasetFromPath(file); if (!dataset) { this.logger.warn(`Could not determine dataset for file: ${file}`); return; } // Get dataset configuration to check extensions const datasetConfig = this.datasetsService.getDatasetConfig(); const datasetSettings = datasetConfig[dataset]; if (!datasetSettings || !datasetSettings.enabled) { return; } // Find the specific path configuration that matches this file let pathConfig = null; for (const pathKey of Object.keys(datasetSettings)) { if (pathKey !== 'enabled' && file.startsWith(pathKey)) { pathConfig = datasetSettings[pathKey]; break; } } if (!pathConfig) { this.logger.warn( `File path ${file} doesn't match any configured path in dataset ${dataset}`, ); return; } // Check if file extension matches the path's exts array const exts = pathConfig.exts; if (exts && Array.isArray(exts) && exts.length > 0) { const fileExt = path.extname(file).toLowerCase(); const extensionMatches = exts.some((ext: string) => { const normalizedExt = ext.startsWith('.') ? ext.toLowerCase() : `.${ext.toLowerCase()}`; return fileExt === normalizedExt; }); if (!extensionMatches) { // File extension doesn't match, skip processing this.logger.debug( `Skipping file ${file} - extension not in dataset exts array`, ); return; } } // Offload validation to worker with timeout to prevent memory leaks this.validationCallbacks.set(file, (result) => { // Clear timeout when callback is called const timeout = this.callbackTimeouts.get(file); if (timeout) { clearTimeout(timeout); this.callbackTimeouts.delete(file); } if (!result.isValid) { this.logger.warn(`File appears to be corrupted or incomplete: ${file}`); return; } // Proceed with task creation this.processValidFile(file, dataset); }); // Set timeout to cleanup callback if worker doesn't respond within 5 minutes const timeout = setTimeout(() => { if (this.validationCallbacks.has(file)) { this.logger.warn(`Validation timeout for file: ${file}`); this.validationCallbacks.delete(file); this.callbackTimeouts.delete(file); } }, 300000); // 5 minutes this.callbackTimeouts.set(file, timeout); this.validationWorker.postMessage({ type: 'validate_file', file }); } private processValidFile(file: string, dataset: string) { // Get dataset configuration const datasetConfig = this.datasetsService.getDatasetConfig(); const datasetSettings = datasetConfig[dataset]; if (!datasetSettings || !datasetSettings.enabled) { return; } // Determine preset and output configuration - find the specific path configuration let preset = 'Fast 1080p30'; // Default fallback let destination: string | undefined; let ext = '.mkv'; // Default extension let clean: any; let folder = false; // Default: don't create subfolders if (datasetConfig[dataset]) { const datasetObj = datasetConfig[dataset]; // Find the path configuration that matches this file for (const pathKey of Object.keys(datasetObj)) { if (pathKey !== 'enabled' && file.startsWith(pathKey)) { const pathConfig = datasetObj[pathKey]; if (pathConfig) { if (pathConfig.preset) { preset = pathConfig.preset; } if (pathConfig.destination) { destination = pathConfig.destination; } if (pathConfig.ext) { // Ensure extension starts with a dot ext = pathConfig.ext.startsWith('.') ? pathConfig.ext : '.' + pathConfig.ext; } if (pathConfig.clean) { clean = pathConfig.clean; } if (typeof pathConfig.folder === 'boolean') { folder = pathConfig.folder; } } break; } } // If no path-specific config found, try the old format (for backward compatibility) if (preset === 'Fast 1080p30' && datasetObj.preset) { preset = datasetObj.preset; } if (!destination && datasetObj.destination) { destination = datasetObj.destination; } if (ext === '.mkv' && datasetObj.ext) { ext = datasetObj.ext.startsWith('.') ? datasetObj.ext : '.' + datasetObj.ext; } if (!clean && datasetObj.clean) { clean = datasetObj.clean; } if (!folder && typeof datasetObj.folder === 'boolean') { folder = datasetObj.folder; } } // Create output path based on configuration let output: string; if (destination) { // If destination is specified, use it as the base path const fileName = path.basename(file, path.extname(file)); let cleanFileName = fileName; // Apply cleaning rules if specified if (clean && typeof clean === 'object') { for (const [pattern, replacement] of Object.entries(clean)) { try { const regex = new RegExp(pattern, 'g'); cleanFileName = cleanFileName.replace(regex, replacement as string); } catch (error) { this.logger.warn( `Invalid regex pattern in clean config: ${pattern}`, ); } } } // If folder is enabled, create a subdirectory based on the cleaned filename if (folder) { // Try to extract series/site name from filename pattern // Look for common date/episode patterns and take everything before the first separator const patterns = [ /\d{2}\.\d{2}\.\d{2}/, // 24.12.17 /[A-Za-z]\d{3,4}/, // E651, S123, etc. /\d{4}/, // 2024, 1234, etc. /\.\d+/, // .123, .2024, etc. ]; let folderName = cleanFileName.charAt(0).toUpperCase(); // fallback let foundMatch = false; for (const pattern of patterns) { const match = cleanFileName.match(pattern); if (match && match.index !== undefined && match.index > 0) { // Take everything before the pattern as the potential folder name let potentialFolderName = cleanFileName .substring(0, match.index) .trim(); // Remove trailing dots if any potentialFolderName = potentialFolderName.replace(/\.$/, ''); // If the potential folder name contains dots, take only the first part (site name) // This handles patterns like "Site.Series.Date..." where we want just "Site" if (potentialFolderName.includes('.')) { folderName = potentialFolderName.split('.')[0]; } else { folderName = potentialFolderName; } foundMatch = true; break; } } // If no pattern matched but filename contains dots, try to extract site name if (!foundMatch && cleanFileName.includes('.')) { const parts = cleanFileName.split('.'); if (parts.length > 1 && parts[0].length > 0) { folderName = parts[0]; } } else if (cleanFileName.toLowerCase().startsWith('the ')) { // For titles starting with "The", use the next word const words = cleanFileName.split(' '); if (words.length > 1) { folderName = words[1].charAt(0).toUpperCase(); } } output = path.join(destination, folderName, cleanFileName + ext); } else { output = path.join(destination, cleanFileName + ext); } // Ensure destination directory exists const outputDir = path.dirname(output); if (!fs.existsSync(outputDir)) { try { fs.mkdirSync(outputDir, { recursive: true }); } catch (error) { this.logger.error( `Failed to create output directory ${outputDir}: ${error.message}`, ); return; } } } else { // Default behavior: same directory with new extension output = path.join( path.dirname(file), path.basename(file, path.extname(file)) + ext, ); } // Always create/update file record for discovered files const existingFileRecord = this.db.findFile(dataset, file) as | FileRecord | undefined; if (!existingFileRecord) { // Create file record for newly discovered file this.db.setFile(dataset, file, { date: new Date().toISOString(), output: output, }); this.logger.log(`Discovered new file: ${file}`); } else { // Update existing file record with current output path (in case config changed) this.db.setFile(dataset, file, { output: output, }); } // Automatic task creation: only when output doesn't exist const outputExists = fs.existsSync(output); if (outputExists) { return; } // Check if task already exists for this input file const existingTask = this.taskQueue.getTaskByInput(file); if (existingTask) { // If task exists and is currently processing, reset to pending for retry if (existingTask.status === 'processing') { this.taskQueue.updateTaskStatus(existingTask.id, 'pending'); this.eventsGateway.emitTaskUpdate({ type: 'reset', taskId: existingTask.id, file, }); } return; } // Create task for processing try { const task = this.taskQueue.createTask({ dataset, input: file, output, preset, }); // Update file record to indicate processing has started this.eventsGateway.emitFileUpdate({ type: 'add', file, dataset, taskId: task.id, }); } catch (error) { this.logger.error( `Failed to create task for file ${file}: ${error.message}`, ); } } private getDatasetFromPath(file: string): string | null { const datasetConfig = this.datasetsService.getDatasetConfig(); // Iterate through each dataset and its paths for (const datasetName of Object.keys(datasetConfig)) { const datasetObj = datasetConfig[datasetName]; if (typeof datasetObj === 'object' && datasetObj !== null) { // Check each path in the dataset configuration for (const pathKey of Object.keys(datasetObj)) { if (pathKey !== 'enabled' && file.startsWith(pathKey)) { // Return the actual dataset name (e.g., "tvshows", "pr0n") return datasetName; } } } } return null; } private updateActivity(eventType: string) { this.lastEventTime = new Date(); this.eventCount++; if (this.eventCount % 100 === 0) { this.logger.log( `Watcher activity: ${this.eventCount} events processed, last: ${eventType}`, ); } } private startActivityMonitor() { // Stop any existing monitor if (this.activityCheckInterval) { clearInterval(this.activityCheckInterval); } // Check for watcher activity every 5 minutes this.activityCheckInterval = setInterval(() => { const now = new Date(); const timeSinceLastEvent = now.getTime() - this.lastEventTime.getTime(); const minutesSinceLastEvent = Math.floor(timeSinceLastEvent / 60000); this.logger.log( `Watcher health check - Events: ${this.eventCount}, Last activity: ${minutesSinceLastEvent}m ago, Status: ${this.isWatching ? 'active' : 'inactive'}`, ); // Verify watcher is still watching if (this.watcher && this.isWatching) { const watchedPaths = this.watcher.getWatched(); const pathCount = Object.keys(watchedPaths).length; this.logger.log(`Currently watching ${pathCount} directories`); if (pathCount === 0 && this.lastWatches.length > 0) { this.logger.error( 'CRITICAL: Watcher has no watched paths but should be watching!', ); this.eventsGateway.emitWatcherUpdate({ type: 'health_alert', healthy: false, reason: 'Watcher lost all watched paths', }); } } }, 300000); // Every 5 minutes } private handleWatcherError(error: Error) { // Log detailed error information this.logger.error( 'Watcher encountered an error, attempting to continue...', ); // Check if watcher is still functional if (this.watcher) { try { const watchedPaths = this.watcher.getWatched(); const pathCount = Object.keys(watchedPaths).length; if (pathCount === 0) { this.logger.error('Watcher has stopped watching paths after error!'); } else { this.logger.log(`Watcher still monitoring ${pathCount} directories`); } } catch (e) { this.logger.error(`Cannot check watcher status: ${e.message}`); } } } async stop() { // Stop activity monitor if (this.activityCheckInterval) { clearInterval(this.activityCheckInterval); this.activityCheckInterval = null; } // If status shows we're watching, force stop regardless of watcher object state if (this.isWatching) { if (this.watcher) { try { await this.watcher.close(); } catch (error) { this.logger.warn(`Error closing watcher: ${error.message}`); } } this.watcher = null; this.isWatching = false; this.eventsGateway.emitWatcherUpdate({ type: 'stopped' }); // Save the stopped state this.savePersistedState(); return { stopped: true }; } return { stopped: false, message: 'Watcher is not running.' }; } status() { return { isWatching: this.isWatching, watches: this.lastWatches, options: this.lastOptions, }; } async onModuleDestroy() { // Clean up resources on application shutdown try { // Stop activity monitor if (this.activityCheckInterval) { clearInterval(this.activityCheckInterval); this.activityCheckInterval = null; } // Clear all callback timeouts for (const timeout of this.callbackTimeouts.values()) { clearTimeout(timeout); } this.callbackTimeouts.clear(); // Close the watcher if it's running if (this.watcher && this.isWatching) { await this.watcher.close(); this.logger.log('Watcher closed on module destroy'); } // Terminate the validation worker if (this.validationWorker) { await this.validationWorker.terminate(); this.logger.log('Validation worker terminated on module destroy'); } // Clear callbacks this.validationCallbacks.clear(); this.logger.log( `Watcher destroyed. Total events processed: ${this.eventCount}`, ); } catch (error) { this.logger.error(`Error during module destroy: ${error}`); } } }