294 lines
9.7 KiB
Python
294 lines
9.7 KiB
Python
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from dotenv import load_dotenv
|
||
from telethon import TelegramClient, events
|
||
from telethon.errors import RPCError
|
||
from telethon.tl.custom.message import Message
|
||
|
||
BAN_COMMAND = "/вбаннахуй"
|
||
UNBAN_COMMAND = "/разбан"
|
||
DATA_PATH = Path("data/blocked_users.json")
|
||
RECENT_CLEANUP_LIMIT = 50
|
||
|
||
ANIMATION_FRAMES = [
|
||
"Запускаю анти-режим.",
|
||
"Запускаю анти-режим..",
|
||
"Запускаю анти-режим...",
|
||
"Проверяю цель...",
|
||
]
|
||
|
||
|
||
class BlocklistStore:
|
||
def __init__(self, path: Path) -> None:
|
||
self.path = path
|
||
self._blocked_user_ids: set[int] = set()
|
||
self._lock = asyncio.Lock()
|
||
|
||
async def load(self) -> None:
|
||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||
if not self.path.exists():
|
||
self.path.write_text('{"blocked_user_ids":[]}\n', encoding="utf-8")
|
||
return
|
||
|
||
try:
|
||
content = self.path.read_text(encoding="utf-8")
|
||
parsed = json.loads(content) if content.strip() else {"blocked_user_ids": []}
|
||
except json.JSONDecodeError:
|
||
logging.warning(
|
||
"Некорректный JSON в %s. Сбрасываю список блокировок.", self.path
|
||
)
|
||
parsed = {"blocked_user_ids": []}
|
||
|
||
raw_ids = parsed.get("blocked_user_ids", [])
|
||
if not isinstance(raw_ids, list):
|
||
raw_ids = []
|
||
|
||
normalized_ids: set[int] = set()
|
||
for value in raw_ids:
|
||
if isinstance(value, int):
|
||
normalized_ids.add(value)
|
||
continue
|
||
if isinstance(value, str):
|
||
try:
|
||
normalized_ids.add(int(value))
|
||
except ValueError:
|
||
continue
|
||
|
||
self._blocked_user_ids = normalized_ids
|
||
await self._save()
|
||
|
||
async def contains(self, user_id: int) -> bool:
|
||
return user_id in self._blocked_user_ids
|
||
|
||
async def add(self, user_id: int) -> bool:
|
||
async with self._lock:
|
||
if user_id in self._blocked_user_ids:
|
||
return False
|
||
self._blocked_user_ids.add(user_id)
|
||
await self._save()
|
||
return True
|
||
|
||
async def remove(self, user_id: int) -> bool:
|
||
async with self._lock:
|
||
if user_id not in self._blocked_user_ids:
|
||
return False
|
||
self._blocked_user_ids.remove(user_id)
|
||
await self._save()
|
||
return True
|
||
|
||
async def _save(self) -> None:
|
||
payload = {"blocked_user_ids": sorted(self._blocked_user_ids)}
|
||
self.path.write_text(
|
||
json.dumps(payload, ensure_ascii=False, indent=2) + "\n",
|
||
encoding="utf-8",
|
||
)
|
||
|
||
|
||
def normalize_command(text: str) -> Optional[str]:
|
||
stripped = text.strip()
|
||
if not stripped.startswith("/"):
|
||
return None
|
||
|
||
first_token = stripped.split(maxsplit=1)[0]
|
||
command = first_token.split("@", maxsplit=1)[0].lower()
|
||
|
||
while command.endswith("!"):
|
||
command = command[:-1]
|
||
|
||
return command
|
||
|
||
|
||
async def animate_ban(message: Message, target_user_id: int, is_new: bool) -> None:
|
||
for frame in ANIMATION_FRAMES:
|
||
try:
|
||
await message.edit(frame)
|
||
except Exception:
|
||
logging.exception("Не удалось обновить кадр анимации.")
|
||
break
|
||
await asyncio.sleep(0.35)
|
||
|
||
if is_new:
|
||
final_text = (
|
||
f"Готово. ID {target_user_id} добавлен в автоудаление.\n"
|
||
"Новые сообщения этого пользователя в группах удаляются только у вас."
|
||
)
|
||
else:
|
||
final_text = (
|
||
f"ID {target_user_id} уже был в автоудалении.\n"
|
||
"Новые сообщения этого пользователя в группах удаляются только у вас."
|
||
)
|
||
|
||
try:
|
||
await message.edit(final_text)
|
||
except Exception:
|
||
logging.exception("Не удалось отправить финальный статус анимации.")
|
||
|
||
|
||
async def delete_message_best_effort(
|
||
client: TelegramClient, message: Message
|
||
) -> bool:
|
||
try:
|
||
await message.delete(revoke=False)
|
||
return True
|
||
except Exception as exc:
|
||
logging.debug(
|
||
"Удаление через message.delete(revoke=False) не сработало: %s", exc
|
||
)
|
||
|
||
try:
|
||
target = message.input_chat if message.input_chat else message.chat_id
|
||
await client.delete_messages(target, [message.id], revoke=False)
|
||
return True
|
||
except Exception as exc:
|
||
logging.debug(
|
||
"Удаление через client.delete_messages(..., revoke=False) не сработало: %s",
|
||
exc,
|
||
)
|
||
|
||
try:
|
||
await message.delete()
|
||
return True
|
||
except RPCError as exc:
|
||
logging.warning(
|
||
"Не удалось удалить сообщение %s в чате %s: %s",
|
||
message.id,
|
||
message.chat_id,
|
||
exc,
|
||
)
|
||
except Exception:
|
||
logging.exception(
|
||
"Неожиданная ошибка удаления сообщения %s в чате %s.",
|
||
message.id,
|
||
message.chat_id,
|
||
)
|
||
|
||
return False
|
||
|
||
|
||
async def cleanup_recent_messages_from_user(
|
||
client: TelegramClient, chat_id: int, user_id: int, limit: int = RECENT_CLEANUP_LIMIT
|
||
) -> int:
|
||
deleted_count = 0
|
||
async for message in client.iter_messages(chat_id, from_user=user_id, limit=limit):
|
||
if message.id is None:
|
||
continue
|
||
if await delete_message_best_effort(client, message):
|
||
deleted_count += 1
|
||
|
||
return deleted_count
|
||
|
||
|
||
def load_settings() -> tuple[int, str, str]:
|
||
load_dotenv()
|
||
api_id_raw = os.getenv("API_ID", "").strip()
|
||
api_hash = os.getenv("API_HASH", "").strip()
|
||
session_name = os.getenv("SESSION_NAME", "antimichaell").strip()
|
||
|
||
if not api_id_raw or not api_hash:
|
||
raise RuntimeError(
|
||
"Нужно задать API_ID и API_HASH в .env (или переменных окружения)."
|
||
)
|
||
|
||
try:
|
||
api_id = int(api_id_raw)
|
||
except ValueError as exc:
|
||
raise RuntimeError("API_ID должен быть целым числом.") from exc
|
||
|
||
return api_id, api_hash, session_name
|
||
|
||
|
||
async def main() -> None:
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s | %(levelname)s | %(message)s",
|
||
)
|
||
|
||
api_id, api_hash, session_name = load_settings()
|
||
blocklist = BlocklistStore(DATA_PATH)
|
||
await blocklist.load()
|
||
|
||
client = TelegramClient(session_name, api_id, api_hash)
|
||
|
||
@client.on(events.NewMessage(outgoing=True))
|
||
async def handle_commands(event: events.NewMessage.Event) -> None:
|
||
command = normalize_command(event.raw_text or "")
|
||
if command not in {BAN_COMMAND, UNBAN_COMMAND}:
|
||
return
|
||
|
||
if not event.is_reply:
|
||
await event.reply(
|
||
"Эта команда работает только как ответ на сообщение нужного пользователя."
|
||
)
|
||
return
|
||
|
||
replied_message = await event.get_reply_message()
|
||
if replied_message is None or replied_message.sender_id is None:
|
||
await event.reply("Не удалось определить автора исходного сообщения.")
|
||
return
|
||
|
||
target_user_id = int(replied_message.sender_id)
|
||
|
||
if command == BAN_COMMAND:
|
||
status_message = await event.reply("Подготовка...")
|
||
added = await blocklist.add(target_user_id)
|
||
await animate_ban(status_message, target_user_id, added)
|
||
cleaned = await cleanup_recent_messages_from_user(
|
||
client,
|
||
event.chat_id,
|
||
target_user_id,
|
||
)
|
||
if cleaned > 0:
|
||
await event.reply(
|
||
f"Дополнительно очищено последних сообщений в этом чате: {cleaned}."
|
||
)
|
||
return
|
||
|
||
removed = await blocklist.remove(target_user_id)
|
||
if removed:
|
||
await event.reply(
|
||
f"Готово. ID {target_user_id} удален из автоудаления.\n"
|
||
"Новые сообщения этого пользователя больше не скрываются."
|
||
)
|
||
else:
|
||
await event.reply(
|
||
f"ID {target_user_id} не был в списке автоудаления."
|
||
)
|
||
|
||
@client.on(events.NewMessage(incoming=True))
|
||
async def auto_delete_target_messages(event: events.NewMessage.Event) -> None:
|
||
if event.is_private:
|
||
return
|
||
|
||
sender_id = event.sender_id
|
||
if sender_id is None:
|
||
return
|
||
|
||
if not await blocklist.contains(int(sender_id)):
|
||
return
|
||
|
||
deleted = await delete_message_best_effort(client, event.message)
|
||
if not deleted:
|
||
logging.warning(
|
||
"Пропуск удаления: сообщение %s от пользователя %s в чате %s.",
|
||
event.id,
|
||
sender_id,
|
||
event.chat_id,
|
||
)
|
||
|
||
await client.start()
|
||
me = await client.get_me()
|
||
logging.info("Юзербот запущен. Аккаунт ID: %s", me.id if me else "unknown")
|
||
await client.run_until_disconnected()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
logging.info("Остановка по Ctrl+C.")
|