nakama/moderation/backend-py/routes/moderation_auth.py

561 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Moderation authentication routes
"""
import secrets
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, HTTPException, status, Response, Cookie, Depends, Request
from fastapi.responses import JSONResponse
from bson import ObjectId
from models import (
SendCodeRequest, RegisterRequest, LoginRequest, TelegramWidgetAuth,
UserResponse
)
from database import (
users_collection, email_verification_codes_collection,
moderation_admins_collection
)
from utils.auth import (
hash_password, verify_password,
create_access_token, create_refresh_token,
get_current_user, normalize_username, is_moderation_admin
)
from utils.email_service import send_verification_code
from utils.telegram_widget import validate_telegram_widget
from config import settings
router = APIRouter()
def set_auth_cookies(response: Response, access_token: str, refresh_token: str):
"""Set authentication cookies"""
# Access token cookie (short-lived)
response.set_cookie(
key=settings.JWT_ACCESS_COOKIE_NAME,
value=access_token,
max_age=settings.JWT_ACCESS_EXPIRES_IN,
httponly=True,
secure=settings.IS_PRODUCTION,
samesite='lax'
)
# Refresh token cookie (long-lived)
response.set_cookie(
key=settings.JWT_REFRESH_COOKIE_NAME,
value=refresh_token,
max_age=settings.JWT_REFRESH_EXPIRES_IN,
httponly=True,
secure=settings.IS_PRODUCTION,
samesite='lax'
)
def clear_auth_cookies(response: Response):
"""Clear authentication cookies"""
response.delete_cookie(settings.JWT_ACCESS_COOKIE_NAME)
response.delete_cookie(settings.JWT_REFRESH_COOKIE_NAME)
@router.post("/send-code")
async def send_code(request: SendCodeRequest, http_request: Request = None):
"""Send verification code to email"""
try:
email_lower = request.email.lower().strip()
# Check if user exists with moderator/admin role
existing_user = await users_collection().find_one({
'email': email_lower,
'role': {'$in': ['moderator', 'admin']}
})
print(f"[ModerationAuth] Проверка пользователя для email {email_lower}: "
f"{{found: {existing_user is not None}, "
f"hasPassword: {bool(existing_user.get('passwordHash')) if existing_user else False}, "
f"role: {existing_user.get('role') if existing_user else None}}}")
# Allow sending code if user exists
if existing_user:
print(f"[ModerationAuth] Пользователь найден, отправка кода разрешена")
else:
# Check if user exists but without proper role
user_by_email = await users_collection().find_one({'email': email_lower})
if user_by_email:
print(f"[ModerationAuth] Пользователь найден, но роль не moderator/admin: {user_by_email.get('role')}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Регистрация недоступна. Обратитесь к администратору для получения доступа."
)
# No user found - allow for debugging
print(f"[ModerationAuth] Пользователь не найден, но отправка кода разрешена для {email_lower}")
# Generate 6-digit code
code = str(secrets.randbelow(900000) + 100000)
# Delete old codes
await email_verification_codes_collection().delete_many({
'email': email_lower,
'purpose': 'registration'
})
# Save new code (valid for 15 minutes)
await email_verification_codes_collection().insert_one({
'email': email_lower,
'code': code,
'purpose': 'registration',
'verified': False,
'expiresAt': datetime.utcnow() + timedelta(minutes=15),
'createdAt': datetime.utcnow()
})
# Send code via email
try:
print(f"[ModerationAuth] 📧 Попытка отправить код на {email_lower}")
print(f"[ModerationAuth] 📧 Код: {code}")
print(f"[ModerationAuth] 📧 EMAIL_PROVIDER из settings: '{settings.EMAIL_PROVIDER}'")
print(f"[ModerationAuth] 📧 EMAIL_FROM: '{settings.EMAIL_FROM}'")
await send_verification_code(email_lower, code)
print(f"[ModerationAuth] ✅ Код успешно отправлен на {email_lower}")
return {"success": True, "message": "Код подтверждения отправлен на email"}
except ValueError as email_error:
print(f"[ModerationAuth] ❌ ValueError при отправке email: {email_error}")
import traceback
traceback.print_exc()
# Delete code if email failed
await email_verification_codes_collection().delete_many({
'email': email_lower,
'code': code
})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(email_error)
)
except Exception as email_error:
print(f"[ModerationAuth] ❌ Exception при отправке email: {type(email_error).__name__}: {email_error}")
import traceback
traceback.print_exc()
await email_verification_codes_collection().delete_many({
'email': email_lower,
'code': code
})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Не удалось отправить код на email: {str(email_error)}"
)
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] Ошибка отправки кода: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/register")
async def register(request: RegisterRequest, response: Response):
"""Register with email verification code"""
try:
email_lower = request.email.lower().strip()
# Find verification code
verification_code = await email_verification_codes_collection().find_one({
'email': email_lower,
'code': request.code,
'purpose': 'registration',
'verified': False
})
if not verification_code:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный или истекший код"
)
# Check expiration
if datetime.utcnow() > verification_code['expiresAt']:
await email_verification_codes_collection().delete_one({'_id': verification_code['_id']})
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Код истек. Запросите новый."
)
# Find user (must be created by administrator)
user = await users_collection().find_one({
'email': email_lower,
'role': {'$in': ['moderator', 'admin']}
})
if not user:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Регистрация недоступна. Обратитесь к администратору."
)
# Check if user already has password
if user.get('passwordHash'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Аккаунт уже зарегистрирован. Используйте вход по паролю."
)
# Check password strength
if len(request.password) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пароль должен содержать минимум 6 символов"
)
# Hash password
password_hash = hash_password(request.password)
# Update user
await users_collection().update_one(
{'_id': user['_id']},
{
'$set': {
'passwordHash': password_hash,
'emailVerified': True,
'username': request.username or user.get('username')
}
}
)
# Mark code as verified
await email_verification_codes_collection().update_one(
{'_id': verification_code['_id']},
{'$set': {'verified': True}}
)
# Generate tokens
user_id_str = str(user['_id'])
access_token = create_access_token(user_id_str)
refresh_token = create_refresh_token(user_id_str)
# Set cookies
set_auth_cookies(response, access_token, refresh_token)
return {
"success": True,
"user": {
"id": user_id_str,
"username": request.username or user.get('username'),
"role": user.get('role')
},
"accessToken": access_token
}
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] Ошибка регистрации: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/login")
async def login(request: LoginRequest, response: Response):
"""Login with email and password"""
try:
email_lower = request.email.lower().strip()
# Find user with password
user = await users_collection().find_one({
'email': email_lower,
'passwordHash': {'$exists': True, '$ne': None},
'role': {'$in': ['moderator', 'admin']}
})
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный email или пароль"
)
if user.get('banned'):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Аккаунт заблокирован"
)
# Verify password
if not verify_password(request.password, user['passwordHash']):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный email или пароль"
)
# Update last active
await users_collection().update_one(
{'_id': user['_id']},
{'$set': {'lastActiveAt': datetime.utcnow()}}
)
# Generate tokens
user_id_str = str(user['_id'])
access_token = create_access_token(user_id_str)
refresh_token = create_refresh_token(user_id_str)
# Set cookies
set_auth_cookies(response, access_token, refresh_token)
return {
"success": True,
"user": {
"id": user_id_str,
"username": user.get('username'),
"role": user.get('role'),
"telegramId": user.get('telegramId')
},
"accessToken": access_token
}
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] Ошибка авторизации: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/telegram-widget")
async def telegram_widget_auth(request: TelegramWidgetAuth, response: Response):
"""Authenticate via Telegram Login Widget"""
print(f"[ModerationAuth] 🔍 Запрос авторизации через Telegram виджет: id={request.id}, username={request.username}")
try:
# Validate Telegram widget data if hash is provided
if request.hash and request.auth_date:
auth_data = {
'id': request.id,
'first_name': request.first_name,
'last_name': request.last_name,
'username': request.username,
'photo_url': request.photo_url,
'auth_date': request.auth_date,
'hash': request.hash
}
try:
is_valid = validate_telegram_widget(auth_data, settings.MODERATION_BOT_TOKEN)
if not is_valid:
print(f"[ModerationAuth] ❌ Неверная подпись Telegram виджета")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверная подпись авторизации"
)
print(f"[ModerationAuth] ✅ Подпись Telegram виджета валидна")
except ValueError as e:
print(f"[ModerationAuth] ❌ Ошибка валидации виджета: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e)
)
# Find user by telegramId
print(f"[ModerationAuth] Поиск пользователя с telegramId={request.id}")
user = await users_collection().find_one({'telegramId': str(request.id)})
if not user:
print(f"[ModerationAuth] ❌ Пользователь не найден с telegramId={request.id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Пользователь не найден. Сначала зарегистрируйтесь через бота."
)
print(f"[ModerationAuth] ✅ Пользователь найден: username={user.get('username')}, role={user.get('role')}")
if user.get('role') not in ['moderator', 'admin']:
print(f"[ModerationAuth] ❌ Недостаточно прав: role={user.get('role')}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Доступ запрещен. У вас нет прав модератора."
)
if user.get('banned'):
print(f"[ModerationAuth] ❌ Аккаунт заблокирован")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Аккаунт заблокирован"
)
# Update user data from widget
update_fields = {}
if request.username and not user.get('username'):
update_fields['username'] = request.username
if request.first_name and not user.get('firstName'):
update_fields['firstName'] = request.first_name
if request.last_name and not user.get('lastName'):
update_fields['lastName'] = request.last_name
if request.photo_url and not user.get('photoUrl'):
update_fields['photoUrl'] = request.photo_url
update_fields['lastActiveAt'] = datetime.utcnow()
if update_fields:
await users_collection().update_one(
{'_id': user['_id']},
{'$set': update_fields}
)
# Generate tokens
user_id_str = str(user['_id'])
access_token = create_access_token(user_id_str)
refresh_token = create_refresh_token(user_id_str)
# Set cookies
set_auth_cookies(response, access_token, refresh_token)
print(f"[ModerationAuth] Успешная авторизация через виджет: {user.get('username')}")
return {
"success": True,
"user": {
"id": user_id_str,
"username": user.get('username'),
"role": user.get('role'),
"telegramId": user.get('telegramId')
},
"accessToken": access_token
}
except HTTPException as e:
print(f"[ModerationAuth] ❌ HTTP ошибка при авторизации через виджет: {e.status_code} - {e.detail}")
raise
except Exception as e:
print(f"[ModerationAuth] ❌ Ошибка авторизации через виджет: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/logout")
async def logout(response: Response):
"""Logout and clear cookies"""
clear_auth_cookies(response)
return {"success": True}
@router.get("/config")
async def get_config():
"""Get configuration for frontend"""
bot_username = settings.MODERATION_BOT_USERNAME
# If not set, try to get from Bot API (simplified - can add full implementation)
if not bot_username:
bot_username = "moderation_bot"
return {"botUsername": bot_username}
@router.get("/me")
async def get_current_user_info(user: dict = Depends(get_current_user)):
"""Get current authenticated user info"""
if user.get('role') not in ['moderator', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Доступ запрещен"
)
return {
"success": True,
"user": {
"id": str(user['_id']),
"username": user.get('username'),
"role": user.get('role'),
"telegramId": user.get('telegramId')
}
}
@router.post("/telegram")
async def telegram_miniapp_auth(request: Request, response: Response):
"""Authenticate via Telegram Mini App initData"""
print(f"[ModerationAuth] 🔍 Запрос авторизации через Telegram Mini App (initData)")
from telegram_middleware.telegram_auth import authenticate_telegram_moderation
from utils.auth import require_moderator, require_owner
try:
# Authenticate via initData
user = await authenticate_telegram_moderation(request)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Отсутствует initData. Используйте официальный клиент."
)
print(f"[ModerationAuth] ✅ Пользователь найден: username={user.get('username')}, role={user.get('role')}")
# Check moderation access
username = user.get('username', '').lower()
is_owner = username in settings.OWNER_USERNAMES_LIST
is_admin_by_role = user.get('role') in ['moderator', 'admin']
# Check if user is moderation admin in DB
from database import moderation_admins_collection
is_admin_by_db = await moderation_admins_collection().find_one({
'$or': [
{'telegramId': user.get('telegramId')},
{'username': username}
]
}) is not None
if not is_owner and not is_admin_by_role and not is_admin_by_db:
print(f"[ModerationAuth] ❌ Недостаточно прав: role={user.get('role')}, isOwner={is_owner}, isAdminByDB={is_admin_by_db}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Доступ запрещен. У вас нет прав модератора. Обратитесь к администратору."
)
# Update last active
from datetime import datetime
await users_collection().update_one(
{'_id': user['_id']},
{'$set': {'lastActiveAt': datetime.utcnow()}}
)
# Generate tokens
user_id_str = str(user['_id'])
access_token = create_access_token(user_id_str)
refresh_token = create_refresh_token(user_id_str)
# Set cookies
set_auth_cookies(response, access_token, refresh_token)
print(f"[ModerationAuth] ✅ Успешная авторизация через Mini App: {user.get('username')}")
return {
"success": True,
"user": {
"id": user_id_str,
"username": user.get('username'),
"role": user.get('role'),
"telegramId": user.get('telegramId')
},
"accessToken": access_token
}
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] ❌ Ошибка авторизации через Mini App: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)