Files
AntiMichaell/bot.py
2026-04-20 16:53:38 +03:00

385 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from telethon import TelegramClient, events
from telethon.errors import RPCError
from telethon.tl import functions, types
from telethon.tl.custom.message import Message
BAN_COMMAND = "/вбаннахуй"
UNBAN_COMMAND = "/разбан"
DATA_PATH = Path("data/blocked_users.json")
RECENT_CLEANUP_LIMIT = 50
POOP_REACTION_EMOJI = "💩"
ANIMATION_FRAMES = [
"Запускаю анти-режим.",
"Запускаю анти-режим..",
"Запускаю анти-режим...",
"Проверяю цель...",
]
class BlocklistStore:
def __init__(self, path: Path) -> None:
self.path = path
self._blocked_user_ids: set[int] = set()
self._lock = asyncio.Lock()
async def load(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
if not self.path.exists():
self.path.write_text('{"blocked_user_ids":[]}\n', encoding="utf-8")
return
try:
content = self.path.read_text(encoding="utf-8")
parsed = json.loads(content) if content.strip() else {"blocked_user_ids": []}
except json.JSONDecodeError:
logging.warning(
"Некорректный JSON в %s. Сбрасываю список блокировок.", self.path
)
parsed = {"blocked_user_ids": []}
raw_ids = parsed.get("blocked_user_ids", [])
if not isinstance(raw_ids, list):
raw_ids = []
normalized_ids: set[int] = set()
for value in raw_ids:
if isinstance(value, int):
normalized_ids.add(value)
continue
if isinstance(value, str):
try:
normalized_ids.add(int(value))
except ValueError:
continue
self._blocked_user_ids = normalized_ids
await self._save()
async def contains(self, user_id: int) -> bool:
return user_id in self._blocked_user_ids
async def add(self, user_id: int) -> bool:
async with self._lock:
if user_id in self._blocked_user_ids:
return False
self._blocked_user_ids.add(user_id)
await self._save()
return True
async def remove(self, user_id: int) -> bool:
async with self._lock:
if user_id not in self._blocked_user_ids:
return False
self._blocked_user_ids.remove(user_id)
await self._save()
return True
async def _save(self) -> None:
payload = {"blocked_user_ids": sorted(self._blocked_user_ids)}
self.path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
def normalize_command(text: str) -> Optional[str]:
stripped = text.strip()
if not stripped.startswith("/"):
return None
first_token = stripped.split(maxsplit=1)[0]
command = first_token.split("@", maxsplit=1)[0].lower()
while command.endswith("!"):
command = command[:-1]
return command
async def animate_ban(message: Message, target_user_id: int, is_new: bool) -> None:
for frame in ANIMATION_FRAMES:
try:
await message.edit(frame)
except Exception:
logging.exception("Не удалось обновить кадр анимации.")
break
await asyncio.sleep(0.35)
if is_new:
final_text = (
f"Готово. ID {target_user_id} добавлен в автоудаление.\n"
"Новые сообщения этого пользователя в группах удаляются только у вас."
)
else:
final_text = (
f"ID {target_user_id} уже был в автоудалении.\n"
"Новые сообщения этого пользователя в группах удаляются только у вас."
)
try:
await message.edit(final_text)
except Exception:
logging.exception("Не удалось отправить финальный статус анимации.")
async def delete_message_best_effort(
client: TelegramClient, message: Message
) -> tuple[bool, Optional[str]]:
last_error: Optional[str] = None
try:
await message.delete(revoke=False)
return True, None
except Exception as exc:
last_error = f"{type(exc).__name__}: {exc}"
logging.debug(
"Удаление через message.delete(revoke=False) не сработало: %s", exc
)
try:
target = message.input_chat if message.input_chat else message.chat_id
await client.delete_messages(target, [message.id], revoke=False)
return True, None
except Exception as exc:
last_error = f"{type(exc).__name__}: {exc}"
logging.debug(
"Удаление через client.delete_messages(..., revoke=False) не сработало: %s",
exc,
)
try:
await message.delete()
return True, None
except RPCError as exc:
last_error = f"{type(exc).__name__}: {exc}"
logging.warning(
"Не удалось удалить сообщение %s в чате %s: %s",
message.id,
message.chat_id,
exc,
)
except Exception:
last_error = "UnexpectedError"
logging.exception(
"Неожиданная ошибка удаления сообщения %s в чате %s.",
message.id,
message.chat_id,
)
return False, last_error
async def react_with_poop_best_effort(client: TelegramClient, message: Message) -> None:
if message.id is None:
return
try:
peer = message.input_chat if message.input_chat else message.chat_id
await client(
functions.messages.SendReactionRequest(
peer=peer,
msg_id=message.id,
big=False,
add_to_recent=False,
reaction=[types.ReactionEmoji(emoticon=POOP_REACTION_EMOJI)],
)
)
except Exception as exc:
logging.debug(
"Не удалось поставить реакцию %s на сообщение %s: %s",
POOP_REACTION_EMOJI,
message.id,
exc,
)
async def delete_message_with_retries(
client: TelegramClient,
message: Message,
retries: tuple[float, ...] = (0.0, 0.7, 2.0),
) -> tuple[bool, Optional[str]]:
await react_with_poop_best_effort(client, message)
last_error: Optional[str] = None
for delay in retries:
if delay > 0:
await asyncio.sleep(delay)
deleted, error = await delete_message_best_effort(client, message)
if deleted:
return True, None
last_error = error
return False, last_error
async def cleanup_recent_messages_from_user(
client: TelegramClient, chat_id: int, user_id: int, limit: int = RECENT_CLEANUP_LIMIT
) -> int:
deleted_count = 0
async for message in client.iter_messages(chat_id, from_user=user_id, limit=limit):
if message.id is None:
continue
deleted, _ = await delete_message_with_retries(client, message)
if deleted:
deleted_count += 1
return deleted_count
def is_permission_like_delete_error(error_text: Optional[str]) -> bool:
if not error_text:
return False
normalized = error_text.upper()
markers = (
"CHATADMINREQUIRED",
"CHAT_ADMIN_REQUIRED",
"MESSAGEDELETEFORBIDDEN",
"MESSAGE_DELETE_FORBIDDEN",
"RIGHT_FORBIDDEN",
)
return any(marker in normalized for marker in markers)
def load_settings() -> tuple[int, str, str]:
load_dotenv()
api_id_raw = os.getenv("API_ID", "").strip()
api_hash = os.getenv("API_HASH", "").strip()
session_name = os.getenv("SESSION_NAME", "antimichaell").strip()
if not api_id_raw or not api_hash:
raise RuntimeError(
"Нужно задать API_ID и API_HASH в .env (или переменных окружения)."
)
try:
api_id = int(api_id_raw)
except ValueError as exc:
raise RuntimeError("API_ID должен быть целым числом.") from exc
return api_id, api_hash, session_name
async def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
api_id, api_hash, session_name = load_settings()
blocklist = BlocklistStore(DATA_PATH)
await blocklist.load()
notified_delete_failures: set[tuple[int, int]] = set()
client = TelegramClient(session_name, api_id, api_hash)
@client.on(events.NewMessage(outgoing=True))
async def handle_commands(event: events.NewMessage.Event) -> None:
command = normalize_command(event.raw_text or "")
if command not in {BAN_COMMAND, UNBAN_COMMAND}:
return
if not event.is_reply:
await event.reply(
"Эта команда работает только как ответ на сообщение нужного пользователя."
)
return
replied_message = await event.get_reply_message()
if replied_message is None or replied_message.sender_id is None:
await event.reply("Не удалось определить автора исходного сообщения.")
return
target_user_id = int(replied_message.sender_id)
if command == BAN_COMMAND:
status_message = await event.reply("Подготовка...")
added = await blocklist.add(target_user_id)
await animate_ban(status_message, target_user_id, added)
chat = await event.get_chat()
if isinstance(chat, types.Channel) and getattr(chat, "megagroup", False):
await event.reply(
"Важно: это супергруппа. В Telegram API локальное удаление чужих "
"сообщений может быть недоступно без прав администратора."
)
cleaned = await cleanup_recent_messages_from_user(
client,
event.chat_id,
target_user_id,
)
if cleaned > 0:
await event.reply(
f"Дополнительно очищено последних сообщений в этом чате: {cleaned}."
)
return
removed = await blocklist.remove(target_user_id)
if removed:
await event.reply(
f"Готово. ID {target_user_id} удален из автоудаления.\n"
"Новые сообщения этого пользователя больше не скрываются."
)
else:
await event.reply(
f"ID {target_user_id} не был в списке автоудаления."
)
@client.on(events.NewMessage(incoming=True))
async def auto_delete_target_messages(event: events.NewMessage.Event) -> None:
if event.is_private:
return
if event.is_channel and not event.is_group:
return
sender_id = event.sender_id
if sender_id is None:
return
if not await blocklist.contains(int(sender_id)):
return
deleted, error = await delete_message_with_retries(client, event.message)
if not deleted:
logging.warning(
"Пропуск удаления: сообщение %s от пользователя %s в чате %s.",
event.id,
sender_id,
event.chat_id,
)
if is_permission_like_delete_error(error):
key = (int(event.chat_id), int(sender_id))
if key not in notified_delete_failures:
notified_delete_failures.add(key)
try:
await client.send_message(
"me",
"Не удалось удалить сообщение по автофильтру.\n"
f"Чат: `{event.chat_id}`\n"
f"Пользователь: `{sender_id}`\n"
f"Причина: `{error}`\n"
"Обычно это ограничение прав/типа чата в Telegram.",
)
except Exception:
logging.exception("Не удалось отправить уведомление в Избранное.")
await client.start()
me = await client.get_me()
logging.info("Юзербот запущен. Аккаунт ID: %s", me.id if me else "unknown")
await client.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Остановка по Ctrl+C.")