| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- import sqlite3
- import os
- from datetime import datetime, timedelta
- from contextlib import contextmanager
- DATABASE_PATH = 'files.db'
- def init_database():
- """Inicializar la base de datos y crear las tablas necesarias"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Crear tabla de archivos
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS files (
- id TEXT PRIMARY KEY,
- original_filename TEXT NOT NULL,
- stored_filename TEXT NOT NULL,
- file_size INTEGER NOT NULL,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- expires_at TIMESTAMP NOT NULL,
- download_count INTEGER DEFAULT 0,
- is_deleted BOOLEAN DEFAULT FALSE
- )
- ''')
-
- conn.commit()
- @contextmanager
- def get_db_connection():
- """Context manager para conexiones a la base de datos"""
- conn = sqlite3.connect(DATABASE_PATH)
- conn.row_factory = sqlite3.Row # Permite acceder a las columnas por nombre
- try:
- yield conn
- finally:
- conn.close()
- def save_file(file_id, original_filename, stored_filename, file_size, expires_hours=24):
- """Guardar información de un archivo en la base de datos"""
- expires_at = datetime.now() + timedelta(hours=expires_hours)
-
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('''
- INSERT INTO files (id, original_filename, stored_filename, file_size, expires_at)
- VALUES (?, ?, ?, ?, ?)
- ''', (file_id, original_filename, stored_filename, file_size, expires_at.isoformat()))
- conn.commit()
- def get_file_by_id(file_id):
- """Obtener información de un archivo por su ID"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('''
- SELECT * FROM files
- WHERE id = ? AND is_deleted = FALSE
- ''', (file_id,))
- return cursor.fetchone()
- def get_all_files():
- """Obtener todos los archivos no eliminados"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('''
- SELECT * FROM files
- WHERE is_deleted = FALSE
- ORDER BY created_at DESC
- ''')
- return cursor.fetchall()
- def get_active_files():
- """Obtener solo archivos activos (no expirados y no eliminados)"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('''
- SELECT * FROM files
- WHERE is_deleted = FALSE AND expires_at > ?
- ORDER BY created_at DESC
- ''', (datetime.now().isoformat(),))
- return cursor.fetchall()
- def delete_file(file_id):
- """Marcar un archivo como eliminado (soft delete)"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('''
- UPDATE files
- SET is_deleted = TRUE
- WHERE id = ?
- ''', (file_id,))
- conn.commit()
- return cursor.rowcount > 0
- def hard_delete_file(file_id):
- """Eliminar completamente un archivo de la base de datos"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('DELETE FROM files WHERE id = ?', (file_id,))
- conn.commit()
- return cursor.rowcount > 0
- def increment_download_count(file_id):
- """Incrementar el contador de descargas de un archivo"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('''
- UPDATE files
- SET download_count = download_count + 1
- WHERE id = ?
- ''', (file_id,))
- conn.commit()
- def cleanup_expired_files():
- """Eliminar archivos expirados de la base de datos"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
- cursor.execute('''
- UPDATE files
- SET is_deleted = TRUE
- WHERE expires_at <= ? AND is_deleted = FALSE
- ''', (datetime.now().isoformat(),))
- conn.commit()
- return cursor.rowcount
- def get_file_stats():
- """Obtener estadísticas de archivos"""
- with get_db_connection() as conn:
- cursor = conn.cursor()
-
- # Total de archivos
- cursor.execute('SELECT COUNT(*) FROM files WHERE is_deleted = FALSE')
- total_files = cursor.fetchone()[0]
-
- # Archivos activos
- cursor.execute('''
- SELECT COUNT(*) FROM files
- WHERE is_deleted = FALSE AND expires_at > ?
- ''', (datetime.now().isoformat(),))
- active_files = cursor.fetchone()[0]
-
- # Total de descargas
- cursor.execute('SELECT SUM(download_count) FROM files WHERE is_deleted = FALSE')
- total_downloads = cursor.fetchone()[0] or 0
-
- return {
- 'total_files': total_files,
- 'active_files': active_files,
- 'total_downloads': total_downloads
- }
|