import asyncio import json import logging import os from pathlib import Path from typing import Optional from dotenv import load_dotenv from telethon import TelegramClient, events from telethon.errors import RPCError from telethon.tl import functions, types from telethon.tl.custom.message import Message BAN_COMMAND = "/вбаннахуй" UNBAN_COMMAND = "/разбан" DATA_PATH = Path("data/blocked_users.json") RECENT_CLEANUP_LIMIT = 50 POOP_REACTION_EMOJI = "💩" ANIMATION_FRAMES = [ "Запускаю анти-режим.", "Запускаю анти-режим..", "Запускаю анти-режим...", "Проверяю цель...", ] class BlocklistStore: def __init__(self, path: Path) -> None: self.path = path self._blocked_user_ids: set[int] = set() self._lock = asyncio.Lock() async def load(self) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) if not self.path.exists(): self.path.write_text('{"blocked_user_ids":[]}\n', encoding="utf-8") return try: content = self.path.read_text(encoding="utf-8") parsed = json.loads(content) if content.strip() else {"blocked_user_ids": []} except json.JSONDecodeError: logging.warning( "Некорректный JSON в %s. Сбрасываю список блокировок.", self.path ) parsed = {"blocked_user_ids": []} raw_ids = parsed.get("blocked_user_ids", []) if not isinstance(raw_ids, list): raw_ids = [] normalized_ids: set[int] = set() for value in raw_ids: if isinstance(value, int): normalized_ids.add(value) continue if isinstance(value, str): try: normalized_ids.add(int(value)) except ValueError: continue self._blocked_user_ids = normalized_ids await self._save() async def contains(self, user_id: int) -> bool: return user_id in self._blocked_user_ids async def add(self, user_id: int) -> bool: async with self._lock: if user_id in self._blocked_user_ids: return False self._blocked_user_ids.add(user_id) await self._save() return True async def remove(self, user_id: int) -> bool: async with self._lock: if user_id not in self._blocked_user_ids: return False self._blocked_user_ids.remove(user_id) await self._save() return True async def _save(self) -> None: payload = {"blocked_user_ids": sorted(self._blocked_user_ids)} self.path.write_text( json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) def normalize_command(text: str) -> Optional[str]: stripped = text.strip() if not stripped.startswith("/"): return None first_token = stripped.split(maxsplit=1)[0] command = first_token.split("@", maxsplit=1)[0].lower() while command.endswith("!"): command = command[:-1] return command async def animate_ban(message: Message, target_user_id: int, is_new: bool) -> None: for frame in ANIMATION_FRAMES: try: await message.edit(frame) except Exception: logging.exception("Не удалось обновить кадр анимации.") break await asyncio.sleep(0.35) if is_new: final_text = ( f"Готово. ID {target_user_id} добавлен в автоудаление.\n" "Новые сообщения этого пользователя в группах удаляются только у вас." ) else: final_text = ( f"ID {target_user_id} уже был в автоудалении.\n" "Новые сообщения этого пользователя в группах удаляются только у вас." ) try: await message.edit(final_text) except Exception: logging.exception("Не удалось отправить финальный статус анимации.") async def delete_message_best_effort( client: TelegramClient, message: Message ) -> tuple[bool, Optional[str]]: last_error: Optional[str] = None try: await message.delete(revoke=False) return True, None except Exception as exc: last_error = f"{type(exc).__name__}: {exc}" logging.debug( "Удаление через message.delete(revoke=False) не сработало: %s", exc ) try: target = message.input_chat if message.input_chat else message.chat_id await client.delete_messages(target, [message.id], revoke=False) return True, None except Exception as exc: last_error = f"{type(exc).__name__}: {exc}" logging.debug( "Удаление через client.delete_messages(..., revoke=False) не сработало: %s", exc, ) try: await message.delete() return True, None except RPCError as exc: last_error = f"{type(exc).__name__}: {exc}" logging.warning( "Не удалось удалить сообщение %s в чате %s: %s", message.id, message.chat_id, exc, ) except Exception: last_error = "UnexpectedError" logging.exception( "Неожиданная ошибка удаления сообщения %s в чате %s.", message.id, message.chat_id, ) return False, last_error async def react_with_poop_best_effort(client: TelegramClient, message: Message) -> None: if message.id is None: return try: peer = message.input_chat if message.input_chat else message.chat_id await client( functions.messages.SendReactionRequest( peer=peer, msg_id=message.id, big=False, add_to_recent=False, reaction=[types.ReactionEmoji(emoticon=POOP_REACTION_EMOJI)], ) ) except Exception as exc: logging.debug( "Не удалось поставить реакцию %s на сообщение %s: %s", POOP_REACTION_EMOJI, message.id, exc, ) async def delete_message_with_retries( client: TelegramClient, message: Message, retries: tuple[float, ...] = (0.0, 0.7, 2.0), ) -> tuple[bool, Optional[str]]: await react_with_poop_best_effort(client, message) last_error: Optional[str] = None for delay in retries: if delay > 0: await asyncio.sleep(delay) deleted, error = await delete_message_best_effort(client, message) if deleted: return True, None last_error = error return False, last_error async def cleanup_recent_messages_from_user( client: TelegramClient, chat_id: int, user_id: int, limit: int = RECENT_CLEANUP_LIMIT ) -> int: deleted_count = 0 async for message in client.iter_messages(chat_id, from_user=user_id, limit=limit): if message.id is None: continue deleted, _ = await delete_message_with_retries(client, message) if deleted: deleted_count += 1 return deleted_count def is_permission_like_delete_error(error_text: Optional[str]) -> bool: if not error_text: return False normalized = error_text.upper() markers = ( "CHATADMINREQUIRED", "CHAT_ADMIN_REQUIRED", "MESSAGEDELETEFORBIDDEN", "MESSAGE_DELETE_FORBIDDEN", "RIGHT_FORBIDDEN", ) return any(marker in normalized for marker in markers) def load_settings() -> tuple[int, str, str]: load_dotenv() api_id_raw = os.getenv("API_ID", "").strip() api_hash = os.getenv("API_HASH", "").strip() session_name = os.getenv("SESSION_NAME", "antimichaell").strip() if not api_id_raw or not api_hash: raise RuntimeError( "Нужно задать API_ID и API_HASH в .env (или переменных окружения)." ) try: api_id = int(api_id_raw) except ValueError as exc: raise RuntimeError("API_ID должен быть целым числом.") from exc return api_id, api_hash, session_name async def main() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", ) api_id, api_hash, session_name = load_settings() blocklist = BlocklistStore(DATA_PATH) await blocklist.load() notified_delete_failures: set[tuple[int, int]] = set() client = TelegramClient(session_name, api_id, api_hash) @client.on(events.NewMessage(outgoing=True)) async def handle_commands(event: events.NewMessage.Event) -> None: command = normalize_command(event.raw_text or "") if command not in {BAN_COMMAND, UNBAN_COMMAND}: return if not event.is_reply: await event.reply( "Эта команда работает только как ответ на сообщение нужного пользователя." ) return replied_message = await event.get_reply_message() if replied_message is None or replied_message.sender_id is None: await event.reply("Не удалось определить автора исходного сообщения.") return target_user_id = int(replied_message.sender_id) if command == BAN_COMMAND: status_message = await event.reply("Подготовка...") added = await blocklist.add(target_user_id) await animate_ban(status_message, target_user_id, added) chat = await event.get_chat() if isinstance(chat, types.Channel) and getattr(chat, "megagroup", False): await event.reply( "Важно: это супергруппа. В Telegram API локальное удаление чужих " "сообщений может быть недоступно без прав администратора." ) cleaned = await cleanup_recent_messages_from_user( client, event.chat_id, target_user_id, ) if cleaned > 0: await event.reply( f"Дополнительно очищено последних сообщений в этом чате: {cleaned}." ) return removed = await blocklist.remove(target_user_id) if removed: await event.reply( f"Готово. ID {target_user_id} удален из автоудаления.\n" "Новые сообщения этого пользователя больше не скрываются." ) else: await event.reply( f"ID {target_user_id} не был в списке автоудаления." ) @client.on(events.NewMessage(incoming=True)) async def auto_delete_target_messages(event: events.NewMessage.Event) -> None: if event.is_private: return if event.is_channel and not event.is_group: return sender_id = event.sender_id if sender_id is None: return if not await blocklist.contains(int(sender_id)): return deleted, error = await delete_message_with_retries(client, event.message) if not deleted: logging.warning( "Пропуск удаления: сообщение %s от пользователя %s в чате %s.", event.id, sender_id, event.chat_id, ) if is_permission_like_delete_error(error): key = (int(event.chat_id), int(sender_id)) if key not in notified_delete_failures: notified_delete_failures.add(key) try: await client.send_message( "me", "Не удалось удалить сообщение по автофильтру.\n" f"Чат: `{event.chat_id}`\n" f"Пользователь: `{sender_id}`\n" f"Причина: `{error}`\n" "Обычно это ограничение прав/типа чата в Telegram.", ) except Exception: logging.exception("Не удалось отправить уведомление в Избранное.") await client.start() me = await client.get_me() logging.info("Юзербот запущен. Аккаунт ID: %s", me.id if me else "unknown") await client.run_until_disconnected() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Остановка по Ctrl+C.")