import sqlite3 import os from datetime import datetime, timedelta from contextlib import contextmanager DATABASE_PATH = 'files.db' def init_database(): """Inicializar la base de datos y crear las tablas necesarias""" with get_db_connection() as conn: cursor = conn.cursor() # Crear tabla de archivos cursor.execute(''' CREATE TABLE IF NOT EXISTS files ( id TEXT PRIMARY KEY, original_filename TEXT NOT NULL, stored_filename TEXT NOT NULL, file_size INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL, download_count INTEGER DEFAULT 0, is_deleted BOOLEAN DEFAULT FALSE ) ''') conn.commit() @contextmanager def get_db_connection(): """Context manager para conexiones a la base de datos""" conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row # Permite acceder a las columnas por nombre try: yield conn finally: conn.close() def save_file(file_id, original_filename, stored_filename, file_size, expires_hours=24): """Guardar información de un archivo en la base de datos""" expires_at = datetime.now() + timedelta(hours=expires_hours) with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO files (id, original_filename, stored_filename, file_size, expires_at) VALUES (?, ?, ?, ?, ?) ''', (file_id, original_filename, stored_filename, file_size, expires_at.isoformat())) conn.commit() def get_file_by_id(file_id): """Obtener información de un archivo por su ID""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT * FROM files WHERE id = ? AND is_deleted = FALSE ''', (file_id,)) return cursor.fetchone() def get_all_files(): """Obtener todos los archivos no eliminados""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT * FROM files WHERE is_deleted = FALSE ORDER BY created_at DESC ''') return cursor.fetchall() def get_active_files(): """Obtener solo archivos activos (no expirados y no eliminados)""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT * FROM files WHERE is_deleted = FALSE AND expires_at > ? ORDER BY created_at DESC ''', (datetime.now().isoformat(),)) return cursor.fetchall() def delete_file(file_id): """Marcar un archivo como eliminado (soft delete)""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' UPDATE files SET is_deleted = TRUE WHERE id = ? ''', (file_id,)) conn.commit() return cursor.rowcount > 0 def hard_delete_file(file_id): """Eliminar completamente un archivo de la base de datos""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute('DELETE FROM files WHERE id = ?', (file_id,)) conn.commit() return cursor.rowcount > 0 def increment_download_count(file_id): """Incrementar el contador de descargas de un archivo""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' UPDATE files SET download_count = download_count + 1 WHERE id = ? ''', (file_id,)) conn.commit() def cleanup_expired_files(): """Eliminar archivos expirados de la base de datos""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(''' UPDATE files SET is_deleted = TRUE WHERE expires_at <= ? AND is_deleted = FALSE ''', (datetime.now().isoformat(),)) conn.commit() return cursor.rowcount def get_file_stats(): """Obtener estadísticas de archivos""" with get_db_connection() as conn: cursor = conn.cursor() # Total de archivos cursor.execute('SELECT COUNT(*) FROM files WHERE is_deleted = FALSE') total_files = cursor.fetchone()[0] # Archivos activos cursor.execute(''' SELECT COUNT(*) FROM files WHERE is_deleted = FALSE AND expires_at > ? ''', (datetime.now().isoformat(),)) active_files = cursor.fetchone()[0] # Total de descargas cursor.execute('SELECT SUM(download_count) FROM files WHERE is_deleted = FALSE') total_downloads = cursor.fetchone()[0] or 0 return { 'total_files': total_files, 'active_files': active_files, 'total_downloads': total_downloads }