task-queue.service.ts 10 KB


  1. import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
  2. import { ConfigService } from './config.service';
  3. import { DbService } from './db.service';
  4. import { EventsGateway } from './events.gateway';
  5. import { HandbrakeService } from './handbrake.service';
  6. interface Task {
  7. id: number;
  8. type: string;
  9. status: string;
  10. progress: number;
  11. dataset?: string;
  12. input?: string;
  13. output?: string;
  14. preset?: string;
  15. priority: number;
  16. retry_count?: number;
  17. max_retries?: number;
  18. error_message?: string;
  19. created_at: string;
  20. updated_at: string;
  21. }
  22. interface QueueSettings {
  23. batchSize: number;
  24. concurrency: number;
  25. retryEnabled: boolean;
  26. maxRetries: number;
  27. retryDelay: number; // in milliseconds
  28. processingInterval: number; // in milliseconds
  29. }
  30. export type { QueueSettings };
  31. @Injectable()
  32. export class TaskQueueService implements OnModuleInit {
  33. private logger = new Logger('TaskQueueService');
  34. private isProcessing = false;
  35. private processingInterval: NodeJS.Timeout | null = null;
  36. private activeTasks = new Set<number>();
  37. private queueSettings: QueueSettings;
  38. constructor(
  39. private readonly db: DbService,
  40. private readonly handbrake: HandbrakeService,
  41. private readonly eventsGateway: EventsGateway,
  42. private readonly config: ConfigService,
  43. ) {
  44. this.loadQueueSettings();
  45. }
  46. private loadQueueSettings() {
  47. const settings = this.config.getSettings('queue', {});
  48. this.queueSettings = {
  49. batchSize: settings.batchSize || 10,
  50. concurrency: settings.concurrency || 1,
  51. retryEnabled: settings.retryEnabled !== false, // default true
  52. maxRetries: settings.maxRetries || 3,
  53. retryDelay: settings.retryDelay || 30000, // 30 seconds default
  54. processingInterval: settings.processingInterval || 5000, // 5 seconds default
  55. };
  56. this.logger.log('Loaded queue settings:', this.queueSettings);
  57. }
  58. updateQueueSettings(settings: Partial<QueueSettings>) {
  59. this.queueSettings = { ...this.queueSettings, ...settings };
  60. // Save to config
  61. const currentSettings = this.config.getSettings('queue', {});
  62. this.config.setSettings({
  63. queue: { ...currentSettings, ...settings },
  64. });
  65. this.logger.log('Updated queue settings:', this.queueSettings);
  66. // Restart processing with new interval if changed
  67. if (settings.processingInterval && this.processingInterval) {
  68. this.stopProcessing();
  69. this.startProcessing();
  70. }
  71. }
  72. getQueueSettings(): QueueSettings {
  73. return { ...this.queueSettings };
  74. }
  75. onModuleInit() {
  76. this.startProcessing();
  77. }
  78. startProcessing() {
  79. if (this.processingInterval) {
  80. this.logger.warn('Task queue processing already running');
  81. return;
  82. }
  83. this.logger.log('Starting automatic task processing');
  84. this.processingInterval = setInterval(() => {
  85. this.processPendingTasks();
  86. }, this.queueSettings.processingInterval);
  87. }
  88. stopProcessing() {
  89. if (this.processingInterval) {
  90. clearInterval(this.processingInterval);
  91. this.processingInterval = null;
  92. this.logger.log('Stopped automatic task processing');
  93. }
  94. }
  95. private async processPendingTasks() {
  96. if (this.isProcessing) {
  97. return; // Already processing
  98. }
  99. try {
  100. this.isProcessing = true;
  101. // Check for tasks that need retry
  102. await this.processRetryTasks();
  103. // Get pending tasks up to batch size
  104. const pendingTasks = this.db.getPendingTasks(
  105. this.queueSettings.batchSize,
  106. ) as Task[];
  107. if (pendingTasks.length === 0) {
  108. return; // No tasks to process
  109. }
  110. // Process tasks up to concurrency limit
  111. const processingPromises: Promise<void>[] = [];
  112. const tasksToProcess = pendingTasks.slice(
  113. 0,
  114. this.queueSettings.concurrency,
  115. );
  116. for (const task of tasksToProcess) {
  117. if (this.activeTasks.size >= this.queueSettings.concurrency) {
  118. break; // Respect concurrency limit
  119. }
  120. if (!task.input || !task.output || !task.preset) {
  121. this.logger.error(`Task ${task.id} is missing required fields`);
  122. this.db.updateTask(task.id, {
  123. status: 'failed',
  124. error_message: 'Missing required fields: input, output, or preset',
  125. });
  126. continue;
  127. }
  128. // Mark task as processing
  129. this.db.updateTask(task.id, { status: 'processing' });
  130. this.activeTasks.add(task.id);
  131. // Emit task update
  132. this.eventsGateway.emitTaskUpdate({
  133. type: 'started',
  134. taskId: task.id,
  135. task: 'handbrake',
  136. input: task.input,
  137. output: task.output,
  138. preset: task.preset,
  139. });
  140. // Process task asynchronously
  141. const processPromise = this.processTask(task);
  142. processingPromises.push(processPromise);
  143. }
  144. // Wait for all concurrent tasks to complete
  145. await Promise.allSettled(processingPromises);
  146. } catch (error) {
  147. this.logger.error(`Error in processPendingTasks: ${error.message}`);
  148. } finally {
  149. this.isProcessing = false;
  150. }
  151. }
  152. private async processRetryTasks() {
  153. if (!this.queueSettings.retryEnabled) {
  154. return;
  155. }
  156. try {
  157. // Get failed tasks that haven't exceeded max retries
  158. const failedTasks = (this.db.getAllTasks() as Task[]).filter(
  159. (task) =>
  160. task.status === 'failed' &&
  161. (task.retry_count || 0) < this.queueSettings.maxRetries,
  162. );
  163. for (const task of failedTasks) {
  164. const retryCount = (task.retry_count || 0) + 1;
  165. const lastUpdate = new Date(task.updated_at);
  166. const timeSinceFailure = Date.now() - lastUpdate.getTime();
  167. // Check if enough time has passed for retry
  168. if (timeSinceFailure >= this.queueSettings.retryDelay) {
  169. this.logger.log(`Retrying task ${task.id} (attempt ${retryCount})`);
  170. // Reset task for retry
  171. this.db.updateTask(task.id, {
  172. status: 'pending',
  173. progress: 0,
  174. retry_count: retryCount,
  175. error_message: undefined,
  176. });
  177. // Emit retry event
  178. this.eventsGateway.emitTaskUpdate({
  179. type: 'retry',
  180. taskId: task.id,
  181. task: 'handbrake',
  182. retryCount,
  183. });
  184. }
  185. }
  186. } catch (error) {
  187. this.logger.error(`Error in processRetryTasks: ${error.message}`);
  188. }
  189. }
  190. private async processTask(task: Task): Promise<void> {
  191. try {
  192. // Process the file
  193. const success = await this.handbrake.processWithHandbrake(
  194. task.input!,
  195. task.output!,
  196. task.preset!,
  197. task.id,
  198. );
  199. if (success) {
  200. // Update task status
  201. this.db.updateTask(task.id, { status: 'completed', progress: 100 });
  202. // Update file status if it exists
  203. if (task.dataset) {
  204. this.db.setFile(task.dataset, task.input!, {
  205. status: 'success',
  206. date: new Date().toISOString(),
  207. });
  208. }
  209. // Emit completion event
  210. this.eventsGateway.emitTaskUpdate({
  211. type: 'completed',
  212. taskId: task.id,
  213. task: 'handbrake',
  214. input: task.input,
  215. output: task.output,
  216. preset: task.preset,
  217. success: true,
  218. });
  219. this.logger.log(`Task ${task.id} completed successfully`);
  220. } else {
  221. throw new Error('Handbrake processing failed');
  222. }
  223. } catch (error) {
  224. const retryCount = task.retry_count || 0;
  225. if (
  226. this.queueSettings.retryEnabled &&
  227. retryCount < this.queueSettings.maxRetries
  228. ) {
  229. // Mark for retry
  230. this.db.updateTask(task.id, {
  231. status: 'failed',
  232. error_message: error.message,
  233. retry_count: retryCount + 1,
  234. });
  235. this.logger.warn(
  236. `Task ${task.id} failed, will retry (attempt ${retryCount + 1}): ${error.message}`,
  237. );
  238. } else {
  239. // Final failure
  240. this.db.updateTask(task.id, {
  241. status: 'failed',
  242. error_message: error.message,
  243. });
  244. // Update file status if it exists
  245. if (task.dataset) {
  246. this.db.setFile(task.dataset, task.input!, {
  247. status: 'error',
  248. date: new Date().toISOString(),
  249. });
  250. }
  251. this.logger.error(
  252. `Task ${task.id} failed permanently: ${error.message}`,
  253. );
  254. }
  255. // Emit failure event
  256. this.eventsGateway.emitTaskUpdate({
  257. type: 'failed',
  258. taskId: task.id,
  259. task: 'handbrake',
  260. input: task.input,
  261. output: task.output,
  262. preset: task.preset,
  263. success: false,
  264. error: error.message,
  265. retryCount: retryCount + 1,
  266. maxRetries: this.queueSettings.maxRetries,
  267. });
  268. } finally {
  269. // Remove from active tasks
  270. this.activeTasks.delete(task.id);
  271. }
  272. }
  273. // Manual task creation (for requeueing from web interface)
  274. createTask(taskData: {
  275. dataset: string;
  276. input: string;
  277. output: string;
  278. preset: string;
  279. priority?: number;
  280. }) {
  281. // Check if file already exists in database
  282. const existingFile = this.db.findFile(taskData.dataset, taskData.input);
  283. if (!existingFile) {
  284. // Create file record
  285. this.db.setFile(taskData.dataset, taskData.input, {
  286. output: taskData.output,
  287. status: 'pending',
  288. date: new Date().toISOString(),
  289. });
  290. }
  291. // Create task
  292. const task = this.db.createTask({
  293. type: 'handbrake',
  294. status: 'pending',
  295. dataset: taskData.dataset,
  296. input: taskData.input,
  297. output: taskData.output,
  298. preset: taskData.preset,
  299. priority: taskData.priority || 0,
  300. });
  301. this.logger.log(`Created task ${task.id} for file: ${taskData.input}`);
  302. return task;
  303. }
  304. // Get queue status
  305. getQueueStatus() {
  306. const allTasks = this.db.getAllTasks() as Task[];
  307. const pending = allTasks.filter((t) => t.status === 'pending').length;
  308. const processing = allTasks.filter((t) => t.status === 'processing').length;
  309. const completed = allTasks.filter((t) => t.status === 'completed').length;
  310. const failed = allTasks.filter((t) => t.status === 'failed').length;
  311. return {
  312. isProcessing: this.isProcessing,
  313. activeTasks: this.activeTasks.size,
  314. pending,
  315. processing,
  316. completed,
  317. failed,
  318. total: allTasks.length,
  319. settings: this.getQueueSettings(),
  320. };
  321. }
  322. }