nakama/moderation/backend-py/websocket_server.py

343 lines
15 KiB
Python
Raw Permalink Normal View History

2025-12-14 23:45:41 +00:00
"""
WebSocket server for moderation chat
"""
import socketio
import logging
2025-12-15 00:37:34 +00:00
from typing import Dict, Set, Optional
from datetime import datetime
from config import settings
2025-12-15 04:38:30 +00:00
from database import moderation_admins_collection, moderation_chat_messages_collection
2025-12-15 00:37:34 +00:00
from utils.auth import normalize_username
2025-12-14 23:45:41 +00:00
logger = logging.getLogger(__name__)
# Create Socket.IO server
sio = socketio.AsyncServer(
async_mode='asgi',
cors_allowed_origins='*',
2025-12-15 03:47:15 +00:00
logger=True, # Включить логирование для отладки
engineio_logger=True, # Включить логирование Engine.IO
ping_timeout=60,
2025-12-15 04:25:03 +00:00
ping_interval=25,
allow_upgrades=True, # Разрешить upgrade с polling на websocket
transports=['polling', 'websocket'] # Поддерживаемые транспорты
2025-12-14 23:45:41 +00:00
)
# Track connected moderators
2025-12-15 00:37:34 +00:00
connected_moderators: Dict[str, dict] = {} # {sid: {username, telegramId, isOwner}}
def broadcast_online():
"""Broadcast list of online moderators"""
unique = {}
for data in connected_moderators.values():
username = data.get('username')
if username:
unique[username] = data
online_list = list(unique.values())
2025-12-15 04:38:30 +00:00
# Broadcast в корневой namespace (так как клиенты подключаются туда)
sio.emit('online', online_list)
# Также broadcast в /mod-chat на случай, если кто-то подключится туда
2025-12-15 00:37:34 +00:00
sio.emit('online', online_list, namespace='/mod-chat')
2025-12-14 23:45:41 +00:00
2025-12-15 04:25:03 +00:00
# Обработчик для корневого namespace (требуется для Socket.IO handshake)
# В python-socketio для ASGI режима нужно использовать декоратор БЕЗ явного namespace='/'
# или использовать namespace=None для корневого namespace
@sio.on('connect')
async def on_connect_root(sid, environ):
"""Handle client connection to root namespace (Socket.IO handshake)"""
print(f"[WebSocket] 🔄 Handshake to ROOT namespace: {sid}")
print(f"[WebSocket] Environ type: {type(environ)}")
if environ:
print(f"[WebSocket] Environ keys: {list(environ.keys()) if isinstance(environ, dict) else 'N/A'}")
logger.info(f"[WebSocket] Handshake to ROOT namespace: {sid}")
2025-12-15 04:17:54 +00:00
# Разрешаем подключение для handshake
return True
2025-12-15 04:25:03 +00:00
@sio.on('disconnect')
async def on_disconnect_root(sid):
"""Handle disconnection from root namespace"""
print(f"[WebSocket] Client disconnected from ROOT namespace: {sid}")
logger.info(f"[WebSocket] Client disconnected from ROOT namespace: {sid}")
2025-12-15 04:28:16 +00:00
# Обработчик auth в корневом namespace - обрабатываем так же, как в /mod-chat
@sio.on('auth')
async def on_auth_root(sid, data):
"""Handle auth in root namespace - process same as /mod-chat"""
print(f"[WebSocket] 📥 Auth запрос в ROOT namespace от {sid}: {data}")
print(f"[WebSocket] ⚠️ Клиент авторизуется в корневом namespace, обрабатываем здесь")
try:
username = normalize_username(data.get('username')) if data.get('username') else None
telegram_id = data.get('telegramId')
print(f"[WebSocket] Обработка auth в ROOT: username={username}, telegramId={telegram_id}")
if not username or not telegram_id:
print(f"[WebSocket] ❌ Auth failed: missing username or telegramId")
await sio.emit('unauthorized', room=sid)
await sio.disconnect(sid)
return
# Check if user is owner
owner_usernames = settings.OWNER_USERNAMES_LIST
is_owner = username.lower() in [u.lower() for u in owner_usernames]
# Check if user is moderation admin
admin = await moderation_admins_collection().find_one({
'telegramId': str(telegram_id)
})
is_admin = admin is not None
if not is_owner and not is_admin:
print(f"[WebSocket] ❌ Access denied: {username} (telegramId: {telegram_id})")
await sio.emit('unauthorized', room=sid)
await sio.disconnect(sid)
return
# Store connection data (используем тот же словарь, что и для /mod-chat)
connected_moderators[sid] = {
'username': username,
'telegramId': telegram_id,
'isOwner': is_owner
}
print(f"[WebSocket] ✅ Auth success в ROOT namespace: {username} (owner: {is_owner}, admin: {is_admin})")
2025-12-15 04:38:30 +00:00
# Загружаем историю сообщений (последние 100)
try:
history = await moderation_chat_messages_collection().find().sort('createdAt', -1).limit(100).to_list(length=100)
# Переворачиваем, чтобы старые сообщения были первыми
history.reverse()
# Форматируем сообщения для отправки
history_messages = []
for msg in history:
history_messages.append({
'id': str(msg.get('_id', '')),
'username': msg.get('username', ''),
'telegramId': msg.get('telegramId', ''),
'text': msg.get('text', ''),
'createdAt': msg.get('createdAt').isoformat() if isinstance(msg.get('createdAt'), datetime) else msg.get('createdAt', '')
})
# Отправляем историю клиенту
await sio.emit('history', history_messages, room=sid)
print(f"[WebSocket] 📜 Sent {len(history_messages)} messages from history to {sid}")
except Exception as e:
print(f"[WebSocket] ⚠️ Failed to load message history: {e}")
logger.error(f"[WebSocket] Failed to load message history: {e}")
2025-12-15 04:28:16 +00:00
await sio.emit('ready', room=sid)
broadcast_online()
except Exception as e:
print(f"[WebSocket] ❌ Error in auth (ROOT): {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
await sio.emit('unauthorized', room=sid)
await sio.disconnect(sid)
2025-12-15 04:38:30 +00:00
# Обработчик сообщений в корневом namespace
@sio.on('message')
async def on_message_root(sid, data):
"""Handle moderation chat message in root namespace"""
print(f"[WebSocket] 📨 Message в ROOT namespace от {sid}: {data}")
try:
if sid not in connected_moderators:
print(f"[WebSocket] ⚠️ Message from unauthorized client: {sid}")
logger.warning(f"[WebSocket] Message from unauthorized client: {sid}")
return
user_data = connected_moderators[sid]
text = (data.get('text') or '').strip()
if not text:
return
message_id = f"{int(datetime.utcnow().timestamp() * 1000)}-{sid[:8]}"
created_at = datetime.utcnow()
message = {
'id': message_id,
'username': user_data['username'],
'telegramId': user_data['telegramId'],
'text': text,
'createdAt': created_at.isoformat()
}
# Сохраняем сообщение в MongoDB
try:
await moderation_chat_messages_collection().insert_one({
'_id': message_id,
'username': user_data['username'],
'telegramId': user_data['telegramId'],
'text': text,
'createdAt': created_at,
'isOwner': user_data.get('isOwner', False)
})
print(f"[WebSocket] 💾 Message saved to DB: {message_id}")
except Exception as e:
print(f"[WebSocket] ⚠️ Failed to save message to DB: {e}")
logger.error(f"[WebSocket] Failed to save message to DB: {e}")
# Broadcast to all in root namespace (без указания room - broadcast всем в namespace)
await sio.emit('message', message)
print(f"[WebSocket] ✅ Message broadcasted в ROOT namespace: {user_data['username']}: {text[:50]}...")
logger.info(f"[WebSocket] Message from {user_data['username']} (ROOT): {text[:50]}...")
except Exception as e:
print(f"[WebSocket] ❌ Error handling message (ROOT): {e}")
logger.error(f"[WebSocket] Error handling message (ROOT): {e}")
import traceback
traceback.print_exc()
2025-12-15 00:42:02 +00:00
# Namespace handlers for /mod-chat
@sio.on('connect', namespace='/mod-chat')
2025-12-15 00:37:34 +00:00
async def on_connect(sid, environ):
"""Handle client connection to /mod-chat namespace"""
2025-12-15 03:40:42 +00:00
print(f"[WebSocket] ✅ Client connected to /mod-chat: {sid}")
2025-12-15 00:37:34 +00:00
logger.info(f"[WebSocket] Client connected to /mod-chat: {sid}")
2025-12-15 03:40:42 +00:00
if environ:
print(f"[WebSocket] Environ keys: {list(environ.keys()) if isinstance(environ, dict) else 'N/A'}")
2025-12-15 03:47:15 +00:00
if isinstance(environ, dict):
print(f"[WebSocket] Query string: {environ.get('QUERY_STRING', 'N/A')}")
print(f"[WebSocket] Path: {environ.get('PATH_INFO', 'N/A')}")
2025-12-15 00:37:34 +00:00
# Don't authorize immediately - wait for 'auth' event
2025-12-15 03:47:15 +00:00
# Возвращаем True, чтобы разрешить подключение
return True
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
2025-12-15 00:42:02 +00:00
@sio.on('disconnect', namespace='/mod-chat')
2025-12-15 00:37:34 +00:00
async def on_disconnect(sid):
2025-12-14 23:45:41 +00:00
"""Handle client disconnection"""
2025-12-15 00:37:34 +00:00
logger.info(f"[WebSocket] Client disconnected from /mod-chat: {sid}")
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
if sid in connected_moderators:
del connected_moderators[sid]
broadcast_online()
2025-12-14 23:45:41 +00:00
2025-12-15 00:42:02 +00:00
@sio.on('auth', namespace='/mod-chat')
2025-12-15 00:37:34 +00:00
async def on_auth(sid, data):
"""Handle authentication for moderation chat"""
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] 📥 Auth запрос от {sid}: {data}")
2025-12-14 23:45:41 +00:00
try:
2025-12-15 00:37:34 +00:00
username = normalize_username(data.get('username')) if data.get('username') else None
telegram_id = data.get('telegramId')
2025-12-14 23:45:41 +00:00
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] Обработка auth: username={username}, telegramId={telegram_id}")
2025-12-15 00:37:34 +00:00
if not username or not telegram_id:
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] ❌ Auth failed: missing username or telegramId")
2025-12-15 00:37:34 +00:00
logger.warning(f"[WebSocket] Auth failed: missing username or telegramId")
await sio.emit('unauthorized', namespace='/mod-chat', room=sid)
await sio.disconnect(sid, namespace='/mod-chat')
2025-12-14 23:45:41 +00:00
return
2025-12-15 00:37:34 +00:00
# Check if user is owner
owner_usernames = settings.OWNER_USERNAMES_LIST
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] Owner usernames: {owner_usernames}")
2025-12-15 00:37:34 +00:00
is_owner = username.lower() in [u.lower() for u in owner_usernames]
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] Is owner: {is_owner}")
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
# Check if user is moderation admin
admin = await moderation_admins_collection().find_one({
'telegramId': str(telegram_id)
})
is_admin = admin is not None
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] Is admin: {is_admin}, admin data: {admin}")
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
if not is_owner and not is_admin:
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] ❌ Access denied: {username} (telegramId: {telegram_id})")
2025-12-15 00:37:34 +00:00
logger.warning(f"[WebSocket] Access denied: {username} (telegramId: {telegram_id})")
await sio.emit('unauthorized', namespace='/mod-chat', room=sid)
await sio.disconnect(sid, namespace='/mod-chat')
return
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
# Store connection data
connected_moderators[sid] = {
2025-12-14 23:45:41 +00:00
'username': username,
2025-12-15 00:37:34 +00:00
'telegramId': telegram_id,
'isOwner': is_owner
}
2025-12-14 23:45:41 +00:00
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] ✅ Auth success: {username} (owner: {is_owner}, admin: {is_admin})")
2025-12-15 00:37:34 +00:00
logger.info(f"[WebSocket] Auth success: {username} (owner: {is_owner}, admin: {is_admin})")
await sio.emit('ready', namespace='/mod-chat', room=sid)
broadcast_online()
2025-12-14 23:45:41 +00:00
except Exception as e:
2025-12-15 03:47:15 +00:00
print(f"[WebSocket] ❌ Error in auth: {type(e).__name__}: {e}")
2025-12-15 00:37:34 +00:00
logger.error(f"[WebSocket] Error in auth: {e}")
import traceback
traceback.print_exc()
await sio.emit('unauthorized', namespace='/mod-chat', room=sid)
await sio.disconnect(sid, namespace='/mod-chat')
2025-12-14 23:45:41 +00:00
2025-12-15 00:42:02 +00:00
@sio.on('message', namespace='/mod-chat')
2025-12-15 00:37:34 +00:00
async def on_message(sid, data):
2025-12-14 23:45:41 +00:00
"""Handle moderation chat message"""
try:
2025-12-15 00:37:34 +00:00
if sid not in connected_moderators:
logger.warning(f"[WebSocket] Message from unauthorized client: {sid}")
return
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
user_data = connected_moderators[sid]
text = (data.get('text') or '').strip()
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
if not text:
return
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
message = {
'id': f"{int(datetime.utcnow().timestamp() * 1000)}-{sid[:8]}",
'username': user_data['username'],
'telegramId': user_data['telegramId'],
'text': text,
'createdAt': datetime.utcnow().isoformat()
}
2025-12-14 23:45:41 +00:00
2025-12-15 00:37:34 +00:00
# Broadcast to all in namespace
await sio.emit('message', message, namespace='/mod-chat')
logger.info(f"[WebSocket] Message from {user_data['username']}: {text[:50]}...")
2025-12-14 23:45:41 +00:00
except Exception as e:
2025-12-15 00:37:34 +00:00
logger.error(f"[WebSocket] Error handling message: {e}")
import traceback
traceback.print_exc()
2025-12-14 23:45:41 +00:00
def get_socketio_app():
"""Get Socket.IO ASGI app"""
2025-12-15 04:07:52 +00:00
print("[WebSocket] Инициализация Socket.IO ASGI app...")
try:
# Проверяем зарегистрированные namespaces
if hasattr(sio, 'namespace_handlers'):
2025-12-15 04:14:28 +00:00
namespaces = list(sio.namespace_handlers.keys())
print(f"[WebSocket] Зарегистрированные namespaces: {namespaces}")
if not namespaces:
print("[WebSocket] ⚠️ ВНИМАНИЕ: Нет зарегистрированных namespaces!")
2025-12-15 04:07:52 +00:00
elif hasattr(sio, 'handlers'):
print(f"[WebSocket] Зарегистрированные handlers: {list(sio.handlers.keys())}")
else:
print("[WebSocket] Не удалось определить зарегистрированные namespaces")
2025-12-15 04:14:28 +00:00
# Попробуем проверить через другие атрибуты
print(f"[WebSocket] Доступные атрибуты sio: {[attr for attr in dir(sio) if not attr.startswith('_')]}")
2025-12-15 04:07:52 +00:00
except Exception as e:
print(f"[WebSocket] Ошибка при проверке namespaces: {e}")
2025-12-15 04:14:28 +00:00
import traceback
traceback.print_exc()
2025-12-15 04:07:52 +00:00
2025-12-15 03:40:42 +00:00
# Socket.IO ASGI app должен обернуть FastAPI app для правильной работы
# Но мы делаем это в main.py через SocketIOWrapper
# Здесь просто возвращаем ASGI app для Socket.IO
2025-12-15 03:55:18 +00:00
# socketio_path указывает путь, по которому Socket.IO будет слушать
2025-12-15 04:07:52 +00:00
app = socketio.ASGIApp(sio, socketio_path='/socket.io')
print("[WebSocket] ✅ Socket.IO ASGI app создан")
return app
2025-12-14 23:45:41 +00:00