import os import sqlite3 import uuid import hashlib import threading import time import secrets import string import argparse from datetime import datetime, timedelta from werkzeug.utils import secure_filename from flask import Flask, request, jsonify, render_template, send_from_directory, abort, make_response from werkzeug.security import generate_password_hash import re import zipfile import tarfile try: import rarfile RARFILE_AVAILABLE = True except ImportError: RARFILE_AVAILABLE = False def generate_secret_key(length=32): """Generate a random secret key with specified length""" alphabet = string.ascii_letters + string.digits + string.punctuation return ''.join(secrets.choice(alphabet) for _ in range(length)) def get_archive_contents(file_path, mime_type): """Get contents of archive files (ZIP, TAR, RAR)""" try: # Check if it's a ZIP file if zipfile.is_zipfile(file_path): with zipfile.ZipFile(file_path, 'r') as zip_ref: file_list = [] for info in zip_ref.infolist(): if not info.is_dir(): file_list.append({ 'name': info.filename, 'size': info.file_size, 'compressed_size': info.compress_size, 'date_time': info.date_time, 'type': 'file' }) else: file_list.append({ 'name': info.filename, 'size': 0, 'type': 'directory' }) return {'type': 'zip', 'files': file_list} # Check if it's a TAR file elif tarfile.is_tarfile(file_path): with tarfile.open(file_path, 'r:*') as tar_ref: file_list = [] for member in tar_ref.getmembers(): file_list.append({ 'name': member.name, 'size': member.size, 'type': 'file' if member.isfile() else ('directory' if member.isdir() else 'other'), 'mode': oct(member.mode) if member.mode else None, 'uid': member.uid, 'gid': member.gid, 'mtime': member.mtime }) return {'type': 'tar', 'files': file_list} # Check if it's a RAR file (if rarfile is available) elif RARFILE_AVAILABLE and mime_type and 'rar' in mime_type.lower(): try: with rarfile.RarFile(file_path) as rar_ref: file_list = [] for info in rar_ref.infolist(): file_list.append({ 'name': info.filename, 'size': info.file_size, 'compressed_size': info.compress_size, 'date_time': info.date_time, 'type': 'file' }) return {'type': 'rar', 'files': file_list} except: pass return None except Exception as e: print(f"Error reading archive {file_path}: {e}") return None def is_archive_file(file_path, mime_type, filename): """Check if file is a supported archive format""" # Check by file extension lower_filename = filename.lower() if any(lower_filename.endswith(ext) for ext in ['.zip', '.tar', '.tar.gz', '.tar.bz2', '.tar.xz', '.tgz']): return True if RARFILE_AVAILABLE and lower_filename.endswith('.rar'): return True # Check by MIME type if mime_type: archive_mimes = [ 'application/zip', 'application/x-zip-compressed', 'application/x-tar', 'application/x-gtar', 'application/x-compressed-tar', 'application/vnd.rar', 'application/x-rar-compressed' ] if any(mime in mime_type.lower() for mime in archive_mimes): return True # Check by actual file content try: if zipfile.is_zipfile(file_path) or tarfile.is_tarfile(file_path): return True except: pass return False def is_media_file(mime_type, filename): """Check if file is a supported media format (image, video, audio, pdf)""" if not mime_type: return False, None # Image files (including HEIC/HEIF) if mime_type.startswith('image/'): supported_images = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif', 'image/webp', 'image/svg+xml', 'image/bmp', 'image/heic', 'image/heif'] if mime_type in supported_images: return True, 'image' # Video files if mime_type.startswith('video/'): supported_videos = ['video/mp4', 'video/webm', 'video/ogg', 'video/avi', 'video/mov', 'video/quicktime'] if mime_type in supported_videos: return True, 'video' # Audio files if mime_type.startswith('audio/'): supported_audio = ['audio/mpeg', 'audio/mp3', 'audio/wav', 'audio/ogg', 'audio/webm', 'audio/aac', 'audio/flac'] if mime_type in supported_audio: return True, 'audio' # PDF files if mime_type == 'application/pdf': return True, 'pdf' return False, None def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='Flask Temporary File Upload Server') parser.add_argument('--host', default='127.0.0.1', help='Host to bind to (default: 127.0.0.1)') parser.add_argument('--port', type=int, default=5000, help='Port to bind to (default: 5000)') parser.add_argument('--debug', action='store_true', help='Enable debug mode') parser.add_argument('--base-url', default=None, help='Base URL for download links (e.g., https://mysite.com)') parser.add_argument('--network', action='store_true', help='Bind to all network interfaces (0.0.0.0)') parser.add_argument('--max-file-size', type=int, default=16, help='Maximum file size in MB (default: 16)') return parser.parse_args() # Parse command line arguments args = parse_arguments() # Set host based on arguments if args.network: HOST = '0.0.0.0' else: HOST = args.host # Set base URL for download links BASE_URL = args.base_url or os.environ.get('BASE_URL', f'http://{HOST}:{args.port}') # Validate max file size if args.max_file_size <= 0: print("❌ Error: Maximum file size must be a positive integer") exit(1) app = Flask(__name__) app.config['SECRET_KEY'] = generate_secret_key(32) app.config['UPLOAD_FOLDER'] = 'uploads' app.config['MAX_CONTENT_LENGTH'] = args.max_file_size * 1024 * 1024 # Convert MB to bytes app.config['BASE_URL'] = BASE_URL # Ensure upload directory exists os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # Database initialization def init_db(): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS files ( id TEXT PRIMARY KEY, original_filename TEXT NOT NULL, sanitized_filename TEXT NOT NULL, file_path TEXT NOT NULL, user_session TEXT NOT NULL, upload_date TEXT NOT NULL, expiration_date TEXT NOT NULL, duration_hours INTEGER NOT NULL, file_size INTEGER NOT NULL, mime_type TEXT ) ''') conn.commit() conn.close() def sanitize_filename(filename): """Sanitize filename for safe storage""" # Remove special characters and spaces name, ext = os.path.splitext(filename) sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', name) # Generate unique identifier unique_id = str(uuid.uuid4())[:8] return f"{sanitized}_{unique_id}{ext}" def cleanup_expired_files(): """Remove expired files from database and filesystem""" conn = sqlite3.connect('files.db') cursor = conn.cursor() # Get expired files cursor.execute(''' SELECT file_path FROM files WHERE expiration_date < ? ''', (datetime.now().isoformat(),)) expired_files = cursor.fetchall() # Delete files from filesystem for (file_path,) in expired_files: full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) try: if os.path.exists(full_path): os.remove(full_path) print(f"Deleted expired file: {file_path}") except Exception as e: print(f"Error deleting file {file_path}: {e}") # Remove from database cursor.execute('DELETE FROM files WHERE expiration_date < ?', (datetime.now().isoformat(),)) conn.commit() conn.close() print(f"Cleaned up {len(expired_files)} expired files") def cleanup_worker(): """Background worker to cleanup expired files every 10 minutes""" while True: time.sleep(600) # 10 minutes cleanup_expired_files() @app.route('/') def index(): max_file_size = app.config['MAX_CONTENT_LENGTH'] // (1024 * 1024) # Convert bytes to MB return render_template('index.html', max_file_size=max_file_size) @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] user_session = request.form.get('user_session') duration_hours = int(request.form.get('duration_hours', 24)) if file.filename == '': return jsonify({'error': 'No file selected'}), 400 if not user_session: return jsonify({'error': 'No user session provided'}), 400 if file: # Generate file info file_id = str(uuid.uuid4()) original_filename = file.filename sanitized_filename = sanitize_filename(original_filename) upload_date = datetime.now() expiration_date = upload_date + timedelta(hours=duration_hours) # Save file file_path = sanitized_filename full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) file.save(full_path) # Get file info file_size = os.path.getsize(full_path) mime_type = file.content_type # Save to database conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' INSERT INTO files (id, original_filename, sanitized_filename, file_path, user_session, upload_date, expiration_date, duration_hours, file_size, mime_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (file_id, original_filename, sanitized_filename, file_path, user_session, upload_date.isoformat(), expiration_date.isoformat(), duration_hours, file_size, mime_type)) conn.commit() conn.close() return jsonify({ 'success': True, 'file_id': file_id, 'original_filename': original_filename, 'download_url': f'{app.config["BASE_URL"]}/download/{file_id}', 'expiration_date': expiration_date.isoformat(), 'file_size': file_size }) @app.route('/files/') def get_user_files(user_session): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' SELECT id, original_filename, upload_date, expiration_date, duration_hours, file_size, mime_type FROM files WHERE user_session = ? AND expiration_date > ? ORDER BY upload_date DESC ''', (user_session, datetime.now().isoformat())) files = cursor.fetchall() conn.close() file_list = [] for file_data in files: file_list.append({ 'id': file_data[0], 'original_filename': file_data[1], 'upload_date': file_data[2], 'expiration_date': file_data[3], 'duration_hours': file_data[4], 'file_size': file_data[5], 'mime_type': file_data[6], 'download_url': f'{app.config["BASE_URL"]}/download/{file_data[0]}' }) return jsonify(file_list) @app.route('/download/') def download_page(file_id): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' SELECT original_filename, file_path, expiration_date, file_size, mime_type FROM files WHERE id = ? ''', (file_id,)) file_data = cursor.fetchone() conn.close() if not file_data: return make_response(render_template('file_not_found.html'), 404) original_filename, file_path, expiration_date, file_size, mime_type = file_data # Check if file has expired if datetime.fromisoformat(expiration_date) < datetime.now(): return make_response(render_template('file_not_found.html'), 410) # Check if file exists full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) if not os.path.exists(full_path): return make_response(render_template('file_not_found.html'), 404) # Check if it's a text file for preview is_text_file = mime_type and (mime_type.startswith('text/') or mime_type == 'application/json') file_content = None if is_text_file: try: with open(full_path, 'r', encoding='utf-8') as f: file_content = f.read() except (UnicodeDecodeError, Exception): # If can't read as text, treat as binary is_text_file = False # Check if it's an archive file for preview is_archive_file_flag = is_archive_file(full_path, mime_type, original_filename) archive_contents = None if is_archive_file_flag: archive_contents = get_archive_contents(full_path, mime_type) # Check if it's a media file for preview is_media_file_flag, media_type = is_media_file(mime_type, original_filename) return render_template('download.html', file_id=file_id, filename=original_filename, file_size=file_size, mime_type=mime_type, expiration_date=expiration_date, is_text_file=is_text_file, file_content=file_content, is_archive_file=is_archive_file_flag, archive_contents=archive_contents, is_media_file=is_media_file_flag, media_type=media_type) @app.route('/preview/') def preview_file(file_id): """Serve media files for preview""" conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' SELECT original_filename, file_path, expiration_date, mime_type FROM files WHERE id = ? ''', (file_id,)) file_data = cursor.fetchone() conn.close() if not file_data: abort(404) original_filename, file_path, expiration_date, mime_type = file_data # Check if file has expired if datetime.fromisoformat(expiration_date) < datetime.now(): abort(410) # Check if file exists full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) if not os.path.exists(full_path): abort(404) # Only allow media files for preview is_media_file_flag, media_type = is_media_file(mime_type, original_filename) if not is_media_file_flag: abort(403) return send_from_directory(app.config['UPLOAD_FOLDER'], file_path, mimetype=mime_type) @app.route('/download//direct') def download_file_direct(file_id): conn = sqlite3.connect('files.db') cursor = conn.cursor() cursor.execute(''' SELECT original_filename, file_path, expiration_date FROM files WHERE id = ? ''', (file_id,)) file_data = cursor.fetchone() conn.close() if not file_data: return make_response(render_template('file_not_found.html'), 404) original_filename, file_path, expiration_date = file_data # Check if file has expired if datetime.fromisoformat(expiration_date) < datetime.now(): return make_response(render_template('file_not_found.html'), 410) # Check if file exists full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) if not os.path.exists(full_path): return make_response(render_template('file_not_found.html'), 404) return send_from_directory(app.config['UPLOAD_FOLDER'], file_path, as_attachment=True, download_name=original_filename) @app.route('/delete/', methods=['DELETE']) def delete_file(file_id): user_session = request.json.get('user_session') if not user_session: return jsonify({'error': 'No user session provided'}), 400 conn = sqlite3.connect('files.db') cursor = conn.cursor() # Get file info and verify ownership cursor.execute(''' SELECT file_path FROM files WHERE id = ? AND user_session = ? ''', (file_id, user_session)) file_data = cursor.fetchone() if not file_data: conn.close() return jsonify({'error': 'File not found or unauthorized'}), 404 file_path = file_data[0] # Delete from filesystem full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path) try: if os.path.exists(full_path): os.remove(full_path) except Exception as e: print(f"Error deleting file {file_path}: {e}") # Delete from database cursor.execute('DELETE FROM files WHERE id = ? AND user_session = ?', (file_id, user_session)) conn.commit() conn.close() return jsonify({'success': True}) if __name__ == '__main__': # Print configuration info print(f"🚀 Starting Flask Temporary File Upload Server") print(f"📁 Upload folder: {app.config['UPLOAD_FOLDER']}") print(f"🔑 Secret key: {app.config['SECRET_KEY'][:8]}... (32 chars)") print(f"🌐 Base URL: {app.config['BASE_URL']}") print(f"🖥️ Server: http://{HOST}:{args.port}") print(f"🔒 Max file size: {args.max_file_size}MB") print(f"⏰ Cleanup interval: 10 minutes") print(f"🐛 Debug mode: {'ON' if args.debug else 'OFF'}") print("-" * 50) init_db() # Start cleanup worker in background cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) cleanup_thread.start() # Initial cleanup cleanup_expired_files() app.run(debug=args.debug, host=HOST, port=args.port)