import { Inject, Injectable, Logger } from '@nestjs/common'; import chokidar, { FSWatcher } from 'chokidar'; import fs from 'fs'; import path from 'path'; import { DatasetsService } from './datasets.service'; import { DbService } from './db.service'; import { EventsGateway } from './events.gateway'; import { TaskQueueService } from './task-queue.service'; interface FileRecord { dataset: string; input: string; output: string; date: string; } @Injectable() export class WatcherService { private watcher: FSWatcher | null = null; private isWatching = false; private lastWatches: string[] = []; private lastOptions: any = {}; private logger = new Logger('WatcherService'); constructor( @Inject(DatasetsService) private readonly datasetsService: DatasetsService, @Inject(DbService) private readonly db: DbService, @Inject(EventsGateway) private readonly eventsGateway: EventsGateway, @Inject(TaskQueueService) private readonly taskQueue: TaskQueueService, ) {} start(watches?: string[], options: any = {}) { if (this.isWatching) { this.logger.warn('Watcher already running.'); return { started: false, message: 'Watcher already running.' }; } // If no watches provided, use all enabled dataset paths const enabledWatches = watches && watches.length > 0 ? watches : this.datasetsService.getEnabledDatasetPaths(); // Override options to be more conservative for file descriptor limits const conservativeOptions = { ...options, interval: Math.max(options.interval || 10000, 30000), // Minimum 30 seconds depth: options.depth !== undefined ? options.depth : 1, ignorePermissionErrors: true, }; this.watcher = chokidar.watch(enabledWatches, conservativeOptions); this.isWatching = true; this.lastWatches = enabledWatches; this.lastOptions = conservativeOptions; this.watcher .on('add', (file: string) => { this.logger.log(`File added: ${file}`); this.handleFileAdded(file); }) .on('change', (file: string) => { this.logger.log(`File changed: ${file}`); this.eventsGateway.emitFileUpdate({ type: 'change', file }); }) .on('unlink', (file: string) => { this.logger.log(`File removed: ${file}`); this.eventsGateway.emitFileUpdate({ type: 'unlink', file }); }) .on('error', (error: Error) => { this.logger.error(`Watcher error: ${error}`); this.eventsGateway.emitWatcherUpdate({ type: 'error', error: error.message, }); }); this.logger.log('Watcher started.'); this.eventsGateway.emitWatcherUpdate({ type: 'started', watches: enabledWatches, }); return { started: true }; } private handleFileAdded(file: string) { // Determine dataset from file path const dataset = this.getDatasetFromPath(file); if (!dataset) { this.logger.warn(`Could not determine dataset for file: ${file}`); return; } // Check if this is a video file (basic extension check) if (!this.isVideoFile(file)) { this.logger.log(`Skipping non-video file: ${file}`); return; } // Validate that the file has proper video headers if (!this.isValidVideoFile(file)) { this.logger.warn(`File appears to be corrupted or incomplete: ${file}`); return; } // Get dataset configuration const datasetConfig = this.datasetsService.getDatasetConfig(); const datasetSettings = datasetConfig[dataset]; if (!datasetSettings || !datasetSettings.enabled) { this.logger.log( `Dataset ${dataset} is not enabled, skipping file: ${file}`, ); return; } // Determine preset and output configuration - find the specific path configuration let preset = 'Fast 1080p30'; // Default fallback let destination: string | undefined; let ext = '.mkv'; // Default extension let clean: any; let folder = false; // Default: don't create subfolders if (datasetConfig[dataset]) { const datasetObj = datasetConfig[dataset]; // Find the path configuration that matches this file for (const pathKey of Object.keys(datasetObj)) { if (pathKey !== 'enabled' && file.startsWith(pathKey)) { const pathConfig = datasetObj[pathKey]; if (pathConfig) { if (pathConfig.preset) { preset = pathConfig.preset; } if (pathConfig.destination) { destination = pathConfig.destination; } if (pathConfig.ext) { // Ensure extension starts with a dot ext = pathConfig.ext.startsWith('.') ? pathConfig.ext : '.' + pathConfig.ext; } if (pathConfig.clean) { clean = pathConfig.clean; } if (typeof pathConfig.folder === 'boolean') { folder = pathConfig.folder; } } break; } } // If no path-specific config found, try the old format (for backward compatibility) if (preset === 'Fast 1080p30' && datasetObj.preset) { preset = datasetObj.preset; } if (!destination && datasetObj.destination) { destination = datasetObj.destination; } if (ext === '.mkv' && datasetObj.ext) { ext = datasetObj.ext.startsWith('.') ? datasetObj.ext : '.' + datasetObj.ext; } if (!clean && datasetObj.clean) { clean = datasetObj.clean; } if (!folder && typeof datasetObj.folder === 'boolean') { folder = datasetObj.folder; } } // Create output path based on configuration let output: string; if (destination) { // If destination is specified, use it as the base path const fileName = path.basename(file, path.extname(file)); let cleanFileName = fileName; // Apply cleaning rules if specified if (clean && typeof clean === 'object') { for (const [pattern, replacement] of Object.entries(clean)) { try { const regex = new RegExp(pattern, 'g'); cleanFileName = cleanFileName.replace(regex, replacement as string); } catch (error) { this.logger.warn( `Invalid regex pattern in clean config: ${pattern}`, ); } } } // If folder is enabled, create a subdirectory based on the cleaned filename if (folder) { // Try to extract series/site name from filename pattern // Look for common date/episode patterns and take everything before the first separator const patterns = [ /\d{2}\.\d{2}\.\d{2}/, // 24.12.17 /[A-Za-z]\d{3,4}/, // E651, S123, etc. /\d{4}/, // 2024, 1234, etc. /\.\d+/, // .123, .2024, etc. ]; let folderName = cleanFileName.charAt(0).toUpperCase(); // fallback let foundMatch = false; for (const pattern of patterns) { const match = cleanFileName.match(pattern); if (match && match.index !== undefined && match.index > 0) { // Take everything before the pattern as the potential folder name let potentialFolderName = cleanFileName .substring(0, match.index) .trim(); // Remove trailing dots if any potentialFolderName = potentialFolderName.replace(/\.$/, ''); // If the potential folder name contains dots, take only the first part (site name) // This handles patterns like "Site.Series.Date..." where we want just "Site" if (potentialFolderName.includes('.')) { folderName = potentialFolderName.split('.')[0]; } else { folderName = potentialFolderName; } foundMatch = true; break; } } // If no pattern matched but filename contains dots, try to extract site name if (!foundMatch && cleanFileName.includes('.')) { const parts = cleanFileName.split('.'); if (parts.length > 1 && parts[0].length > 0) { folderName = parts[0]; } } else if (cleanFileName.toLowerCase().startsWith('the ')) { // For titles starting with "The", use the next word const words = cleanFileName.split(' '); if (words.length > 1) { folderName = words[1].charAt(0).toUpperCase(); } } output = path.join(destination, folderName, cleanFileName + ext); } else { output = path.join(destination, cleanFileName + ext); } // Ensure destination directory exists const outputDir = path.dirname(output); if (!fs.existsSync(outputDir)) { try { fs.mkdirSync(outputDir, { recursive: true }); this.logger.log(`Created output directory: ${outputDir}`); } catch (error) { this.logger.error( `Failed to create output directory ${outputDir}: ${error.message}`, ); return; } } } else { // Default behavior: same directory with new extension output = path.join( path.dirname(file), path.basename(file, path.extname(file)) + ext, ); } // Always create/update file record for discovered files const existingFileRecord = this.db.findFile(dataset, file) as | FileRecord | undefined; if (!existingFileRecord) { // Create file record for newly discovered file this.db.setFile(dataset, file, { date: new Date().toISOString(), output: output, }); this.logger.log(`Discovered new file: ${file}`); } else { // Update existing file record with current output path (in case config changed) this.db.setFile(dataset, file, { output: output, }); } // Automatic task creation: only when output doesn't exist const outputExists = fs.existsSync(output); if (outputExists) { this.logger.log( `Output file already exists, skipping automatic task creation: ${output}`, ); return; } // Check if task already exists for this input file const existingTask = this.taskQueue.getTaskByInput(file); if (existingTask) { // If task exists and is currently processing, reset to pending for retry if (existingTask.status === 'processing') { this.logger.log( `Resetting stuck processing task ${existingTask.id} to pending for file: ${file}`, ); this.taskQueue.updateTaskStatus(existingTask.id, 'pending'); this.eventsGateway.emitTaskUpdate({ type: 'reset', taskId: existingTask.id, file, }); } else { this.logger.log( `Task already exists for file: ${file} (status: ${existingTask.status})`, ); } return; } // Create task for processing try { const task = this.taskQueue.createTask({ dataset, input: file, output, preset, }); // Update file record to indicate processing has started this.db.setFile(dataset, file, { status: 'pending', date: new Date().toISOString(), }); this.logger.log(`Created task ${task.id} for file: ${file}`); // Emit file update event this.eventsGateway.emitFileUpdate({ type: 'add', file, dataset, taskId: task.id, }); } catch (error) { this.logger.error( `Failed to create task for file ${file}: ${error.message}`, ); } } private getDatasetFromPath(file: string): string | null { const datasetConfig = this.datasetsService.getDatasetConfig(); // Iterate through each dataset and its paths for (const datasetName of Object.keys(datasetConfig)) { const datasetObj = datasetConfig[datasetName]; if (typeof datasetObj === 'object' && datasetObj !== null) { // Check each path in the dataset configuration for (const pathKey of Object.keys(datasetObj)) { if (pathKey !== 'enabled' && file.startsWith(pathKey)) { // Return the actual dataset name (e.g., "tvshows", "pr0n") return datasetName; } } } } return null; } private isVideoFile(file: string): boolean { const videoExtensions = [ '.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm', '.m4v', ]; const ext = path.extname(file).toLowerCase(); return videoExtensions.includes(ext); } private isValidVideoFile(file: string): boolean { try { // Check if file exists and is readable if (!fs.existsSync(file)) { return false; } const stats = fs.statSync(file); if (stats.size === 0) { return false; } // Read first few bytes to check for video file signatures const buffer = Buffer.alloc(12); const fd = fs.openSync(file, 'r'); try { fs.readSync(fd, buffer, 0, 12, 0); } finally { fs.closeSync(fd); } // Check for common video file signatures const signature = buffer.toString('hex'); // MP4 signature (ftyp box) if (signature.includes('66747970')) { return true; } // MKV/WebM signature (EBML) if (signature.startsWith('1a45dfa3')) { return true; } // AVI signature (RIFF) if ( signature.startsWith('52494646') && buffer.toString('ascii', 8, 12) === 'AVI ' ) { return true; } // MOV signature (ftyp) if (signature.includes('66747971') || signature.includes('66747970')) { return true; } // For other formats, just check if file is large enough to be a video (> 1MB) // This is a basic heuristic since not all video formats have easily detectable headers return stats.size > 1024 * 1024; } catch (error) { this.logger.warn(`Error validating video file ${file}: ${error.message}`); return false; } } async stop() { if (this.watcher && this.isWatching) { await this.watcher.close(); this.isWatching = false; this.logger.log('Watcher stopped.'); this.eventsGateway.emitWatcherUpdate({ type: 'stopped' }); return { stopped: true }; } this.logger.warn('Watcher is not running.'); return { stopped: false, message: 'Watcher is not running.' }; } status() { return { isWatching: this.isWatching, watches: this.lastWatches, options: this.lastOptions, }; } }