Files
AntiMichaell/bot.py

230 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from telethon import TelegramClient, events
from telethon.tl.custom.message import Message
BAN_COMMAND = "/вбаннахуй"
UNBAN_COMMAND = "/разбан"
DATA_PATH = Path("data/blocked_users.json")
ANIMATION_FRAMES = [
"Запускаю анти-режим.",
"Запускаю анти-режим..",
"Запускаю анти-режим...",
"Проверяю цель...",
]
class BlocklistStore:
def __init__(self, path: Path) -> None:
self.path = path
self._blocked_user_ids: set[int] = set()
self._lock = asyncio.Lock()
async def load(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
if not self.path.exists():
self.path.write_text('{"blocked_user_ids":[]}\n', encoding="utf-8")
return
try:
content = self.path.read_text(encoding="utf-8")
parsed = json.loads(content) if content.strip() else {"blocked_user_ids": []}
except json.JSONDecodeError:
logging.warning(
"Некорректный JSON в %s. Сбрасываю список блокировок.", self.path
)
parsed = {"blocked_user_ids": []}
raw_ids = parsed.get("blocked_user_ids", [])
if not isinstance(raw_ids, list):
raw_ids = []
normalized_ids: set[int] = set()
for value in raw_ids:
if isinstance(value, int):
normalized_ids.add(value)
continue
if isinstance(value, str):
try:
normalized_ids.add(int(value))
except ValueError:
continue
self._blocked_user_ids = normalized_ids
await self._save()
async def contains(self, user_id: int) -> bool:
return user_id in self._blocked_user_ids
async def add(self, user_id: int) -> bool:
async with self._lock:
if user_id in self._blocked_user_ids:
return False
self._blocked_user_ids.add(user_id)
await self._save()
return True
async def remove(self, user_id: int) -> bool:
async with self._lock:
if user_id not in self._blocked_user_ids:
return False
self._blocked_user_ids.remove(user_id)
await self._save()
return True
async def _save(self) -> None:
payload = {"blocked_user_ids": sorted(self._blocked_user_ids)}
self.path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
def normalize_command(text: str) -> Optional[str]:
stripped = text.strip()
if not stripped.startswith("/"):
return None
first_token = stripped.split(maxsplit=1)[0]
command = first_token.split("@", maxsplit=1)[0].lower()
while command.endswith("!"):
command = command[:-1]
return command
async def animate_ban(message: Message, target_user_id: int, is_new: bool) -> None:
for frame in ANIMATION_FRAMES:
try:
await message.edit(frame)
except Exception:
logging.exception("Не удалось обновить кадр анимации.")
break
await asyncio.sleep(0.35)
if is_new:
final_text = (
f"Готово. ID {target_user_id} добавлен в автоудаление.\n"
"Новые сообщения этого пользователя в группах удаляются только у вас."
)
else:
final_text = (
f"ID {target_user_id} уже был в автоудалении.\n"
"Новые сообщения этого пользователя в группах удаляются только у вас."
)
try:
await message.edit(final_text)
except Exception:
logging.exception("Не удалось отправить финальный статус анимации.")
def load_settings() -> tuple[int, str, str]:
load_dotenv()
api_id_raw = os.getenv("API_ID", "").strip()
api_hash = os.getenv("API_HASH", "").strip()
session_name = os.getenv("SESSION_NAME", "antimichaell").strip()
if not api_id_raw or not api_hash:
raise RuntimeError(
"Нужно задать API_ID и API_HASH в .env (или переменных окружения)."
)
try:
api_id = int(api_id_raw)
except ValueError as exc:
raise RuntimeError("API_ID должен быть целым числом.") from exc
return api_id, api_hash, session_name
async def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
api_id, api_hash, session_name = load_settings()
blocklist = BlocklistStore(DATA_PATH)
await blocklist.load()
client = TelegramClient(session_name, api_id, api_hash)
@client.on(events.NewMessage(outgoing=True))
async def handle_commands(event: events.NewMessage.Event) -> None:
command = normalize_command(event.raw_text or "")
if command not in {BAN_COMMAND, UNBAN_COMMAND}:
return
if not event.is_reply:
await event.reply(
"Эта команда работает только как ответ на сообщение нужного пользователя."
)
return
replied_message = await event.get_reply_message()
if replied_message is None or replied_message.sender_id is None:
await event.reply("Не удалось определить автора исходного сообщения.")
return
target_user_id = int(replied_message.sender_id)
if command == BAN_COMMAND:
status_message = await event.reply("Подготовка...")
added = await blocklist.add(target_user_id)
await animate_ban(status_message, target_user_id, added)
return
removed = await blocklist.remove(target_user_id)
if removed:
await event.reply(
f"Готово. ID {target_user_id} удален из автоудаления.\n"
"Новые сообщения этого пользователя больше не скрываются."
)
else:
await event.reply(
f"ID {target_user_id} не был в списке автоудаления."
)
@client.on(events.NewMessage(incoming=True))
async def auto_delete_target_messages(event: events.NewMessage.Event) -> None:
if not event.is_group:
return
sender_id = event.sender_id
if sender_id is None:
return
if not await blocklist.contains(int(sender_id)):
return
try:
await client.delete_messages(event.chat_id, [event.id], revoke=False)
except Exception:
logging.exception(
"Не удалось удалить сообщение %s от пользователя %s в чате %s.",
event.id,
sender_id,
event.chat_id,
)
await client.start()
me = await client.get_me()
logging.info("Юзербот запущен. Аккаунт ID: %s", me.id if me else "unknown")
await client.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Остановка по Ctrl+C.")