| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- import os
- import sqlite3
- import uuid
- import hashlib
- import threading
- import time
- import secrets
- import string
- import argparse
- from datetime import datetime, timedelta
- from werkzeug.utils import secure_filename
- from flask import Flask, request, jsonify, render_template, send_from_directory, abort
- from werkzeug.security import generate_password_hash
- import re
- def generate_secret_key(length=32):
- """Generate a random secret key with specified length"""
- alphabet = string.ascii_letters + string.digits + string.punctuation
- return ''.join(secrets.choice(alphabet) for _ in range(length))
- def parse_arguments():
- """Parse command line arguments"""
- parser = argparse.ArgumentParser(description='Flask Temporary File Upload Server')
- parser.add_argument('--host', default='127.0.0.1', help='Host to bind to (default: 127.0.0.1)')
- parser.add_argument('--port', type=int, default=5000, help='Port to bind to (default: 5000)')
- parser.add_argument('--debug', action='store_true', help='Enable debug mode')
- parser.add_argument('--base-url', default=None, help='Base URL for download links (e.g., https://mysite.com)')
- parser.add_argument('--network', action='store_true', help='Bind to all network interfaces (0.0.0.0)')
- parser.add_argument('--max-file-size', type=int, default=16, help='Maximum file size in MB (default: 16)')
- return parser.parse_args()
- # Parse command line arguments
- args = parse_arguments()
- # Set host based on arguments
- if args.network:
- HOST = '0.0.0.0'
- else:
- HOST = args.host
- # Set base URL for download links
- BASE_URL = args.base_url or os.environ.get('BASE_URL', f'http://{HOST}:{args.port}')
- # Validate max file size
- if args.max_file_size <= 0:
- print("❌ Error: Maximum file size must be a positive integer")
- exit(1)
- app = Flask(__name__)
- app.config['SECRET_KEY'] = generate_secret_key(32)
- app.config['UPLOAD_FOLDER'] = 'uploads'
- app.config['MAX_CONTENT_LENGTH'] = args.max_file_size * 1024 * 1024 # Convert MB to bytes
- app.config['BASE_URL'] = BASE_URL
- # Ensure upload directory exists
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- # Database initialization
- def init_db():
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- CREATE TABLE IF NOT EXISTS files (
- id TEXT PRIMARY KEY,
- original_filename TEXT NOT NULL,
- sanitized_filename TEXT NOT NULL,
- file_path TEXT NOT NULL,
- user_session TEXT NOT NULL,
- upload_date TEXT NOT NULL,
- expiration_date TEXT NOT NULL,
- duration_hours INTEGER NOT NULL,
- file_size INTEGER NOT NULL,
- mime_type TEXT
- )
- ''')
- conn.commit()
- conn.close()
- def sanitize_filename(filename):
- """Sanitize filename for safe storage"""
- # Remove special characters and spaces
- name, ext = os.path.splitext(filename)
- sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', name)
- # Generate unique identifier
- unique_id = str(uuid.uuid4())[:8]
- return f"{sanitized}_{unique_id}{ext}"
- def cleanup_expired_files():
- """Remove expired files from database and filesystem"""
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
-
- # Get expired files
- cursor.execute('''
- SELECT file_path FROM files
- WHERE expiration_date < ?
- ''', (datetime.now().isoformat(),))
-
- expired_files = cursor.fetchall()
-
- # Delete files from filesystem
- for (file_path,) in expired_files:
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- try:
- if os.path.exists(full_path):
- os.remove(full_path)
- print(f"Deleted expired file: {file_path}")
- except Exception as e:
- print(f"Error deleting file {file_path}: {e}")
-
- # Remove from database
- cursor.execute('DELETE FROM files WHERE expiration_date < ?',
- (datetime.now().isoformat(),))
-
- conn.commit()
- conn.close()
- print(f"Cleaned up {len(expired_files)} expired files")
- def cleanup_worker():
- """Background worker to cleanup expired files every 10 minutes"""
- while True:
- time.sleep(600) # 10 minutes
- cleanup_expired_files()
- @app.route('/')
- def index():
- max_file_size = app.config['MAX_CONTENT_LENGTH'] // (1024 * 1024) # Convert bytes to MB
- return render_template('index.html', max_file_size=max_file_size)
- @app.route('/upload', methods=['POST'])
- def upload_file():
- if 'file' not in request.files:
- return jsonify({'error': 'No file provided'}), 400
-
- file = request.files['file']
- user_session = request.form.get('user_session')
- duration_hours = int(request.form.get('duration_hours', 24))
-
- if file.filename == '':
- return jsonify({'error': 'No file selected'}), 400
-
- if not user_session:
- return jsonify({'error': 'No user session provided'}), 400
-
- if file:
- # Generate file info
- file_id = str(uuid.uuid4())
- original_filename = file.filename
- sanitized_filename = sanitize_filename(original_filename)
- upload_date = datetime.now()
- expiration_date = upload_date + timedelta(hours=duration_hours)
-
- # Save file
- file_path = sanitized_filename
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- file.save(full_path)
-
- # Get file info
- file_size = os.path.getsize(full_path)
- mime_type = file.content_type
-
- # Save to database
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- INSERT INTO files (id, original_filename, sanitized_filename, file_path,
- user_session, upload_date, expiration_date, duration_hours,
- file_size, mime_type)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- ''', (file_id, original_filename, sanitized_filename, file_path, user_session,
- upload_date.isoformat(), expiration_date.isoformat(), duration_hours,
- file_size, mime_type))
- conn.commit()
- conn.close()
-
- return jsonify({
- 'success': True,
- 'file_id': file_id,
- 'original_filename': original_filename,
- 'download_url': f'{app.config["BASE_URL"]}/download/{file_id}',
- 'expiration_date': expiration_date.isoformat(),
- 'file_size': file_size
- })
- @app.route('/files/<user_session>')
- def get_user_files(user_session):
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- SELECT id, original_filename, upload_date, expiration_date,
- duration_hours, file_size, mime_type
- FROM files
- WHERE user_session = ? AND expiration_date > ?
- ORDER BY upload_date DESC
- ''', (user_session, datetime.now().isoformat()))
-
- files = cursor.fetchall()
- conn.close()
-
- file_list = []
- for file_data in files:
- file_list.append({
- 'id': file_data[0],
- 'original_filename': file_data[1],
- 'upload_date': file_data[2],
- 'expiration_date': file_data[3],
- 'duration_hours': file_data[4],
- 'file_size': file_data[5],
- 'mime_type': file_data[6],
- 'download_url': f'{app.config["BASE_URL"]}/download/{file_data[0]}'
- })
-
- return jsonify(file_list)
- @app.route('/download/<file_id>')
- def download_file(file_id):
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
- cursor.execute('''
- SELECT original_filename, file_path, expiration_date
- FROM files
- WHERE id = ?
- ''', (file_id,))
-
- file_data = cursor.fetchone()
- conn.close()
-
- if not file_data:
- abort(404)
-
- original_filename, file_path, expiration_date = file_data
-
- # Check if file has expired
- if datetime.fromisoformat(expiration_date) < datetime.now():
- abort(410) # Gone
-
- # Check if file exists
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- if not os.path.exists(full_path):
- abort(404)
-
- return send_from_directory(app.config['UPLOAD_FOLDER'], file_path,
- as_attachment=True, download_name=original_filename)
- @app.route('/delete/<file_id>', methods=['DELETE'])
- def delete_file(file_id):
- user_session = request.json.get('user_session')
-
- if not user_session:
- return jsonify({'error': 'No user session provided'}), 400
-
- conn = sqlite3.connect('files.db')
- cursor = conn.cursor()
-
- # Get file info and verify ownership
- cursor.execute('''
- SELECT file_path FROM files
- WHERE id = ? AND user_session = ?
- ''', (file_id, user_session))
-
- file_data = cursor.fetchone()
-
- if not file_data:
- conn.close()
- return jsonify({'error': 'File not found or unauthorized'}), 404
-
- file_path = file_data[0]
-
- # Delete from filesystem
- full_path = os.path.join(app.config['UPLOAD_FOLDER'], file_path)
- try:
- if os.path.exists(full_path):
- os.remove(full_path)
- except Exception as e:
- print(f"Error deleting file {file_path}: {e}")
-
- # Delete from database
- cursor.execute('DELETE FROM files WHERE id = ? AND user_session = ?',
- (file_id, user_session))
-
- conn.commit()
- conn.close()
-
- return jsonify({'success': True})
- if __name__ == '__main__':
- # Print configuration info
- print(f"🚀 Starting Flask Temporary File Upload Server")
- print(f"📁 Upload folder: {app.config['UPLOAD_FOLDER']}")
- print(f"🔑 Secret key: {app.config['SECRET_KEY'][:8]}... (32 chars)")
- print(f"🌐 Base URL: {app.config['BASE_URL']}")
- print(f"🖥️ Server: http://{HOST}:{args.port}")
- print(f"🔒 Max file size: {args.max_file_size}MB")
- print(f"⏰ Cleanup interval: 10 minutes")
- print(f"🐛 Debug mode: {'ON' if args.debug else 'OFF'}")
- print("-" * 50)
-
- init_db()
-
- # Start cleanup worker in background
- cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
- cleanup_thread.start()
-
- # Initial cleanup
- cleanup_expired_files()
-
- app.run(debug=args.debug, host=HOST, port=args.port)
|