diff --git a/main.py b/main.py index ec16869..2e1373e 100644 --- a/main.py +++ b/main.py @@ -1,25 +1,88 @@ import asyncio from asyncio import Lock import asyncpg -from asyncpg import create_pool -from aiogram import Bot, Dispatcher, types, F -from aiogram.filters import Command -from aiogram.types import ReplyKeyboardMarkup, KeyboardButton, ReplyKeyboardRemove -from dotenv import load_dotenv -import uuid -import hashlib -import ast import os +import uuid import logging +from aiogram import Bot, Dispatcher, F, types +from aiogram.types import ReplyKeyboardMarkup, KeyboardButton, ReplyKeyboardRemove +from aiogram.filters import Command +from dotenv import load_dotenv +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad +import base64 -logging.basicConfig(level=logging.DEBUG) # DEBUG показывает максимум информации +logging.basicConfig(level=logging.DEBUG) # Загрузка данных из .env load_dotenv() -db_lock = Lock() +# Добавляем импорты для шифрования и дешифрования +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad +import base64 +ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY") +if not ENCRYPTION_KEY: + logging.error("ENCRYPTION_KEY не задан!") + raise ValueError("ENCRYPTION_KEY обязателен") +# Преобразуем ключ в bytes. Убедитесь, что длина ключа корректна (16, 24 или 32 байта) +ENCRYPTION_KEY = ENCRYPTION_KEY.encode() + +def encrypt_telegram_id(telegram_id: str) -> str: + # Используем AES в режиме ECB для детерминированного шифрования + cipher = AES.new(ENCRYPTION_KEY, AES.MODE_ECB) + ct_bytes = cipher.encrypt(pad(telegram_id.encode(), AES.block_size)) + return base64.b64encode(ct_bytes).decode('utf-8') + +def decrypt_telegram_id(enc_telegram_id: str) -> str: + ct_bytes = base64.b64decode(enc_telegram_id) + cipher = AES.new(ENCRYPTION_KEY, AES.MODE_ECB) + pt = unpad(cipher.decrypt(ct_bytes), AES.block_size) + return pt.decode('utf-8') + +db_lock = Lock() db_pool2 = None +retry_queue = [] + +async def process_retry_queue(): + global retry_queue + while True: + # Копируем очередь для итерирования + for item in retry_queue.copy(): + send_method = item.get('send_method') + kwargs = item.get('kwargs') + try: + await send_method(**kwargs) + retry_queue.remove(item) + logging.info("Сообщение успешно отправлено из очереди.") + except Exception as e: + # Если ошибка говорит о том, что тред не найден - удаляем сообщение из очереди + if "message thread not found" in str(e): + retry_queue.remove(item) + logging.info(f"Удаляем сообщение из очереди, т.к. не найден тред: {e}") + else: + logging.error(f"Не удалось отправить сообщение из очереди: {e}") + await asyncio.sleep(10) + +async def safe_send(send_method, **kwargs): + try: + return await send_method(**kwargs) + except Exception as e: + logging.error(f"Ошибка отправки, сообщение сохранено для повторной отправки: {e}") + retry_queue.append({ + 'send_method': send_method, + 'kwargs': kwargs, + }) + # Пытаемся уведомить пользователя о проблемах, если указан chat_id + if 'chat_id' in kwargs: + try: + await bot.send_message(chat_id=kwargs['chat_id'], + text="Сейчас возникли проблемы с сетью. " + "Ваше сообщение будет отправлено, как только связь восстановится.") + except Exception as inner: + logging.error(f"Не удалось отправить уведомление пользователю: {inner}") + return None # Асинхронная функция для логирования состояния пула @@ -35,7 +98,9 @@ async def log_pool_state(): DATABASE_URL = os.getenv("DATABASE_URL") - +BOT_TOKEN = os.getenv("BOT_TOKEN") +GROUP_ID = os.getenv("GROUP_ID") +TABLE_NAME = os.getenv("TABLE_NAME", "an_users") # Асинхронная функция для создания пула подключения async def get_db_pool2(): @@ -49,26 +114,28 @@ async def get_db_pool2(): raise -BOT_TOKEN = os.getenv("BOT_TOKEN") -GROUP_ID = os.getenv("GROUP_ID") # Telegram ID администратора - -# Инициализация бота и диспетчера +# Создание бота bot = Bot(token=BOT_TOKEN) + +# Создание диспетчера без передачи бота dp = Dispatcher() + # Регистрация пользователя с созданием нового топика -async def register_user(telegram_id): +async def register_user(telegram_id: str): """ Регистрирует пользователя, создавая анонимный ID и топик для взаимодействия. """ try: logging.info(f"Регистрация пользователя с Telegram ID: {telegram_id}") + # Шифруем Telegram ID для использования в базе данных + encrypted_id = encrypt_telegram_id(str(telegram_id)) async with db_pool2.acquire() as conn: # Проверяем, зарегистрирован ли пользователь result = await conn.fetchrow( - "SELECT anon_id, topic_id FROM an_users WHERE telegram_id = $1", - telegram_id, + f"SELECT anon_id, topic_id FROM {TABLE_NAME} WHERE telegram_id = $1", + encrypted_id, ) if result: logging.info(f"Пользователь {telegram_id} уже зарегистрирован.") @@ -84,10 +151,10 @@ async def register_user(telegram_id): ) topic_id = topic_result.message_thread_id - # Сохранение в базе данных + # Сохранение зашифрованного Telegram ID в базе данных await conn.execute( - "INSERT INTO an_users (telegram_id, anon_id, topic_id) VALUES ($1, $2, $3)", - telegram_id, + f"INSERT INTO {TABLE_NAME} (telegram_id, anon_id, topic_id) VALUES ($1, $2, $3)", + encrypted_id, anon_id, topic_id, ) @@ -105,9 +172,9 @@ async def register_user(telegram_id): async def get_telegram_id(anon_id): async with db_pool2.acquire() as conn: result = await conn.fetchrow( - "SELECT telegram_id FROM an_users WHERE anon_id = $1", anon_id + f"SELECT telegram_id FROM {TABLE_NAME} WHERE anon_id = $1", anon_id ) - return result["telegram_id"] if result else None + return decrypt_telegram_id(result["telegram_id"]) if result else None @dp.message(Command("start")) @@ -141,15 +208,15 @@ async def fill_form(message: types.Message): async def contact_volunteer(message: types.Message): try: - # Получаем номер строки в базе + # Получаем номер строки в базе, используя зашифрованный Telegram ID async with db_pool2.acquire() as conn: + encrypted_id = encrypt_telegram_id(str(message.from_user.id)) result = await conn.fetchrow( - "SELECT id FROM an_users WHERE telegram_id = $1", message.from_user.id + f"SELECT id FROM {TABLE_NAME} WHERE telegram_id = $1", encrypted_id ) if result: user_number = result["id"] - topic_name = f"{user_number}" await message.answer( f"Добро пожаловать! Здесь ты можешь написать нам. Ты будешь общаться с администраторами через бота, поэтому ты останешься для них анонимными.", @@ -162,6 +229,9 @@ async def contact_volunteer(message: types.Message): # Обработчик сообщений от пользователя @dp.message(F.chat.type == "private") async def handle_user_message(message: types.Message): + if not message.from_user: + logging.error("Отсутствует информация о пользователе") + return logging.info( f"Сообщение от {message.from_user.id}: {message.text or 'мультимедиа'}" ) @@ -170,61 +240,67 @@ async def handle_user_message(message: types.Message): anon_id, topic_id = await register_user(message.from_user.id) try: - # Формируем идентификатор для анонимности + + # Формируем идентификатор для анонимности user_tag = f"Сообщение от {str(anon_id)[:4]}:" # Отправляем разные типы сообщений if message.text: - forward_message = f"{user_tag}\n{message.text}" - await bot.send_message( - chat_id=GROUP_ID, message_thread_id=topic_id, text=forward_message + forward_message = f"{message.text}" + await safe_send( + bot.send_message, + chat_id=GROUP_ID, + message_thread_id=topic_id, + text=forward_message ) elif message.photo: - await bot.send_photo( + await safe_send( + bot.send_photo, chat_id=GROUP_ID, message_thread_id=topic_id, photo=message.photo[-1].file_id, caption=f"{user_tag}\n{message.caption or ''}", ) elif message.video: - await bot.send_video( + await safe_send( + bot.send_video, chat_id=GROUP_ID, message_thread_id=topic_id, video=message.video.file_id, caption=f"{user_tag}\n{message.caption or ''}", ) elif message.document: - await bot.send_document( + await safe_send( + bot.send_document, chat_id=GROUP_ID, message_thread_id=topic_id, document=message.document.file_id, caption=f"{user_tag}\n{message.caption or ''}", ) elif message.audio: - await bot.send_audio( + await safe_send( + bot.send_audio, chat_id=GROUP_ID, message_thread_id=topic_id, audio=message.audio.file_id, caption=f"{user_tag}\n{message.caption or ''}", ) elif message.voice: - await bot.send_voice( + await safe_send( + bot.send_voice, chat_id=GROUP_ID, message_thread_id=topic_id, voice=message.voice.file_id, caption=user_tag, ) else: - await bot.send_message( + await safe_send( + bot.send_message, chat_id=GROUP_ID, message_thread_id=topic_id, text=f"{user_tag}\nТип сообщения пока не поддерживается.", ) - # Уведомляем пользователя - await message.answer("Сообщение отправлено!") - logging.info(f"Сообщение от {message.from_user.id} успешно переслано в топик.") - except Exception as e: logging.error(f"Ошибка при обработке сообщения от пользователя: {e}") await message.answer("Произошла ошибка при отправке сообщения.") @@ -257,15 +333,13 @@ async def handle_admin_reply(message: types.Message): # Обработка редактирования сообщений администратора @dp.edited_message(F.chat.type.in_(["group", "supergroup"])) async def handle_admin_edited_message(message: types.Message): - """ - Обрабатывает редактированные сообщения от администратора в топиках группы. - """ + topic_id = message.message_thread_id # Проверяем, существует ли topic_id в базе async with db_pool2.acquire() as conn: result = await conn.fetchrow( - "SELECT telegram_id FROM an_users WHERE topic_id = $1", topic_id + f"SELECT telegram_id FROM {TABLE_NAME} WHERE topic_id = $1", topic_id ) if not result: @@ -289,49 +363,61 @@ async def process_admin_message(message: types.Message): # Получаем topic_id из текущего чата topic_id = message.message_thread_id - # Находим telegram_id пользователя, связанного с этим topic_id + # Находим зашифрованный Telegram ID пользователя, связанного с этим topic_id async with db_pool2.acquire() as conn: result = await conn.fetchrow( - "SELECT telegram_id FROM an_users WHERE topic_id = $1", topic_id + f"SELECT telegram_id FROM {TABLE_NAME} WHERE topic_id = $1", topic_id ) - try: - telegram_id = result["telegram_id"] + if not result: + logging.error("Пользователь с данным topic_id не найден") + return + # Дешифруем Telegram ID для последующего использования + telegram_id = decrypt_telegram_id(result["telegram_id"]) + + try: # Пересылаем сообщение пользователю в зависимости от типа контента if message.photo: - await bot.send_photo( + await safe_send( + bot.send_photo, chat_id=telegram_id, photo=message.photo[-1].file_id, caption=message.caption, ) elif message.video: - await bot.send_video( + await safe_send( + bot.send_video, chat_id=telegram_id, video=message.video.file_id, caption=message.caption, ) elif message.document: - await bot.send_document( + await safe_send( + bot.send_document, chat_id=telegram_id, document=message.document.file_id, caption=message.caption, ) elif message.audio: - await bot.send_audio( + await safe_send( + bot.send_audio, chat_id=telegram_id, audio=message.audio.file_id, caption=message.caption, ) elif message.voice: - await bot.send_voice( + await safe_send( + bot.send_voice, chat_id=telegram_id, voice=message.voice.file_id, caption=message.caption, ) elif message.text: - await bot.send_message( - chat_id=telegram_id, text=f"Ответ волонтёр_ки:\n\n{message.text}" + await safe_send( + bot.send_message, + chat_id=telegram_id, + text=f"\n\n{message.text}" ) logging.info(f"Ответ успешно отправлен пользователю с ID {telegram_id}") @@ -354,6 +440,10 @@ async def main(): logging.info("Подключение к базе данных успешно установлено") await log_pool_state() # Логирование состояния пула + + # Запускаем фоновую задачу для повторной отправки сообщений + asyncio.create_task(process_retry_queue()) + await dp.start_polling(bot) except Exception as e: logging.error(f"Ошибка при подключении к базе данных: {e}")