import json import os from datetime import datetime, timedelta from io import StringIO from typing import List import requests import urllib3 from lxml import etree import app_config as cfg from model import POData, GPVData STATUS_SEND_OK = 0 STATUS_ERROR_SEND_TO_TG = 20 STATUS_ERROR_GPV_RESPONSE = 10 STATUS_ERROR_PO_RESPONSE = 30 STATUS_EMPTY_CONFIG = 5 STATUS_ERROR_SEND_TO_MX = 40 PARSED_PO_DATAS: List[POData] = [] PARSED_GPV_DATAS: List[GPVData] = [] LOG = cfg.LOG def send_message_to_tg_get(msg: str): url = f"https://api.telegram.org/bot{cfg.TG_TOKEN}/sendMessage?chat_id={cfg.TG_CHAT}&parse_mode=html&text={msg}" if not cfg.IS_DEBUG: return requests.get(url).status_code else: print(f"Send to TG GET: {msg}") return 200 def send_message_to_tg_post(msg: str): if not cfg.IS_DEBUG: return requests.post( url='https://api.telegram.org/bot{0}/{1}'.format(cfg.TG_TOKEN, 'sendMessage'), data={'chat_id': {cfg.TG_CHAT}, 'text': {msg}, 'parse_mode': 'html'} ).status_code else: print(f"Send to TG POST: {msg}") return 200 def send_matrix_notification(message: str) -> bool: """Send notification to Matrix""" if not cfg.USE_MATRIX: return False if not cfg.IS_DEBUG: try: url = f"{cfg.MX_SERVER}/_matrix/client/r0/rooms/{cfg.MX_ROOM}/send/m.room.message" headers = { 'Authorization': f'Bearer {cfg.MX_TOKEN}', 'Content-Type': 'application/json' } payload = { 'msgtype': 'm.text', 'body': message, 'format': 'org.matrix.custom.html', 'formatted_body': message.replace('\n', '
') } response = requests.post(url, headers=headers, json=payload) return response.status_code == 200 except Exception as e: # logger.error(f"Error sending Matrix notification: {e}") return False else: print(f"Send to MX: {message}") return True def get_GPV_response_from_oe(account): url = 'https://be-svitlo.oe.if.ua/schedule-by-search' headers = { "Host": "be-svitlo.oe.if.ua", "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:144.0) Gecko/20100101 Firefox/144.0", "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.5", "Content-Type": "application/x-www-form-urlencoded;charset=utf-8", "Referer": "https://svitlo.oe.if.ua/", "Origin": "https://svitlo.oe.if.ua", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", "Connection": "keep-alive", "Priority": "u=0", } data = { "accountNumber": account, "userSearchChoice": "pob", "address": "" } urllib3.disable_warnings() return requests.post(url, headers=headers, data=data, verify=False) def get_PO_response(): # Power Outage utf8 = '✓' url = 'https://oe.if.ua/uk/shutdowns_table' params = { 'utf8': utf8, 'settlement': cfg.SETTLEMENT, 'street': cfg.STREET, 'house_number': cfg.HOUSE, 'building_part_number': cfg.BUILDING_PART_NUMBER, 'apartment': cfg.APARTMENT, 'commit': 'Пошук' } headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' } urllib3.disable_warnings() return requests.get(url=url, headers=headers, params=params) def has_numbers(in_str): return any(char.isdigit() for char in str(in_str)) def get_utf_chars(number, is_time=False): digit_chars = ['\U00000030\U000020E3', '\U00000031\U000020E3', '\U00000032\U000020E3', '\U00000033\U000020E3', '\U00000034\U000020E3', '\U00000035\U000020E3', '\U00000036\U000020E3', '\U00000037\U000020E3', '\U00000038\U000020E3', '\U00000039\U000020E3'] res = '' if is_time: h = number.split(':')[0] m = number.split(':')[1] if m != 0: value = str(h) + ':' + str(m) else: value = str(h) + ':00' else: value = str(number) for c in value: if c.isdigit and c not in ['.', ':']: res += digit_chars[int(c)] else: res += c return res def get_digit_message(arr, is_time=False): first = 0 if len(arr) > 0: first = arr[0] last = 0 if len(arr) > 1: last = arr[1] msg = 'з {num0} до {num1}' return msg.format(num0=get_utf_chars(first, is_time), num1=get_utf_chars(last, is_time)) def get_GPV_message(approved_from, event_date, hours_off, hours_on, outage_queue): queue = get_utf_chars(outage_queue) message = f'\U000026A1 ГПВ на {event_date} для {queue} черги' if len(hours_off) > 0: message += '\nВимкнення:' for i in range(len(hours_off)): arr = [hours_off[i], hours_on[i]] msg = get_digit_message(arr, True) message += '\n' + msg # message += (f'\n{MENTIONS}') else: message += '\nВимкнень немає \U0001F44C \U0001F483' message += (f'\n{approved_from}') return message def get_address(city, street, house): return f'{city}, {street}, {house}' def is_outage_date_before_or_same(stop_date): parsed_date = datetime.strptime(stop_date, "%Y.%m.%d").date() current_date = datetime.now().date() return parsed_date + timedelta(hours=1) >= current_date def parse_po_data(root) -> List[POData]: res: List[POData] = [] for i in range(len(root.findall(".//div[@class='table-row flex']"))): po_data = POData( city=root.xpath("//div[@class='table-row flex']/div[@class='city']/text()")[i], street=root.xpath("//div[@class='table-row flex']/div[@class='street']/text()")[i], house=root.xpath("//div[@class='table-row flex']/div[@class='house_number']/text()")[i], po_type=root.xpath("//div[@class='table-row flex']/div[@class='type-shutdown']/text()")[i], reason=root.xpath("//div[@class='table-row flex']/div[@class='reason-shutdown reason-text-container']/div[@class='reason-text-hide']/text()")[i], placement_date=root.xpath("//div[@class='table-row flex']/div[@class='placement_date']/div[@class='date']/text()")[i], start_date=root.xpath("//div[@class='table-row flex']/div[@class='shutdown_date']/div[@class='date']/text()")[i], stop_date=root.xpath("//div[@class='table-row flex']/div[@class='turn_on_date']/div[@class='date']/text()")[i], start_time=root.xpath("//div[@class='table-row flex']/div[@class='shutdown_date']/div[@class='time']/text()")[i], stop_time=root.xpath("//div[@class='table-row flex']/div[@class='turn_on_date']/div[@class='time']/text()")[i]) if is_outage_date_before_or_same(po_data.stop_date): po_data.need_to_send = True if po_data not in res: res.append(po_data) res.sort(key=lambda pd: pd.start_date) return res def load_last_datas(file_name): if os.path.isfile(file_name): with open(file_name, 'r') as f: return f.read() else: return '' def get_parsed_po_data(): return PARSED_PO_DATAS def get_parsed_gpv_data(): return PARSED_GPV_DATAS async def process_gpv(): if cfg.IS_DEBUG: x = json.loads( """{ "current": { "gav": { "message": "Черга спеціальних аварійних відключень (СГАВ):", "queue": null }, "hasQueue": "yes", "note": "Станом на 21:05 14.01.2026 за вказаним особовим рахунком '30014180' споживач підпадає під чергу '5.1' Графіку погодинного відключення(ГПВ)", "queue": 5, "sgav": { "message": "Черга спеціальних аварійних відключень (СГАВ):", "queue": null }, "subQueue": 1 }, "schedule": [ { "createdAt": "14.01.2026 19:11", "eventDate": "15.04.2026", "queues": { "5.1": [ { "from": "07:30", "shutdownHours": "07:30-11:00", "status": 1, "to": "11:00" }, { "from": "14:30", "shutdownHours": "14:30-20:00", "status": 1, "to": "20:00" }, { "from": "22:00", "shutdownHours": "22:00-00:00", "status": 1, "to": "00:00" } ] }, "scheduleApprovedSince": "14.04.2026 19:11" }, { "createdAt": "13.04.2026 20:28", "eventDate": "14.04.2026", "queues": { "5.1": [ { "from": "03:00", "shutdownHours": "03:00-07:00", "status": 1, "to": "07:00" }, { "from": "11:00", "shutdownHours": "11:00-17:30", "status": 1, "to": "17:30" } ] }, "scheduleApprovedSince": "14.04.2026 09:56" } ] } """ ) else: response = get_GPV_response_from_oe(cfg.ACCOUNT) if response.status_code != 200: print("code=" + str(response.status_code)) return STATUS_ERROR_GPV_RESPONSE x = response.json() outage_queue = str(x["current"]["queue"]) if x["current"]["subQueue"]: outage_queue += '.' + str(x["current"]["subQueue"]) datas = '' if 'queues' in str(x["schedule"]): datas = sorted(x["schedule"], key=lambda item: item['eventDate']) for entry in datas: hours_off = [] hours_on = [] if len(entry) > 0: approved_from = 'Запроваджено ' + entry['scheduleApprovedSince'] event_date = entry['eventDate'] hours_list = entry['queues'][outage_queue] else: hours_list = [] approved_from = 'Hемає даних про дату запровадження' event_date = datetime.today().strftime('%Y-%m-%d') if cfg.IS_DEBUG: print(f'hours_list{hours_list}, approved_from={approved_from}, event_date={event_date}') for h in hours_list: if h["status"] == 1: hours_off.append(h["from"]) hours_on.append(h["to"]) if cfg.IS_DEBUG: print(f'hours_off = {hours_off}') print(f'hours_on = {hours_on}') update_gpv_data(PARSED_GPV_DATAS, GPVData(approved_from=approved_from, event_date=event_date, hours_off=hours_off, hours_on=hours_on, outage_queue=outage_queue)) check_and_send_gpv_message() return STATUS_SEND_OK def check_and_send_gpv_message(): for el in PARSED_GPV_DATAS: message = get_GPV_message(el.approved_from, el.event_date, el.hours_off, el.hours_on, el.outage_queue) if el.need_to_send and (el.start_ts[-1] + 3600.0 >= datetime.now().timestamp()): tg_response = send_message_to_tg_get(message) if tg_response == 200: el.set_send() LOG.debug(f'TG message send {message}') else: LOG.error(f'TG send error code: {STATUS_ERROR_SEND_TO_TG}') if cfg.USE_MATRIX and not cfg.IS_DEBUG: res = send_matrix_notification(message) if not res: LOG.error(f'MX send error code: {STATUS_ERROR_SEND_TO_MX}') def update_gpv_data(data_list: List[GPVData], new_data: GPVData): # find by TS obj = next((obj for obj in data_list if obj.start_ts == new_data.start_ts), None) if obj: # if exists - update: old_data = data_list[data_list.index(obj)] if not old_data.need_to_send: # notification already send if old_data.start_ts != new_data.start_ts: data_list[data_list.index(obj)] = new_data LOG.debug(f'Updated element {old_data} in list with new data {new_data}') else: LOG.debug(f'Element {old_data} was not updated due to the same') else: data_list[data_list.index(obj)] = new_data else: # if not exists - add data_list.append(new_data) LOG.debug(f'New element appended to list {new_data}') # data_list SORT # remove OLD entries ts = datetime.now().timestamp() for el in data_list[:]: if el.start_ts and (el.start_ts[-1] + 1800) < ts: LOG.debug(f'Old element removed from list {el}') data_list.remove(el) def get_message_with_po(el: POData): address = get_address(el.city, el.street, el.house) if el.need_to_send: message = ( f'\U000026A0 Увага!\nНа {el.start_date} за адресою {address} заплановано {el.po_type} відключення (заявка від {el.placement_date}).\n' f'Причина відключення: {el.reason}\n' f'Початок вимкнення {el.start_date} об {el.start_time}\n' f'Кінець вимкнення {el.stop_date} об {el.stop_time}') else: message = f'На {el.start_date} Планових вимкнень немає \U0001F44C \U0001F483' return message async def process_po(): if not cfg.IS_DEBUG: response = get_PO_response() if response.status_code != 200: return STATUS_ERROR_PO_RESPONSE html = response.text parser = etree.HTMLParser() root = etree.parse(StringIO(html), parser) else: # html = "
Район
Населений пункт
Вулиця
№ буд
" \ # "
Корпус
Диспетчерська назва об'єкту
Вид робіт
Причина робіт
Дата розміщення
" \ # "
Дата і час вимкнення
Дата і час включення
Наступна дата робіт
Піднянський район
" \ # "
Гірна
Тевського
9
ДСП 110/Пр."ЗБВIК"/30-413/Л-6
Планове
" \ # "
Поточний ремонт
2025.03.24
11:50
2026.04.10
" \ # "
09:00
2026.04.10
18:00
" with open('test/data_po_2days_02.html', 'r') as file: html = file.read() root = etree.fromstring(html) global PARSED_PO_DATAS new_data = parse_po_data(root) for el in new_data: await update_po_data(PARSED_PO_DATAS, el) check_and_send_po_message() return STATUS_SEND_OK def check_and_send_po_message(): for el in PARSED_PO_DATAS: if el.need_to_send: message = get_message_with_po(el) tg_response = send_message_to_tg_get(message) if tg_response == 200: el.set_sent() el.set_notified_now() LOG.debug(f'TG message send {message}') else: LOG.error(f'TG send error code: {STATUS_ERROR_SEND_TO_TG}') if cfg.USE_MATRIX: res = send_matrix_notification(message) if not res: LOG.error(f'MX send error code: {STATUS_ERROR_SEND_TO_MX}') async def update_po_data(data_list:List[POData], new_data: POData): # find by TS obj = next((obj for obj in data_list if obj.start_ts == new_data.start_ts), None) if obj: # if exists - update old_data = data_list[data_list.index(obj)] if not old_data.need_to_send: # notification already send if old_data != new_data: # update 'need_to_send' to re-send notification every day if ((not old_data.notified_at + timedelta(days=1) <= datetime.now()) or # or if current hour - is the start data processing hour with firs minute in cfg.PROCESSING_MINUTES list (cfg.PROCESS_START_HOUR and datetime.now().strftime('%H') == cfg.PROCESS_START_HOUR and datetime.now().strftime('%M') == cfg.PROCESSING_MINUTES[0])): new_data.set_sent() new_data.update_notified_at(old_data) data_list[data_list.index(obj)] = new_data LOG.debug(f'Updated element {old_data} in list with new data {new_data}') else: LOG.debug(f'Element {old_data} was not updated due to the same') else: data_list[data_list.index(obj)] = new_data else: # if not exists - add data_list.append(new_data) LOG.debug(f'New element appended to list {new_data}') # data_list SORT # remove OLD entries ts = datetime.now().timestamp() for el in data_list[:]: if el.start_ts and (el.start_ts + 3600) <= ts: LOG.debug(f'Old element removed from list {el}') data_list.remove(el) if __name__ == '__main__': pass