497 lines
18 KiB
Python
497 lines
18 KiB
Python
import json
|
||
import os
|
||
from datetime import datetime, timedelta
|
||
from io import StringIO
|
||
from typing import List
|
||
|
||
import requests
|
||
import urllib3
|
||
from lxml import etree
|
||
|
||
import app_config as cfg
|
||
from model import POData, GPVData
|
||
|
||
STATUS_SEND_OK = 0
|
||
STATUS_ERROR_SEND_TO_TG = 20
|
||
STATUS_ERROR_GPV_RESPONSE = 10
|
||
STATUS_ERROR_PO_RESPONSE = 30
|
||
STATUS_EMPTY_CONFIG = 5
|
||
STATUS_ERROR_SEND_TO_MX = 40
|
||
|
||
PARSED_PO_DATAS: List[POData] = []
|
||
PARSED_GPV_DATAS: List[GPVData] = []
|
||
|
||
LOG = cfg.LOG
|
||
|
||
def send_message_to_tg_get(msg: str):
|
||
url = f"https://api.telegram.org/bot{cfg.TG_TOKEN}/sendMessage?chat_id={cfg.TG_CHAT}&parse_mode=html&text={msg}"
|
||
if not cfg.IS_DEBUG:
|
||
return requests.get(url).status_code
|
||
else:
|
||
print(f"Send to TG GET: {msg}")
|
||
return 200
|
||
|
||
|
||
def send_message_to_tg_post(msg: str):
|
||
if not cfg.IS_DEBUG:
|
||
return requests.post(
|
||
url='https://api.telegram.org/bot{0}/{1}'.format(cfg.TG_TOKEN, 'sendMessage'),
|
||
data={'chat_id': {cfg.TG_CHAT}, 'text': {msg}, 'parse_mode': 'html'}
|
||
).status_code
|
||
else:
|
||
print(f"Send to TG POST: {msg}")
|
||
return 200
|
||
|
||
|
||
def send_matrix_notification(message: str) -> bool:
|
||
"""Send notification to Matrix"""
|
||
if not cfg.USE_MATRIX:
|
||
return False
|
||
|
||
if not cfg.IS_DEBUG:
|
||
try:
|
||
url = f"{cfg.MX_SERVER}/_matrix/client/r0/rooms/{cfg.MX_ROOM}/send/m.room.message"
|
||
|
||
headers = {
|
||
'Authorization': f'Bearer {cfg.MX_TOKEN}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
payload = {
|
||
'msgtype': 'm.text',
|
||
'body': message,
|
||
'format': 'org.matrix.custom.html',
|
||
'formatted_body': message.replace('\n', '<br/>')
|
||
}
|
||
|
||
response = requests.post(url, headers=headers, json=payload)
|
||
return response.status_code == 200
|
||
except Exception as e:
|
||
# logger.error(f"Error sending Matrix notification: {e}")
|
||
return False
|
||
else:
|
||
print(f"Send to MX: {message}")
|
||
return True
|
||
|
||
|
||
def get_GPV_response_from_oe(account):
|
||
url = 'https://be-svitlo.oe.if.ua/schedule-by-search'
|
||
headers = {
|
||
"Host": "be-svitlo.oe.if.ua",
|
||
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:144.0) Gecko/20100101 Firefox/144.0",
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Accept-Language": "en-US,en;q=0.5",
|
||
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
|
||
"Referer": "https://svitlo.oe.if.ua/",
|
||
"Origin": "https://svitlo.oe.if.ua",
|
||
"Sec-Fetch-Dest": "empty",
|
||
"Sec-Fetch-Mode": "cors",
|
||
"Sec-Fetch-Site": "same-site",
|
||
"Connection": "keep-alive",
|
||
"Priority": "u=0",
|
||
}
|
||
data = {
|
||
"accountNumber": account,
|
||
"userSearchChoice": "pob",
|
||
"address": ""
|
||
}
|
||
urllib3.disable_warnings()
|
||
return requests.post(url, headers=headers, data=data, verify=False)
|
||
|
||
|
||
def get_PO_response(): # Power Outage
|
||
utf8 = '✓'
|
||
url = 'https://oe.if.ua/uk/shutdowns_table'
|
||
params = {
|
||
'utf8': utf8,
|
||
'settlement': cfg.SETTLEMENT,
|
||
'street': cfg.STREET,
|
||
'house_number': cfg.HOUSE,
|
||
'building_part_number': cfg.BUILDING_PART_NUMBER,
|
||
'apartment': cfg.APARTMENT,
|
||
'commit': 'Пошук'
|
||
}
|
||
headers = {
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
|
||
}
|
||
urllib3.disable_warnings()
|
||
return requests.get(url=url, headers=headers, params=params)
|
||
|
||
|
||
def has_numbers(in_str):
|
||
return any(char.isdigit() for char in str(in_str))
|
||
|
||
|
||
def get_utf_chars(number, is_time=False):
|
||
digit_chars = ['\U00000030\U000020E3', '\U00000031\U000020E3', '\U00000032\U000020E3', '\U00000033\U000020E3',
|
||
'\U00000034\U000020E3', '\U00000035\U000020E3', '\U00000036\U000020E3', '\U00000037\U000020E3',
|
||
'\U00000038\U000020E3', '\U00000039\U000020E3']
|
||
res = ''
|
||
if is_time:
|
||
h = number.split(':')[0]
|
||
m = number.split(':')[1]
|
||
if m != 0:
|
||
value = str(h) + ':' + str(m)
|
||
else:
|
||
value = str(h) + ':00'
|
||
else:
|
||
value = str(number)
|
||
for c in value:
|
||
if c.isdigit and c not in ['.', ':']:
|
||
res += digit_chars[int(c)]
|
||
else:
|
||
res += c
|
||
return res
|
||
|
||
|
||
def get_digit_message(arr, is_time=False):
|
||
first = 0
|
||
if len(arr) > 0:
|
||
first = arr[0]
|
||
|
||
last = 0
|
||
if len(arr) > 1:
|
||
last = arr[1]
|
||
|
||
msg = 'з {num0} до {num1}'
|
||
return msg.format(num0=get_utf_chars(first, is_time), num1=get_utf_chars(last, is_time))
|
||
|
||
|
||
def get_GPV_message(approved_from, event_date, hours_off, hours_on, outage_queue):
|
||
queue = get_utf_chars(outage_queue)
|
||
message = f'\U000026A1 <b>ГПВ на {event_date} для {queue} черги</b>'
|
||
|
||
if len(hours_off) > 0:
|
||
message += '\nВимкнення:'
|
||
for i in range(len(hours_off)):
|
||
arr = [hours_off[i], hours_on[i]]
|
||
msg = get_digit_message(arr, True)
|
||
message += '\n' + msg
|
||
|
||
# message += (f'\n{MENTIONS}')
|
||
else:
|
||
message += '\nВимкнень немає \U0001F44C \U0001F483'
|
||
message += (f'\n<b><i>{approved_from}</i></b>')
|
||
return message
|
||
|
||
|
||
def get_address(city, street, house):
|
||
return f'{city}, {street}, {house}'
|
||
|
||
|
||
def is_outage_date_before_or_same(stop_date):
|
||
parsed_date = datetime.strptime(stop_date, "%Y.%m.%d").date()
|
||
current_date = datetime.now().date()
|
||
return parsed_date + timedelta(hours=1) >= current_date
|
||
|
||
|
||
def parse_po_data(root) -> List[POData]:
|
||
res: List[POData] = []
|
||
for i in range(len(root.findall(".//div[@class='table-row flex']"))):
|
||
po_data = POData(
|
||
city=root.xpath("//div[@class='table-row flex']/div[@class='city']/text()")[i],
|
||
street=root.xpath("//div[@class='table-row flex']/div[@class='street']/text()")[i],
|
||
house=root.xpath("//div[@class='table-row flex']/div[@class='house_number']/text()")[i],
|
||
po_type=root.xpath("//div[@class='table-row flex']/div[@class='type-shutdown']/text()")[i],
|
||
reason=root.xpath("//div[@class='table-row flex']/div[@class='reason-shutdown reason-text-container']/div[@class='reason-text-hide']/text()")[i],
|
||
placement_date=root.xpath("//div[@class='table-row flex']/div[@class='placement_date']/div[@class='date']/text()")[i],
|
||
start_date=root.xpath("//div[@class='table-row flex']/div[@class='shutdown_date']/div[@class='date']/text()")[i],
|
||
stop_date=root.xpath("//div[@class='table-row flex']/div[@class='turn_on_date']/div[@class='date']/text()")[i],
|
||
start_time=root.xpath("//div[@class='table-row flex']/div[@class='shutdown_date']/div[@class='time']/text()")[i],
|
||
stop_time=root.xpath("//div[@class='table-row flex']/div[@class='turn_on_date']/div[@class='time']/text()")[i])
|
||
|
||
if is_outage_date_before_or_same(po_data.stop_date):
|
||
po_data.need_to_send = True
|
||
|
||
if po_data not in res:
|
||
res.append(po_data)
|
||
|
||
res.sort(key=lambda pd: pd.start_date)
|
||
|
||
return res
|
||
|
||
|
||
def load_last_datas(file_name):
|
||
if os.path.isfile(file_name):
|
||
with open(file_name, 'r') as f:
|
||
return f.read()
|
||
else:
|
||
return ''
|
||
|
||
def get_parsed_po_data():
|
||
return PARSED_PO_DATAS
|
||
|
||
|
||
def get_parsed_gpv_data():
|
||
return PARSED_GPV_DATAS
|
||
|
||
|
||
async def process_gpv():
|
||
if cfg.IS_DEBUG:
|
||
x = json.loads(
|
||
"""{
|
||
"current":
|
||
{
|
||
"gav":
|
||
{
|
||
"message": "Черга спеціальних аварійних відключень (СГАВ):",
|
||
"queue": null
|
||
},
|
||
"hasQueue": "yes",
|
||
"note": "Станом на 21:05 14.01.2026 за вказаним особовим рахунком '30014180' споживач підпадає під чергу '5.1' Графіку погодинного відключення(ГПВ)",
|
||
"queue": 5,
|
||
"sgav":
|
||
{
|
||
"message": "Черга спеціальних аварійних відключень (СГАВ):",
|
||
"queue": null
|
||
},
|
||
"subQueue": 1
|
||
},
|
||
"schedule":
|
||
[
|
||
{
|
||
"createdAt": "14.01.2026 19:11",
|
||
"eventDate": "15.04.2026",
|
||
"queues":
|
||
{
|
||
"5.1":
|
||
[
|
||
{
|
||
"from": "07:30",
|
||
"shutdownHours": "07:30-11:00",
|
||
"status": 1,
|
||
"to": "11:00"
|
||
},
|
||
{
|
||
"from": "14:30",
|
||
"shutdownHours": "14:30-20:00",
|
||
"status": 1,
|
||
"to": "20:00"
|
||
},
|
||
{
|
||
"from": "22:00",
|
||
"shutdownHours": "22:00-00:00",
|
||
"status": 1,
|
||
"to": "00:00"
|
||
}
|
||
]
|
||
},
|
||
"scheduleApprovedSince": "14.04.2026 19:11"
|
||
},
|
||
{
|
||
"createdAt": "13.04.2026 20:28",
|
||
"eventDate": "14.04.2026",
|
||
"queues":
|
||
{
|
||
"5.1":
|
||
[
|
||
{
|
||
"from": "03:00",
|
||
"shutdownHours": "03:00-07:00",
|
||
"status": 1,
|
||
"to": "07:00"
|
||
},
|
||
{
|
||
"from": "11:00",
|
||
"shutdownHours": "11:00-17:30",
|
||
"status": 1,
|
||
"to": "17:30"
|
||
}
|
||
]
|
||
},
|
||
"scheduleApprovedSince": "14.04.2026 09:56"
|
||
}
|
||
]
|
||
}
|
||
"""
|
||
)
|
||
else:
|
||
response = get_GPV_response_from_oe(cfg.ACCOUNT)
|
||
if response.status_code != 200:
|
||
print("code=" + str(response.status_code))
|
||
return STATUS_ERROR_GPV_RESPONSE
|
||
|
||
x = response.json()
|
||
|
||
outage_queue = str(x["current"]["queue"])
|
||
if x["current"]["subQueue"]:
|
||
outage_queue += '.' + str(x["current"]["subQueue"])
|
||
|
||
datas = ''
|
||
if 'queues' in str(x["schedule"]):
|
||
datas = sorted(x["schedule"], key=lambda item: item['eventDate'])
|
||
|
||
for entry in datas:
|
||
hours_off = []
|
||
hours_on = []
|
||
if len(entry) > 0:
|
||
approved_from = 'Запроваджено ' + entry['scheduleApprovedSince']
|
||
event_date = entry['eventDate']
|
||
hours_list = entry['queues'][outage_queue]
|
||
else:
|
||
hours_list = []
|
||
approved_from = 'Hемає даних про дату запровадження'
|
||
event_date = datetime.today().strftime('%Y-%m-%d')
|
||
|
||
if cfg.IS_DEBUG:
|
||
print(f'hours_list{hours_list}, approved_from={approved_from}, event_date={event_date}')
|
||
|
||
for h in hours_list:
|
||
if h["status"] == 1:
|
||
hours_off.append(h["from"])
|
||
hours_on.append(h["to"])
|
||
|
||
if cfg.IS_DEBUG:
|
||
print(f'hours_off = {hours_off}')
|
||
print(f'hours_on = {hours_on}')
|
||
|
||
update_gpv_data(PARSED_GPV_DATAS, GPVData(approved_from=approved_from, event_date=event_date, hours_off=hours_off, hours_on=hours_on, outage_queue=outage_queue))
|
||
|
||
check_and_send_gpv_message()
|
||
return STATUS_SEND_OK
|
||
|
||
|
||
def check_and_send_gpv_message():
|
||
for el in PARSED_GPV_DATAS:
|
||
message = get_GPV_message(el.approved_from, el.event_date, el.hours_off, el.hours_on, el.outage_queue)
|
||
if el.need_to_send and (el.start_ts[-1] + 3600.0 >= datetime.now().timestamp()):
|
||
tg_response = send_message_to_tg_get(message)
|
||
if tg_response == 200:
|
||
el.set_send()
|
||
LOG.debug(f'TG message send {message}')
|
||
else:
|
||
LOG.error(f'TG send error code: {STATUS_ERROR_SEND_TO_TG}')
|
||
|
||
if cfg.USE_MATRIX and not cfg.IS_DEBUG:
|
||
res = send_matrix_notification(message)
|
||
if not res:
|
||
LOG.error(f'MX send error code: {STATUS_ERROR_SEND_TO_MX}')
|
||
|
||
|
||
def update_gpv_data(data_list: List[GPVData], new_data: GPVData):
|
||
# find by TS
|
||
obj = next((obj for obj in data_list if obj.start_ts == new_data.start_ts), None)
|
||
if obj:
|
||
# if exists - update:
|
||
old_data = data_list[data_list.index(obj)]
|
||
if not old_data.need_to_send: # notification already send
|
||
if old_data.start_ts != new_data.start_ts:
|
||
data_list[data_list.index(obj)] = new_data
|
||
LOG.debug(f'Updated element {old_data} in list with new data {new_data}')
|
||
else:
|
||
LOG.debug(f'Element {old_data} was not updated due to the same')
|
||
else:
|
||
data_list[data_list.index(obj)] = new_data
|
||
else:
|
||
# if not exists - add
|
||
data_list.append(new_data)
|
||
LOG.debug(f'New element appended to list {new_data}')
|
||
# data_list SORT
|
||
|
||
# remove OLD entries
|
||
ts = datetime.now().timestamp()
|
||
for el in data_list[:]:
|
||
if el.start_ts and (el.start_ts[-1] + 1800) < ts:
|
||
LOG.debug(f'Old element removed from list {el}')
|
||
data_list.remove(el)
|
||
|
||
|
||
def get_message_with_po(el: POData):
|
||
address = get_address(el.city, el.street, el.house)
|
||
if el.need_to_send:
|
||
message = (
|
||
f'\U000026A0 <b>Увага!</b>\nНа {el.start_date} за адресою {address} заплановано <b>{el.po_type}</b> відключення (заявка від {el.placement_date}).\n'
|
||
f'Причина відключення: {el.reason}\n'
|
||
f'Початок вимкнення <b>{el.start_date} об {el.start_time}</b>\n'
|
||
f'Кінець вимкнення <b>{el.stop_date} об {el.stop_time}</b>')
|
||
else:
|
||
message = f'На {el.start_date} Планових вимкнень немає \U0001F44C \U0001F483'
|
||
|
||
return message
|
||
|
||
|
||
async def process_po():
|
||
if not cfg.IS_DEBUG:
|
||
response = get_PO_response()
|
||
if response.status_code != 200:
|
||
return STATUS_ERROR_PO_RESPONSE
|
||
html = response.text
|
||
parser = etree.HTMLParser()
|
||
root = etree.parse(StringIO(html), parser)
|
||
else:
|
||
# html = "<html><body><div id='shutdowns-table'><div class='table'><div class='flex table-header'><div class='region'>Район</div><div class='city'>Населений пункт</div><div class='street'>Вулиця</div><div class='house_number'>№ буд</div>" \
|
||
# "<div class='building_part_number'>Корпус</div><div class='dispatch_name'>Диспетчерська назва об'єкту</div><div class='type-shutdown'>Вид робіт</div><div class='reason-shutdown'>Причина робіт</div><div class='placement_date'>Дата розміщення</div>" \
|
||
# "<div class='shutdown_date'>Дата і час вимкнення</div><div class='turn_on_date'>Дата і час включення</div><div class='next_date'>Наступна дата робіт</div></div></div><div class='shutdowns'><div><div><div class='table-row flex'><div class='region'>Піднянський район</div>" \
|
||
# "<div class='city'>Гірна</div><div class='street'>Тевського</div><div class='house_number'>9</div><div class='building_part_number'/><div class='dispatch_name'>ДСП 110/Пр."ЗБВIК"/30-413/Л-6</div><div class='type-shutdown'>Планове</div>" \
|
||
# "<div class='reason-shutdown reason-text-container'><div class='reason-text-hide'>Поточний ремонт</div></div><div class='placement_date'><div class='date'>2025.03.24</div><div class='time'>11:50</div></div><div class='shutdown_date'><div class='date'>2026.04.10</div>" \
|
||
# "<div class='time'>09:00</div></div><div class='turn_on_date'><div class='date'>2026.04.10</div><div class='time'>18:00</div></div><div class='next_date'><div class='reason-text-hide'/></div></div></div></div></div></div></body></html>"
|
||
with open('test/data_po_2days_02.html', 'r') as file:
|
||
html = file.read()
|
||
root = etree.fromstring(html)
|
||
|
||
global PARSED_PO_DATAS
|
||
new_data = parse_po_data(root)
|
||
for el in new_data:
|
||
await update_po_data(PARSED_PO_DATAS, el)
|
||
|
||
check_and_send_po_message()
|
||
|
||
return STATUS_SEND_OK
|
||
|
||
|
||
def check_and_send_po_message():
|
||
for el in PARSED_PO_DATAS:
|
||
if el.need_to_send:
|
||
message = get_message_with_po(el)
|
||
tg_response = send_message_to_tg_get(message)
|
||
if tg_response == 200:
|
||
el.set_sent()
|
||
el.set_notified_now()
|
||
LOG.debug(f'TG message send {message}')
|
||
else:
|
||
LOG.error(f'TG send error code: {STATUS_ERROR_SEND_TO_TG}')
|
||
|
||
if cfg.USE_MATRIX:
|
||
res = send_matrix_notification(message)
|
||
if not res:
|
||
LOG.error(f'MX send error code: {STATUS_ERROR_SEND_TO_MX}')
|
||
|
||
|
||
async def update_po_data(data_list:List[POData], new_data: POData):
|
||
# find by TS
|
||
obj = next((obj for obj in data_list if obj.start_ts == new_data.start_ts), None)
|
||
if obj:
|
||
# if exists - update
|
||
old_data = data_list[data_list.index(obj)]
|
||
if not old_data.need_to_send: # notification already send
|
||
if old_data != new_data:
|
||
# update 'need_to_send' to re-send notification every day
|
||
if ((not old_data.notified_at + timedelta(days=1) <= datetime.now()) or
|
||
# or if current hour - is the start data processing hour with firs minute in cfg.PROCESSING_MINUTES list
|
||
(cfg.PROCESS_START_HOUR and datetime.now().strftime('%H') == cfg.PROCESS_START_HOUR and datetime.now().strftime('%M') == cfg.PROCESSING_MINUTES[0])):
|
||
new_data.set_sent()
|
||
new_data.update_notified_at(old_data)
|
||
data_list[data_list.index(obj)] = new_data
|
||
LOG.debug(f'Updated element {old_data} in list with new data {new_data}')
|
||
else:
|
||
LOG.debug(f'Element {old_data} was not updated due to the same')
|
||
else:
|
||
data_list[data_list.index(obj)] = new_data
|
||
else:
|
||
# if not exists - add
|
||
data_list.append(new_data)
|
||
LOG.debug(f'New element appended to list {new_data}')
|
||
# data_list SORT
|
||
|
||
# remove OLD entries
|
||
ts = datetime.now().timestamp()
|
||
for el in data_list[:]:
|
||
if el.start_ts and (el.start_ts + 3600) <= ts:
|
||
LOG.debug(f'Old element removed from list {el}')
|
||
data_list.remove(el)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
pass
|