import os import sqlite3 import uuid import hashlib import threading import time import secrets import string import argparse from datetime import datetime, timedelta from werkzeug.utils import secure_filename from flask import Flask, request, jsonify, render_template, send_from_directory, abort, make_response from werkzeug.security import generate_password_hash import re def generate_secret_key(length=32): """Generate a random secret key with specified length""" alphabet = string.ascii_letters + string.digits + string.punctuation return ''.join(secrets.choice(alphabet) for _ in range(length)) def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='Flask Temporary File Upload Server') parser.add_argument('--host', default='127.0.0.1', help='Host to bind to (default: 127.0.0.1)') parser.add_argument('--port', type=int, default=5000, help='Port to bind to (default: 5000)') parser.add_argument('--debug', action='store_true', help='Enable debug mode') parser.add_argument('--base-url', default=None, help='Base URL for download links (e.g., https://mysite.com)') parser.add_argument('--network', action='store_true', help='Bind to all network interfaces (0.0.0.0)') parser.add_argument('--max-file-size', type=int, default=16, help='Maximum file size in MB (default: 16)') return parser.parse_args() # Parse command line arguments args = parse_arguments() # Set host based on arguments if args.network: HOST = '0.0.0.0' else: HOST = args.host # Set base URL for download links BASE_URL = args.base_url or os.environ.get('BASE_URL', f'http://{HOST}:{args.port}') # Validate max file size if args.max_file_size <= 0: print("❌ Error: Maximum file size must be a positive integer") exit(1) app = Flask(__name__) app.config['SECRET_KEY'] = generate_secret_key(32) app.config['UPLOAD_FOLDER'] = 'uploads' app.config['MAX_CONTENT_LENGTH'] = args.max_file_size * 1024 * 1024 # Convert MB to bytes app.config['BASE_URL'] = BASE_URL # Ensure upload directory exists os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # Database initialization def init_db(): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS files ( id TEXT PRIMARY KEY, original_filename TEXT NOT NULL, sanitized_filename TEXT NOT NULL, file_path TEXT NOT NULL, user_session TEXT NOT NULL, upload_date TEXT NOT NULL, expiration_date TEXT NOT NULL, duration_hours INTEGER NOT NULL, file_size INTEGER NOT NULL, mime_type TEXT ) ''') conn.commit() conn.close() def sanitize_filename(filename): """Sanitize filename for safe storage""" # Remove special characters and spaces name, ext = os.path.splitext(filename) sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', name) # Generate unique identifier unique_id = str(uuid.uuid4())[:8] return f"{sanitized}_{unique_id}{ext}" def cleanup_expired_files(): """Remove expired files from database and filesystem""" conn = sqlite3.connect('files.db') cursor = conn.cursor() # Get expired files cursor.execute(''' SELECT file_path FROM files WHERE expiration_date < ? ''', (datetime.now().isoformat(),)) expired_files = cursor.fetchall() # Delete files from filesystem for (file_path,) in expired_files: full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) try: if os.path.exists(full_path): os.remove(full_path) print(f"Deleted expired file: {file_path}") except Exception as e: print(f"Error deleting file {file_path}: {e}") # Remove from database cursor.execute('DELETE FROM files WHERE expiration_date < ?', (datetime.now().isoformat(),)) conn.commit() conn.close() print(f"Cleaned up {len(expired_files)} expired files") def cleanup_worker(): """Background worker to cleanup expired files every 10 minutes""" while True: time.sleep(600) # 10 minutes cleanup_expired_files() @app.route('/') def index(): max_file_size = app.config['MAX_CONTENT_LENGTH'] // (1024 * 1024) # Convert bytes to MB return render_template('index.html', max_file_size=max_file_size) @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] user_session = request.form.get('user_session') duration_hours = int(request.form.get('duration_hours', 24)) if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if not user_session: return jsonify({'error': 'No user session provided'}), 400 if file: # Generate file info file_id = str(uuid.uuid4()) original_filename = file.filename sanitized_filename = sanitize_filename(original_filename) upload_date = datetime.now() expiration_date = upload_date + timedelta(hours=duration_hours) # Save file file_path = sanitized_filename full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) file.save(full_path) # Get file info file_size = os.path.getsize(full_path) mime_type = file.content_type # Save to database conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' INSERT INTO files (id, original_filename, sanitized_filename, file_path, user_session, upload_date, expiration_date, duration_hours, file_size, mime_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (file_id, original_filename, sanitized_filename, file_path, user_session, upload_date.isoformat(), expiration_date.isoformat(), duration_hours, file_size, mime_type)) conn.commit() conn.close() return jsonify({ 'success': True, 'file_id': file_id, 'original_filename': original_filename, 'download_url': f'{app.config["BASE_URL"]}/download/{file_id}', 'expiration_date': expiration_date.isoformat(), 'file_size': file_size }) @app.route('/files/') def get_user_files(user_session): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' SELECT id, original_filename, upload_date, expiration_date, duration_hours, file_size, mime_type FROM files WHERE user_session = ? AND expiration_date > ? ORDER BY upload_date DESC ''', (user_session, datetime.now().isoformat())) files = cursor.fetchall() conn.close() file_list = [] for file_data in files: file_list.append({ 'id': file_data[0], 'original_filename': file_data[1], 'upload_date': file_data[2], 'expiration_date': file_data[3], 'duration_hours': file_data[4], 'file_size': file_data[5], 'mime_type': file_data[6], 'download_url': f'{app.config["BASE_URL"]}/download/{file_data[0]}' }) return jsonify(file_list) @app.route('/download/') def download_page(file_id): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' SELECT original_filename, file_path, expiration_date, file_size, mime_type FROM files WHERE id = ? ''', (file_id,)) file_data = cursor.fetchone() conn.close() if not file_data: return make_response(render_template('file_not_found.html'), 404) original_filename, file_path, expiration_date, file_size, mime_type = file_data # Check if file has expired if datetime.fromisoformat(expiration_date) < datetime.now(): return make_response(render_template('file_not_found.html'), 410) # Check if file exists full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) if not os.path.exists(full_path): return make_response(render_template('file_not_found.html'), 404) # Check if it's a text file for preview is_text_file = mime_type and (mime_type.startswith('text/') or mime_type == 'application/json') file_content = None if is_text_file: try: with open(full_path, 'r', encoding='utf-8') as f: file_content = f.read() except (UnicodeDecodeError, Exception): # If can't read as text, treat as binary is_text_file = False return render_template('download.html', file_id=file_id, filename=original_filename, file_size=file_size, mime_type=mime_type, expiration_date=expiration_date, is_text_file=is_text_file, file_content=file_content) @app.route('/download//direct') def download_file_direct(file_id): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' SELECT original_filename, file_path, expiration_date FROM files WHERE id = ? ''', (file_id,)) file_data = cursor.fetchone() conn.close() if not file_data: return make_response(render_template('file_not_found.html'), 404) original_filename, file_path, expiration_date = file_data # Check if file has expired if datetime.fromisoformat(expiration_date) < datetime.now(): return make_response(render_template('file_not_found.html'), 410) # Check if file exists full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) if not os.path.exists(full_path): return make_response(render_template('file_not_found.html'), 404) return send_from_directory(app.config['UPLOAD_FOLDER'], file_path, as_attachment=True, download_name=original_filename) @app.route('/delete/', methods=['DELETE']) def delete_file(file_id): user_session = request.json.get('user_session') if not user_session: return jsonify({'error': 'No user session provided'}), 400 conn = sqlite3.connect('files.db') cursor = conn.cursor() # Get file info and verify ownership cursor.execute(''' SELECT file_path FROM files WHERE id = ? AND user_session = ? ''', (file_id, user_session)) file_data = cursor.fetchone() if not file_data: conn.close() return jsonify({'error': 'File not found or unauthorized'}), 404 file_path = file_data[0] # Delete from filesystem full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) try: if os.path.exists(full_path): os.remove(full_path) except Exception as e: print(f"Error deleting file {file_path}: {e}") # Delete from database cursor.execute('DELETE FROM files WHERE id = ? AND user_session = ?', (file_id, user_session)) conn.commit() conn.close() return jsonify({'success': True}) if __name__ == '__main__': # Print configuration info print(f"🚀 Starting Flask Temporary File Upload Server") print(f"📁 Upload folder: {app.config['UPLOAD_FOLDER']}") print(f"🔑 Secret key: {app.config['SECRET_KEY'][:8]}... (32 chars)") print(f"🌐 Base URL: {app.config['BASE_URL']}") print(f"🖥️ Server: http://{HOST}:{args.port}") print(f"🔒 Max file size: {args.max_file_size}MB") print(f"⏰ Cleanup interval: 10 minutes") print(f"🐛 Debug mode: {'ON' if args.debug else 'OFF'}") print("-" * 50) init_db() # Start cleanup worker in background cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) cleanup_thread.start() # Initial cleanup cleanup_expired_files() app.run(debug=args.debug, host=HOST, port=args.port)