Initial commit

This commit is contained in:
Anry Das 2025-10-05 16:42:27 +03:00
commit d2df6a005b
9 changed files with 906 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/configs/secrets.yaml
/uploads/

56
app_config.py Normal file
View File

@ -0,0 +1,56 @@
import os
from config_file import read_config as read_cfg
APP_VERSION="1.0"
SCRIPT_PATH = os.path.dirname(__file__)
CONFIGS_DIR = SCRIPT_PATH + "/configs"
CONFIG_FILE_NAME = CONFIGS_DIR + "/server_config.yaml"
API_KEY = ''
UPLOAD_DIR = 'uploads'
DB_URL = ''
APP_PORT = 25100
IS_DEBUG = False
IS_DOCS_AVAILABLE = False
HTML_FILE = 'html/main.html'
CSS_FILE = 'css/main.css'
JS_FILE = 'js/main.js'
ALIAS_NUM_LETTERS = 3
ALIAS_NUM_DIGITS = 4
"""
For 3 + 4 defaults there are 1 406 080 000 possible values:
Кількість варіантів для кожної літери (великої або малої) дорівнює 52 (26 великих + 26 малих).
Кількість комбінацій для трьох літер: 52 * 52 * 52 = 140 608.
Кількість варіантів для кожної цифри: 10 (0-9).
Кількість комбінацій для чотирьох цифр: 10 * 10 * 10 * 10 = 10 000.
Загальна кількість комбінацій: 140 608 * 10 000 = 1 406 080 000.
"""
def read_app_config():
j, _ = read_cfg(CONFIG_FILE_NAME)
return j
def get_config_value(cfg, section, key, default):
return cfg[section][key] if section in cfg and key in cfg[section] else default
def parse_config(cfg):
global UPLOAD_DIR, APP_PORT, IS_DEBUG, DB_URL, ALIAS_NUM_LETTERS, ALIAS_NUM_DIGITS\
, IS_DOCS_AVAILABLE, HTML_FILE, CSS_FILE, JS_FILE
UPLOAD_DIR = get_config_value(cfg, 'server', 'directory', UPLOAD_DIR)
APP_PORT = get_config_value(cfg, 'server', 'port', APP_PORT)
DB_URL = get_config_value(cfg, 'server', 'db_url', DB_URL)
IS_DEBUG = get_config_value(cfg, 'server', 'debug', IS_DEBUG)
ALIAS_NUM_LETTERS = get_config_value(cfg, 'server', 'alias_letters', ALIAS_NUM_LETTERS)
ALIAS_NUM_DIGITS = get_config_value(cfg, 'server', 'alias_digits', ALIAS_NUM_DIGITS)
IS_DOCS_AVAILABLE = get_config_value(cfg, 'server', 'docs_available', IS_DOCS_AVAILABLE)
HTML_FILE = get_config_value(cfg, 'interface', 'html', HTML_FILE)
CSS_FILE = get_config_value(cfg, 'interface', 'css', CSS_FILE)
JS_FILE = get_config_value(cfg, 'interface', 'js', JS_FILE)
def init_config():
config = read_app_config()
parse_config(config)
if __name__ == '__main__':
init_config()

71
config_file.py Executable file
View File

