||
- import { Inject, Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
- import chokidar, { FSWatcher } from 'chokidar';
- import fs from 'fs';
- import path from 'path';
- import { Worker } from 'worker_threads';
- import { DatasetsService } from './datasets.service';
- import { DbService } from './db.service';
- import { EventsGateway } from './events.gateway';
- import { TaskQueueService } from './task-queue.service';
- interface FileRecord {
- dataset: string;
- input: string;
- output: string;
- date: string;
- }
- @Injectable()
- export class WatcherService implements OnModuleDestroy {
- private watcher: FSWatcher | null = null;
- private isWatching = false;
- private lastWatches: string[] = [];
- private lastOptions: any = {};
- private logger = new Logger('WatcherService');
- private validationWorker: Worker;
- private validationCallbacks = new Map<string, (result: any) => void>();
- private callbackTimeouts = new Map<string, NodeJS.Timeout>();
- private lastEventTime: Date = new Date();
- private activityCheckInterval: NodeJS.Timeout | null = null;
- private eventCount = 0;
- constructor(
- @Inject(DatasetsService) private readonly datasetsService: DatasetsService,
- @Inject(DbService) private readonly db: DbService,
- @Inject(EventsGateway) private readonly eventsGateway: EventsGateway,
- @Inject(TaskQueueService) private readonly taskQueue: TaskQueueService,
- ) {
- this.validationWorker = new Worker(
- path.join(__dirname, 'file-validation-worker.js'),
- );
- this.validationWorker.on('message', (message) => {
- if (message.type === 'validation_result') {
- const callback = this.validationCallbacks.get(message.file);
- if (callback) {
- callback(message);
- this.validationCallbacks.delete(message.file);
- }
- }
- });
- this.validationWorker.on('error', (error) => {
- this.logger.error(`Validation worker error: ${error}`);
- });
- // Load persisted state on startup
- this.loadPersistedState();
- }
- private loadPersistedState() {
- try {
- const db = this.db.getDb();
- const row = db
- .prepare('SELECT value FROM settings WHERE key = ?')
- .get('watcher_state') as { value?: string } | undefined;
- if (row && row.value) {
- const state = JSON.parse(row.value);
- this.isWatching = state.isWatching || false;
- this.lastWatches = state.lastWatches || [];
- this.lastOptions = state.lastOptions || {};
- // If we were watching before restart, resume watching
- if (this.isWatching && this.lastWatches.length > 0) {
- this.logger.log('Resuming watcher from persisted state');
- this.start(this.lastWatches, this.lastOptions);
- }
- }
- } catch (error) {
- this.logger.error(`Failed to load persisted watcher state: ${error}`);
- }
- }
- private savePersistedState() {
- try {
- const db = this.db.getDb();
- const state = {
- isWatching: this.isWatching,
- lastWatches: this.lastWatches,
- lastOptions: this.lastOptions,
- };
- db.prepare(
- 'INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)',
- ).run('watcher_state', JSON.stringify(state));
- } catch (error) {
- this.logger.error(`Failed to save persisted watcher state: ${error}`);
- }
- }
- start(watches?: string[], options: any = {}) {
- if (this.isWatching) {
- this.logger.warn('Watcher already running.');
- return { started: false, message: 'Watcher already running.' };
- }
- // If no watches provided, use all enabled dataset paths
- const enabledWatches =
- watches && watches.length > 0
- ? watches
- : this.datasetsService.getEnabledDatasetPaths();
- // Get dataset configuration to determine extensions to watch
- const datasetConfig = this.datasetsService.getDatasetConfig();
- // Create a function to determine if a file should be watched based on dataset extensions
- const shouldWatchFile = (filePath: string): boolean => {
- // Always allow directories to be traversed
- try {
- if (fs.statSync(filePath).isDirectory()) {
- return true;
- }
- } catch {
- // If we can't stat the file, assume it's a file and continue with filtering
- }
- // Get the dataset for this file path
- const dataset = this.getDatasetFromPath(filePath);
- if (!dataset) {
- return false; // Don't watch files that don't belong to any dataset
- }
- // Get the dataset configuration
- const datasetSettings = datasetConfig[dataset];
- if (!datasetSettings || !datasetSettings.enabled) {
- return false;
- }
- // Find the specific path configuration that matches this file
- let pathConfig = null;
- for (const pathKey of Object.keys(datasetSettings)) {
- if (pathKey !== 'enabled' && filePath.startsWith(pathKey)) {
- pathConfig = datasetSettings[pathKey];
- break;
- }
- }
- if (!pathConfig) {
- return false; // File path doesn't match any configured path in the dataset
- }
- // Get the exts array for this path
- const exts = pathConfig.exts;
- if (!exts || !Array.isArray(exts) || exts.length === 0) {
- // If no exts specified, watch all files (backward compatibility)
- return true;
- }
- // Check if file extension matches any of the allowed extensions
- const fileExt = path.extname(filePath).toLowerCase();
- return exts.some((ext: string) => {
- const normalizedExt = ext.startsWith('.')
- ? ext.toLowerCase()
- : `.${ext.toLowerCase()}`;
- return fileExt === normalizedExt;
- });
- };
- // Override options with robust settings for long-running stability
- const conservativeOptions = {
- ...options,
- // Polling is more reliable for network filesystems and prevents watcher from dying
- usePolling: options.usePolling !== undefined ? options.usePolling : true,
- interval: Math.max(options.interval || 10000, 30000), // Minimum 30 seconds
- binaryInterval: 60000, // Check binary files less frequently
- depth: options.depth !== undefined ? options.depth : 1,
- ignorePermissionErrors: true,
- // Wait for file writes to finish before emitting events
- awaitWriteFinish: {
- stabilityThreshold: 5000, // Wait 5 seconds after last change
- pollInterval: 1000, // Check every second
- },
- // Prevent file descriptor leaks
- persistent: true,
- // Better error handling
- ignoreInitial: false,
- followSymlinks: false,
- // Atomic write detection
- atomic: true,
- ignored: (filePath: string) => {
- // Use the shouldWatchFile function to filter files
- return !shouldWatchFile(filePath);
- },
- };
- this.watcher = chokidar.watch(enabledWatches, conservativeOptions);
- this.isWatching = true;
- this.lastWatches = enabledWatches;
- this.lastOptions = conservativeOptions;
- this.lastEventTime = new Date();
- this.eventCount = 0;
- this.watcher
- .on('add', (file: string) => {
- this.updateActivity('add');
- this.handleFileAdded(file);
- })
- .on('change', (file: string) => {
- this.updateActivity('change');
- this.eventsGateway.emitFileUpdate({ type: 'change', file });
- })
- .on('unlink', (file: string) => {
- this.updateActivity('unlink');
- this.eventsGateway.emitFileUpdate({ type: 'unlink', file });
- })
- .on('error', (error: Error) => {
- this.logger.error(`Watcher error: ${error.message}`);
- this.logger.error(`Error stack: ${error.stack}`);
- this.eventsGateway.emitWatcherUpdate({
- type: 'error',
- error: error.message,
- });
- // Don't let errors kill the watcher - try to recover
- this.handleWatcherError(error);
- })
- .on('ready', () => {
- this.logger.log('Watcher is ready and monitoring for changes');
- this.logger.log(`Watching ${enabledWatches.length} path(s)`);
- this.logger.log(`Polling enabled: ${conservativeOptions.usePolling}`);
- this.startActivityMonitor();
- })
- .on('raw', (event, path, details) => {
- // Log raw events for debugging (can be disabled in production)
- this.logger.debug(`Raw event: ${event} on ${path}`);
- });
- this.eventsGateway.emitWatcherUpdate({
- type: 'started',
- watches: enabledWatches,
- });
- // Save the running state
- this.savePersistedState();
- return { started: true };
- }
- private handleFileAdded(file: string) {
- // Determine dataset from file path
- const dataset = this.getDatasetFromPath(file);
- if (!dataset) {
- this.logger.warn(`Could not determine dataset for file: ${file}`);
- return;
- }
- // Get dataset configuration to check extensions
- const datasetConfig = this.datasetsService.getDatasetConfig();
- const datasetSettings = datasetConfig[dataset];
- if (!datasetSettings || !datasetSettings.enabled) {
- return;
- }
- // Find the specific path configuration that matches this file
- let pathConfig = null;
- for (const pathKey of Object.keys(datasetSettings)) {
- if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
- pathConfig = datasetSettings[pathKey];
- break;
- }
- }
- if (!pathConfig) {
- this.logger.warn(
- `File path ${file} doesn't match any configured path in dataset ${dataset}`,
- );
- return;
- }
- // Check if file extension matches the path's exts array
- const exts = pathConfig.exts;
- if (exts && Array.isArray(exts) && exts.length > 0) {
- const fileExt = path.extname(file).toLowerCase();
- const extensionMatches = exts.some((ext: string) => {
- const normalizedExt = ext.startsWith('.')
- ? ext.toLowerCase()
- : `.${ext.toLowerCase()}`;
- return fileExt === normalizedExt;
- });
- if (!extensionMatches) {
- // File extension doesn't match, skip processing
- this.logger.debug(
- `Skipping file ${file} - extension not in dataset exts array`,
- );
- return;
- }
- }
- // Offload validation to worker with timeout to prevent memory leaks
- this.validationCallbacks.set(file, (result) => {
- // Clear timeout when callback is called
- const timeout = this.callbackTimeouts.get(file);
- if (timeout) {
- clearTimeout(timeout);
- this.callbackTimeouts.delete(file);
- }
- if (!result.isValid) {
- this.logger.warn(`File appears to be corrupted or incomplete: ${file}`);
- return;
- }
- // Proceed with task creation
- this.processValidFile(file, dataset);
- });
- // Set timeout to cleanup callback if worker doesn't respond within 5 minutes
- const timeout = setTimeout(() => {
- if (this.validationCallbacks.has(file)) {
- this.logger.warn(`Validation timeout for file: ${file}`);
- this.validationCallbacks.delete(file);
- this.callbackTimeouts.delete(file);
- }
- }, 300000); // 5 minutes
- this.callbackTimeouts.set(file, timeout);
- this.validationWorker.postMessage({ type: 'validate_file', file });
- }
- private processValidFile(file: string, dataset: string) {
- // Get dataset configuration
- const datasetConfig = this.datasetsService.getDatasetConfig();
- const datasetSettings = datasetConfig[dataset];
- if (!datasetSettings || !datasetSettings.enabled) {
- return;
- }
- // Determine preset and output configuration - find the specific path configuration
- let preset = 'Fast 1080p30'; // Default fallback
- let destination: string | undefined;
- let ext = '.mkv'; // Default extension
- let clean: any;
- let folder = false; // Default: don't create subfolders
- if (datasetConfig[dataset]) {
- const datasetObj = datasetConfig[dataset];
- // Find the path configuration that matches this file
- for (const pathKey of Object.keys(datasetObj)) {
- if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
- const pathConfig = datasetObj[pathKey];
- if (pathConfig) {
- if (pathConfig.preset) {
- preset = pathConfig.preset;
- }
- if (pathConfig.destination) {
- destination = pathConfig.destination;
- }
- if (pathConfig.ext) {
- // Ensure extension starts with a dot
- ext = pathConfig.ext.startsWith('.')
- ? pathConfig.ext
- : '.' + pathConfig.ext;
- }
- if (pathConfig.clean) {
- clean = pathConfig.clean;
- }
- if (typeof pathConfig.folder === 'boolean') {
- folder = pathConfig.folder;
- }
- }
- break;
- }
- }
- // If no path-specific config found, try the old format (for backward compatibility)
- if (preset === 'Fast 1080p30' && datasetObj.preset) {
- preset = datasetObj.preset;
- }
- if (!destination && datasetObj.destination) {
- destination = datasetObj.destination;
- }
- if (ext === '.mkv' && datasetObj.ext) {
- ext = datasetObj.ext.startsWith('.')
- ? datasetObj.ext
- : '.' + datasetObj.ext;
- }
- if (!clean && datasetObj.clean) {
- clean = datasetObj.clean;
- }
- if (!folder && typeof datasetObj.folder === 'boolean') {
- folder = datasetObj.folder;
- }
- }
- // Create output path based on configuration
- let output: string;
- if (destination) {
- // If destination is specified, use it as the base path
- const fileName = path.basename(file, path.extname(file));
- let cleanFileName = fileName;
- // Apply cleaning rules if specified
- if (clean && typeof clean === 'object') {
- for (const [pattern, replacement] of Object.entries(clean)) {
- try {
- const regex = new RegExp(pattern, 'g');
- cleanFileName = cleanFileName.replace(regex, replacement as string);
- } catch (error) {
- this.logger.warn(
- `Invalid regex pattern in clean config: ${pattern}`,
- );
- }
- }
- }
- // If folder is enabled, create a subdirectory based on the cleaned filename
- if (folder) {
- // Try to extract series/site name from filename pattern
- // Look for common date/episode patterns and take everything before the first separator
- const patterns = [
- /\d{2}\.\d{2}\.\d{2}/, // 24.12.17
- /[A-Za-z]\d{3,4}/, // E651, S123, etc.
- /\d{4}/, // 2024, 1234, etc.
- /\.\d+/, // .123, .2024, etc.
- ];
- let folderName = cleanFileName.charAt(0).toUpperCase(); // fallback
- let foundMatch = false;
- for (const pattern of patterns) {
- const match = cleanFileName.match(pattern);
- if (match && match.index !== undefined && match.index > 0) {
- // Take everything before the pattern as the potential folder name
- let potentialFolderName = cleanFileName
- .substring(0, match.index)
- .trim();
- // Remove trailing dots if any
- potentialFolderName = potentialFolderName.replace(/\.$/, '');
- // If the potential folder name contains dots, take only the first part (site name)
- // This handles patterns like "Site.Series.Date..." where we want just "Site"
- if (potentialFolderName.includes('.')) {
- folderName = potentialFolderName.split('.')[0];
- } else {
- folderName = potentialFolderName;
- }
- foundMatch = true;
- break;
- }
- }
- // If no pattern matched but filename contains dots, try to extract site name
- if (!foundMatch && cleanFileName.includes('.')) {
- const parts = cleanFileName.split('.');
- if (parts.length > 1 && parts[0].length > 0) {
- folderName = parts[0];
- }
- } else if (cleanFileName.toLowerCase().startsWith('the ')) {
- // For titles starting with "The", use the next word
- const words = cleanFileName.split(' ');
- if (words.length > 1) {
- folderName = words[1].charAt(0).toUpperCase();
- }
- }
- output = path.join(destination, folderName, cleanFileName + ext);
- } else {
- output = path.join(destination, cleanFileName + ext);
- }
- // Ensure destination directory exists
- const outputDir = path.dirname(output);
- if (!fs.existsSync(outputDir)) {
- try {
- fs.mkdirSync(outputDir, { recursive: true });
- } catch (error) {
- this.logger.error(
- `Failed to create output directory ${outputDir}: ${error.message}`,
- );
- return;
- }
- }
- } else {
- // Default behavior: same directory with new extension
- output = path.join(
- path.dirname(file),
- path.basename(file, path.extname(file)) + ext,
- );
- }
- // Always create/update file record for discovered files
- const existingFileRecord = this.db.findFile(dataset, file) as
- | FileRecord
- | undefined;
- if (!existingFileRecord) {
- // Create file record for newly discovered file
- this.db.setFile(dataset, file, {
- date: new Date().toISOString(),
- output: output,
- });
- this.logger.log(`Discovered new file: ${file}`);
- } else {
- // Update existing file record with current output path (in case config changed)
- this.db.setFile(dataset, file, {
- output: output,
- });
- }
- // Automatic task creation: only when output doesn't exist
- const outputExists = fs.existsSync(output);
- if (outputExists) {
- return;
- }
- // Check if task already exists for this input file
- const existingTask = this.taskQueue.getTaskByInput(file);
- if (existingTask) {
- // If task exists and is currently processing, reset to pending for retry
- if (existingTask.status === 'processing') {
- this.taskQueue.updateTaskStatus(existingTask.id, 'pending');
- this.eventsGateway.emitTaskUpdate({
- type: 'reset',
- taskId: existingTask.id,
- file,
- });
- }
- return;
- }
- // Create task for processing
- try {
- const task = this.taskQueue.createTask({
- dataset,
- input: file,
- output,
- preset,
- });
- // Update file record to indicate processing has started
- this.eventsGateway.emitFileUpdate({
- type: 'add',
- file,
- dataset,
- taskId: task.id,
- });
- } catch (error) {
- this.logger.error(
- `Failed to create task for file ${file}: ${error.message}`,
- );
- }
- }
- private getDatasetFromPath(file: string): string | null {
- const datasetConfig = this.datasetsService.getDatasetConfig();
- // Iterate through each dataset and its paths
- for (const datasetName of Object.keys(datasetConfig)) {
- const datasetObj = datasetConfig[datasetName];
- if (typeof datasetObj === 'object' && datasetObj !== null) {
- // Check each path in the dataset configuration
- for (const pathKey of Object.keys(datasetObj)) {
- if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
- // Return the actual dataset name (e.g., "tvshows", "pr0n")
- return datasetName;
- }
- }
- }
- }
- return null;
- }
- private updateActivity(eventType: string) {
- this.lastEventTime = new Date();
- this.eventCount++;
- if (this.eventCount % 100 === 0) {
- this.logger.log(
- `Watcher activity: ${this.eventCount} events processed, last: ${eventType}`,
- );
- }
- }
- private startActivityMonitor() {
- // Stop any existing monitor
- if (this.activityCheckInterval) {
- clearInterval(this.activityCheckInterval);
- }
- // Check for watcher activity every 5 minutes
- this.activityCheckInterval = setInterval(() => {
- const now = new Date();
- const timeSinceLastEvent = now.getTime() - this.lastEventTime.getTime();
- const minutesSinceLastEvent = Math.floor(timeSinceLastEvent / 60000);
- this.logger.log(
- `Watcher health check - Events: ${this.eventCount}, Last activity: ${minutesSinceLastEvent}m ago, Status: ${this.isWatching ? 'active' : 'inactive'}`,
- );
- // Verify watcher is still watching
- if (this.watcher && this.isWatching) {
- const watchedPaths = this.watcher.getWatched();
- const pathCount = Object.keys(watchedPaths).length;
- this.logger.log(`Currently watching ${pathCount} directories`);
- if (pathCount === 0 && this.lastWatches.length > 0) {
- this.logger.error(
- 'CRITICAL: Watcher has no watched paths but should be watching!',
- );
- this.eventsGateway.emitWatcherUpdate({
- type: 'health_alert',
- healthy: false,
- reason: 'Watcher lost all watched paths',
- });
- }
- }
- }, 300000); // Every 5 minutes
- }
- private handleWatcherError(error: Error) {
- // Log detailed error information
- this.logger.error(
- 'Watcher encountered an error, attempting to continue...',
- );
- // Check if watcher is still functional
- if (this.watcher) {
- try {
- const watchedPaths = this.watcher.getWatched();
- const pathCount = Object.keys(watchedPaths).length;
- if (pathCount === 0) {
- this.logger.error('Watcher has stopped watching paths after error!');
- } else {
- this.logger.log(`Watcher still monitoring ${pathCount} directories`);
- }
- } catch (e) {
- this.logger.error(`Cannot check watcher status: ${e.message}`);
- }
- }
- }
- async stop() {
- // Stop activity monitor
- if (this.activityCheckInterval) {
- clearInterval(this.activityCheckInterval);
- this.activityCheckInterval = null;
- }
- // If status shows we're watching, force stop regardless of watcher object state
- if (this.isWatching) {
- if (this.watcher) {
- try {
- await this.watcher.close();
- } catch (error) {
- this.logger.warn(`Error closing watcher: ${error.message}`);
- }
- }
- this.watcher = null;
- this.isWatching = false;
- this.eventsGateway.emitWatcherUpdate({ type: 'stopped' });
- // Save the stopped state
- this.savePersistedState();
- return { stopped: true };
- }
- return { stopped: false, message: 'Watcher is not running.' };
- }
- status() {
- return {
- isWatching: this.isWatching,
- watches: this.lastWatches,
- options: this.lastOptions,
- };
- }
- async onModuleDestroy() {
- // Clean up resources on application shutdown
- try {
- // Stop activity monitor
- if (this.activityCheckInterval) {
- clearInterval(this.activityCheckInterval);
- this.activityCheckInterval = null;
- }
- // Clear all callback timeouts
- for (const timeout of this.callbackTimeouts.values()) {
- clearTimeout(timeout);
- }
- this.callbackTimeouts.clear();
- // Close the watcher if it's running
- if (this.watcher && this.isWatching) {
- await this.watcher.close();
- this.logger.log('Watcher closed on module destroy');
- }
- // Terminate the validation worker
- if (this.validationWorker) {
- await this.validationWorker.terminate();
- this.logger.log('Validation worker terminated on module destroy');
- }
- // Clear callbacks
- this.validationCallbacks.clear();
- this.logger.log(
- `Watcher destroyed. Total events processed: ${this.eventCount}`,
- );
- } catch (error) {
- this.logger.error(`Error during module destroy: ${error}`);
- }
- }
- }
|