watcher.service.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. import { Inject, Injectable, Logger } from '@nestjs/common';
  2. import chokidar, { FSWatcher } from 'chokidar';
  3. import fs from 'fs';
  4. import path from 'path';
  5. import { DatasetsService } from './datasets.service';
  6. import { DbService } from './db.service';
  7. import { EventsGateway } from './events.gateway';
  8. import { TaskQueueService } from './task-queue.service';
  9. interface FileRecord {
  10. dataset: string;
  11. input: string;
  12. output: string;
  13. date: string;
  14. }
  15. @Injectable()
  16. export class WatcherService {
  17. private watcher: FSWatcher | null = null;
  18. private isWatching = false;
  19. private lastWatches: string[] = [];
  20. private lastOptions: any = {};
  21. private logger = new Logger('WatcherService');
  22. constructor(
  23. @Inject(DatasetsService) private readonly datasetsService: DatasetsService,
  24. @Inject(DbService) private readonly db: DbService,
  25. @Inject(EventsGateway) private readonly eventsGateway: EventsGateway,
  26. @Inject(TaskQueueService) private readonly taskQueue: TaskQueueService,
  27. ) {}
  28. start(watches?: string[], options: any = {}) {
  29. if (this.isWatching) {
  30. this.logger.warn('Watcher already running.');
  31. return { started: false, message: 'Watcher already running.' };
  32. }
  33. // If no watches provided, use all enabled dataset paths
  34. const enabledWatches =
  35. watches && watches.length > 0
  36. ? watches
  37. : this.datasetsService.getEnabledDatasetPaths();
  38. // Override options to be more conservative for file descriptor limits
  39. const conservativeOptions = {
  40. ...options,
  41. interval: Math.max(options.interval || 10000, 30000), // Minimum 30 seconds
  42. depth: options.depth !== undefined ? options.depth : 1,
  43. ignorePermissionErrors: true,
  44. };
  45. this.watcher = chokidar.watch(enabledWatches, conservativeOptions);
  46. this.isWatching = true;
  47. this.lastWatches = enabledWatches;
  48. this.lastOptions = conservativeOptions;
  49. this.watcher
  50. .on('add', (file: string) => {
  51. this.logger.log(`File added: ${file}`);
  52. this.handleFileAdded(file);
  53. })
  54. .on('change', (file: string) => {
  55. this.logger.log(`File changed: ${file}`);
  56. this.eventsGateway.emitFileUpdate({ type: 'change', file });
  57. })
  58. .on('unlink', (file: string) => {
  59. this.logger.log(`File removed: ${file}`);
  60. this.eventsGateway.emitFileUpdate({ type: 'unlink', file });
  61. })
  62. .on('error', (error: Error) => {
  63. this.logger.error(`Watcher error: ${error}`);
  64. this.eventsGateway.emitWatcherUpdate({
  65. type: 'error',
  66. error: error.message,
  67. });
  68. });
  69. this.logger.log('Watcher started.');
  70. this.eventsGateway.emitWatcherUpdate({
  71. type: 'started',
  72. watches: enabledWatches,
  73. });
  74. return { started: true };
  75. }
  76. private handleFileAdded(file: string) {
  77. // Determine dataset from file path
  78. const dataset = this.getDatasetFromPath(file);
  79. if (!dataset) {
  80. this.logger.warn(`Could not determine dataset for file: ${file}`);
  81. return;
  82. }
  83. // Check if this is a video file (basic extension check)
  84. if (!this.isVideoFile(file)) {
  85. this.logger.log(`Skipping non-video file: ${file}`);
  86. return;
  87. }
  88. // Validate that the file has proper video headers
  89. if (!this.isValidVideoFile(file)) {
  90. this.logger.warn(`File appears to be corrupted or incomplete: ${file}`);
  91. return;
  92. }
  93. // Get dataset configuration
  94. const datasetConfig = this.datasetsService.getDatasetConfig();
  95. const datasetSettings = datasetConfig[dataset];
  96. if (!datasetSettings || !datasetSettings.enabled) {
  97. this.logger.log(
  98. `Dataset ${dataset} is not enabled, skipping file: ${file}`,
  99. );
  100. return;
  101. }
  102. // Determine preset and output configuration - find the specific path configuration
  103. let preset = 'Fast 1080p30'; // Default fallback
  104. let destination: string | undefined;
  105. let ext = '.mkv'; // Default extension
  106. let clean: any;
  107. let folder = false; // Default: don't create subfolders
  108. if (datasetConfig[dataset]) {
  109. const datasetObj = datasetConfig[dataset];
  110. // Find the path configuration that matches this file
  111. for (const pathKey of Object.keys(datasetObj)) {
  112. if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
  113. const pathConfig = datasetObj[pathKey];
  114. if (pathConfig) {
  115. if (pathConfig.preset) {
  116. preset = pathConfig.preset;
  117. }
  118. if (pathConfig.destination) {
  119. destination = pathConfig.destination;
  120. }
  121. if (pathConfig.ext) {
  122. // Ensure extension starts with a dot
  123. ext = pathConfig.ext.startsWith('.')
  124. ? pathConfig.ext
  125. : '.' + pathConfig.ext;
  126. }
  127. if (pathConfig.clean) {
  128. clean = pathConfig.clean;
  129. }
  130. if (typeof pathConfig.folder === 'boolean') {
  131. folder = pathConfig.folder;
  132. }
  133. }
  134. break;
  135. }
  136. }
  137. // If no path-specific config found, try the old format (for backward compatibility)
  138. if (preset === 'Fast 1080p30' && datasetObj.preset) {
  139. preset = datasetObj.preset;
  140. }
  141. if (!destination && datasetObj.destination) {
  142. destination = datasetObj.destination;
  143. }
  144. if (ext === '.mkv' && datasetObj.ext) {
  145. ext = datasetObj.ext.startsWith('.')
  146. ? datasetObj.ext
  147. : '.' + datasetObj.ext;
  148. }
  149. if (!clean && datasetObj.clean) {
  150. clean = datasetObj.clean;
  151. }
  152. if (!folder && typeof datasetObj.folder === 'boolean') {
  153. folder = datasetObj.folder;
  154. }
  155. }
  156. // Create output path based on configuration
  157. let output: string;
  158. if (destination) {
  159. // If destination is specified, use it as the base path
  160. const fileName = path.basename(file, path.extname(file));
  161. let cleanFileName = fileName;
  162. // Apply cleaning rules if specified
  163. if (clean && typeof clean === 'object') {
  164. for (const [pattern, replacement] of Object.entries(clean)) {
  165. try {
  166. const regex = new RegExp(pattern, 'g');
  167. cleanFileName = cleanFileName.replace(regex, replacement as string);
  168. } catch (error) {
  169. this.logger.warn(
  170. `Invalid regex pattern in clean config: ${pattern}`,
  171. );
  172. }
  173. }
  174. }
  175. // If folder is enabled, create a subdirectory based on the cleaned filename
  176. if (folder) {
  177. // Try to extract series/site name from filename pattern
  178. // Look for common date/episode patterns and take everything before the first separator
  179. const patterns = [
  180. /\d{2}\.\d{2}\.\d{2}/, // 24.12.17
  181. /[A-Za-z]\d{3,4}/, // E651, S123, etc.
  182. /\d{4}/, // 2024, 1234, etc.
  183. /\.\d+/, // .123, .2024, etc.
  184. ];
  185. let folderName = cleanFileName.charAt(0).toUpperCase(); // fallback
  186. let foundMatch = false;
  187. for (const pattern of patterns) {
  188. const match = cleanFileName.match(pattern);
  189. if (match && match.index !== undefined && match.index > 0) {
  190. // Take everything before the pattern as the potential folder name
  191. let potentialFolderName = cleanFileName
  192. .substring(0, match.index)
  193. .trim();
  194. // Remove trailing dots if any
  195. potentialFolderName = potentialFolderName.replace(/\.$/, '');
  196. // If the potential folder name contains dots, take only the first part (site name)
  197. // This handles patterns like "Site.Series.Date..." where we want just "Site"
  198. if (potentialFolderName.includes('.')) {
  199. folderName = potentialFolderName.split('.')[0];
  200. } else {
  201. folderName = potentialFolderName;
  202. }
  203. foundMatch = true;
  204. break;
  205. }
  206. }
  207. // If no pattern matched but filename contains dots, try to extract site name
  208. if (!foundMatch && cleanFileName.includes('.')) {
  209. const parts = cleanFileName.split('.');
  210. if (parts.length > 1 && parts[0].length > 0) {
  211. folderName = parts[0];
  212. }
  213. } else if (cleanFileName.toLowerCase().startsWith('the ')) {
  214. // For titles starting with "The", use the next word
  215. const words = cleanFileName.split(' ');
  216. if (words.length > 1) {
  217. folderName = words[1].charAt(0).toUpperCase();
  218. }
  219. }
  220. output = path.join(destination, folderName, cleanFileName + ext);
  221. } else {
  222. output = path.join(destination, cleanFileName + ext);
  223. }
  224. // Ensure destination directory exists
  225. const outputDir = path.dirname(output);
  226. if (!fs.existsSync(outputDir)) {
  227. try {
  228. fs.mkdirSync(outputDir, { recursive: true });
  229. this.logger.log(`Created output directory: ${outputDir}`);
  230. } catch (error) {
  231. this.logger.error(
  232. `Failed to create output directory ${outputDir}: ${error.message}`,
  233. );
  234. return;
  235. }
  236. }
  237. } else {
  238. // Default behavior: same directory with new extension
  239. output = path.join(
  240. path.dirname(file),
  241. path.basename(file, path.extname(file)) + ext,
  242. );
  243. }
  244. // Always create/update file record for discovered files
  245. const existingFileRecord = this.db.findFile(dataset, file) as
  246. | FileRecord
  247. | undefined;
  248. if (!existingFileRecord) {
  249. // Create file record for newly discovered file
  250. this.db.setFile(dataset, file, {
  251. date: new Date().toISOString(),
  252. output: output,
  253. });
  254. this.logger.log(`Discovered new file: ${file}`);
  255. } else {
  256. // Update existing file record with current output path (in case config changed)
  257. this.db.setFile(dataset, file, {
  258. output: output,
  259. });
  260. }
  261. // Automatic task creation: only when output doesn't exist
  262. const outputExists = fs.existsSync(output);
  263. if (outputExists) {
  264. this.logger.log(
  265. `Output file already exists, skipping automatic task creation: ${output}`,
  266. );
  267. return;
  268. }
  269. // Check if task already exists for this input file
  270. const existingTask = this.taskQueue.getTaskByInput(file);
  271. if (existingTask) {
  272. // If task exists and is currently processing, reset to pending for retry
  273. if (existingTask.status === 'processing') {
  274. this.logger.log(
  275. `Resetting stuck processing task ${existingTask.id} to pending for file: ${file}`,
  276. );
  277. this.taskQueue.updateTaskStatus(existingTask.id, 'pending');
  278. this.eventsGateway.emitTaskUpdate({
  279. type: 'reset',
  280. taskId: existingTask.id,
  281. file,
  282. });
  283. } else {
  284. this.logger.log(
  285. `Task already exists for file: ${file} (status: ${existingTask.status})`,
  286. );
  287. }
  288. return;
  289. }
  290. // Create task for processing
  291. try {
  292. const task = this.taskQueue.createTask({
  293. dataset,
  294. input: file,
  295. output,
  296. preset,
  297. });
  298. // Update file record to indicate processing has started
  299. this.db.setFile(dataset, file, {
  300. status: 'pending',
  301. date: new Date().toISOString(),
  302. });
  303. this.logger.log(`Created task ${task.id} for file: ${file}`);
  304. // Emit file update event
  305. this.eventsGateway.emitFileUpdate({
  306. type: 'add',
  307. file,
  308. dataset,
  309. taskId: task.id,
  310. });
  311. } catch (error) {
  312. this.logger.error(
  313. `Failed to create task for file ${file}: ${error.message}`,
  314. );
  315. }
  316. }
  317. private getDatasetFromPath(file: string): string | null {
  318. const datasetConfig = this.datasetsService.getDatasetConfig();
  319. // Iterate through each dataset and its paths
  320. for (const datasetName of Object.keys(datasetConfig)) {
  321. const datasetObj = datasetConfig[datasetName];
  322. if (typeof datasetObj === 'object' && datasetObj !== null) {
  323. // Check each path in the dataset configuration
  324. for (const pathKey of Object.keys(datasetObj)) {
  325. if (pathKey !== 'enabled' && file.startsWith(pathKey)) {
  326. // Return the actual dataset name (e.g., "tvshows", "pr0n")
  327. return datasetName;
  328. }
  329. }
  330. }
  331. }
  332. return null;
  333. }
  334. private isVideoFile(file: string): boolean {
  335. const videoExtensions = [
  336. '.mp4',
  337. '.mkv',
  338. '.avi',
  339. '.mov',
  340. '.wmv',
  341. '.flv',
  342. '.webm',
  343. '.m4v',
  344. ];
  345. const ext = path.extname(file).toLowerCase();
  346. return videoExtensions.includes(ext);
  347. }
  348. private isValidVideoFile(file: string): boolean {
  349. try {
  350. // Check if file exists and is readable
  351. if (!fs.existsSync(file)) {
  352. return false;
  353. }
  354. const stats = fs.statSync(file);
  355. if (stats.size === 0) {
  356. return false;
  357. }
  358. // Read first few bytes to check for video file signatures
  359. const buffer = Buffer.alloc(12);
  360. const fd = fs.openSync(file, 'r');
  361. try {
  362. fs.readSync(fd, buffer, 0, 12, 0);
  363. } finally {
  364. fs.closeSync(fd);
  365. }
  366. // Check for common video file signatures
  367. const signature = buffer.toString('hex');
  368. // MP4 signature (ftyp box)
  369. if (signature.includes('66747970')) {
  370. return true;
  371. }
  372. // MKV/WebM signature (EBML)
  373. if (signature.startsWith('1a45dfa3')) {
  374. return true;
  375. }
  376. // AVI signature (RIFF)
  377. if (
  378. signature.startsWith('52494646') &&
  379. buffer.toString('ascii', 8, 12) === 'AVI '
  380. ) {
  381. return true;
  382. }
  383. // MOV signature (ftyp)
  384. if (signature.includes('66747971') || signature.includes('66747970')) {
  385. return true;
  386. }
  387. // For other formats, just check if file is large enough to be a video (> 1MB)
  388. // This is a basic heuristic since not all video formats have easily detectable headers
  389. return stats.size > 1024 * 1024;
  390. } catch (error) {
  391. this.logger.warn(`Error validating video file ${file}: ${error.message}`);
  392. return false;
  393. }
  394. }
  395. async stop() {
  396. if (this.watcher && this.isWatching) {
  397. await this.watcher.close();
  398. this.isWatching = false;
  399. this.logger.log('Watcher stopped.');
  400. this.eventsGateway.emitWatcherUpdate({ type: 'stopped' });
  401. return { stopped: true };
  402. }
  403. this.logger.warn('Watcher is not running.');
  404. return { stopped: false, message: 'Watcher is not running.' };
  405. }
  406. status() {
  407. return {
  408. isWatching: this.isWatching,
  409. watches: this.lastWatches,
  410. options: this.lastOptions,
  411. };
  412. }
  413. }