import { Injectable } from '@nestjs/common'; import Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; import { MigrationRunner } from './migration-runner'; @Injectable() export class DbService { private db: Database.Database; constructor() { // Use unified database for all settings/configuration // Find project root by traversing up from current directory until we find the root package.json let projectRoot = process.cwd(); while (projectRoot !== path.dirname(projectRoot)) { if (fs.existsSync(path.join(projectRoot, 'package.json'))) { try { const pkg = JSON.parse( fs.readFileSync(path.join(projectRoot, 'package.json'), 'utf-8'), ); if (pkg.name === 'watch-finished-turbo') { break; } } catch (e) { // ignore } } projectRoot = path.dirname(projectRoot); } const rootDataPath = path.resolve(projectRoot, 'data/database.db'); // Ensure the directory exists const dir = path.dirname(rootDataPath); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } try { this.db = new Database(rootDataPath); // Run migrations const migrationsDir = path.resolve(projectRoot, 'data/migrations'); const migrationRunner = new MigrationRunner(this.db, migrationsDir); migrationRunner.applyPendingMigrations(); } catch (error) { console.error('Failed to open database:', error); console.error('Database path:', rootDataPath); console.error('Directory exists:', fs.existsSync(dir)); console.error( 'Directory permissions:', fs.statSync(dir).mode.toString(8), ); throw error; } } /** * Get the database instance for direct queries */ getDb(): Database.Database { return this.db; } // List all files listAllFiles() { return this.db.prepare('SELECT * FROM files').all(); } // List all files for a dataset listFilesForDataset(dataset: string) { return this.db .prepare('SELECT * FROM files WHERE dataset = ?') .all(dataset); } /** * Delete file records older than X days (autoExpireDays from settings) * @param days Days to keep (optional, overrides settings) */ deleteExpiredFiles(days?: number): number { // Fallback to 180 if no settings available const keepDays = days || 180; const cutoff = new Date( Date.now() - keepDays * 24 * 60 * 60 * 1000, ).toISOString(); const stmt = this.db.prepare('DELETE FROM files WHERE date < ?'); const info = stmt.run(cutoff); return info.changes; } /** * Migrate legacy JSON files to the SQLite database. * @param opts Options for migration. * @param dbOverride Optional override for db instance. */ migrateJsonToSqlite( opts: { datasets?: string[]; dataDir?: string } = {}, dbOverride?: Database.Database, ) { const datasets = opts.datasets || [ 'movies', 'tvshows', 'kids', 'pr0n', 'sports', ]; const dataDir = opts.dataDir || path.join(process.cwd(), 'legacy/data'); const dbInstance = dbOverride || this.db; dbInstance.exec(` CREATE TABLE IF NOT EXISTS files ( dataset TEXT, input TEXT, output TEXT, date TEXT, status TEXT DEFAULT 'pending', PRIMARY KEY (dataset, input) ); `); const insert = dbInstance.prepare( 'INSERT INTO files (dataset, input, output, date) VALUES (?, ?, ?, ?)', ); for (const dataset of datasets) { const filePath = path.join(dataDir, `${dataset}.json`); if (!fs.existsSync(filePath)) continue; const json = JSON.parse(fs.readFileSync(filePath, 'utf8')); if (!json.files || !Array.isArray(json.files)) continue; for (const rec of json.files) { insert.run( dataset, rec.input || null, rec.output || null, rec.date || null, ); } } } // Duplicate file review helpers clearDuplicateGroups() { this.db.prepare('DELETE FROM duplicate_files').run(); } getDuplicateGroup(id: number) { const row = this.db .prepare('SELECT * FROM duplicate_files WHERE id = ?') .get(id) as | { id: number; dataset: string; destination: string; hash: string; size: number; files: string; status: string; note?: string; created_at: string; reviewed_at?: string; } | undefined; if (!row) return undefined; return { ...row, files: this.safeParseFiles(row.files) }; } getDuplicateGroupByKey( dataset: string, destination: string, hash: string, size: number, ) { const row = this.db .prepare( 'SELECT * FROM duplicate_files WHERE dataset = ? AND destination = ? AND hash = ? AND size = ?', ) .get(dataset, destination, hash, size) as | { id: number; dataset: string; destination: string; hash: string; size: number; files: string; status: string; note?: string; created_at: string; reviewed_at?: string; } | undefined; if (!row) return undefined; return { ...row, files: this.safeParseFiles(row.files) }; } saveDuplicateGroup(entry: { dataset: string; destination: string; hash: string; size: number; files: string[]; }) { const existing = this.getDuplicateGroupByKey( entry.dataset, entry.destination, entry.hash, entry.size, ); // Do not re-flag entries that were manually reviewed/ignored if (existing && existing.status === 'reviewed') { return existing; } const filesJson = JSON.stringify(entry.files); if (existing) { this.db .prepare( `UPDATE duplicate_files SET files = ?, size = ?, status = 'pending', note = note, reviewed_at = NULL WHERE id = ?`, ) .run(filesJson, entry.size, existing.id); return { ...existing, files: entry.files, size: entry.size, status: 'pending', }; } const result = this.db .prepare( `INSERT INTO duplicate_files (dataset, destination, hash, size, files) VALUES (?, ?, ?, ?, ?)`, ) .run(entry.dataset, entry.destination, entry.hash, entry.size, filesJson); return { ...entry, id: result.lastInsertRowid as number, status: 'pending', note: null, }; } listDuplicateGroups(status?: string, dataset?: string) { let query = 'SELECT * FROM duplicate_files'; const params: any[] = []; if (status) { query += ' WHERE status = ?'; params.push(status); } if (dataset) { query += status ? ' AND dataset = ?' : ' WHERE dataset = ?'; params.push(dataset); } query += ' ORDER BY created_at DESC'; const rows = this.db.prepare(query).all(...params) as Array<{ id: number; dataset: string; destination: string; hash: string; size: number; files: string; status: string; note?: string; created_at: string; reviewed_at?: string; }>; return rows.map((row) => ({ ...row, files: this.safeParseFiles(row.files), })); } markDuplicateGroup( id: number, status: 'pending' | 'reviewed' | 'purged', note?: string, ) { return this.db .prepare( `UPDATE duplicate_files SET status = ?, note = COALESCE(?, note), reviewed_at = CURRENT_TIMESTAMP WHERE id = ?`, ) .run(status, note || null, id); } updateDuplicateGroupFiles( id: number, files: string[], status?: 'pending' | 'reviewed' | 'purged', note?: string, ) { return this.db .prepare( `UPDATE duplicate_files SET files = ?, status = COALESCE(?, status), note = COALESCE(?, note), reviewed_at = CASE WHEN ? IS NOT NULL THEN CURRENT_TIMESTAMP ELSE reviewed_at END WHERE id = ?`, ) .run( JSON.stringify(files), status || null, note || null, status || null, id, ); } deleteDuplicateGroup(id: number) { return this.db.prepare('DELETE FROM duplicate_files WHERE id = ?').run(id); } private safeParseFiles(value: string): string[] { try { const parsed = JSON.parse(value); return Array.isArray(parsed) ? parsed : []; } catch { return []; } } findFile(dataset: string, file: string) { return this.db .prepare('SELECT * FROM files WHERE dataset = ? AND input = ?') .get(dataset, file); } setFile(dataset: string, file: string, payload: any) { const existing = this.findFile(dataset, file) as | { dataset: string; input: string; output?: string; date?: string; status?: string; hash?: string; file_size?: number; } | undefined; const outputValue = payload && payload.output !== undefined ? payload.output : (existing?.output ?? null); const statusValue = payload && payload.status !== undefined ? payload.status : (existing?.status ?? 'pending'); const dateValue = payload && payload.date !== undefined ? new Date(payload.date).toISOString() : existing?.date || new Date().toISOString(); const hashValue = payload && payload.hash !== undefined ? payload.hash : (existing?.hash ?? null); const fileSizeValue = payload && payload.file_size !== undefined ? payload.file_size : (existing?.file_size ?? null); if (existing) { this.db .prepare( `UPDATE files SET output = COALESCE(?, output), date = COALESCE(?, date), status = COALESCE(?, status), hash = COALESCE(?, hash), file_size = COALESCE(?, file_size) WHERE dataset = ? AND input = ?`, ) .run( outputValue, dateValue, statusValue, hashValue, fileSizeValue, dataset, file, ); } else { this.db .prepare( 'INSERT INTO files (dataset, input, output, date, status, hash, file_size) VALUES (?, ?, ?, ?, ?, ?, ?)', ) .run( dataset, file, outputValue, dateValue, statusValue, hashValue, fileSizeValue, ); } return this.findFile(dataset, file); } removeFile(dataset: string, file: string, soft = true) { if (soft) { this.db .prepare( 'UPDATE files SET status = ?, date = ? WHERE dataset = ? AND input = ?', ) .run('deleted', new Date().toISOString(), dataset, file); } else { this.db .prepare('DELETE FROM files WHERE dataset = ? AND input = ?') .run(dataset, file); } } clearAllFiles() { this.db.prepare('DELETE FROM files').run(); } getAllFiles(dataset: string) { return this.db .prepare('SELECT * FROM files WHERE dataset = ?') .all(dataset); } getDeletedOlderThan(dataset: string, isoDate: string) { return this.db .prepare('SELECT * FROM files WHERE dataset = ? AND date < ?') .all(dataset, isoDate); } // Task CRUD methods getAllTasks() { return this.db .prepare('SELECT * FROM tasks ORDER BY created_at DESC') .all(); } getPendingTasks(limit: number = 10) { return this.db .prepare( 'SELECT * FROM tasks WHERE status = ? ORDER BY priority DESC, created_at ASC LIMIT ?', ) .all('pending', limit); } getTaskById(id: number) { return this.db.prepare('SELECT * FROM tasks WHERE id = ?').get(id); } getTaskByInput(input: string) { return this.db.prepare('SELECT * FROM tasks WHERE input = ?').get(input); } createTask(task: { type: string; status?: string; progress?: number; dataset?: string; input?: string; output?: string; preset?: string; priority?: number; error_message?: string; retry_count?: number; max_retries?: number; }) { const result = this.db .prepare( `INSERT INTO tasks (type, status, progress, dataset, input, output, preset, priority, retry_count, max_retries, error_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, ) .run( task.type, task.status || 'pending', task.progress || 0, task.dataset || null, task.input || null, task.output || null, task.preset || null, task.priority || 0, task.retry_count || 0, task.max_retries || null, task.error_message || null, ); return { id: result.lastInsertRowid, ...task }; } deleteTask(id: number) { return this.db.prepare('DELETE FROM tasks WHERE id = ?').run(id); } updateTask( id: number, updates: { status?: string; progress?: number; dataset?: string; input?: string; output?: string; preset?: string; priority?: number; error_message?: string; retry_count?: number; max_retries?: number; }, ) { const setParts = []; const values = []; if (updates.status !== undefined) { setParts.push('status = ?'); values.push(updates.status); } if (updates.progress !== undefined) { setParts.push('progress = ?'); values.push(updates.progress); } if (updates.dataset !== undefined) { setParts.push('dataset = ?'); values.push(updates.dataset); } if (updates.input !== undefined) { setParts.push('input = ?'); values.push(updates.input); } if (updates.output !== undefined) { setParts.push('output = ?'); values.push(updates.output); } if (updates.preset !== undefined) { setParts.push('preset = ?'); values.push(updates.preset); } if (updates.priority !== undefined) { setParts.push('priority = ?'); values.push(updates.priority); } if (updates.error_message !== undefined) { setParts.push('error_message = ?'); values.push(updates.error_message); } if (updates.retry_count !== undefined) { setParts.push('retry_count = ?'); values.push(updates.retry_count); } if (updates.max_retries !== undefined) { setParts.push('max_retries = ?'); values.push(updates.max_retries); } if (setParts.length > 0) { setParts.push('updated_at = CURRENT_TIMESTAMP'); values.push(id); this.db .prepare(`UPDATE tasks SET ${setParts.join(', ')} WHERE id = ?`) .run(...values); } return this.db.prepare('SELECT * FROM tasks WHERE id = ?').get(id); } // Task maintenance methods deleteTasksByStatus(status: string, olderThanDays?: number) { let query = 'DELETE FROM tasks WHERE status = ?'; const params = [status]; if (olderThanDays !== undefined) { query += ` AND created_at < datetime('now', '-${olderThanDays} days')`; } return this.db.prepare(query).run(...params); } deleteTasksOlderThan(days: number) { return this.db .prepare( `DELETE FROM tasks WHERE created_at < datetime('now', '-${days} days')`, ) .run(); } getTaskStats() { const stats = this.db .prepare( ` SELECT status, COUNT(*) as count, MIN(created_at) as oldest, MAX(created_at) as newest FROM tasks GROUP BY status `, ) .all(); const total = this.db .prepare('SELECT COUNT(*) as total FROM tasks') .get() as { total: number }; return { total: total.total, byStatus: stats, }; } archiveOldTasks(daysOld: number = 30): { changes?: number } { // Create archive table if it doesn't exist this.db.exec(` CREATE TABLE IF NOT EXISTS tasks_archive ( id INTEGER PRIMARY KEY, type TEXT NOT NULL, status TEXT NOT NULL, progress REAL DEFAULT 0, dataset TEXT, input TEXT, output TEXT, preset TEXT, priority INTEGER DEFAULT 0, retry_count INTEGER DEFAULT 0, max_retries INTEGER, error_message TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, archived_at DATETIME DEFAULT CURRENT_TIMESTAMP ) `); // Move old completed/failed tasks to archive const insertResult = this.db .prepare( ` INSERT INTO tasks_archive SELECT *, CURRENT_TIMESTAMP as archived_at FROM tasks WHERE status IN ('completed', 'failed', 'skipped') AND created_at < datetime('now', '-${daysOld} days') `, ) .run(); // Delete archived tasks from main table const deleteResult = this.db .prepare( ` DELETE FROM tasks WHERE status IN ('completed', 'failed', 'skipped') AND created_at < datetime('now', '-${daysOld} days') `, ) .run(); return { changes: insertResult.changes }; } // Purge all tasks from the database purgeAllTasks() { const result = this.db.prepare('DELETE FROM tasks').run(); return result; } // ============================================================ // Hash-based duplicate detection methods // ============================================================ /** * Store a destination file with its hash and size */ storeDestinationFile( dataset: string, destinationPath: string, hash: string, fileSize: number, ) { // Use destination_path as both input and destination_path for consistency with indexing worker // This prevents duplicates by using the (dataset, input) primary key const existing = this.db .prepare('SELECT * FROM files WHERE dataset = ? AND input = ?') .get(dataset, destinationPath) as | { dataset: string; input: string | null; output: string | null; destination_path: string; hash: string | null; file_size: number | null; } | undefined; const now = new Date().toISOString(); if (existing) { this.db .prepare( `UPDATE files SET destination_path = ?, hash = ?, file_size = ?, date = ? WHERE dataset = ? AND input = ?`, ) .run(destinationPath, hash, fileSize, now, dataset, destinationPath); } else { // Use destination_path as input for the primary key this.db .prepare( `INSERT INTO files (dataset, input, destination_path, hash, file_size, date, status) VALUES (?, ?, ?, ?, ?, ?, 'indexed')`, ) .run(dataset, destinationPath, destinationPath, hash, fileSize, now); } } /** * Find duplicate files by hash and size */ findDuplicatesByHash( hash: string, fileSize: number, dataset?: string, ): Array<{ dataset: string; input: string | null; output: string | null; destination_path: string | null; hash: string; file_size: number; date: string; status: string; }> { let query = 'SELECT * FROM files WHERE hash = ? AND file_size = ?'; const params: any[] = [hash, fileSize]; if (dataset) { query += ' AND dataset = ?'; params.push(dataset); } return this.db.prepare(query).all(...params) as Array<{ dataset: string; input: string | null; output: string | null; destination_path: string | null; hash: string; file_size: number; date: string; status: string; }>; } /** * Get all duplicates from the view */ getAllDuplicates(dataset?: string) { let query = 'SELECT * FROM file_duplicates'; const params: any[] = []; if (dataset) { query += ' WHERE dataset = ?'; params.push(dataset); } return this.db.prepare(query).all(...params) as Array<{ hash: string; file_size: number; dataset: string; file_count: number; file_paths: string; }>; } /** * Get files in a destination that need hash indexing */ getDestinationFilesWithoutHash(dataset: string, destinationPath?: string) { let query = ` SELECT * FROM files WHERE dataset = ? AND destination_path IS NOT NULL AND hash IS NULL `; const params: any[] = [dataset]; if (destinationPath) { query += ' AND destination_path LIKE ?'; params.push(`${destinationPath}%`); } return this.db.prepare(query).all(...params); } /** * Remove all destination file entries (for re-indexing) */ clearDestinationFiles(dataset: string, destinationPath?: string) { let query = 'DELETE FROM files WHERE dataset = ? AND destination_path IS NOT NULL'; const params: any[] = [dataset]; if (destinationPath) { query += ' AND destination_path LIKE ?'; params.push(`${destinationPath}%`); } const result = this.db.prepare(query).run(...params); return result.changes; } /** * Get count of indexed destination files */ getDestinationFileCount(dataset: string, destinationPath?: string) { let query = 'SELECT COUNT(*) as count FROM files WHERE dataset = ? AND destination_path IS NOT NULL'; const params: any[] = [dataset]; if (destinationPath) { query += ' AND destination_path LIKE ?'; params.push(`${destinationPath}%`); } const result = this.db.prepare(query).get(...params) as { count: number }; return result.count; } /** * Update hash and file_size for a file after processing * Used by handbrake service after encoding completes */ updateFileHash( dataset: string, filePath: string, hash: string, fileSize: number, ) { const now = new Date().toISOString(); const stmt = this.db.prepare(` UPDATE files SET hash = ?, file_size = ?, date = ? WHERE dataset = ? AND input = ? `); return stmt.run(hash, fileSize, now, dataset, filePath); } /** * Remove duplicate file entries * Removes: * 1. Legacy duplicates where input = NULL and a matching entry with input = destination_path exists * 2. Multiple entries with the same output path (keeps the most recent) * 3. Multiple entries with the same destination_path (keeps the most recent) */ removeDuplicateFileEntries() { let totalRemoved = 0; // 1. Remove legacy duplicates (input IS NULL) const deleteLegacyStmt = this.db.prepare(` DELETE FROM files WHERE input IS NULL AND destination_path IS NOT NULL AND EXISTS ( SELECT 1 FROM files f2 WHERE f2.dataset = files.dataset AND f2.input = files.destination_path AND f2.destination_path = files.destination_path ) `); const legacyResult = deleteLegacyStmt.run(); totalRemoved += legacyResult.changes; // 2. Remove duplicate entries with same output path (keep most recent) const deleteOutputDupsStmt = this.db.prepare(` DELETE FROM files WHERE rowid NOT IN ( SELECT MAX(rowid) FROM files WHERE output IS NOT NULL GROUP BY dataset, output ) AND output IS NOT NULL `); const outputDupsResult = deleteOutputDupsStmt.run(); totalRemoved += outputDupsResult.changes; // 3. Remove duplicate entries with same destination_path (keep most recent) const deleteDestDupsStmt = this.db.prepare(` DELETE FROM files WHERE rowid NOT IN ( SELECT MAX(rowid) FROM files WHERE destination_path IS NOT NULL GROUP BY dataset, destination_path ) AND destination_path IS NOT NULL `); const destDupsResult = deleteDestDupsStmt.run(); totalRemoved += destDupsResult.changes; return totalRemoved; } }