From d539eab31dd5ca3f601440537e833dd7f37aa55d Mon Sep 17 00:00:00 2001 From: sadtweenk Date: Mon, 20 Apr 2026 16:31:31 +0300 Subject: [PATCH] feat: add telethon userbot with ban/unban auto-delete --- .env.example | 3 + README.md | 41 +++++++ bot.py | 229 ++++++++++++++++++++++++++++++++++++++++ data/blocked_users.json | 1 + requirements.txt | 2 + 5 files changed, 276 insertions(+) create mode 100644 .env.example create mode 100644 bot.py create mode 100644 data/blocked_users.json create mode 100644 requirements.txt diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..12bfa7e --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +API_ID=123456 +API_HASH=0123456789abcdef0123456789abcdef +SESSION_NAME=antimichaell diff --git a/README.md b/README.md index bfeb8a0..9f983a3 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,43 @@ # AntiMichaell +Юзербот на `Telethon`, который: +- по команде-ответу добавляет пользователя в список автоудаления; +- удаляет его новые сообщения в любых группах только у вас; +- умеет снимать автоудаление (разбан) для конкретного пользователя. + +## Функции +- Команда бана: `/ВБАННАХУЙ` (или `/ВБАННАХУЙ!`, регистр не важен). +- Команда разбана: `/РАЗБАН` (тоже можно с `!`). +- Обе команды работают только как `reply` на сообщение нужного пользователя. +- Список ID хранится в `data/blocked_users.json` и сохраняется между перезапусками. + +## Установка +1. Установите Python 3.10+. +2. Создайте и активируйте виртуальное окружение: + - macOS/Linux: + - `python3 -m venv .venv` + - `source .venv/bin/activate` + - Windows (PowerShell): + - `py -m venv .venv` + - `.venv\Scripts\Activate.ps1` +3. Установите зависимости: + - `pip install -r requirements.txt` +4. Создайте `.env` из примера: + - `cp .env.example .env` +5. Заполните `.env`: + - `API_ID` и `API_HASH` берутся на `https://my.telegram.org` + - `SESSION_NAME` можно оставить по умолчанию. + +## Запуск +- `python bot.py` +- При первом запуске Telethon попросит номер телефона, код и при необходимости 2FA-пароль. + +## Как пользоваться +1. В группе ответьте на сообщение пользователя командой `/ВБАННАХУЙ!`. +2. Юзербот покажет короткую анимацию и добавит ID пользователя в автоудаление. +3. Новые сообщения этого пользователя в группах будут удаляться только у вас. +4. Чтобы отключить автоудаление, ответьте на его сообщение командой `/РАЗБАН`. + +## Ограничения +- Удаление "только у вас" работает в формате `best effort` и зависит от ограничений Telegram API. +- Если Telegram ограничивает удаление в конкретном типе чата, бот залогирует ошибку и продолжит работу. diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..e90e41b --- /dev/null +++ b/bot.py @@ -0,0 +1,229 @@ +import asyncio +import json +import logging +import os +from pathlib import Path +from typing import Optional + +from dotenv import load_dotenv +from telethon import TelegramClient, events +from telethon.tl.custom.message import Message + +BAN_COMMAND = "/вбаннахуй" +UNBAN_COMMAND = "/разбан" +DATA_PATH = Path("data/blocked_users.json") + +ANIMATION_FRAMES = [ + "Запускаю анти-режим.", + "Запускаю анти-режим..", + "Запускаю анти-режим...", + "Проверяю цель...", +] + + +class BlocklistStore: + def __init__(self, path: Path) -> None: + self.path = path + self._blocked_user_ids: set[int] = set() + self._lock = asyncio.Lock() + + async def load(self) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + if not self.path.exists(): + self.path.write_text('{"blocked_user_ids":[]}\n', encoding="utf-8") + return + + try: + content = self.path.read_text(encoding="utf-8") + parsed = json.loads(content) if content.strip() else {"blocked_user_ids": []} + except json.JSONDecodeError: + logging.warning( + "Некорректный JSON в %s. Сбрасываю список блокировок.", self.path + ) + parsed = {"blocked_user_ids": []} + + raw_ids = parsed.get("blocked_user_ids", []) + if not isinstance(raw_ids, list): + raw_ids = [] + + normalized_ids: set[int] = set() + for value in raw_ids: + if isinstance(value, int): + normalized_ids.add(value) + continue + if isinstance(value, str): + try: + normalized_ids.add(int(value)) + except ValueError: + continue + + self._blocked_user_ids = normalized_ids + await self._save() + + async def contains(self, user_id: int) -> bool: + return user_id in self._blocked_user_ids + + async def add(self, user_id: int) -> bool: + async with self._lock: + if user_id in self._blocked_user_ids: + return False + self._blocked_user_ids.add(user_id) + await self._save() + return True + + async def remove(self, user_id: int) -> bool: + async with self._lock: + if user_id not in self._blocked_user_ids: + return False + self._blocked_user_ids.remove(user_id) + await self._save() + return True + + async def _save(self) -> None: + payload = {"blocked_user_ids": sorted(self._blocked_user_ids)} + self.path.write_text( + json.dumps(payload, ensure_ascii=False, indent=2) + "\n", + encoding="utf-8", + ) + + +def normalize_command(text: str) -> Optional[str]: + stripped = text.strip() + if not stripped.startswith("/"): + return None + + first_token = stripped.split(maxsplit=1)[0] + command = first_token.split("@", maxsplit=1)[0].lower() + + while command.endswith("!"): + command = command[:-1] + + return command + + +async def animate_ban(message: Message, target_user_id: int, is_new: bool) -> None: + for frame in ANIMATION_FRAMES: + try: + await message.edit(frame) + except Exception: + logging.exception("Не удалось обновить кадр анимации.") + break + await asyncio.sleep(0.35) + + if is_new: + final_text = ( + f"Готово. ID {target_user_id} добавлен в автоудаление.\n" + "Новые сообщения этого пользователя в группах удаляются только у вас." + ) + else: + final_text = ( + f"ID {target_user_id} уже был в автоудалении.\n" + "Новые сообщения этого пользователя в группах удаляются только у вас." + ) + + try: + await message.edit(final_text) + except Exception: + logging.exception("Не удалось отправить финальный статус анимации.") + + +def load_settings() -> tuple[int, str, str]: + load_dotenv() + api_id_raw = os.getenv("API_ID", "").strip() + api_hash = os.getenv("API_HASH", "").strip() + session_name = os.getenv("SESSION_NAME", "antimichaell").strip() + + if not api_id_raw or not api_hash: + raise RuntimeError( + "Нужно задать API_ID и API_HASH в .env (или переменных окружения)." + ) + + try: + api_id = int(api_id_raw) + except ValueError as exc: + raise RuntimeError("API_ID должен быть целым числом.") from exc + + return api_id, api_hash, session_name + + +async def main() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)s | %(message)s", + ) + + api_id, api_hash, session_name = load_settings() + blocklist = BlocklistStore(DATA_PATH) + await blocklist.load() + + client = TelegramClient(session_name, api_id, api_hash) + + @client.on(events.NewMessage(outgoing=True)) + async def handle_commands(event: events.NewMessage.Event) -> None: + command = normalize_command(event.raw_text or "") + if command not in {BAN_COMMAND, UNBAN_COMMAND}: + return + + if not event.is_reply: + await event.reply( + "Эта команда работает только как ответ на сообщение нужного пользователя." + ) + return + + replied_message = await event.get_reply_message() + if replied_message is None or replied_message.sender_id is None: + await event.reply("Не удалось определить автора исходного сообщения.") + return + + target_user_id = int(replied_message.sender_id) + + if command == BAN_COMMAND: + status_message = await event.reply("Подготовка...") + added = await blocklist.add(target_user_id) + await animate_ban(status_message, target_user_id, added) + return + + removed = await blocklist.remove(target_user_id) + if removed: + await event.reply( + f"Готово. ID {target_user_id} удален из автоудаления.\n" + "Новые сообщения этого пользователя больше не скрываются." + ) + else: + await event.reply( + f"ID {target_user_id} не был в списке автоудаления." + ) + + @client.on(events.NewMessage(incoming=True)) + async def auto_delete_target_messages(event: events.NewMessage.Event) -> None: + if not event.is_group: + return + + sender_id = event.sender_id + if sender_id is None: + return + + if not await blocklist.contains(int(sender_id)): + return + + try: + await client.delete_messages(event.chat_id, [event.id], revoke=False) + except Exception: + logging.exception( + "Не удалось удалить сообщение %s от пользователя %s в чате %s.", + event.id, + sender_id, + event.chat_id, + ) + + await client.start() + me = await client.get_me() + logging.info("Юзербот запущен. Аккаунт ID: %s", me.id if me else "unknown") + await client.run_until_disconnected() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Остановка по Ctrl+C.") diff --git a/data/blocked_users.json b/data/blocked_users.json new file mode 100644 index 0000000..5c3a1d2 --- /dev/null +++ b/data/blocked_users.json @@ -0,0 +1 @@ +{"blocked_user_ids":[]} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..24857bf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +telethon>=1.41.0,<2.0.0 +python-dotenv>=1.0.0,<2.0.0