|
@@ -24,6 +24,10 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
private logger = new Logger('WatcherService');
|
|
private logger = new Logger('WatcherService');
|
|
|
private validationWorker: Worker;
|
|
private validationWorker: Worker;
|
|
|
private validationCallbacks = new Map<string, (result: any) => void>();
|
|
private validationCallbacks = new Map<string, (result: any) => void>();
|
|
|
|
|
+ private callbackTimeouts = new Map<string, NodeJS.Timeout>();
|
|
|
|
|
+ private lastEventTime: Date = new Date();
|
|
|
|
|
+ private activityCheckInterval: NodeJS.Timeout | null = null;
|
|
|
|
|
+ private eventCount = 0;
|
|
|
|
|
|
|
|
constructor(
|
|
constructor(
|
|
|
@Inject(DatasetsService) private readonly datasetsService: DatasetsService,
|
|
@Inject(DatasetsService) private readonly datasetsService: DatasetsService,
|
|
@@ -159,12 +163,27 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
});
|
|
});
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
- // Override options to be more conservative for file descriptor limits
|
|
|
|
|
|
|
+ // Override options with robust settings for long-running stability
|
|
|
const conservativeOptions = {
|
|
const conservativeOptions = {
|
|
|
...options,
|
|
...options,
|
|
|
|
|
+ // Polling is more reliable for network filesystems and prevents watcher from dying
|
|
|
|
|
+ usePolling: options.usePolling !== undefined ? options.usePolling : true,
|
|
|
interval: Math.max(options.interval || 10000, 30000), // Minimum 30 seconds
|
|
interval: Math.max(options.interval || 10000, 30000), // Minimum 30 seconds
|
|
|
|
|
+ binaryInterval: 60000, // Check binary files less frequently
|
|
|
depth: options.depth !== undefined ? options.depth : 1,
|
|
depth: options.depth !== undefined ? options.depth : 1,
|
|
|
ignorePermissionErrors: true,
|
|
ignorePermissionErrors: true,
|
|
|
|
|
+ // Wait for file writes to finish before emitting events
|
|
|
|
|
+ awaitWriteFinish: {
|
|
|
|
|
+ stabilityThreshold: 5000, // Wait 5 seconds after last change
|
|
|
|
|
+ pollInterval: 1000, // Check every second
|
|
|
|
|
+ },
|
|
|
|
|
+ // Prevent file descriptor leaks
|
|
|
|
|
+ persistent: true,
|
|
|
|
|
+ // Better error handling
|
|
|
|
|
+ ignoreInitial: false,
|
|
|
|
|
+ followSymlinks: false,
|
|
|
|
|
+ // Atomic write detection
|
|
|
|
|
+ atomic: true,
|
|
|
ignored: (filePath: string) => {
|
|
ignored: (filePath: string) => {
|
|
|
// Use the shouldWatchFile function to filter files
|
|
// Use the shouldWatchFile function to filter files
|
|
|
return !shouldWatchFile(filePath);
|
|
return !shouldWatchFile(filePath);
|
|
@@ -175,26 +194,43 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
this.isWatching = true;
|
|
this.isWatching = true;
|
|
|
this.lastWatches = enabledWatches;
|
|
this.lastWatches = enabledWatches;
|
|
|
this.lastOptions = conservativeOptions;
|
|
this.lastOptions = conservativeOptions;
|
|
|
|
|
+ this.lastEventTime = new Date();
|
|
|
|
|
+ this.eventCount = 0;
|
|
|
|
|
+
|
|
|
this.watcher
|
|
this.watcher
|
|
|
.on('add', (file: string) => {
|
|
.on('add', (file: string) => {
|
|
|
|
|
+ this.updateActivity('add');
|
|
|
this.handleFileAdded(file);
|
|
this.handleFileAdded(file);
|
|
|
})
|
|
})
|
|
|
.on('change', (file: string) => {
|
|
.on('change', (file: string) => {
|
|
|
|
|
+ this.updateActivity('change');
|
|
|
this.eventsGateway.emitFileUpdate({ type: 'change', file });
|
|
this.eventsGateway.emitFileUpdate({ type: 'change', file });
|
|
|
})
|
|
})
|
|
|
.on('unlink', (file: string) => {
|
|
.on('unlink', (file: string) => {
|
|
|
|
|
+ this.updateActivity('unlink');
|
|
|
this.eventsGateway.emitFileUpdate({ type: 'unlink', file });
|
|
this.eventsGateway.emitFileUpdate({ type: 'unlink', file });
|
|
|
})
|
|
})
|
|
|
.on('error', (error: Error) => {
|
|
.on('error', (error: Error) => {
|
|
|
- this.logger.error(`Watcher error: ${error}`);
|
|
|
|
|
|
|
+ this.logger.error(`Watcher error: ${error.message}`);
|
|
|
|
|
+ this.logger.error(`Error stack: ${error.stack}`);
|
|
|
this.eventsGateway.emitWatcherUpdate({
|
|
this.eventsGateway.emitWatcherUpdate({
|
|
|
type: 'error',
|
|
type: 'error',
|
|
|
error: error.message,
|
|
error: error.message,
|
|
|
});
|
|
});
|
|
|
|
|
+ // Don't let errors kill the watcher - try to recover
|
|
|
|
|
+ this.handleWatcherError(error);
|
|
|
})
|
|
})
|
|
|
.on('ready', () => {
|
|
.on('ready', () => {
|
|
|
this.logger.log('Watcher is ready and monitoring for changes');
|
|
this.logger.log('Watcher is ready and monitoring for changes');
|
|
|
|
|
+ this.logger.log(`Watching ${enabledWatches.length} path(s)`);
|
|
|
|
|
+ this.logger.log(`Polling enabled: ${conservativeOptions.usePolling}`);
|
|
|
|
|
+ this.startActivityMonitor();
|
|
|
|
|
+ })
|
|
|
|
|
+ .on('raw', (event, path, details) => {
|
|
|
|
|
+ // Log raw events for debugging (can be disabled in production)
|
|
|
|
|
+ this.logger.debug(`Raw event: ${event} on ${path}`);
|
|
|
});
|
|
});
|
|
|
|
|
+
|
|
|
this.eventsGateway.emitWatcherUpdate({
|
|
this.eventsGateway.emitWatcherUpdate({
|
|
|
type: 'started',
|
|
type: 'started',
|
|
|
watches: enabledWatches,
|
|
watches: enabledWatches,
|
|
@@ -258,8 +294,15 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Offload validation to worker
|
|
|
|
|
|
|
+ // Offload validation to worker with timeout to prevent memory leaks
|
|
|
this.validationCallbacks.set(file, (result) => {
|
|
this.validationCallbacks.set(file, (result) => {
|
|
|
|
|
+ // Clear timeout when callback is called
|
|
|
|
|
+ const timeout = this.callbackTimeouts.get(file);
|
|
|
|
|
+ if (timeout) {
|
|
|
|
|
+ clearTimeout(timeout);
|
|
|
|
|
+ this.callbackTimeouts.delete(file);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
if (!result.isValid) {
|
|
if (!result.isValid) {
|
|
|
this.logger.warn(`File appears to be corrupted or incomplete: ${file}`);
|
|
this.logger.warn(`File appears to be corrupted or incomplete: ${file}`);
|
|
|
return;
|
|
return;
|
|
@@ -269,6 +312,16 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
this.processValidFile(file, dataset);
|
|
this.processValidFile(file, dataset);
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
|
|
+ // Set timeout to cleanup callback if worker doesn't respond within 5 minutes
|
|
|
|
|
+ const timeout = setTimeout(() => {
|
|
|
|
|
+ if (this.validationCallbacks.has(file)) {
|
|
|
|
|
+ this.logger.warn(`Validation timeout for file: ${file}`);
|
|
|
|
|
+ this.validationCallbacks.delete(file);
|
|
|
|
|
+ this.callbackTimeouts.delete(file);
|
|
|
|
|
+ }
|
|
|
|
|
+ }, 300000); // 5 minutes
|
|
|
|
|
+
|
|
|
|
|
+ this.callbackTimeouts.set(file, timeout);
|
|
|
this.validationWorker.postMessage({ type: 'validate_file', file });
|
|
this.validationWorker.postMessage({ type: 'validate_file', file });
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -519,7 +572,81 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ private updateActivity(eventType: string) {
|
|
|
|
|
+ this.lastEventTime = new Date();
|
|
|
|
|
+ this.eventCount++;
|
|
|
|
|
+ if (this.eventCount % 100 === 0) {
|
|
|
|
|
+ this.logger.log(
|
|
|
|
|
+ `Watcher activity: ${this.eventCount} events processed, last: ${eventType}`,
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private startActivityMonitor() {
|
|
|
|
|
+ // Stop any existing monitor
|
|
|
|
|
+ if (this.activityCheckInterval) {
|
|
|
|
|
+ clearInterval(this.activityCheckInterval);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Check for watcher activity every 5 minutes
|
|
|
|
|
+ this.activityCheckInterval = setInterval(() => {
|
|
|
|
|
+ const now = new Date();
|
|
|
|
|
+ const timeSinceLastEvent = now.getTime() - this.lastEventTime.getTime();
|
|
|
|
|
+ const minutesSinceLastEvent = Math.floor(timeSinceLastEvent / 60000);
|
|
|
|
|
+
|
|
|
|
|
+ this.logger.log(
|
|
|
|
|
+ `Watcher health check - Events: ${this.eventCount}, Last activity: ${minutesSinceLastEvent}m ago, Status: ${this.isWatching ? 'active' : 'inactive'}`,
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // Verify watcher is still watching
|
|
|
|
|
+ if (this.watcher && this.isWatching) {
|
|
|
|
|
+ const watchedPaths = this.watcher.getWatched();
|
|
|
|
|
+ const pathCount = Object.keys(watchedPaths).length;
|
|
|
|
|
+ this.logger.log(`Currently watching ${pathCount} directories`);
|
|
|
|
|
+
|
|
|
|
|
+ if (pathCount === 0 && this.lastWatches.length > 0) {
|
|
|
|
|
+ this.logger.error(
|
|
|
|
|
+ 'CRITICAL: Watcher has no watched paths but should be watching!',
|
|
|
|
|
+ );
|
|
|
|
|
+ this.eventsGateway.emitWatcherUpdate({
|
|
|
|
|
+ type: 'health_alert',
|
|
|
|
|
+ healthy: false,
|
|
|
|
|
+ reason: 'Watcher lost all watched paths',
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }, 300000); // Every 5 minutes
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private handleWatcherError(error: Error) {
|
|
|
|
|
+ // Log detailed error information
|
|
|
|
|
+ this.logger.error(
|
|
|
|
|
+ 'Watcher encountered an error, attempting to continue...',
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // Check if watcher is still functional
|
|
|
|
|
+ if (this.watcher) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ const watchedPaths = this.watcher.getWatched();
|
|
|
|
|
+ const pathCount = Object.keys(watchedPaths).length;
|
|
|
|
|
+ if (pathCount === 0) {
|
|
|
|
|
+ this.logger.error('Watcher has stopped watching paths after error!');
|
|
|
|
|
+ } else {
|
|
|
|
|
+ this.logger.log(`Watcher still monitoring ${pathCount} directories`);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (e) {
|
|
|
|
|
+ this.logger.error(`Cannot check watcher status: ${e.message}`);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
async stop() {
|
|
async stop() {
|
|
|
|
|
+ // Stop activity monitor
|
|
|
|
|
+ if (this.activityCheckInterval) {
|
|
|
|
|
+ clearInterval(this.activityCheckInterval);
|
|
|
|
|
+ this.activityCheckInterval = null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// If status shows we're watching, force stop regardless of watcher object state
|
|
// If status shows we're watching, force stop regardless of watcher object state
|
|
|
if (this.isWatching) {
|
|
if (this.isWatching) {
|
|
|
if (this.watcher) {
|
|
if (this.watcher) {
|
|
@@ -553,6 +680,18 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
async onModuleDestroy() {
|
|
async onModuleDestroy() {
|
|
|
// Clean up resources on application shutdown
|
|
// Clean up resources on application shutdown
|
|
|
try {
|
|
try {
|
|
|
|
|
+ // Stop activity monitor
|
|
|
|
|
+ if (this.activityCheckInterval) {
|
|
|
|
|
+ clearInterval(this.activityCheckInterval);
|
|
|
|
|
+ this.activityCheckInterval = null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Clear all callback timeouts
|
|
|
|
|
+ for (const timeout of this.callbackTimeouts.values()) {
|
|
|
|
|
+ clearTimeout(timeout);
|
|
|
|
|
+ }
|
|
|
|
|
+ this.callbackTimeouts.clear();
|
|
|
|
|
+
|
|
|
// Close the watcher if it's running
|
|
// Close the watcher if it's running
|
|
|
if (this.watcher && this.isWatching) {
|
|
if (this.watcher && this.isWatching) {
|
|
|
await this.watcher.close();
|
|
await this.watcher.close();
|
|
@@ -567,6 +706,10 @@ export class WatcherService implements OnModuleDestroy {
|
|
|
|
|
|
|
|
// Clear callbacks
|
|
// Clear callbacks
|
|
|
this.validationCallbacks.clear();
|
|
this.validationCallbacks.clear();
|
|
|
|
|
+
|
|
|
|
|
+ this.logger.log(
|
|
|
|
|
+ `Watcher destroyed. Total events processed: ${this.eventCount}`,
|
|
|
|
|
+ );
|
|
|
} catch (error) {
|
|
} catch (error) {
|
|
|
this.logger.error(`Error during module destroy: ${error}`);
|
|
this.logger.error(`Error during module destroy: ${error}`);
|
|
|
}
|
|
}
|