752 lines
22 KiB
Python
752 lines
22 KiB
Python
import os
|
|
import secrets
|
|
import hashlib
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File, Form, Request, Cookie
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials, APIKeyHeader
|
|
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.staticfiles import StaticFiles
|
|
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, ForeignKey, BigInteger, func
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker, Session, relationship
|
|
from pydantic import BaseModel, Field
|
|
import uvicorn
|
|
|
|
import app_config
|
|
|
|
ERROR_STATUS_NO_DB_URL = 10
|
|
|
|
app_config.init_config()
|
|
|
|
# Database Configuration
|
|
DATABASE_URL = app_config.DB_URL
|
|
if not DATABASE_URL:
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "")
|
|
if not DATABASE_URL:
|
|
print('DB URL does not set. DB URL is a mandatory variable.\nPlease provide it in config file or as an environment variable and start the server again')
|
|
sys.exit(ERROR_STATUS_NO_DB_URL)
|
|
|
|
UPLOAD_DIR = Path(app_config.UPLOAD_DIR)
|
|
UPLOAD_DIR.mkdir(exist_ok=True)
|
|
|
|
# Templates
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
Base = declarative_base()
|
|
|
|
# Security
|
|
security = HTTPBasic()
|
|
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
|
|
|
|
# Session storage (in-memory for demo, use Redis in production)
|
|
sessions = {}
|
|
|
|
|
|
# Create tables
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
|
|
# Database Models
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
username = Column(String(50), unique=True, index=True, nullable=False)
|
|
password_hash = Column(String(256), nullable=False)
|
|
is_admin = Column(Boolean, default=False)
|
|
is_blocked = Column(Boolean, default=False)
|
|
api_key = Column(String(64), unique=True, index=True)
|
|
created_at = Column(DateTime, default=datetime.utcnow)
|
|
|
|
files = relationship("FileMetadata", back_populates="owner", cascade="all, delete-orphan")
|
|
|
|
|
|
class FileMetadata(Base):
|
|
__tablename__ = "files"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
filename = Column(String(255), nullable=False)
|
|
stored_filename = Column(String(255), unique=True, nullable=False)
|
|
file_size = Column(BigInteger, nullable=False)
|
|
content_type = Column(String(100))
|
|
share_token = Column(String(64), unique=True, index=True)
|
|
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
|
|
uploaded_at = Column(DateTime, default=datetime.utcnow)
|
|
|
|
owner = relationship("User", back_populates="files")
|
|
|
|
|
|
# Pydantic Models
|
|
class UserCreate(BaseModel):
|
|
username: str = Field(..., min_length=3, max_length=50)
|
|
password: str = Field(..., min_length=8)
|
|
is_admin: bool = False
|
|
|
|
|
|
class UserResponse(BaseModel):
|
|
id: int
|
|
username: str
|
|
is_admin: bool
|
|
is_blocked: bool
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class FileInfo(BaseModel):
|
|
id: int
|
|
filename: str
|
|
file_size: int
|
|
content_type: Optional[str]
|
|
uploaded_at: datetime
|
|
share_link: Optional[str]
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
# Password Hashing with PBKDF2-SHA256
|
|
def hash_password(password: str) -> str:
|
|
"""Hash password using PBKDF2-SHA256"""
|
|
salt = os.urandom(32)
|
|
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
|
return salt.hex() + ':' + pwdhash.hex()
|
|
|
|
|
|
def verify_password(stored_password: str, provided_password: str) -> bool:
|
|
"""Verify password against hash"""
|
|
try:
|
|
salt, pwdhash = stored_password.split(':')
|
|
return hashlib.pbkdf2_hmac(
|
|
'sha256',
|
|
provided_password.encode('utf-8'),
|
|
bytes.fromhex(salt),
|
|
100000
|
|
).hex() == pwdhash
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def generate_api_key() -> str:
|
|
"""Generate a secure API key"""
|
|
return secrets.token_urlsafe(48)
|
|
|
|
|
|
def generate_share_token() -> str:
|
|
"""Generate a secure share token"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
def generate_session_token() -> str:
|
|
"""Generate a secure session token"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
def format_size(size_bytes: int) -> str:
|
|
"""Format file size in human-readable format"""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.1f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.1f} TB"
|
|
|
|
|
|
# Add filter to Jinja2
|
|
templates.env.filters['format_size'] = format_size
|
|
|
|
|
|
# Database Dependency
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Session Management
|
|
def get_session_user(session_token: Optional[str] = Cookie(None), db: Session = Depends(get_db)) -> Optional[User]:
|
|
"""Get user from session token"""
|
|
if not session_token or session_token not in sessions:
|
|
return None
|
|
|
|
user_id = sessions[session_token]
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
|
|
if not user or user.is_blocked:
|
|
return None
|
|
|
|
return user
|
|
|
|
|
|
# Authentication
|
|
def get_current_user_http(
|
|
credentials: HTTPBasicCredentials = Depends(security),
|
|
db: Session = Depends(get_db)
|
|
) -> User:
|
|
"""Authenticate user via HTTP Basic Auth"""
|
|
user = db.query(User).filter(User.username == credentials.username).first()
|
|
|
|
if not user or not verify_password(user.password_hash, credentials.password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid credentials",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
|
|
if user.is_blocked:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="User account is blocked"
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
def get_current_user_api(
|
|
api_key: Optional[str] = Depends(api_key_header),
|
|
db: Session = Depends(get_db)
|
|
) -> User:
|
|
"""Authenticate user via API Key"""
|
|
if not api_key:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="API Key required"
|
|
)
|
|
|
|
user = db.query(User).filter(User.api_key == api_key).first()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid API Key"
|
|
)
|
|
|
|
if user.is_blocked:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="User account is blocked"
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
def get_current_user(
|
|
session_user: Optional[User] = Depends(get_session_user),
|
|
api_key: Optional[str] = Depends(api_key_header),
|
|
db: Session = Depends(get_db)
|
|
) -> User:
|
|
"""Get current user from session or API authentication"""
|
|
if session_user:
|
|
return session_user
|
|
|
|
if api_key:
|
|
return get_current_user_api(api_key, db)
|
|
|
|
# Try HTTP Basic Auth
|
|
try:
|
|
from fastapi.security import HTTPBasicCredentials
|
|
# This is for API endpoints
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required"
|
|
)
|
|
except:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required"
|
|
)
|
|
|
|
|
|
def require_admin(user: User = Depends(get_current_user)) -> User:
|
|
"""Require admin role"""
|
|
if not user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin privileges required"
|
|
)
|
|
return user
|
|
|
|
|
|
# FastAPI Application
|
|
app = FastAPI(
|
|
title="File Storage Service by -=:dAs:=-",
|
|
description="Upload, save, download, share and remove files with for authenticated users included administrative dashboard and REST API",
|
|
openapi_url="/api/v1/openapi.json" if app_config.IS_DOCS_AVAILABLE else None,
|
|
version=app_config.APP_VERSION
|
|
)
|
|
|
|
|
|
# Initialize default admin user
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
db = SessionLocal()
|
|
try:
|
|
admin = db.query(User).filter(User.username == "admin").first()
|
|
if not admin:
|
|
admin = User(
|
|
username="admin",
|
|
password_hash=hash_password("admin123"),
|
|
is_admin=True,
|
|
api_key=generate_api_key()
|
|
)
|
|
db.add(admin)
|
|
db.commit()
|
|
print(f"Default admin created. Username: admin, Password: admin123")
|
|
print(f"Admin API Key: {admin.api_key}")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Web Routes
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root(session_user: Optional[User] = Depends(get_session_user)):
|
|
"""Root page - redirect to login or dashboard"""
|
|
if session_user:
|
|
return RedirectResponse(url="/dashboard", status_code=302)
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request, error: Optional[str] = None):
|
|
"""Login page"""
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"error": error
|
|
})
|
|
|
|
|
|
@app.post("/login")
|
|
async def login(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Process login"""
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
if not user or not verify_password(user.password_hash, password):
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"error": "Invalid username or password"
|
|
})
|
|
|
|
if user.is_blocked:
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"error": "Your account has been blocked"
|
|
})
|
|
|
|
# Create session
|
|
session_token = generate_session_token()
|
|
sessions[session_token] = user.id
|
|
|
|
response = RedirectResponse(url="/dashboard", status_code=302)
|
|
response.set_cookie(key="session_token", value=session_token, httponly=True)
|
|
return response
|
|
|
|
|
|
@app.get("/logout")
|
|
async def logout(session_token: Optional[str] = Cookie(None)):
|
|
"""Logout"""
|
|
if session_token and session_token in sessions:
|
|
del sessions[session_token]
|
|
|
|
response = RedirectResponse(url="/login", status_code=302)
|
|
response.delete_cookie("session_token")
|
|
return response
|
|
|
|
|
|
@app.get("/dashboard", response_class=HTMLResponse)
|
|
async def dashboard(
|
|
request: Request,
|
|
user: User = Depends(get_session_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""User dashboard"""
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
files = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).order_by(
|
|
FileMetadata.uploaded_at.desc()).all()
|
|
|
|
return templates.TemplateResponse("user_files.html", {
|
|
"request": request,
|
|
"username": user.username,
|
|
"is_admin": user.is_admin,
|
|
"files": files,
|
|
"format_size": format_size
|
|
})
|
|
|
|
|
|
@app.get("/settings", response_class=HTMLResponse)
|
|
async def settings_page(
|
|
request: Request,
|
|
user: User = Depends(get_session_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Settings page"""
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count()
|
|
|
|
return templates.TemplateResponse("settings.html", {
|
|
"request": request,
|
|
"username": user.username,
|
|
"is_admin": user.is_admin,
|
|
"api_key": user.api_key,
|
|
"file_count": file_count
|
|
})
|
|
|
|
|
|
@app.post("/settings/change-password")
|
|
async def change_password(
|
|
request: Request,
|
|
current_password: str = Form(...),
|
|
new_password: str = Form(...),
|
|
confirm_password: str = Form(...),
|
|
user: User = Depends(get_session_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Change password"""
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
if not verify_password(user.password_hash, current_password):
|
|
file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count()
|
|
return templates.TemplateResponse("settings.html", {
|
|
"request": request,
|
|
"username": user.username,
|
|
"is_admin": user.is_admin,
|
|
"api_key": user.api_key,
|
|
"file_count": file_count,
|
|
"error": "Current password is incorrect"
|
|
})
|
|
|
|
if new_password != confirm_password:
|
|
file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count()
|
|
return templates.TemplateResponse("settings.html", {
|
|
"request": request,
|
|
"username": user.username,
|
|
"is_admin": user.is_admin,
|
|
"api_key": user.api_key,
|
|
"file_count": file_count,
|
|
"error": "New passwords do not match"
|
|
})
|
|
|
|
user.password_hash = hash_password(new_password)
|
|
db.commit()
|
|
|
|
file_count = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).count()
|
|
return templates.TemplateResponse("settings.html", {
|
|
"request": request,
|
|
"username": user.username,
|
|
"is_admin": user.is_admin,
|
|
"api_key": user.api_key,
|
|
"file_count": file_count,
|
|
"message": "Password changed successfully"
|
|
})
|
|
|
|
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
async def admin_panel(
|
|
request: Request,
|
|
user: User = Depends(get_session_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Admin panel"""
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
if not user.is_admin:
|
|
return RedirectResponse(url="/dashboard", status_code=302)
|
|
|
|
# Get all users with file counts
|
|
users = db.query(
|
|
User,
|
|
func.count(FileMetadata.id).label('file_count')
|
|
).outerjoin(FileMetadata).group_by(User.id).order_by(User.created_at.desc()).all()
|
|
|
|
users_list = []
|
|
for u, file_count in users:
|
|
user_dict = {
|
|
'id': u.id,
|
|
'username': u.username,
|
|
'is_admin': u.is_admin,
|
|
'is_blocked': u.is_blocked,
|
|
'created_at': u.created_at,
|
|
'file_count': file_count
|
|
}
|
|
users_list.append(type('User', (), user_dict)())
|
|
|
|
# Statistics
|
|
stats = {
|
|
'total_users': db.query(User).count(),
|
|
'active_users': db.query(User).filter(User.is_blocked == False).count(),
|
|
'blocked_users': db.query(User).filter(User.is_blocked == True).count(),
|
|
'total_files': db.query(FileMetadata).count()
|
|
}
|
|
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"username": user.username,
|
|
"is_admin": user.is_admin,
|
|
"users": users_list,
|
|
"stats": stats
|
|
})
|
|
|
|
|
|
# API Endpoints
|
|
|
|
# User Management (Admin Only)
|
|
@app.post("/api/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_user(
|
|
user_data: UserCreate,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(require_admin)
|
|
):
|
|
"""Create a new user (Admin only)"""
|
|
existing = db.query(User).filter(User.username == user_data.username).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
|
|
new_user = User(
|
|
username=user_data.username,
|
|
password_hash=hash_password(user_data.password),
|
|
is_admin=user_data.is_admin,
|
|
api_key=generate_api_key()
|
|
)
|
|
|
|
db.add(new_user)
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
return new_user
|
|
|
|
|
|
@app.get("/api/users", response_model=List[UserResponse])
|
|
async def list_users(
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(require_admin)
|
|
):
|
|
"""List all users (Admin only)"""
|
|
users = db.query(User).all()
|
|
return users
|
|
|
|
|
|
@app.delete("/api/users/{user_id}")
|
|
async def delete_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(require_admin)
|
|
):
|
|
"""Delete a user (Admin only)"""
|
|
if user_id == admin.id:
|
|
raise HTTPException(status_code=400, detail="Cannot delete yourself")
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
db.delete(user)
|
|
db.commit()
|
|
|
|
return {"message": "User deleted successfully"}
|
|
|
|
|
|
@app.patch("/api/users/{user_id}/block")
|
|
async def toggle_block_user(
|
|
user_id: int,
|
|
block: bool,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(require_admin)
|
|
):
|
|
"""Block or unblock a user (Admin only)"""
|
|
if user_id == admin.id:
|
|
raise HTTPException(status_code=400, detail="Cannot block yourself")
|
|
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
user.is_blocked = block
|
|
db.commit()
|
|
|
|
return {"message": f"User {'blocked' if block else 'unblocked'} successfully"}
|
|
|
|
|
|
@app.get("/api/users/me/api-key")
|
|
async def get_my_api_key(user: User = Depends(get_current_user)):
|
|
"""Get your API key"""
|
|
return {"api_key": user.api_key}
|
|
|
|
|
|
@app.post("/api/users/me/api-key/regenerate")
|
|
async def regenerate_api_key(
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Regenerate your API key"""
|
|
user.api_key = generate_api_key()
|
|
db.commit()
|
|
return {"api_key": user.api_key}
|
|
|
|
|
|
# File Management
|
|
@app.post("/api/files/upload", response_model=FileInfo)
|
|
async def upload_file(
|
|
file: UploadFile = File(...),
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Upload a file"""
|
|
# Generate unique stored filename
|
|
stored_filename = f"{secrets.token_urlsafe(16)}_{file.filename}"
|
|
file_path = UPLOAD_DIR / stored_filename
|
|
|
|
# Save file
|
|
content = await file.read()
|
|
with open(file_path, "wb") as f:
|
|
f.write(content)
|
|
|
|
# Create metadata
|
|
file_metadata = FileMetadata(
|
|
filename=file.filename,
|
|
stored_filename=stored_filename,
|
|
file_size=len(content),
|
|
content_type=file.content_type,
|
|
share_token=generate_share_token(),
|
|
user_id=user.id
|
|
)
|
|
|
|
db.add(file_metadata)
|
|
db.commit()
|
|
db.refresh(file_metadata)
|
|
|
|
return FileInfo(
|
|
id=file_metadata.id,
|
|
filename=file_metadata.filename,
|
|
file_size=file_metadata.file_size,
|
|
content_type=file_metadata.content_type,
|
|
uploaded_at=file_metadata.uploaded_at,
|
|
share_link=f"/share/{file_metadata.share_token}"
|
|
)
|
|
|
|
|
|
@app.get("/api/files", response_model=List[FileInfo])
|
|
async def list_files(
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""List user's files"""
|
|
files = db.query(FileMetadata).filter(FileMetadata.user_id == user.id).all()
|
|
|
|
return [
|
|
FileInfo(
|
|
id=f.id,
|
|
filename=f.filename,
|
|
file_size=f.file_size,
|
|
content_type=f.content_type,
|
|
uploaded_at=f.uploaded_at,
|
|
share_link=f"/share/{f.share_token}"
|
|
)
|
|
for f in files
|
|
]
|
|
|
|
|
|
@app.get("/api/files/{file_id}/download")
|
|
async def download_file(
|
|
file_id: int,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Download a file"""
|
|
file_metadata = db.query(FileMetadata).filter(
|
|
FileMetadata.id == file_id,
|
|
FileMetadata.user_id == user.id
|
|
).first()
|
|
|
|
if not file_metadata:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
file_path = UPLOAD_DIR / file_metadata.stored_filename
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="File not found on disk")
|
|
|
|
from fastapi.responses import FileResponse as FastAPIFileResponse
|
|
return FastAPIFileResponse(
|
|
path=str(file_path),
|
|
filename=file_metadata.filename,
|
|
media_type=file_metadata.content_type
|
|
)
|
|
|
|
|
|
@app.delete("/api/files/{file_id}")
|
|
async def delete_file(
|
|
file_id: int,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Delete a file"""
|
|
file_metadata = db.query(FileMetadata).filter(
|
|
FileMetadata.id == file_id,
|
|
FileMetadata.user_id == user.id
|
|
).first()
|
|
|
|
if not file_metadata:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
# Delete physical file
|
|
file_path = UPLOAD_DIR / file_metadata.stored_filename
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
|
|
# Delete metadata
|
|
db.delete(file_metadata)
|
|
db.commit()
|
|
|
|
return {"message": "File deleted successfully"}
|
|
|
|
|
|
# Public Share Endpoint
|
|
@app.get("/share/{share_token}")
|
|
async def download_shared_file(share_token: str, db: Session = Depends(get_db)):
|
|
"""Download a shared file (no authentication required)"""
|
|
file_metadata = db.query(FileMetadata).filter(
|
|
FileMetadata.share_token == share_token
|
|
).first()
|
|
|
|
if not file_metadata:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
file_path = UPLOAD_DIR / file_metadata.stored_filename
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="File not found on disk")
|
|
|
|
from fastapi.responses import FileResponse as FastAPIFileResponse
|
|
return FastAPIFileResponse(
|
|
path=str(file_path),
|
|
filename=file_metadata.filename,
|
|
media_type=file_metadata.content_type
|
|
)
|
|
|
|
|
|
# Health Check
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint"""
|
|
return {"status": "healthy"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(app, host="0.0.0.0", port=app_config.APP_PORT) |