task-queue.service.ts 14 KB


  1. import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
  2. import * as fs from 'fs';
  3. import { ConfigService } from './config.service';
  4. import { DbService } from './db.service';
  5. import { EventsGateway } from './events.gateway';
  6. import { HandbrakeService } from './handbrake.service';
  7. interface Task {
  8. id: number;
  9. type: string;
  10. status: string;
  11. progress: number;
  12. dataset?: string;
  13. input?: string;
  14. output?: string;
  15. preset?: string;
  16. priority: number;
  17. retry_count?: number;
  18. max_retries?: number;
  19. error_message?: string;
  20. created_at: string;
  21. updated_at: string;
  22. }
  23. interface QueueSettings {
  24. batchSize: number;
  25. concurrency: number;
  26. retryEnabled: boolean;
  27. maxRetries: number;
  28. retryDelay: number; // in milliseconds
  29. processingInterval: number; // in milliseconds
  30. }
  31. export type { QueueSettings };
  32. @Injectable()
  33. export class TaskQueueService implements OnModuleInit {
  34. private logger = new Logger('TaskQueueService');
  35. private isProcessing = false;
  36. private processingInterval: NodeJS.Timeout | null = null;
  37. private activeTasks = new Set<number>();
  38. private isStopping = false;
  39. private gracefulStopResolver: (() => void) | null = null;
  40. private queueSettings: QueueSettings;
  41. constructor(
  42. private readonly db: DbService,
  43. private readonly handbrake: HandbrakeService,
  44. private readonly eventsGateway: EventsGateway,
  45. private readonly config: ConfigService,
  46. ) {
  47. this.loadQueueSettings();
  48. }
  49. private loadQueueSettings() {
  50. const settings = this.config.getSettings('queue', {});
  51. this.queueSettings = {
  52. batchSize: settings.batchSize || 10,
  53. concurrency: settings.concurrency || 1,
  54. retryEnabled: settings.retryEnabled !== false, // default true
  55. maxRetries: settings.maxRetries || 3,
  56. retryDelay: settings.retryDelay || 30000, // 30 seconds default
  57. processingInterval: settings.processingInterval || 5000, // 5 seconds default
  58. };
  59. }
  60. updateQueueSettings(settings: Partial<QueueSettings>) {
  61. this.queueSettings = { ...this.queueSettings, ...settings };
  62. // Save to config
  63. const currentSettings = this.config.getSettings('queue', {});
  64. this.config.setSettings({
  65. queue: { ...currentSettings, ...settings },
  66. });
  67. // Restart processing with new interval if changed
  68. if (settings.processingInterval && this.processingInterval) {
  69. this.stopProcessing();
  70. this.startProcessing();
  71. }
  72. }
  73. getQueueSettings(): QueueSettings {
  74. return { ...this.queueSettings };
  75. }
  76. onModuleInit() {
  77. // Don't start processing automatically - wait for explicit start command
  78. }
  79. startProcessing() {
  80. // Reset any pending stop request
  81. this.isStopping = false;
  82. this.gracefulStopResolver = null;
  83. if (this.processingInterval) {
  84. return;
  85. }
  86. this.processingInterval = setInterval(() => {
  87. this.processPendingTasks();
  88. }, this.queueSettings.processingInterval);
  89. }
  90. stopProcessing() {
  91. if (this.processingInterval) {
  92. clearInterval(this.processingInterval);
  93. this.processingInterval = null;
  94. }
  95. }
  96. private stopImmediate() {
  97. this.isStopping = false;
  98. this.gracefulStopResolver = null;
  99. this.stopProcessing();
  100. return { stopped: true, activeTasks: this.activeTasks.size };
  101. }
  102. private resolveGracefulStopIfDrained() {
  103. if (this.isStopping && this.activeTasks.size === 0 && !this.isProcessing) {
  104. const resolver = this.gracefulStopResolver;
  105. this.isStopping = false;
  106. this.gracefulStopResolver = null;
  107. if (resolver) {
  108. resolver();
  109. }
  110. }
  111. }
  112. private async stopGracefully() {
  113. this.isStopping = true;
  114. this.stopProcessing();
  115. this.logger.log(
  116. `Graceful stop requested; waiting for ${this.activeTasks.size} active task(s)`,
  117. );
  118. if (this.activeTasks.size === 0 && !this.isProcessing) {
  119. this.isStopping = false;
  120. return { stopped: true, drained: true, activeTasks: 0 };
  121. }
  122. return new Promise((resolve) => {
  123. this.gracefulStopResolver = () => {
  124. this.logger.log('All active tasks completed; queue stopped.');
  125. resolve({ stopped: true, drained: true, activeTasks: 0 });
  126. };
  127. });
  128. }
  129. async stop(graceful = true) {
  130. if (!graceful) {
  131. return this.stopImmediate();
  132. }
  133. return this.stopGracefully();
  134. }
  135. // Public API methods
  136. start() {
  137. this.startProcessing();
  138. return { started: true };
  139. }
  140. private async processPendingTasks() {
  141. if (this.isProcessing) {
  142. return; // Already processing
  143. }
  144. if (this.isStopping) {
  145. this.resolveGracefulStopIfDrained();
  146. return;
  147. }
  148. try {
  149. this.isProcessing = true;
  150. // Check for tasks that need retry
  151. await this.processRetryTasks();
  152. // Get pending tasks up to batch size
  153. const pendingTasks = this.db.getPendingTasks(
  154. this.queueSettings.batchSize,
  155. ) as Task[];
  156. if (pendingTasks.length === 0) {
  157. return; // No tasks to process
  158. }
  159. // Process tasks up to batch size, respecting concurrency limit
  160. const processingPromises: Promise<void>[] = [];
  161. for (const task of pendingTasks) {
  162. if (this.activeTasks.size >= this.queueSettings.concurrency) {
  163. break; // Respect concurrency limit
  164. }
  165. if (!task.input || !task.output || !task.preset) {
  166. this.logger.error(`Task ${task.id} is missing required fields`);
  167. this.db.updateTask(task.id, {
  168. status: 'failed',
  169. error_message: 'Missing required fields: input, output, or preset',
  170. });
  171. continue;
  172. }
  173. // Mark task as processing
  174. this.db.updateTask(task.id, { status: 'processing' });
  175. this.activeTasks.add(task.id);
  176. // Emit task update
  177. this.eventsGateway.emitTaskUpdate({
  178. type: 'started',
  179. taskId: task.id,
  180. task: 'handbrake',
  181. input: task.input,
  182. output: task.output,
  183. preset: task.preset,
  184. });
  185. // Process task asynchronously
  186. const processPromise = this.processTask(task);
  187. processingPromises.push(processPromise);
  188. }
  189. // Wait for all concurrent tasks to complete
  190. await Promise.allSettled(processingPromises);
  191. } catch (error) {
  192. this.logger.error(`Error in processPendingTasks: ${error.message}`);
  193. } finally {
  194. this.isProcessing = false;
  195. this.resolveGracefulStopIfDrained();
  196. }
  197. }
  198. private async processRetryTasks() {
  199. if (!this.queueSettings.retryEnabled) {
  200. return;
  201. }
  202. try {
  203. // Get failed tasks that haven't exceeded max retries
  204. const failedTasks = (this.db.getAllTasks() as Task[]).filter(
  205. (task) =>
  206. task.status === 'failed' &&
  207. (task.retry_count || 0) < this.queueSettings.maxRetries,
  208. );
  209. for (const task of failedTasks) {
  210. const retryCount = (task.retry_count || 0) + 1;
  211. const lastUpdate = new Date(task.updated_at);
  212. const timeSinceFailure = Date.now() - lastUpdate.getTime();
  213. // Check if enough time has passed for retry
  214. if (timeSinceFailure >= this.queueSettings.retryDelay) {
  215. // Reset task for retry
  216. this.db.updateTask(task.id, {
  217. status: 'pending',
  218. progress: 0,
  219. retry_count: retryCount,
  220. error_message: undefined,
  221. });
  222. // Emit retry event
  223. this.eventsGateway.emitTaskUpdate({
  224. type: 'retry',
  225. taskId: task.id,
  226. task: 'handbrake',
  227. retryCount,
  228. });
  229. }
  230. }
  231. } catch (error) {
  232. this.logger.error(`Error in processRetryTasks: ${error.message}`);
  233. }
  234. }
  235. private async processTask(task: Task): Promise<void> {
  236. try {
  237. // Check if output file already exists and skip if it does (unless requeued)
  238. if (task.output && fs.existsSync(task.output)) {
  239. // Check if this task was requeued (has retry_count > 0 or was manually requeued)
  240. const wasRequeued =
  241. (task.retry_count || 0) > 0 || (task.priority || 0) > 0;
  242. if (!wasRequeued) {
  243. // Skip processing - file already exists
  244. this.db.updateTask(task.id, { status: 'skipped', progress: 100 });
  245. // Update file status if it exists
  246. if (task.dataset) {
  247. this.db.setFile(task.dataset, task.input!, {
  248. status: 'success',
  249. date: new Date().toISOString(),
  250. });
  251. }
  252. // Emit skipped event
  253. this.eventsGateway.emitTaskUpdate({
  254. type: 'skipped',
  255. taskId: task.id,
  256. task: 'handbrake',
  257. input: task.input,
  258. output: task.output,
  259. preset: task.preset,
  260. success: true,
  261. });
  262. return;
  263. }
  264. }
  265. // Process the file
  266. const success = await this.handbrake.processWithHandbrake(
  267. task.input!,
  268. task.output!,
  269. task.preset!,
  270. task.id,
  271. );
  272. if (success) {
  273. // Update task status
  274. this.db.updateTask(task.id, { status: 'completed', progress: 100 });
  275. // Update file status if it exists
  276. if (task.dataset) {
  277. this.db.setFile(task.dataset, task.input!, {
  278. status: 'success',
  279. date: new Date().toISOString(),
  280. });
  281. }
  282. // Emit completion event
  283. this.eventsGateway.emitTaskUpdate({
  284. type: 'completed',
  285. taskId: task.id,
  286. task: 'handbrake',
  287. input: task.input,
  288. output: task.output,
  289. preset: task.preset,
  290. success: true,
  291. });
  292. } else {
  293. throw new Error('Handbrake processing failed');
  294. }
  295. } catch (error) {
  296. const retryCount = task.retry_count || 0;
  297. if (
  298. this.queueSettings.retryEnabled &&
  299. retryCount < this.queueSettings.maxRetries
  300. ) {
  301. // Mark for retry
  302. this.db.updateTask(task.id, {
  303. status: 'failed',
  304. error_message: error.message,
  305. retry_count: retryCount + 1,
  306. });
  307. this.logger.warn(
  308. `Task ${task.id} failed, will retry (attempt ${retryCount + 1}): ${error.message}`,
  309. );
  310. } else {
  311. // Final failure
  312. this.db.updateTask(task.id, {
  313. status: 'failed',
  314. error_message: error.message,
  315. });
  316. // Update file status if it exists
  317. if (task.dataset) {
  318. this.db.setFile(task.dataset, task.input!, {
  319. status: 'error',
  320. date: new Date().toISOString(),
  321. });
  322. }
  323. this.logger.error(
  324. `Task ${task.id} failed permanently: ${error.message}`,
  325. );
  326. }
  327. // Emit failure event
  328. this.eventsGateway.emitTaskUpdate({
  329. type: 'failed',
  330. taskId: task.id,
  331. task: 'handbrake',
  332. input: task.input,
  333. output: task.output,
  334. preset: task.preset,
  335. success: false,
  336. error: error.message,
  337. retryCount: retryCount + 1,
  338. maxRetries: this.queueSettings.maxRetries,
  339. });
  340. } finally {
  341. // Remove from active tasks
  342. this.activeTasks.delete(task.id);
  343. this.resolveGracefulStopIfDrained();
  344. }
  345. }
  346. // Manual task creation (for requeueing from web interface)
  347. createTask(taskData: {
  348. dataset: string;
  349. input: string;
  350. output: string;
  351. preset: string;
  352. priority?: number;
  353. status?: string;
  354. }) {
  355. // Check if output file already exists (case-insensitive matching)
  356. if (this.handbrake.outputFileExists(taskData.output)) {
  357. this.logger.warn(
  358. `Output file already exists: ${taskData.output}, skipping task creation`,
  359. );
  360. throw new Error(`Output file already exists: ${taskData.output}`);
  361. }
  362. // Check if file already exists in database
  363. const existingFile = this.db.findFile(taskData.dataset, taskData.input);
  364. if (!existingFile) {
  365. // Create file record
  366. this.db.setFile(taskData.dataset, taskData.input, {
  367. output: taskData.output,
  368. status: 'pending',
  369. date: new Date().toISOString(),
  370. });
  371. }
  372. // Create task
  373. const task = this.db.createTask({
  374. type: 'handbrake',
  375. status: taskData.status || 'pending',
  376. dataset: taskData.dataset,
  377. input: taskData.input,
  378. output: taskData.output,
  379. preset: taskData.preset,
  380. priority: taskData.priority || 0,
  381. });
  382. this.logger.log(`Created task ${task.id} for file: ${taskData.input}`);
  383. return task;
  384. }
  385. // Get queue status
  386. getQueueStatus() {
  387. const allTasks = this.db.getAllTasks() as Task[];
  388. const pending = allTasks.filter((t) => t.status === 'pending').length;
  389. const processing = allTasks.filter((t) => t.status === 'processing').length;
  390. const completed = allTasks.filter((t) => t.status === 'completed').length;
  391. const failed = allTasks.filter((t) => t.status === 'failed').length;
  392. return {
  393. isProcessing: !!this.processingInterval, // Whether task processing is enabled/running
  394. isProcessingCycle: this.isProcessing, // Whether currently in a processing cycle
  395. isStopping: this.isStopping, // Whether a graceful stop has been requested
  396. activeTasks: this.activeTasks.size,
  397. pending,
  398. processing,
  399. completed,
  400. failed,
  401. total: allTasks.length,
  402. settings: this.getQueueSettings(),
  403. };
  404. }
  405. // Get task by input file
  406. getTaskByInput(input: string): Task | undefined {
  407. return this.db.getTaskByInput(input) as Task | undefined;
  408. }
  409. // Update task status
  410. updateTaskStatus(taskId: number, status: string, errorMessage?: string) {
  411. const updateData: any = { status };
  412. if (errorMessage) {
  413. updateData.error_message = errorMessage;
  414. }
  415. return this.db.updateTask(taskId, updateData);
  416. }
  417. }