Initial commit

This commit is contained in:
Anry Das
2026-04-26 10:08:54 +03:00
commit e3367c6a2e
13 changed files with 2338 additions and 0 deletions

496
utils.py Normal file
View File

@@ -0,0 +1,496 @@
import json
import os
from datetime import datetime, timedelta
from io import StringIO
from typing import List
import requests
import urllib3
from lxml import etree
import app_config as cfg
from model import POData, GPVData
STATUS_SEND_OK = 0
STATUS_ERROR_SEND_TO_TG = 20
STATUS_ERROR_GPV_RESPONSE = 10
STATUS_ERROR_PO_RESPONSE = 30
STATUS_EMPTY_CONFIG = 5
STATUS_ERROR_SEND_TO_MX = 40
PARSED_PO_DATAS: List[POData] = []
PARSED_GPV_DATAS: List[GPVData] = []
LOG = cfg.LOG
def send_message_to_tg_get(msg: str):
url = f"https://api.telegram.org/bot{cfg.TG_TOKEN}/sendMessage?chat_id={cfg.TG_CHAT}&parse_mode=html&text={msg}"
if not cfg.IS_DEBUG:
return requests.get(url).status_code
else:
print(f"Send to TG GET: {msg}")
return 200
def send_message_to_tg_post(msg: str):
if not cfg.IS_DEBUG:
return requests.post(
url='https://api.telegram.org/bot{0}/{1}'.format(cfg.TG_TOKEN, 'sendMessage'),
data={'chat_id': {cfg.TG_CHAT}, 'text': {msg}, 'parse_mode': 'html'}
).status_code
else:
print(f"Send to TG POST: {msg}")
return 200
def send_matrix_notification(message: str) -> bool:
"""Send notification to Matrix"""
if not cfg.USE_MATRIX:
return False
if not cfg.IS_DEBUG:
try:
url = f"{cfg.MX_SERVER}/_matrix/client/r0/rooms/{cfg.MX_ROOM}/send/m.room.message"
headers = {
'Authorization': f'Bearer {cfg.MX_TOKEN}',
'Content-Type': 'application/json'
}
payload = {
'msgtype': 'm.text',
'body': message,
'format': 'org.matrix.custom.html',
'formatted_body': message.replace('\n', '<br/>')
}
response = requests.post(url, headers=headers, json=payload)
return response.status_code == 200
except Exception as e:
# logger.error(f"Error sending Matrix notification: {e}")
return False
else:
print(f"Send to MX: {message}")
return True
def get_GPV_response_from_oe(account):
url = 'https://be-svitlo.oe.if.ua/schedule-by-search'
headers = {
"Host": "be-svitlo.oe.if.ua",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:144.0) Gecko/20100101 Firefox/144.0",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.5",
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
"Referer": "https://svitlo.oe.if.ua/",
"Origin": "https://svitlo.oe.if.ua",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
"Connection": "keep-alive",
"Priority": "u=0",
}
data = {
"accountNumber": account,
"userSearchChoice": "pob",
"address": ""
}
urllib3.disable_warnings()
return requests.post(url, headers=headers, data=data, verify=False)
def get_PO_response(): # Power Outage
utf8 = ''
url = 'https://oe.if.ua/uk/shutdowns_table'
params = {
'utf8': utf8,
'settlement': cfg.SETTLEMENT,
'street': cfg.STREET,
'house_number': cfg.HOUSE,
'building_part_number': cfg.BUILDING_PART_NUMBER,
'apartment': cfg.APARTMENT,
'commit': 'Пошук'
}
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
}
urllib3.disable_warnings()
return requests.get(url=url, headers=headers, params=params)
def has_numbers(in_str):
return any(char.isdigit() for char in str(in_str))
def get_utf_chars(number, is_time=False):
digit_chars = ['\U00000030\U000020E3', '\U00000031\U000020E3', '\U00000032\U000020E3', '\U00000033\U000020E3',
'\U00000034\U000020E3', '\U00000035\U000020E3', '\U00000036\U000020E3', '\U00000037\U000020E3',
'\U00000038\U000020E3', '\U00000039\U000020E3']
res = ''
if is_time:
h = number.split(':')[0]
m = number.split(':')[1]
if m != 0:
value = str(h) + ':' + str(m)
else:
value = str(h) + ':00'
else:
value = str(number)
for c in value:
if c.isdigit and c not in ['.', ':']:
res += digit_chars[int(c)]
else:
res += c
return res
def get_digit_message(arr, is_time=False):
first = 0
if len(arr) > 0:
first = arr[0]
last = 0
if len(arr) > 1:
last = arr[1]
msg = 'з {num0} до {num1}'
return msg.format(num0=get_utf_chars(first, is_time), num1=get_utf_chars(last, is_time))
def get_GPV_message(approved_from, event_date, hours_off, hours_on, outage_queue):
queue = get_utf_chars(outage_queue)
message = f'\U000026A1 <b>ГПВ на {event_date} для {queue} черги</b>'
if len(hours_off) > 0:
message += '\nВимкнення:'
for i in range(len(hours_off)):
arr = [hours_off[i], hours_on[i]]
msg = get_digit_message(arr, True)
message += '\n' + msg
# message += (f'\n{MENTIONS}')
else:
message += '\nВимкнень немає \U0001F44C \U0001F483'
message += (f'\n<b><i>{approved_from}</i></b>')
return message
def get_address(city, street, house):
return f'{city}, {street}, {house}'
def is_outage_date_before_or_same(stop_date):
parsed_date = datetime.strptime(stop_date, "%Y.%m.%d").date()
current_date = datetime.now().date()
return parsed_date + timedelta(hours=1) >= current_date
def parse_po_data(root) -> List[POData]:
res: List[POData] = []
for i in range(len(root.findall(".//div[@class='table-row flex']"))):
po_data = POData(
city=root.xpath("//div[@class='table-row flex']/div[@class='city']/text()")[i],
street=root.xpath("//div[@class='table-row flex']/div[@class='street']/text()")[i],
house=root.xpath("//div[@class='table-row flex']/div[@class='house_number']/text()")[i],
po_type=root.xpath("//div[@class='table-row flex']/div[@class='type-shutdown']/text()")[i],
reason=root.xpath("//div[@class='table-row flex']/div[@class='reason-shutdown reason-text-container']/div[@class='reason-text-hide']/text()")[i],
placement_date=root.xpath("//div[@class='table-row flex']/div[@class='placement_date']/div[@class='date']/text()")[i],
start_date=root.xpath("//div[@class='table-row flex']/div[@class='shutdown_date']/div[@class='date']/text()")[i],
stop_date=root.xpath("//div[@class='table-row flex']/div[@class='turn_on_date']/div[@class='date']/text()")[i],
start_time=root.xpath("//div[@class='table-row flex']/div[@class='shutdown_date']/div[@class='time']/text()")[i],
stop_time=root.xpath("//div[@class='table-row flex']/div[@class='turn_on_date']/div[@class='time']/text()")[i])
if is_outage_date_before_or_same(po_data.stop_date):
po_data.need_to_send = True
if po_data not in res:
res.append(po_data)
res.sort(key=lambda pd: pd.start_date)
return res
def load_last_datas(file_name):
if os.path.isfile(file_name):
with open(file_name, 'r') as f:
return f.read()
else:
return ''
def get_parsed_po_data():
return PARSED_PO_DATAS
def get_parsed_gpv_data():
return PARSED_GPV_DATAS
async def process_gpv():
if cfg.IS_DEBUG:
x = json.loads(
"""{
"current":
{
"gav":
{
"message": "Черга спеціальних аварійних відключень (СГАВ):",
"queue": null
},
"hasQueue": "yes",
"note": "Станом на 21:05 14.01.2026 за вказаним особовим рахунком '30014180' споживач підпадає під чергу '5.1' Графіку погодинного відключення(ГПВ)",
"queue": 5,
"sgav":
{
"message": "Черга спеціальних аварійних відключень (СГАВ):",
"queue": null
},
"subQueue": 1
},
"schedule":
[
{
"createdAt": "14.01.2026 19:11",
"eventDate": "15.04.2026",
"queues":
{
"5.1":
[
{
"from": "07:30",
"shutdownHours": "07:30-11:00",
"status": 1,
"to": "11:00"
},
{
"from": "14:30",
"shutdownHours": "14:30-20:00",
"status": 1,
"to": "20:00"
},
{
"from": "22:00",
"shutdownHours": "22:00-00:00",
"status": 1,
"to": "00:00"
}
]
},
"scheduleApprovedSince": "14.04.2026 19:11"
},
{
"createdAt": "13.04.2026 20:28",
"eventDate": "14.04.2026",
"queues":
{
"5.1":
[
{
"from": "03:00",
"shutdownHours": "03:00-07:00",
"status": 1,
"to": "07:00"
},
{
"from": "11:00",
"shutdownHours": "11:00-17:30",
"status": 1,
"to": "17:30"
}
]
},
"scheduleApprovedSince": "14.04.2026 09:56"
}
]
}
"""
)
else:
response = get_GPV_response_from_oe(cfg.ACCOUNT)
if response.status_code != 200:
print("code=" + str(response.status_code))
return STATUS_ERROR_GPV_RESPONSE
x = response.json()
outage_queue = str(x["current"]["queue"])
if x["current"]["subQueue"]:
outage_queue += '.' + str(x["current"]["subQueue"])
datas = ''
if 'queues' in str(x["schedule"]):
datas = sorted(x["schedule"], key=lambda item: item['eventDate'])
for entry in datas:
hours_off = []
hours_on = []
if len(entry) > 0:
approved_from = 'Запроваджено ' + entry['scheduleApprovedSince']
event_date = entry['eventDate']
hours_list = entry['queues'][outage_queue]
else:
hours_list = []
approved_from = 'Hемає даних про дату запровадження'
event_date = datetime.today().strftime('%Y-%m-%d')
if cfg.IS_DEBUG:
print(f'hours_list{hours_list}, approved_from={approved_from}, event_date={event_date}')
for h in hours_list:
if h["status"] == 1:
hours_off.append(h["from"])
hours_on.append(h["to"])
if cfg.IS_DEBUG:
print(f'hours_off = {hours_off}')
print(f'hours_on = {hours_on}')
update_gpv_data(PARSED_GPV_DATAS, GPVData(approved_from=approved_from, event_date=event_date, hours_off=hours_off, hours_on=hours_on, outage_queue=outage_queue))
check_and_send_gpv_message()
return STATUS_SEND_OK
def check_and_send_gpv_message():
for el in PARSED_GPV_DATAS:
message = get_GPV_message(el.approved_from, el.event_date, el.hours_off, el.hours_on, el.outage_queue)
if el.need_to_send and (el.start_ts[-1] + 3600.0 >= datetime.now().timestamp()):
tg_response = send_message_to_tg_get(message)
if tg_response == 200:
el.set_send()
LOG.debug(f'TG message send {message}')
else:
LOG.error(f'TG send error code: {STATUS_ERROR_SEND_TO_TG}')
if cfg.USE_MATRIX and not cfg.IS_DEBUG:
res = send_matrix_notification(message)
if not res:
LOG.error(f'MX send error code: {STATUS_ERROR_SEND_TO_MX}')
def update_gpv_data(data_list: List[GPVData], new_data: GPVData):
# find by TS
obj = next((obj for obj in data_list if obj.start_ts == new_data.start_ts), None)
if obj:
# if exists - update:
old_data = data_list[data_list.index(obj)]
if not old_data.need_to_send: # notification already send
if old_data.start_ts != new_data.start_ts:
data_list[data_list.index(obj)] = new_data
LOG.debug(f'Updated element {old_data} in list with new data {new_data}')
else:
LOG.debug(f'Element {old_data} was not updated due to the same')
else:
data_list[data_list.index(obj)] = new_data
else:
# if not exists - add
data_list.append(new_data)
LOG.debug(f'New element appended to list {new_data}')
# data_list SORT
# remove OLD entries
ts = datetime.now().timestamp()
for el in data_list[:]:
if el.start_ts and (el.start_ts[-1] + 1800) < ts:
LOG.debug(f'Old element removed from list {el}')
data_list.remove(el)
def get_message_with_po(el: POData):
address = get_address(el.city, el.street, el.house)
if el.need_to_send:
message = (
f'\U000026A0 <b>Увага!</b>\nНа {el.start_date} за адресою {address} заплановано <b>{el.po_type}</b> відключення (заявка від {el.placement_date}).\n'
f'Причина відключення: {el.reason}\n'
f'Початок вимкнення <b>{el.start_date} об {el.start_time}</b>\n'
f'Кінець вимкнення <b>{el.stop_date} об {el.stop_time}</b>')
else:
message = f'На {el.start_date} Планових вимкнень немає \U0001F44C \U0001F483'
return message
async def process_po():
if not cfg.IS_DEBUG:
response = get_PO_response()
if response.status_code != 200:
return STATUS_ERROR_PO_RESPONSE
html = response.text
parser = etree.HTMLParser()
root = etree.parse(StringIO(html), parser)
else:
# html = "<html><body><div id='shutdowns-table'><div class='table'><div class='flex table-header'><div class='region'>Район</div><div class='city'>Населений пункт</div><div class='street'>Вулиця</div><div class='house_number'>№ буд</div>" \
# "<div class='building_part_number'>Корпус</div><div class='dispatch_name'>Диспетчерська назва об'єкту</div><div class='type-shutdown'>Вид робіт</div><div class='reason-shutdown'>Причина робіт</div><div class='placement_date'>Дата розміщення</div>" \
# "<div class='shutdown_date'>Дата і час вимкнення</div><div class='turn_on_date'>Дата і час включення</div><div class='next_date'>Наступна дата робіт</div></div></div><div class='shutdowns'><div><div><div class='table-row flex'><div class='region'>Піднянський район</div>" \
# "<div class='city'>Гірна</div><div class='street'>Тевського</div><div class='house_number'>9</div><div class='building_part_number'/><div class='dispatch_name'>ДСП 110/Пр.&quot;ЗБВIК&quot;/30-413/Л-6</div><div class='type-shutdown'>Планове</div>" \
# "<div class='reason-shutdown reason-text-container'><div class='reason-text-hide'>Поточний ремонт</div></div><div class='placement_date'><div class='date'>2025.03.24</div><div class='time'>11:50</div></div><div class='shutdown_date'><div class='date'>2026.04.10</div>" \
# "<div class='time'>09:00</div></div><div class='turn_on_date'><div class='date'>2026.04.10</div><div class='time'>18:00</div></div><div class='next_date'><div class='reason-text-hide'/></div></div></div></div></div></div></body></html>"
with open('test/data_po_2days_02.html', 'r') as file:
html = file.read()
root = etree.fromstring(html)
global PARSED_PO_DATAS
new_data = parse_po_data(root)
for el in new_data:
await update_po_data(PARSED_PO_DATAS, el)
check_and_send_po_message()
return STATUS_SEND_OK
def check_and_send_po_message():
for el in PARSED_PO_DATAS:
if el.need_to_send:
message = get_message_with_po(el)
tg_response = send_message_to_tg_get(message)
if tg_response == 200:
el.set_sent()
el.set_notified_now()
LOG.debug(f'TG message send {message}')
else:
LOG.error(f'TG send error code: {STATUS_ERROR_SEND_TO_TG}')
if cfg.USE_MATRIX:
res = send_matrix_notification(message)
if not res:
LOG.error(f'MX send error code: {STATUS_ERROR_SEND_TO_MX}')
async def update_po_data(data_list:List[POData], new_data: POData):
# find by TS
obj = next((obj for obj in data_list if obj.start_ts == new_data.start_ts), None)
if obj:
# if exists - update
old_data = data_list[data_list.index(obj)]
if not old_data.need_to_send: # notification already send
if old_data != new_data:
# update 'need_to_send' to re-send notification every day
if ((not old_data.notified_at + timedelta(days=1) <= datetime.now()) or
# or if current hour - is the start data processing hour with firs minute in cfg.PROCESSING_MINUTES list
(cfg.PROCESS_START_HOUR and datetime.now().strftime('%H') == cfg.PROCESS_START_HOUR and datetime.now().strftime('%M') == cfg.PROCESSING_MINUTES[0])):
new_data.set_sent()
new_data.update_notified_at(old_data)
data_list[data_list.index(obj)] = new_data
LOG.debug(f'Updated element {old_data} in list with new data {new_data}')
else:
LOG.debug(f'Element {old_data} was not updated due to the same')
else:
data_list[data_list.index(obj)] = new_data
else:
# if not exists - add
data_list.append(new_data)
LOG.debug(f'New element appended to list {new_data}')
# data_list SORT
# remove OLD entries
ts = datetime.now().timestamp()
for el in data_list[:]:
if el.start_ts and (el.start_ts + 3600) <= ts:
LOG.debug(f'Old element removed from list {el}')
data_list.remove(el)
if __name__ == '__main__':
pass