@ -0,0 +1,71 @@
import json
import os
import yaml
def read_config(name):
if not os.path.isfile(name):
raise Exception(f"File {name} doesn't exists")
filename, ext = os.path.splitext(name)
if 'json' in ext:
return read_json(name), os.path.getmtime(name)
elif 'properties' in ext:
return read_prop(name), os.path.getmtime(name)
elif 'yaml' in ext or 'yml' in ext:
return read_yaml(name), os.path.getmtime(name)
else:
raise Exception("Wrong file type")
def read_json(name):
with open(name, 'r', encoding='utf-8') as f:
j_conf = json.load(f)
conf = {}
for key, value in j_conf.items():
conf[key] = value
return conf
def read_prop(filepath, sep='=', comment_char='#'):
conf = {}
with open(filepath, "rt", encoding='utf-8') as f:
for line in f:
l = line.strip()
if l and not l.startswith(comment_char):
key_value = l.split(sep)
key = key_value[0].strip()
value = sep.join(key_value[1:]).strip().strip('"')
conf[key] = value
return conf
def read_yaml(name, secrets_file='secrets.yaml'):
# Load secrets first
secrets = {}
secrets_path = os.path.join(os.path.dirname(name), secrets_file)
if os.path.exists(secrets_path):
with open(secrets_path, 'r', encoding='utf-8') as f:
secrets = yaml.safe_load(f) or {}
# Define a custom constructor for !secret tag
def secret_constructor(loader, node):
secret_key = loader.construct_scalar(node)
if secret_key not in secrets:
raise ValueError(f"Secret '{secret_key}' not found in {secrets_file}")
return secrets[secret_key]
# Register the custom constructor
yaml.add_constructor('!secret', secret_constructor, Loader=yaml.SafeLoader)
# Load the main configuration
conf = {}
with open(name, 'r', encoding='utf-8') as f:
y_conf = yaml.safe_load(f)
if y_conf:
for key, value in y_conf.items():
conf[key] = value
return conf
def main():
pass
if __name__ == "__main__":
main()

View File

@ -0,0 +1,13 @@
server:
port: !secret port
directory: 'uploads'
db_url: !secret db_url
alias_letters: 3
alias_digits: 4
debug: false
docs_available: false
interface:
html: 'html/main.html'
css: 'css/main.css'
script: 'js/main.js'

64
css/main.css Normal file
View File

