Selaa lähdekoodia

Add state persistence for Watcher and TaskQueue services

- Add getDb() method to DbService for direct database access
- Add loadPersistedState/savePersistedState to WatcherService
- Persist isWatching, lastWatches, and lastOptions to settings table
- Auto-resume watching on startup if previously running
- Add loadPersistedState/savePersistedState to TaskQueueService
- Persist isProcessing state and auto-resume processing on startup
- Update tests with proper DbService mocking
- State survives application restarts and crashes
Timothy Pomeroy 4 viikkoa sitten
vanhempi
commit
dbb93713f7

+ 7 - 0
apps/service/src/db.service.ts

@@ -67,6 +67,13 @@ export class DbService {
     }
   }
 
+  /**
+   * Get the database instance for direct queries
+   */
+  getDb(): Database.Database {
+    return this.db;
+  }
+
   /**
    * Delete file records older than X days (autoExpireDays from settings)
    * @param days Days to keep (optional, overrides settings)

+ 48 - 0
apps/service/src/task-queue.service.ts

@@ -50,6 +50,48 @@ export class TaskQueueService implements OnModuleInit {
     private readonly config: ConfigService,
   ) {
     this.loadQueueSettings();
+
+    // Load persisted state on startup
+    this.loadPersistedState();
+  }
+
+  private async loadPersistedState() {
+    try {
+      const db = this.db.getDb();
+      const row = db
+        .prepare('SELECT value FROM settings WHERE key = ?')
+        .get('task_queue_state') as { value?: string } | undefined;
+
+      if (row && row.value) {
+        const state = JSON.parse(row.value);
+        const wasProcessing = state.isProcessing || false;
+
+        // If we were processing before restart, resume processing
+        if (wasProcessing) {
+          this.logger.log(
+            'Resuming task queue processing from persisted state',
+          );
+          this.startProcessing();
+        }
+      }
+    } catch (error) {
+      this.logger.error(`Failed to load persisted task queue state: ${error}`);
+    }
+  }
+
+  private savePersistedState() {
+    try {
+      const db = this.db.getDb();
+      const state = {
+        isProcessing: this.processingInterval !== null,
+      };
+
+      db.prepare(
+        'INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)',
+      ).run('task_queue_state', JSON.stringify(state));
+    } catch (error) {
+      this.logger.error(`Failed to save persisted task queue state: ${error}`);
+    }
   }
 
   private loadQueueSettings() {
@@ -99,12 +141,18 @@ export class TaskQueueService implements OnModuleInit {
     this.processingInterval = setInterval(() => {
       this.processPendingTasks();
     }, this.queueSettings.processingInterval);
+
+    // Save the processing state
+    this.savePersistedState();
   }
 
   stopProcessing() {
     if (this.processingInterval) {
       clearInterval(this.processingInterval);
       this.processingInterval = null;
+
+      // Save the stopped state
+      this.savePersistedState();
     }
   }
 

+ 16 - 2
apps/service/src/watcher.service.spec.ts

@@ -37,7 +37,14 @@ jest.mock('./datasets.service', () => ({
 
 // Mock DbService
 jest.mock('./db.service', () => ({
-  DbService: jest.fn().mockImplementation(() => ({})),
+  DbService: jest.fn().mockImplementation(() => ({
+    getDb: jest.fn(() => ({
+      prepare: jest.fn(() => ({
+        get: jest.fn(() => null),
+        run: jest.fn(),
+      })),
+    })),
+  })),
 }));
 
 // Mock EventsGateway
@@ -67,7 +74,14 @@ describe('WatcherService', () => {
       getDatasetExts: jest.fn(),
     };
 
-    const mockDbService = {};
+    const mockDbService = {
+      getDb: jest.fn(() => ({
+        prepare: jest.fn(() => ({
+          get: jest.fn(() => null),
+          run: jest.fn(),
+        })),
+      })),
+    };
     const mockEventsGateway = {
       emitFileUpdate: jest.fn(),
       emitWatcherUpdate: jest.fn(),

+ 52 - 0
apps/service/src/watcher.service.ts

@@ -46,6 +46,50 @@ export class WatcherService {
     this.validationWorker.on('error', (error) => {
       this.logger.error(`Validation worker error: ${error}`);
     });
+
+    // Load persisted state on startup
+    this.loadPersistedState();
+  }
+
+  private loadPersistedState() {
+    try {
+      const db = this.db.getDb();
+      const row = db
+        .prepare('SELECT value FROM settings WHERE key = ?')
+        .get('watcher_state') as { value?: string } | undefined;
+
+      if (row && row.value) {
+        const state = JSON.parse(row.value);
+        this.isWatching = state.isWatching || false;
+        this.lastWatches = state.lastWatches || [];
+        this.lastOptions = state.lastOptions || {};
+
+        // If we were watching before restart, resume watching
+        if (this.isWatching && this.lastWatches.length > 0) {
+          this.logger.log('Resuming watcher from persisted state');
+          this.start(this.lastWatches, this.lastOptions);
+        }
+      }
+    } catch (error) {
+      this.logger.error(`Failed to load persisted watcher state: ${error}`);
+    }
+  }
+
+  private savePersistedState() {
+    try {
+      const db = this.db.getDb();
+      const state = {
+        isWatching: this.isWatching,
+        lastWatches: this.lastWatches,
+        lastOptions: this.lastOptions,
+      };
+
+      db.prepare(
+        'INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)',
+      ).run('watcher_state', JSON.stringify(state));
+    } catch (error) {
+      this.logger.error(`Failed to save persisted watcher state: ${error}`);
+    }
   }
 
   start(watches?: string[], options: any = {}) {
@@ -152,6 +196,10 @@ export class WatcherService {
       type: 'started',
       watches: enabledWatches,
     });
+
+    // Save the running state
+    this.savePersistedState();
+
     return { started: true };
   }
 
@@ -473,6 +521,10 @@ export class WatcherService {
       await this.watcher.close();
       this.isWatching = false;
       this.eventsGateway.emitWatcherUpdate({ type: 'stopped' });
+
+      // Save the stopped state
+      this.savePersistedState();
+
       return { stopped: true };
     }
     return { stopped: false, message: 'Watcher is not running.' };