import logging from datetime import datetime, UTC from typing import Optional, List import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI, BackgroundTasks, Request from starlette.responses import FileResponse, HTMLResponse from starlette.routing import Mount from starlette.templating import Jinja2Templates import app_config as cfg from model import POData, JobData from utils import process_gpv, process_po, get_parsed_po_data, get_parsed_gpv_data, send_message_to_tg_get, \ send_matrix_notification DESCRIPTION = ''' Energy Outage Inform Service It provides: * getting state/value from some configured sensors * own health endpoint Created by -=:dAs:=- ''' ######################## # cfg.init_config() ToDo: TEST IT! SCHEDULER: Optional[AsyncIOScheduler] = None LOG = cfg.LOG TEMPLATES = Jinja2Templates(directory="templates") ######################## TAGS_METADATA = [] app = FastAPI( title="Energy Outage Inform Service by -=:dAs:=-", description=DESCRIPTION, version=cfg.APP_VERSION, license_info={ "name": "Apache 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0.html", }, openapi_tags=TAGS_METADATA ) api = FastAPI() app.mount("/api/v1", api) ######################## async def check_and_notify(): await process_gpv() await process_po() @app.on_event("startup") async def startup_event(): global SCHEDULER # --- STARTUP LOGIC --- LOG.info("--- Starting EO Notifier...") SCHEDULER = AsyncIOScheduler() for h in range(cfg.PROCESS_START_HOUR, cfg.PROCESS_STOP_HOUR): mins = cfg.PROCESSING_MINUTES if not cfg.IS_DEBUG else [x for x in range(0, 60) if x % 2 == 0] for m in mins: SCHEDULER.add_job( check_and_notify, 'cron', hour=h, minute=m, id=f'check_EO_at_{h}h_{m}m' ) LOG.info(f"Scheduler: job to check at {h}:{m}") SCHEDULER.start() LOG.info("Scheduler started") # Run initial check await check_and_notify() @app.on_event("shutdown") async def shutdown_event(): # --- SHUTDOWN LOGIC --- if SCHEDULER: SCHEDULER.shutdown() LOG.info('Scheduler stopped') LOG.info('--- EO Notifier shutting down') @app.get("/favicon.ico", include_in_schema=False) async def get_favicon(): return FileResponse('static/favicon.png') def list_endpoints(): url_list = [r for r in api.routes if 'HEAD' not in r.methods] return url_list def get_path_prefix(): for r in app.routes: if isinstance(r, Mount): return r.path return None @app.get("/", response_class=HTMLResponse) async def root(request: Request): help_messages = { "Service": "Energy Outage Inform Service by -=:dAs:=", "version": f"{cfg.APP_VERSION}", "status": "running", "Scheduler running": SCHEDULER.running, "timestamp": date_format(datetime.now(UTC)) } return TEMPLATES.TemplateResponse("info.html", { "request": request, "help_messages": help_messages, "title": "Energy Outage Inform Service by -=:dAs:=", "EPs": list_endpoints(), "path_prefix": get_path_prefix(), "PO_events": get_parsed_po_data(), "GPV_events": get_parsed_gpv_data() }) @api.get("/status", summary='Show status of the service', description='Showing current status of the service') async def get_status(): """Detailed health check""" global SCHEDULER return { "status": "healthy", "config_loaded": bool(cfg.CONFIG), "PO_data": get_parsed_po_data(), "GPV_data": get_parsed_gpv_data(), "scheduler_running": SCHEDULER.running, "scheduler_jobs": await get_scheduled_jobs() } @api.get("/update", summary='Updates data from source', description='Updates data from web site immediately') async def update(): await check_and_notify() status = await get_status() return status async def get_scheduled_jobs() -> List[JobData]: global SCHEDULER jobs: List[JobData] = [] for j in SCHEDULER.get_jobs(): jobs.append( JobData( name=j.id, trigger=str(j.trigger), next=j.next_run_time) ) return jobs @api.get("/health", summary='Health of the service', description='Showing current health of the service. Returns OK if it launched') async def health_check(): return { "status": "OK" } @api.get("/test_message/{messanger}", summary='Send message to messanger', description='Send test message to messanger according to current configuration') async def send_test_message(messanger: str = None): messanger_up = messanger.upper() msg = f'Test message to {messanger_up}' if messanger_up == "TG": send_message_to_tg_get(msg) elif messanger_up == "MX": send_matrix_notification(msg) else: return { "status": f"Error: Wrong messanger '{messanger_up}'" } return { "status": "OK" } @api.get('/logs', summary='Show LOGs', description='Showing Service''s LOGs if it configured') async def get_logs(): return {'logs': cfg.LOG_LIST} def date_format(dt: datetime, fmt: str = '%Y-%m-%d %H:%M:%S'): return dt.strftime(fmt) def main(): logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] - %(name)s: %(funcName)s[%(lineno)d] - %(message)s' ) cfg.LOG.info('Application starting') uvicorn.run("main:app", **cfg.get_uvicorn_config()) if __name__ == '__main__': main()