import os import secrets import hashlib import sys from datetime import datetime, timedelta from typing import Optional, List from pathlib import Path from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File, Form, Request, Cookie from fastapi.security import HTTPBasic, HTTPBasicCredentials, APIKeyHeader from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey, BigInteger, func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session, relationship from pydantic import BaseModel, Field import uvicorn import app_config ERROR_STATUS_NO_DB_URL = 10 app_config.init_config() # Database Configuration DATABASE_URL = app_config.DB_URL if not DATABASE_URL: DATABASE_URL = os.getenv("DATABASE_URL", "") if not DATABASE_URL: print('DB URL does not set. DB URL is a mandatory variable.\nPlease provide it in config file or as an environment variable and start the server again') sys.exit(ERROR_STATUS_NO_DB_URL) UPLOAD_DIR = Path(app_config.UPLOAD_DIR) UPLOAD_DIR.mkdir(exist_ok=True) # Templates templates = Jinja2Templates(directory="templates") engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # Security security = HTTPBasic() api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) # Session storage (in-memory for demo, use Redis in production) sessions = {} # Create tables Base.metadata.create_all(bind=engine) # Database Models class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) username = Column(String(50), unique=True, index=True, nullable=False) password_hash = Column(String(256), nullable=False) is_admin = Column(Boolean, default=False) is_blocked = Column(Boolean, default=False) api_key = Column(String(64), unique=True, index=True) created_at = Column(DateTime, default=datetime.utcnow) files = relationship("FileMetadata", back_populates="owner", cascade="all, delete-orphan") class FileMetadata(Base): __tablename__ = "files" id = Column(Integer, primary_key=True, index=True) filename = Column(String(255), nullable=False) stored_filename = Column(String(255), unique=True, nullable=False) file_size = Column(BigInteger, nullable=False) content_type = Column(String(100)) share_token = Column(String(64), unique=True, index=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) uploaded_at = Column(DateTime, default=datetime.utcnow) owner = relationship("User", back_populates="files") # Pydantic Models class UserCreate(BaseModel): username: str = Field(..., min_length=3, max_length=50) password: str = Field(..., min_length=8) is_admin: bool = False class UserResponse(BaseModel): id: int username: str is_admin: bool is_blocked: bool created_at: datetime class Config: from_attributes = True class FileInfo(BaseModel): id: int filename: str file_size: int content_type: Optional[str] uploaded_at: datetime share_link: Optional[str] class Config: from_attributes = True # Password Hashing with PBKDF2-SHA256 def hash_password(password: str) -> str: """Hash password using PBKDF2-SHA256""" salt = os.urandom(32) pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) return salt.hex() + ':' + pwdhash.hex() def verify_password(stored_password: str, provided_password: str) -> bool: """Verify password against hash""" try: salt, pwdhash = stored_password.split(':') return hashlib.pbkdf2_hmac( 'sha256', provided_password.encode('utf-8'), bytes.fromhex(salt), 100000 ).hex() == pwdhash except Exception: return False def generate_api_key() -> str: """Generate a secure API key""" return secrets.token_urlsafe(48) def generate_share_token() -> str: """Generate a secure share token""" return secrets.token_urlsafe(32) def generate_session_token() -> str: """Generate a secure session token""" return secrets.token_urlsafe(32) def format_size(size_bytes: int) -> str: """Format file size in human-readable format""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB" # Add filter to Jinja2 templates.env.filters['format_size'] = format_size # Database Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() # Session Management def get_session_user(session_token: Optional[str] = Cookie(None), db: Session = Depends(get_db)) -> Optional[User]: """Get user from session token""" if not session_token or session_token not in sessions: return None user_id = sessions[session_token] user = db.query(User).filter(User.id == user_id).first() if not user or user.is_blocked: return None return user # Authentication def get_current_user_http( credentials: HTTPBasicCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """Authenticate user via HTTP Basic Auth""" user = db.query(User).filter(User.username == credentials.username).first() if not user or not verify_password(user.password_hash, credentials.password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}, ) if user.is_blocked: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is blocked" ) return user def get_current_user_api( api_key: Optional[str] = Depends(api_key_header), db: Session = Depends(get_db) ) -> User: """Authenticate user via API Key""" if not api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="API Key required" ) user = db.query(User).filter(User.api_key == api_key).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API Key" ) if user.is_blocked: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is blocked" ) return user def get_current_user( session_user: Optional[User] = Depends(get_session_user), api_key: Optional[str] = Depends(api_key_header), db: Session = Depends(get_db) ) -> User: """Get current user from session or API authentication""" if session_user: return session_user if api_key: return get_current_user_api(api_key, db) # Try HTTP Basic Auth try: from fastapi.security import HTTPBasicCredentials # This is for API endpoints raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) except: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) def require_admin(user: User = Depends(get_current_user)) -> User: """Require admin role""" if not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required" ) return user # FastAPI Application app = FastAPI( title="File Storage Service by -=:dAs:=-", description="Upload, save, download, share and remove files with for authenticated users included administrative dashboard and REST API", openapi_url="/api/v1/openapi.json" if app_config.IS_DOCS_AVAILABLE else None, version=app_config.APP_VERSION ) # Initialize default admin user @app.on_event("startup") async def startup_event(): db = SessionLocal() try: admin = db.query(User).filter(User.username == "admin").first() if not admin: admin = User( username="admin", password_hash=hash_password("admin123"), is_admin=True, api_key=generate_api_key() ) db.add(admin) db.commit() print(f"Default admin created. Username: admin, Password: admin123") print(f"Admin API Key: {admin.api_key}") finally: db.close() # Web Routes @app.get("/", response_class=HTMLResponse) async def root(session_user: Optional[User] = Depends(get_session_user)): """Root page - redirect to login or dashboard""" if session_user: return RedirectResponse(url="/dashboard", status_code=302) return RedirectResponse(url="/login", status_code=302) @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request, error: Optional[str] = None): """Login page""" return templates.TemplateResponse("login.html", { "request": request, "error": error }) @app.post("/login") async def login( request: Request, username: str = Form(...), password: str = Form(...), db: Session = Depends(get_db) ): """Process login""" user = db.query(User).filter(User.username == username).first() if not user or not verify_password(user.password_hash, password): return templates.TemplateResponse("login.html", { "request": request, "error": "Invalid username or password" }) if user.is_blocked: return templates.TemplateResponse("login.html", { "request": request, "error": "Your account has been blocked" }) # Create session session_token = generate_session_token() sessions[session_token] = user.id response = RedirectResponse(url="/dashboard", status_code=302) response.set_cookie(key="session_token", value=session_token, httponly=True) return response @app.get("/logout") async def logout(session_token: Optional[str] = Cookie(None)): """Logout""" if session_token and session_token in sessions: del sessions[session_token] response = RedirectResponse(url="/login", status_code=302) response.delete_cookie("session_token") return response @app.get("/dashboard", response_class=HTMLResponse) async def dashboard( request: Request, user: User = Depends(get_session_user), db: Session = Depends(get_db) ): """User dashboard""" if not user: return RedirectResponse(url="/login", status_code=302) files = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).order_by( FileMetadata.uploaded_at.desc()).all() return templates.TemplateResponse("user_files.html", { "request": request, "username": user.username, "is_admin": user.is_admin, "files": files, "format_size": format_size }) @app.get("/settings", response_class=HTMLResponse) async def settings_page( request: Request, user: User = Depends(get_session_user), db: Session = Depends(get_db) ): """Settings page""" if not user: return RedirectResponse(url="/login", status_code=302) file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count() return templates.TemplateResponse("settings.html", { "request": request, "username": user.username, "is_admin": user.is_admin, "api_key": user.api_key, "file_count": file_count }) @app.post("/settings/change-password") async def change_password( request: Request, current_password: str = Form(...), new_password: str = Form(...), confirm_password: str = Form(...), user: User = Depends(get_session_user), db: Session = Depends(get_db) ): """Change password""" if not user: return RedirectResponse(url="/login", status_code=302) if not verify_password(user.password_hash, current_password): file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count() return templates.TemplateResponse("settings.html", { "request": request, "username": user.username, "is_admin": user.is_admin, "api_key": user.api_key, "file_count": file_count, "error": "Current password is incorrect" }) if new_password != confirm_password: file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count() return templates.TemplateResponse("settings.html", { "request": request, "username": user.username, "is_admin": user.is_admin, "api_key": user.api_key, "file_count": file_count, "error": "New passwords do not match" }) user.password_hash = hash_password(new_password) db.commit() file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count() return templates.TemplateResponse("settings.html", { "request": request, "username": user.username, "is_admin": user.is_admin, "api_key": user.api_key, "file_count": file_count, "message": "Password changed successfully" }) @app.get("/admin", response_class=HTMLResponse) async def admin_panel( request: Request, user: User = Depends(get_session_user), db: Session = Depends(get_db) ): """Admin panel""" if not user: return RedirectResponse(url="/login", status_code=302) if not user.is_admin: return RedirectResponse(url="/dashboard", status_code=302) # Get all users with file counts users = db.query( User, func.count(FileMetadata.id).label('file_count') ).outerjoin(FileMetadata).group_by(User.id).order_by(User.created_at.desc()).all() users_list = [] for u, file_count in users: user_dict = { 'id': u.id, 'username': u.username, 'is_admin': u.is_admin, 'is_blocked': u.is_blocked, 'created_at': u.created_at, 'file_count': file_count } users_list.append(type('User', (), user_dict)()) # Statistics stats = { 'total_users': db.query(User).count(), 'active_users': db.query(User).filter(User.is_blocked == False).count(), 'blocked_users': db.query(User).filter(User.is_blocked == True).count(), 'total_files': db.query(FileMetadata).count() } return templates.TemplateResponse("admin.html", { "request": request, "username": user.username, "is_admin": user.is_admin, "users": users_list, "stats": stats }) # API Endpoints # User Management (Admin Only) @app.post("/api/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def create_user( user_data: UserCreate, db: Session = Depends(get_db), admin: User = Depends(require_admin) ): """Create a new user (Admin only)""" existing = db.query(User).filter(User.username == user_data.username).first() if existing: raise HTTPException(status_code=400, detail="Username already exists") new_user = User( username=user_data.username, password_hash=hash_password(user_data.password), is_admin=user_data.is_admin, api_key=generate_api_key() ) db.add(new_user) db.commit() db.refresh(new_user) return new_user @app.get("/api/users", response_model=List[UserResponse]) async def list_users( db: Session = Depends(get_db), admin: User = Depends(require_admin) ): """List all users (Admin only)""" users = db.query(User).all() return users @app.delete("/api/users/{user_id}") async def delete_user( user_id: int, db: Session = Depends(get_db), admin: User = Depends(require_admin) ): """Delete a user (Admin only)""" if user_id == admin.id: raise HTTPException(status_code=400, detail="Cannot delete yourself") user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") db.delete(user) db.commit() return {"message": "User deleted successfully"} @app.patch("/api/users/{user_id}/block") async def toggle_block_user( user_id: int, block: bool, db: Session = Depends(get_db), admin: User = Depends(require_admin) ): """Block or unblock a user (Admin only)""" if user_id == admin.id: raise HTTPException(status_code=400, detail="Cannot block yourself") user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") user.is_blocked = block db.commit() return {"message": f"User {'blocked' if block else 'unblocked'} successfully"} @app.get("/api/users/me/api-key") async def get_my_api_key(user: User = Depends(get_current_user)): """Get your API key""" return {"api_key": user.api_key} @app.post("/api/users/me/api-key/regenerate") async def regenerate_api_key( user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Regenerate your API key""" user.api_key = generate_api_key() db.commit() return {"api_key": user.api_key} # File Management @app.post("/api/files/upload", response_model=FileInfo) async def upload_file( file: UploadFile = File(...), user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Upload a file""" # Generate unique stored filename stored_filename = f"{secrets.token_urlsafe(16)}_{file.filename}" file_path = UPLOAD_DIR / stored_filename # Save file content = await file.read() with open(file_path, "wb") as f: f.write(content) # Create metadata file_metadata = FileMetadata( filename=file.filename, stored_filename=stored_filename, file_size=len(content), content_type=file.content_type, share_token=generate_share_token(), user_id=user.id ) db.add(file_metadata) db.commit() db.refresh(file_metadata) return FileInfo( id=file_metadata.id, filename=file_metadata.filename, file_size=file_metadata.file_size, content_type=file_metadata.content_type, uploaded_at=file_metadata.uploaded_at, share_link=f"/share/{file_metadata.share_token}" ) @app.get("/api/files", response_model=List[FileInfo]) async def list_files( user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """List user's files""" files = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).all() return [ FileInfo( id=f.id, filename=f.filename, file_size=f.file_size, content_type=f.content_type, uploaded_at=f.uploaded_at, share_link=f"/share/{f.share_token}" ) for f in files ] @app.get("/api/files/{file_id}/download") async def download_file( file_id: int, user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Download a file""" file_metadata = db.query(FileMetadata).filter( FileMetadata.id == file_id, FileMetadata.user_id == user.id ).first() if not file_metadata: raise HTTPException(status_code=404, detail="File not found") file_path = UPLOAD_DIR / file_metadata.stored_filename if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") from fastapi.responses import FileResponse as FastAPIFileResponse return FastAPIFileResponse( path=str(file_path), filename=file_metadata.filename, media_type=file_metadata.content_type ) @app.delete("/api/files/{file_id}") async def delete_file( file_id: int, user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Delete a file""" file_metadata = db.query(FileMetadata).filter( FileMetadata.id == file_id, FileMetadata.user_id == user.id ).first() if not file_metadata: raise HTTPException(status_code=404, detail="File not found") # Delete physical file file_path = UPLOAD_DIR / file_metadata.stored_filename if file_path.exists(): file_path.unlink() # Delete metadata db.delete(file_metadata) db.commit() return {"message": "File deleted successfully"} # Public Share Endpoint @app.get("/share/{share_token}") async def download_shared_file(share_token: str, db: Session = Depends(get_db)): """Download a shared file (no authentication required)""" file_metadata = db.query(FileMetadata).filter( FileMetadata.share_token == share_token ).first() if not file_metadata: raise HTTPException(status_code=404, detail="File not found") file_path = UPLOAD_DIR / file_metadata.stored_filename if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found on disk") from fastapi.responses import FileResponse as FastAPIFileResponse return FastAPIFileResponse( path=str(file_path), filename=file_metadata.filename, media_type=file_metadata.content_type ) # Health Check @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy"} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=app_config.APP_PORT)