| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- import os
- import sqlite3
- import uuid
- import hashlib
- import threading
- import time
- import secrets
- import string
- import argparse
- from datetime import datetime, timedelta
- from werkzeug.utils import secure_filename
- from flask import Flask, request, jsonify, render_template, send_from_directory, abort, make_response
- from werkzeug.security import generate_password_hash
- import re
- import zipfile
- import tarfile
- try:
- import rarfile
- RARFILE_AVAILABLE = True
- except ImportError:
- RARFILE_AVAILABLE = False
- def generate_secret_key(length=32):
- """Generate a random secret key with specified length"""
- alphabet = string.ascii_letters + string.digits + string.punctuation
- return ''.join(secrets.choice(alphabet) for _ in range(length))
- def get_archive_contents(file_path, mime_type):
- """Get contents of archive files (ZIP, TAR, RAR)"""
- try:
- # Check if it's a ZIP file
- if zipfile.is_zipfile(file_path):
- with zipfile.ZipFile(file_path, 'r') as zip_ref:
- file_list = []
- for info in zip_ref.infolist():
- if not info.is_dir():
- file_list.append({
- 'name': info.filename,
- 'size': info.file_size,
- 'compressed_size': info.compress_size,
- 'date_time': info.date_time,
- 'type': 'file'
- })
- else:
- file_list.append({
- 'name': info.filename,
- 'size': 0,
- 'type': 'directory'
- })
- return {'type': 'zip', 'files': file_list}
-
- # Check if it's a TAR file
- elif tarfile.is_tarfile(file_path):
- with tarfile.open(file_path, 'r:*') as tar_ref:
- file_list = []
- for member in tar_ref.getmembers():
- file_list.append({
- 'name': member.name,
- 'size': member.size,
- 'type': 'file' if member.isfile() else ('directory' if member.isdir() else 'other'),
- 'mode': oct(member.mode) if member.mode else None,
- 'uid': member.uid,
- 'gid': member.gid,
- 'mtime': member.mtime
- })
- return {'type': 'tar', 'files': file_list}
-
- # Check if it's a RAR file (if rarfile is available)
- elif RARFILE_AVAILABLE and mime_type and 'rar' in mime_type.lower():
- try:
- with rarfile.RarFile(file_path) as rar_ref:
- file_list = []
- for info in rar_ref.infolist():
- file_list.append({
- 'name': info.filename,
- 'size': info.file_size,
- 'compressed_size': info.compress_size,
- 'date_time': info.date_time,
- 'type': 'file'
- })
- return {'type': 'rar', 'files': file_list}
- except:
- pass
-
- return None
- except Exception as e:
- print(f"Error reading archive {file_path}: {e}")
- return None
- def is_archive_file(file_path, mime_type, filename):
- """Check if file is a supported archive format"""
- # Check by file extension
- lower_filename = filename.lower()
- if any(lower_filename.endswith(ext) for ext in ['.zip', '.tar', '.tar.gz', '.tar.bz2', '.tar.xz', '.tgz']):
- return True
-
- if RARFILE_AVAILABLE and lower_filename.endswith('.rar'):
- return True
-
- # Check by MIME type
- if mime_type:
- archive_mimes = [
- 'application/zip',
- 'application/x-zip-compressed',
- 'application/x-tar',
- 'application/x-gtar',
- 'application/x-compressed-tar',
- 'application/vnd.rar',
- 'application/x-rar-compressed'
- ]
- if any(mime in mime_type.lower() for mime in archive_mimes):
- return True
-
- # Check by actual file content
- try:
- if zipfile.is_zipfile(file_path) or tarfile.is_tarfile(file_path):
- return True
- except:
- pass
-
- return False
- def is_media_file(mime_type, filename):
- """Check if file is a supported media format (image, video, audio, pdf)"""
- if not mime_type:
- return False, None
-
- # Image files (including HEIC/HEIF)
- if mime_type.startswith('image/'):
- supported_images = ['image/jpeg', 'image/jpg', 'image/png', 'image/gif', 'image/webp', 'image/svg+xml', 'image/bmp', 'image/heic', 'image/heif']
- if mime_type in supported_images:
- return True, 'image'
-
- # Video files
- if mime_type.startswith('video/'):
- supported_videos = ['video/mp4', 'video/webm', 'video/ogg', 'video/avi', 'video/mov', 'video/quicktime']
- if mime_type in supported_videos:
- return True, 'video'
-
- # Audio files
- if mime_type.startswith('audio/'):
- supported_audio = ['audio/mpeg', 'audio/mp3', 'audio/wav', 'audio/ogg', 'audio/webm', 'audio/aac', 'audio/flac']
- if mime_type in supported_audio:
- return True, 'audio'
-
- # PDF files
- if mime_type == 'application/pdf':
- return True, 'pdf'
-
- return False, None
- def parse_arguments():
- """Parse command line arguments"""
- parser = argparse.ArgumentParser(description='Flask Temporary File Upload Server')
- parser.add_argument('--host', default='127.0.0.1', help='Host to bind to (default: 127.0.0.1)')
- parser.add_argument('--port', type=int, default=5000, help='Port to bind to (default: 5000)')
- parser.add_argument('--debug', action='store_true', help='Enable debug mode')
- parser.add_argument('--base-url', default=None, help='Base URL for download links (e.g., https://mysite.com)')
- parser.add_argument('--network', action='store_true', help='Bind to all network interfaces (0.0.0.0)')
- parser.add_argument('--max-file-size', type=int, default=16, help='Maximum file size in MB (default: 16)')
- return parser.parse_args()
- # Parse command line arguments
- args = parse_arguments()
- # Set host based on arguments
- if args.network:
- HOST = '0.0.0.0'
- else:
- HOST = args.host
- # Set base URL for download links
- BASE_URL = args.base_url or os.environ.get('BASE_URL', f'http://{HOST}:{args.port}')
- # Validate max file size
- if args.max_file_size <= 0:
- print("❌ Error: Maximum file size must be a positive integer")
- exit(1)
- app = Flask(__name__)
- app.config['SECRET_KEY'] = generate_secret_key(32)
- app.config['UPLOAD_FOLDER'] = 'uploads'
- app.config['MAX_CONTENT_LENGTH'] = args.max_file_size * 1024 * 1024 # Convert MB to bytes
- app.config['BASE_URL'] = BASE_URL
- # Ensure upload directory exists
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- # Database initialization
- def init_db():
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS files (
- id TEXT PRIMARY KEY,
- original_filename TEXT NOT NULL,
- sanitized_filename TEXT NOT NULL,
- file_path TEXT NOT NULL,
- user_session TEXT NOT NULL,
- upload_date TEXT NOT NULL,
- expiration_date TEXT NOT NULL,
- duration_hours INTEGER NOT NULL,
- file_size INTEGER NOT NULL,
- mime_type TEXT
- )
- ''')
- conn.commit()
- conn.close()
- def sanitize_filename(filename):
- """Sanitize filename for safe storage"""
- # Remove special characters and spaces
- name, ext = os.path.splitext(filename)
- sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', name)
- # Generate unique identifier
- unique_id = str(uuid.uuid4())[:8]
- return f"{sanitized}_{unique_id}{ext}"
- def cleanup_expired_files():
- """Remove expired files from database and filesystem"""
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
-
- # Get expired files
- cursor.execute('''
- SELECT file_path FROM files
- WHERE expiration_date < ?
- ''', (datetime.now().isoformat(),))
-
- expired_files = cursor.fetchall()
-
- # Delete files from filesystem
- for (file_path,) in expired_files:
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- try:
- if os.path.exists(full_path):
- os.remove(full_path)
- print(f"Deleted expired file: {file_path}")
- except Exception as e:
- print(f"Error deleting file {file_path}: {e}")
-
- # Remove from database
- cursor.execute('DELETE FROM files WHERE expiration_date < ?',
- (datetime.now().isoformat(),))
-
- conn.commit()
- conn.close()
- print(f"Cleaned up {len(expired_files)} expired files")
- def cleanup_worker():
- """Background worker to cleanup expired files every 10 minutes"""
- while True:
- time.sleep(600) # 10 minutes
- cleanup_expired_files()
- @app.route('/')
- def index():
- max_file_size = app.config['MAX_CONTENT_LENGTH'] // (1024 * 1024) # Convert bytes to MB
- return render_template('index.html', max_file_size=max_file_size)
- @app.route('/upload', methods=['POST'])
- def upload_file():
- if 'file' not in request.files:
- return jsonify({'error': 'No file provided'}), 400
-
- file = request.files['file']
- user_session = request.form.get('user_session')
- duration_hours = int(request.form.get('duration_hours', 24))
-
- if file.filename == '':
- return jsonify({'error': 'No file selected'}), 400
-
- if not user_session:
- return jsonify({'error': 'No user session provided'}), 400
-
- if file:
- # Generate file info
- file_id = str(uuid.uuid4())
- original_filename = file.filename
- sanitized_filename = sanitize_filename(original_filename)
- upload_date = datetime.now()
- expiration_date = upload_date + timedelta(hours=duration_hours)
-
- # Save file
- file_path = sanitized_filename
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- file.save(full_path)
-
- # Get file info
- file_size = os.path.getsize(full_path)
- mime_type = file.content_type
-
- # Save to database
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- INSERT INTO files (id, original_filename, sanitized_filename, file_path,
- user_session, upload_date, expiration_date, duration_hours,
- file_size, mime_type)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- ''', (file_id, original_filename, sanitized_filename, file_path, user_session,
- upload_date.isoformat(), expiration_date.isoformat(), duration_hours,
- file_size, mime_type))
- conn.commit()
- conn.close()
-
- return jsonify({
- 'success': True,
- 'file_id': file_id,
- 'original_filename': original_filename,
- 'download_url': f'{app.config["BASE_URL"]}/download/{file_id}',
- 'expiration_date': expiration_date.isoformat(),
- 'file_size': file_size
- })
- @app.route('/files/<user_session>')
- def get_user_files(user_session):
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- SELECT id, original_filename, upload_date, expiration_date,
- duration_hours, file_size, mime_type
- FROM files
- WHERE user_session = ? AND expiration_date > ?
- ORDER BY upload_date DESC
- ''', (user_session, datetime.now().isoformat()))
-
- files = cursor.fetchall()
- conn.close()
-
- file_list = []
- for file_data in files:
- file_list.append({
- 'id': file_data[0],
- 'original_filename': file_data[1],
- 'upload_date': file_data[2],
- 'expiration_date': file_data[3],
- 'duration_hours': file_data[4],
- 'file_size': file_data[5],
- 'mime_type': file_data[6],
- 'download_url': f'{app.config["BASE_URL"]}/download/{file_data[0]}'
- })
-
- return jsonify(file_list)
- @app.route('/download/<file_id>')
- def download_page(file_id):
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- SELECT original_filename, file_path, expiration_date, file_size, mime_type
- FROM files
- WHERE id = ?
- ''', (file_id,))
-
- file_data = cursor.fetchone()
- conn.close()
-
- if not file_data:
- return make_response(render_template('file_not_found.html'), 404)
-
- original_filename, file_path, expiration_date, file_size, mime_type = file_data
-
- # Check if file has expired
- if datetime.fromisoformat(expiration_date) < datetime.now():
- return make_response(render_template('file_not_found.html'), 410)
-
- # Check if file exists
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- if not os.path.exists(full_path):
- return make_response(render_template('file_not_found.html'), 404)
-
- # Check if it's a text file for preview
- is_text_file = mime_type and (mime_type.startswith('text/') or mime_type == 'application/json')
- file_content = None
-
- if is_text_file:
- try:
- with open(full_path, 'r', encoding='utf-8') as f:
- file_content = f.read()
- except (UnicodeDecodeError, Exception):
- # If can't read as text, treat as binary
- is_text_file = False
-
- # Check if it's an archive file for preview
- is_archive_file_flag = is_archive_file(full_path, mime_type, original_filename)
- archive_contents = None
-
- if is_archive_file_flag:
- archive_contents = get_archive_contents(full_path, mime_type)
-
- # Check if it's a media file for preview
- is_media_file_flag, media_type = is_media_file(mime_type, original_filename)
-
- return render_template('download.html',
- file_id=file_id,
- filename=original_filename,
- file_size=file_size,
- mime_type=mime_type,
- expiration_date=expiration_date,
- is_text_file=is_text_file,
- file_content=file_content,
- is_archive_file=is_archive_file_flag,
- archive_contents=archive_contents,
- is_media_file=is_media_file_flag,
- media_type=media_type)
- @app.route('/preview/<file_id>')
- def preview_file(file_id):
- """Serve media files for preview"""
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- SELECT original_filename, file_path, expiration_date, mime_type
- FROM files
- WHERE id = ?
- ''', (file_id,))
-
- file_data = cursor.fetchone()
- conn.close()
-
- if not file_data:
- abort(404)
-
- original_filename, file_path, expiration_date, mime_type = file_data
-
- # Check if file has expired
- if datetime.fromisoformat(expiration_date) < datetime.now():
- abort(410)
-
- # Check if file exists
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- if not os.path.exists(full_path):
- abort(404)
-
- # Only allow media files for preview
- is_media_file_flag, media_type = is_media_file(mime_type, original_filename)
- if not is_media_file_flag:
- abort(403)
-
- return send_from_directory(app.config['UPLOAD_FOLDER'], file_path, mimetype=mime_type)
- @app.route('/download/<file_id>/direct')
- def download_file_direct(file_id):
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- SELECT original_filename, file_path, expiration_date
- FROM files
- WHERE id = ?
- ''', (file_id,))
-
- file_data = cursor.fetchone()
- conn.close()
-
- if not file_data:
- return make_response(render_template('file_not_found.html'), 404)
-
- original_filename, file_path, expiration_date = file_data
-
- # Check if file has expired
- if datetime.fromisoformat(expiration_date) < datetime.now():
- return make_response(render_template('file_not_found.html'), 410)
-
- # Check if file exists
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- if not os.path.exists(full_path):
- return make_response(render_template('file_not_found.html'), 404)
-
- return send_from_directory(app.config['UPLOAD_FOLDER'], file_path,
- as_attachment=True, download_name=original_filename)
- @app.route('/delete/<file_id>', methods=['DELETE'])
- def delete_file(file_id):
- user_session = request.json.get('user_session')
-
- if not user_session:
- return jsonify({'error': 'No user session provided'}), 400
-
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
-
- # Get file info and verify ownership
- cursor.execute('''
- SELECT file_path FROM files
- WHERE id = ? AND user_session = ?
- ''', (file_id, user_session))
-
- file_data = cursor.fetchone()
-
- if not file_data:
- conn.close()
- return jsonify({'error': 'File not found or unauthorized'}), 404
-
- file_path = file_data[0]
-
- # Delete from filesystem
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- try:
- if os.path.exists(full_path):
- os.remove(full_path)
- except Exception as e:
- print(f"Error deleting file {file_path}: {e}")
-
- # Delete from database
- cursor.execute('DELETE FROM files WHERE id = ? AND user_session = ?',
- (file_id, user_session))
-
- conn.commit()
- conn.close()
-
- return jsonify({'success': True})
- if __name__ == '__main__':
- # Print configuration info
- print(f"🚀 Starting Flask Temporary File Upload Server")
- print(f"📁 Upload folder: {app.config['UPLOAD_FOLDER']}")
- print(f"🔑 Secret key: {app.config['SECRET_KEY'][:8]}... (32 chars)")
- print(f"🌐 Base URL: {app.config['BASE_URL']}")
- print(f"🖥️ Server: http://{HOST}:{args.port}")
- print(f"🔒 Max file size: {args.max_file_size}MB")
- print(f"⏰ Cleanup interval: 10 minutes")
- print(f"🐛 Debug mode: {'ON' if args.debug else 'OFF'}")
- print("-" * 50)
-
- init_db()
-
- # Start cleanup worker in background
- cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
- cleanup_thread.start()
-
- # Initial cleanup
- cleanup_expired_files()
-
- app.run(debug=args.debug, host=HOST, port=args.port)
|