''' 🗄️ PostgreSQL Database Integration Database Tables: users - Stores user_id, api_key, and creation timestamp files - Stores file metadata with foreign key to users Automatic cascading delete when user is removed Indexed columns for fast lookups Stored Information: API keys and user IDs File metadata (name, size, path, upload date) User-file relationships 📁 File Organization Directory Structure: uploads/ ├── user-id-1/ │ ├── file-id-1_document.pdf │ └── file-id-2_image.jpg └── user-id-2/ └── file-id-3_data.csv Each user gets their own subdirectory for better organization and isolation. ⚙️ Setup Instructions Install dependencies: bashpip install fastapi uvicorn python-multipart asyncpg Set up PostgreSQL database: bash# Create database createdb fileserver # Set DATABASE_URL environment variable export DATABASE_URL="postgresql://username:password@localhost:5432/fileserver" Run the server: bashpython script_name.py The database tables will be created automatically on startup! 🔑 Key Features ✅ All data persists in PostgreSQL ✅ Files organized in user-specific subdirectories ✅ Automatic database initialization ✅ Connection pooling for better performance ✅ Proper foreign key relationships with cascade delete ✅ Timestamps for all uploads The DATABASE_URL can be configured via environment variable for different environments (development, production, etc.). ''' import random import string import sys from fastapi import FastAPI, File, UploadFile, HTTPException, Header, Depends from fastapi.responses import FileResponse, HTMLResponse from fastapi.security import APIKeyHeader from pathlib import Path import shutil import os from typing import List, Optional import uuid import secrets from datetime import datetime import asyncpg from contextlib import asynccontextmanager import app_config ERROR_STATUS_NO_DB_URL = 10 STYLE_MACRO = "#$STYLE$#" SCRIPT_MACRO = "#$SCRIPT$#" # Database configuration app_config.init_config() DATABASE_URL = app_config.DB_URL # Global database pool db_pool: asyncpg.Pool | None = None @asynccontextmanager async def lifespan(app: FastAPI): """Checking DB URL""" global DATABASE_URL if not DATABASE_URL: DATABASE_URL = os.getenv("DATABASE_URL", "") if not DATABASE_URL: print('DB URL does not set. DB URL is a mandatory variable.\nPlease provide it in config file or as a environment variable and restart the server') sys.exit(ERROR_STATUS_NO_DB_URL) """Manage application lifespan - startup and shutdown""" global db_pool # Startup: Create database pool and initialize tables db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) await init_database() print("✅ Database connected and initialized") yield # Shutdown: Close database pool await db_pool.close() print("👋 Database connection closed") app = FastAPI( title="Das File Server", description="Upload, save, and download files with API key authentication", lifespan=lifespan, openapi_url="/api/v1/openapi.json" if app_config.IS_DOCS_AVAILABLE else None ) # Configure upload directory UPLOAD_DIR = Path(app_config.UPLOAD_DIR) UPLOAD_DIR.mkdir(exist_ok=True) # API Key header api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) async def init_database(): """Initialize database tables""" async with db_pool.acquire() as conn: # Create users table await conn.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id UUID PRIMARY KEY, api_key VARCHAR(255) UNIQUE NOT NULL, alias VARCHAR(32) UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT NOW() ) """) # Create files table await conn.execute(""" CREATE TABLE IF NOT EXISTS files ( file_id UUID PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(user_id) ON DELETE CASCADE, original_name VARCHAR(500) NOT NULL, stored_name VARCHAR(500) NOT NULL, file_path TEXT NOT NULL, size BIGINT NOT NULL, uploaded_at TIMESTAMP DEFAULT NOW(), CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(user_id) ) """) # Create index for faster lookups await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_files_user_id ON files(user_id) """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_users_api_key ON users(api_key) """) async def get_user_id(api_key: Optional[str] = Depends(api_key_header)) -> str: """Validate API key and return user ID""" if not api_key: raise HTTPException(status_code=401, detail="Missing API key") async with db_pool.acquire() as conn: user = await conn.fetchrow( "SELECT user_id FROM users WHERE api_key = $1", api_key ) if not user: raise HTTPException(status_code=401, detail="Invalid API key") return str(user['user_id']) def get_user_directory(user_id: str) -> Path: """Get or create user's subdirectory""" user_dir = UPLOAD_DIR / user_id user_dir.mkdir(exist_ok=True) return user_dir def get_file_content(file_name): try: with open(file_name, 'r') as file: file_content = file.read() return file_content except FileNotFoundError: print(f"Error: The file '{file_name}' was not found.") except Exception as e: print(f"An error occurred: {e}") @app.get("/", response_class=HTMLResponse) async def home(): """Serve the web interface""" css = get_file_content(app_config.CSS_FILE) js = get_file_content(app_config.JS_FILE) return (get_file_content(app_config.HTML_FILE) .replace(STYLE_MACRO, css) .replace(SCRIPT_MACRO, js)) @app.post("/new-key") @app.get("/new-key") async def generate_key(): """Generate a new API key for a user""" api_key = secrets.token_urlsafe(32) try: return { "api_key": api_key, "message": "New API key generated successfully." } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to generate API key: {str(e)}") @app.post("/new-alias") @app.get("/new-alias") async def generate_alias(): """Generate a new alias""" return { "alias": await get_alias(), "message": "New Alias generated successfully." } async def get_all_aliases(): async with db_pool.acquire() as conn: aliases = await conn.fetch(""" SELECT alias FROM users """) return aliases async def get_alias(): """Getting the alias""" existing_aliases = await get_all_aliases() alias = existing_aliases[0] if existing_aliases else "" while alias in existing_aliases or alias == "": num_letters = app_config.ALIAS_NUM_LETTERS num_digits = app_config.ALIAS_NUM_DIGITS letters = [random.choice(string.ascii_letters) for _ in range(num_letters)] digits = [random.choice(string.digits) for _ in range(num_digits)] res_list = letters + digits #random.shuffle(res_list) alias = ''.join(res_list) return alias @app.post("/add-user") async def add_new_user(): """Add new user with new API key""" # Generate secure random API key api_key = secrets.token_urlsafe(32) user_id = uuid.uuid4() user_alias = get_alias() async with db_pool.acquire() as conn: try: # Store API key in database await conn.execute( "INSERT INTO users (user_id, api_key) VALUES ($1, $2, $3)", user_id, api_key, user_alias ) # Create user's directory get_user_directory(str(user_id)) return { "api_key": api_key, "user_id": str(user_id), "message": "API key generated successfully. Keep it secure!" } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to generate API key: {str(e)}") @app.post("/upload") async def upload_files( files: List[UploadFile] = File(...), user_id: str = Depends(get_user_id) ): """Upload one or multiple files (requires API key)""" uploaded_files = [] user_dir = get_user_directory(user_id) async with db_pool.acquire() as conn: for file in files: # Generate unique ID for file file_id = uuid.uuid4() stored_name = f"{file_id}_{file.filename}" file_path = user_dir / stored_name # Save file try: with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) file_size = os.path.getsize(file_path) # Store metadata in database await conn.execute(""" INSERT INTO files (file_id, user_id, original_name, stored_name, file_path, size) VALUES ($1, $2, $3, $4, $5, $6) """, file_id, uuid.UUID(user_id), file.filename, stored_name, str(file_path), file_size) uploaded_files.append({ "id": str(file_id), "filename": file.filename, "size": file_size }) except Exception as e: # Clean up file if database insert fails if file_path.exists(): os.remove(file_path) raise HTTPException(status_code=500, detail=f"Failed to upload {file.filename}: {str(e)}") return {"message": "Files uploaded successfully", "files": uploaded_files} @app.get("/files") async def list_files(user_id: str = Depends(get_user_id)): """List all files for the authenticated user""" async with db_pool.acquire() as conn: files = await conn.fetch(""" SELECT file_id, original_name, size, uploaded_at FROM files WHERE user_id = $1 ORDER BY uploaded_at DESC """, uuid.UUID(user_id)) return [ { "file_id": str(file['file_id']), "original_name": file['original_name'], "size": file['size'], "uploaded_at": file['uploaded_at'].isoformat() } for file in files ] @app.get("/download/{file_id}") async def download_file( file_id: str, api_key: Optional[str] = None, x_api_key: Optional[str] = Header(None) ): """Download a file by ID (requires API key)""" # Accept API key from query param or header key = api_key or x_api_key if not key: raise HTTPException(status_code=401, detail="Missing API key") async with db_pool.acquire() as conn: # Verify API key and get user user = await conn.fetchrow( "SELECT user_id FROM users WHERE api_key = $1", key ) if not user: raise HTTPException(status_code=401, detail="Invalid API key") # Get file info and verify ownership file_info = await conn.fetchrow(""" SELECT file_path, original_name FROM files WHERE file_id = $1 AND user_id = $2 """, uuid.UUID(file_id), user['user_id']) if not file_info: raise HTTPException(status_code=404, detail="File not found or access denied") file_path = Path(file_info['file_path']) if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") return FileResponse( path=file_path, filename=file_info['original_name'], media_type="application/octet-stream" ) @app.get("/dn/{file_id}") async def download_link( file_id: str ): """Download a file by DIRECT LINK (without API key)""" async with db_pool.acquire() as conn: # Get file info and verify ownership file_info = await conn.fetchrow(""" SELECT file_path, original_name FROM files WHERE file_id = $1 """, uuid.UUID(file_id)) if not file_info: raise HTTPException(status_code=404, detail="File not found or access denied") file_path = Path(file_info['file_path']) if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") return FileResponse( path=file_path, filename=file_info['original_name'], media_type="application/octet-stream" ) @app.delete("/files/{file_id}") async def delete_file( file_id: str, user_id: str = Depends(get_user_id) ): """Delete a file by ID (only owner can delete)""" async with db_pool.acquire() as conn: # Get file info and verify ownership file_info = await conn.fetchrow(""" SELECT file_path FROM files WHERE file_id = $1 AND user_id = $2 """, uuid.UUID(file_id), uuid.UUID(user_id)) if not file_info: raise HTTPException(status_code=404, detail="File not found or access denied") file_path = Path(file_info['file_path']) # Delete file from disk if file_path.exists(): os.remove(file_path) # Remove from database await conn.execute( "DELETE FROM files WHERE file_id = $1", uuid.UUID(file_id) ) return {"message": "File deleted successfully"} @app.get("/files/{file_id}/info") async def get_file_info( file_id: str, user_id: str = Depends(get_user_id) ): """Get file information by ID (only owner can view)""" async with db_pool.acquire() as conn: file_info = await conn.fetchrow(""" SELECT file_id, original_name, stored_name, size, uploaded_at FROM files WHERE file_id = $1 AND user_id = $2 """, uuid.UUID(file_id), uuid.UUID(user_id)) if not file_info: raise HTTPException(status_code=404, detail="File not found or access denied") return { "file_id": str(file_info['file_id']), "original_name": file_info['original_name'], "stored_name": file_info['stored_name'], "size": file_info['size'], "uploaded_at": file_info['uploaded_at'].isoformat() } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=app_config.APP_PORT)