@ -0,0 +1,64 @@
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 50px auto;
padding: 20px;
}
h1 { color: #333; }
.section {
background: #f5f5f5;
padding: 20px;
margin: 20px 0;
border-radius: 8px;
}
input[type="file"], input[type="text"] {
margin: 10px 0;
padding: 8px;
width: 100%;
max-width: 400px;
}
button {
background: #007bff;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
margin: 5px;
}
button:hover {
background: #0056b3;
}
.file-list {
list-style: none;
padding: 0;
}
.file-item {
background: white;
padding: 10px;
margin: 5px 0;
border-radius: 4px;
display: flex;
justify-content: space-between;
align-items: center;
}
.message {
padding: 10px;
margin: 10px 0;
border-radius: 4px;
}
.success { background: #d4edda; color: #155724; }
.error { background: #f8d7da; color: #721c24; }
.info { background: #d1ecf1; color: #0c5460; }
.hidden { display: none; }
.api-key-display {
background: white;
padding: 10px;
border-radius: 4px;
font-family: monospace;
word-break: break-all;
}
.file-date {
font-size: 0.85em;
color: #676;
}

51
html/main.html Normal file
View File

@ -0,0 +1,51 @@
<!DOCTYPE html>
<html>
<head>
<title>Das File Server</title>
<!--<link rel="stylesheet" href="/main.css">-->
<style>
#$STYLE$#
</style>
</head>
<body>
<h1>🔐 Secure File Server by -=:dAs:=-</h1>
<div class="section" id="authSection">
<h2>API Key Management</h2>
<div id="noKeySection">
<!-- Remove it section in prod!
<button onclick="generateKey()">Generate New API Key</button>
<p>or</p>
-->
<input type="text" id="existingKey" placeholder="Enter your existing API key">
<button onclick="useExistingKey()">Use This Key</button>
</div>
<div id="hasKeySection" class="hidden">
<div class="message info">
<strong>Your API Key:</strong>
<div class="api-key-display" id="apiKeyDisplay"></div>
<small>⚠️ Save this key securely! You won't be able to see it again.</small>
</div>
<button onclick="clearKey()" style="background: #dc3545;">Logout</button>
</div>
<div id="authMessage"></div>
</div>
<div class="section hidden" id="uploadSection">
<h2>Upload Files</h2>
<input type="file" id="fileInput" multiple>
<button onclick="uploadFiles()">Upload</button>
<div id="uploadMessage"></div>
</div>
<div class="section hidden" id="filesSection">
<h2>Your Files</h2>
<button onclick="loadFiles()">Refresh List</button>
<ul class="file-list" id="fileList"></ul>
</div>
<script>
#$SCRIPT$#
</script>
</body>
</html>

171
js/main.js Normal file
View File

@ -0,0 +1,171 @@
let apiKey = localStorage.getItem('apiKey');
function updateUI() {
const hasKey = !!apiKey;
document.getElementById('noKeySection').classList.toggle('hidden', hasKey);
document.getElementById('hasKeySection').classList.toggle('hidden', !hasKey);
document.getElementById('uploadSection').classList.toggle('hidden', !hasKey);
document.getElementById('filesSection').classList.toggle('hidden', !hasKey);
if (hasKey) {
document.getElementById('apiKeyDisplay').textContent = apiKey;
loadFiles();
}
}
async function generateKey() {
const messageDiv = document.getElementById('authMessage');
try {
const response = await fetch('/add-user', {
method: 'POST'
});
const result = await response.json();
if (response.ok) {
apiKey = result.api_key;
localStorage.setItem('apiKey', apiKey);
messageDiv.innerHTML = '<div class="message success">API key generated successfully!</div>';
updateUI();
} else {
messageDiv.innerHTML = `<div class="message error">${result.detail}</div>`;
}
} catch (error) {
messageDiv.innerHTML = '<div class="message error">Failed to generate key</div>';
}
}
function useExistingKey() {
const key = document.getElementById('existingKey').value.trim();
const messageDiv = document.getElementById('authMessage');
if (!key) {
messageDiv.innerHTML = '<div class="message error">Please enter an API key</div>';
return;
}
apiKey = key;
localStorage.setItem('apiKey', apiKey);
messageDiv.innerHTML = '<div class="message success">API key set!</div>';
document.getElementById('existingKey').value = '';
updateUI();
}
function clearKey() {
apiKey = null;
localStorage.removeItem('apiKey');
document.getElementById('authMessage').innerHTML = '';
updateUI();
}
async function uploadFiles() {
const input = document.getElementById('fileInput');
const files = input.files;
const messageDiv = document.getElementById('uploadMessage');
if (files.length === 0) {
messageDiv.innerHTML = '<div class="message error">Please select files</div>';
return;
}
const formData = new FormData();
for (let file of files) {
formData.append('files', file);
}
try {
const response = await fetch('/upload', {
method: 'POST',
headers: {
'X-API-Key': apiKey
},
body: formData
});
const result = await response.json();
if (response.ok) {
messageDiv.innerHTML = '<div class="message success">Files uploaded successfully!</div>';
input.value = '';
loadFiles();
} else {
messageDiv.innerHTML = `<div class="message error">${result.detail}</div>`;
}
} catch (error) {
messageDiv.innerHTML = '<div class="message error">Upload failed</div>';
}
}
async function loadFiles() {
if (!apiKey) return;
try {
const response = await fetch('/files', {
headers: {
'X-API-Key': apiKey
}
});
const files = await response.json();
const fileList = document.getElementById('fileList');
if (response.status === 401) {
document.getElementById('authMessage').innerHTML = '<div class="message error">Invalid API key</div>';
clearKey();
return;
}
if (files.length === 0) {
fileList.innerHTML = '<li>No files available</li>';
return;
}
fileList.innerHTML = files.map(file => `
<li class="file-item">
<div>
<div><strong>${file.original_name}</strong> (${formatBytes(file.size)})&nbsp;<a href="/dn/${file.file_id}">Direct link to file</a></div>
<div class="file-date">Uploaded: ${new Date(file.uploaded_at).toLocaleString()}</div>
</div>
<div>
<button onclick="downloadFile('${file.file_id}', '${file.original_name}')">Download</button>
<button onclick="deleteFile('${file.file_id}')" style="background: #dc3545;">Delete</button>
</div>
</li>
`).join('');
} catch (error) {
console.error('Failed to load files:', error);
}
}
async function downloadFile(fileId, filename) {
window.location.href = `/download/${fileId}?api_key=${apiKey}`;
}
async function deleteFile(fileId) {
if (!confirm('Are you sure you want to delete this file?')) return;
try {
const response = await fetch(`/files/${fileId}`, {
method: 'DELETE',
headers: {
'X-API-Key': apiKey
}
});
if (response.ok) {
loadFiles();
} else {
alert('Failed to delete file');
}
} catch (error) {
alert('Failed to delete file');
}
}
function formatBytes(bytes) {
if (bytes === 0) return '0 Bytes';
const k = 1024;
const sizes = ['Bytes', 'KB', 'MB', 'GB'];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return Math.round(bytes / Math.pow(k, i) * 100) / 100 + ' ' + sizes[i];
}
// Initialize UI
updateUI();

472
main.py Normal file
View File

@ -0,0 +1,472 @@
'''
🗄 PostgreSQL Database Integration
Database Tables:
users - Stores user_id, api_key, and creation timestamp
files - Stores file metadata with foreign key to users
Automatic cascading delete when user is removed
Indexed columns for fast lookups
Stored Information:
API keys and user IDs
File metadata (name, size, path, upload date)
User-file relationships
📁 File Organization
Directory Structure:
uploads/
user-id-1/
file-id-1_document.pdf
file-id-2_image.jpg
user-id-2/
file-id-3_data.csv
Each user gets their own subdirectory for better organization and isolation.
Setup Instructions
Install dependencies:
bashpip install fastapi uvicorn python-multipart asyncpg
Set up PostgreSQL database:
bash# Create database
createdb fileserver
# Set DATABASE_URL environment variable
export DATABASE_URL="postgresql://username:password@localhost:5432/fileserver"
Run the server:
bashpython script_name.py
The database tables will be created automatically on startup!
🔑 Key Features
All data persists in PostgreSQL
Files organized in user-specific subdirectories
Automatic database initialization
Connection pooling for better performance
Proper foreign key relationships with cascade delete
Timestamps for all uploads
The DATABASE_URL can be configured via environment variable for different environments (development, production, etc.).
'''
import random
import string
import sys
from fastapi import FastAPI, File, UploadFile, HTTPException, Header, Depends
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.security import APIKeyHeader
from pathlib import Path
import shutil
import os
from typing import List, Optional
import uuid
import secrets
from datetime import datetime
import asyncpg
from contextlib import asynccontextmanager
import app_config
ERROR_STATUS_NO_DB_URL = 10
STYLE_MACRO = "#$STYLE$#"
SCRIPT_MACRO = "#$SCRIPT$#"
# Database configuration
app_config.init_config()
DATABASE_URL = app_config.DB_URL
# Global database pool
db_pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Checking DB URL"""
global DATABASE_URL
if not DATABASE_URL:
DATABASE_URL = os.getenv("DATABASE_URL", "")
if not DATABASE_URL:
print('DB URL does not set. DB URL is a mandatory variable.\nPlease provide it in config file or as a environment variable and restart the server')
sys.exit(ERROR_STATUS_NO_DB_URL)
"""Manage application lifespan - startup and shutdown"""
global db_pool
# Startup: Create database pool and initialize tables
db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10)
await init_database()
print("✅ Database connected and initialized")
yield
# Shutdown: Close database pool
await db_pool.close()
print("👋 Database connection closed")
app = FastAPI(
title="Das File Server",
description="Upload, save, and download files with API key authentication",
lifespan=lifespan,
openapi_url="/api/v1/openapi.json" if app_config.IS_DOCS_AVAILABLE else None
)
# Configure upload directory
UPLOAD_DIR = Path(app_config.UPLOAD_DIR)
UPLOAD_DIR.mkdir(exist_ok=True)
# API Key header
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def init_database():
"""Initialize database tables"""
async with db_pool.acquire() as conn:
# Create users table
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id UUID PRIMARY KEY,
api_key VARCHAR(255) UNIQUE NOT NULL,
alias VARCHAR(32) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
)
""")
# Create files table
await conn.execute("""
CREATE TABLE IF NOT EXISTS files (
file_id UUID PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
original_name VARCHAR(500) NOT NULL,
stored_name VARCHAR(500) NOT NULL,
file_path TEXT NOT NULL,
size BIGINT NOT NULL,
uploaded_at TIMESTAMP DEFAULT NOW(),
CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(user_id)
)
""")
# Create index for faster lookups
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_files_user_id ON files(user_id)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_users_api_key ON users(api_key)
""")
async def get_user_id(api_key: Optional[str] = Depends(api_key_header)) -> str:
"""Validate API key and return user ID"""
if not api_key:
raise HTTPException(status_code=401, detail="Missing API key")
async with db_pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT user_id FROM users WHERE api_key = $1",
api_key
)
if not user:
raise HTTPException(status_code=401, detail="Invalid API key")
return str(user['user_id'])
def get_user_directory(user_id: str) -> Path:
"""Get or create user's subdirectory"""
user_dir = UPLOAD_DIR / user_id
user_dir.mkdir(exist_ok=True)
return user_dir
def get_file_content(file_name):
try:
with open(file_name, 'r') as file:
file_content = file.read()
return file_content
except FileNotFoundError:
print(f"Error: The file '{file_name}' was not found.")
except Exception as e:
print(f"An error occurred: {e}")
@app.get("/", response_class=HTMLResponse)
async def home():
"""Serve the web interface"""
css = get_file_content(app_config.CSS_FILE)
js = get_file_content(app_config.JS_FILE)
return (get_file_content(app_config.HTML_FILE)
.replace(STYLE_MACRO, css)
.replace(SCRIPT_MACRO, js))
@app.post("/new-key")
@app.get("/new-key")
async def generate_key():
"""Generate a new API key for a user"""
api_key = secrets.token_urlsafe(32)
try:
return {
"api_key": api_key,
"message": "New API key generated successfully."
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to generate API key: {str(e)}")
@app.post("/new-alias")
@app.get("/new-alias")
async def generate_alias():
"""Generate a new alias"""
return {
"alias": await get_alias(),
"message": "New Alias generated successfully."
}
async def get_all_aliases():
async with db_pool.acquire() as conn:
aliases = await conn.fetch("""
SELECT alias FROM users
""")
return aliases
async def get_alias():
"""Getting the alias"""
existing_aliases = await get_all_aliases()
alias = existing_aliases[0] if existing_aliases else ""
while alias in existing_aliases or alias == "":
num_letters = app_config.ALIAS_NUM_LETTERS
num_digits = app_config.ALIAS_NUM_DIGITS
letters = [random.choice(string.ascii_letters) for _ in range(num_letters)]
digits = [random.choice(string.digits) for _ in range(num_digits)]
res_list = letters + digits
#random.shuffle(res_list)
alias = ''.join(res_list)
return alias
@app.post("/add-user")
async def add_new_user():
"""Add new user with new API key"""
# Generate secure random API key
api_key = secrets.token_urlsafe(32)
user_id = uuid.uuid4()
user_alias = get_alias()
async with db_pool.acquire() as conn:
try:
# Store API key in database
await conn.execute(
"INSERT INTO users (user_id, api_key) VALUES ($1, $2, $3)",
user_id, api_key, user_alias
)
# Create user's directory
get_user_directory(str(user_id))
return {
"api_key": api_key,
"user_id": str(user_id),
"message": "API key generated successfully. Keep it secure!"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to generate API key: {str(e)}")
@app.post("/upload")
async def upload_files(
files: List[UploadFile] = File(...),
user_id: str = Depends(get_user_id)
):
"""Upload one or multiple files (requires API key)"""
uploaded_files = []
user_dir = get_user_directory(user_id)
async with db_pool.acquire() as conn:
for file in files:
# Generate unique ID for file
file_id = uuid.uuid4()
stored_name = f"{file_id}_{file.filename}"
file_path = user_dir / stored_name
# Save file
try:
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
file_size = os.path.getsize(file_path)
# Store metadata in database
await conn.execute("""
INSERT INTO files (file_id, user_id, original_name, stored_name, file_path, size)
VALUES ($1, $2, $3, $4, $5, $6)
""", file_id, uuid.UUID(user_id), file.filename, stored_name, str(file_path), file_size)
uploaded_files.append({
"id": str(file_id),
"filename": file.filename,
"size": file_size
})
except Exception as e:
# Clean up file if database insert fails
if file_path.exists():
os.remove(file_path)
raise HTTPException(status_code=500, detail=f"Failed to upload {file.filename}: {str(e)}")
return {"message": "Files uploaded successfully", "files": uploaded_files}
@app.get("/files")
async def list_files(user_id: str = Depends(get_user_id)):
"""List all files for the authenticated user"""
async with db_pool.acquire() as conn:
files = await conn.fetch("""
SELECT file_id, original_name, size, uploaded_at
FROM files
WHERE user_id = $1
ORDER BY uploaded_at DESC
""", uuid.UUID(user_id))
return [
{
"file_id": str(file['file_id']),
"original_name": file['original_name'],
"size": file['size'],
"uploaded_at": file['uploaded_at'].isoformat()
}
for file in files
]
@app.get("/download/{file_id}")
async def download_file(
file_id: str,
api_key: Optional[str] = None,
x_api_key: Optional[str] = Header(None)
):
"""Download a file by ID (requires API key)"""
# Accept API key from query param or header
key = api_key or x_api_key
if not key:
raise HTTPException(status_code=401, detail="Missing API key")
async with db_pool.acquire() as conn:
# Verify API key and get user
user = await conn.fetchrow(
"SELECT user_id FROM users WHERE api_key = $1",
key
)
if not user:
raise HTTPException(status_code=401, detail="Invalid API key")
# Get file info and verify ownership
file_info = await conn.fetchrow("""
SELECT file_path, original_name
FROM files
WHERE file_id = $1 AND user_id = $2
""", uuid.UUID(file_id), user['user_id'])
if not file_info:
raise HTTPException(status_code=404, detail="File not found or access denied")
file_path = Path(file_info['file_path'])
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
path=file_path,
filename=file_info['original_name'],
media_type="application/octet-stream"
)
@app.get("/dn/{file_id}")
async def download_link(
file_id: str
):
"""Download a file by LINK (without API key)"""
async with db_pool.acquire() as conn:
# Get file info and verify ownership
file_info = await conn.fetchrow("""
SELECT file_path, original_name
FROM files
WHERE file_id = $1
""", uuid.UUID(file_id))
if not file_info:
raise HTTPException(status_code=404, detail="File not found or access denied")
file_path = Path(file_info['file_path'])
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
path=file_path,
filename=file_info['original_name'],
media_type="application/octet-stream"
)
@app.delete("/files/{file_id}")
async def delete_file(
file_id: str,
user_id: str = Depends(get_user_id)
):
"""Delete a file by ID (only owner can delete)"""
async with db_pool.acquire() as conn:
# Get file info and verify ownership
file_info = await conn.fetchrow("""
SELECT file_path
FROM files
WHERE file_id = $1 AND user_id = $2
""", uuid.UUID(file_id), uuid.UUID(user_id))
if not file_info:
raise HTTPException(status_code=404, detail="File not found or access denied")
file_path = Path(file_info['file_path'])
# Delete file from disk
if file_path.exists():
os.remove(file_path)
# Remove from database
await conn.execute(
"DELETE FROM files WHERE file_id = $1",
uuid.UUID(file_id)
)
return {"message": "File deleted successfully"}
@app.get("/files/{file_id}/info")
async def get_file_info(
file_id: str,
user_id: str = Depends(get_user_id)
):
"""Get file information by ID (only owner can view)"""
async with db_pool.acquire() as conn:
file_info = await conn.fetchrow("""
SELECT file_id, original_name, stored_name, size, uploaded_at
FROM files
WHERE file_id = $1 AND user_id = $2
""", uuid.UUID(file_id), uuid.UUID(user_id))
if not file_info:
raise HTTPException(status_code=404, detail="File not found or access denied")
return {
"file_id": str(file_info['file_id']),
"original_name": file_info['original_name'],
"stored_name": file_info['stored_name'],
"size": file_info['size'],
"uploaded_at": file_info['uploaded_at'].isoformat()
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=app_config.APP_PORT)

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
fastapi~=0.118.0
uvicorn~=0.37.0
python-multipart
fastapi[standard]
asyncpg~=0.30.0
PyYAML~=6.0.3