nakama/moderation/backend-py/routes/moderation_auth.py

434 lines
15 KiB
Python
Raw Normal View History

2025-12-14 23:45:41 +00:00
"""
Moderation authentication routes
"""
import secrets
from datetime import datetime, timedelta
from typing import Optional
2025-12-14 23:56:36 +00:00
from fastapi import APIRouter, HTTPException, status, Response, Cookie, Depends, Request
2025-12-14 23:45:41 +00:00
from fastapi.responses import JSONResponse
from bson import ObjectId
from models import (
SendCodeRequest, RegisterRequest, LoginRequest, TelegramWidgetAuth,
UserResponse
)
from database import (
users_collection, email_verification_codes_collection,
moderation_admins_collection
)
from utils.auth import (
hash_password, verify_password,
create_access_token, create_refresh_token,
get_current_user, normalize_username, is_moderation_admin
)
from utils.email_service import send_verification_code
from config import settings
router = APIRouter()
def set_auth_cookies(response: Response, access_token: str, refresh_token: str):
"""Set authentication cookies"""
# Access token cookie (short-lived)
response.set_cookie(
key=settings.JWT_ACCESS_COOKIE_NAME,
value=access_token,
max_age=settings.JWT_ACCESS_EXPIRES_IN,
httponly=True,
secure=settings.IS_PRODUCTION,
samesite='lax'
)
# Refresh token cookie (long-lived)
response.set_cookie(
key=settings.JWT_REFRESH_COOKIE_NAME,
value=refresh_token,
max_age=settings.JWT_REFRESH_EXPIRES_IN,
httponly=True,
secure=settings.IS_PRODUCTION,
samesite='lax'
)
def clear_auth_cookies(response: Response):
"""Clear authentication cookies"""
response.delete_cookie(settings.JWT_ACCESS_COOKIE_NAME)
response.delete_cookie(settings.JWT_REFRESH_COOKIE_NAME)
@router.post("/send-code")
2025-12-14 23:56:36 +00:00
async def send_code(request: SendCodeRequest, http_request: Request = None):
2025-12-14 23:45:41 +00:00
"""Send verification code to email"""
try:
email_lower = request.email.lower().strip()
# Check if user exists with moderator/admin role
existing_user = await users_collection().find_one({
'email': email_lower,
'role': {'$in': ['moderator', 'admin']}
})
print(f"[ModerationAuth] Проверка пользователя для email {email_lower}: "
f"{{found: {existing_user is not None}, "
f"hasPassword: {bool(existing_user.get('passwordHash')) if existing_user else False}, "
f"role: {existing_user.get('role') if existing_user else None}}}")
# Allow sending code if user exists
if existing_user:
print(f"[ModerationAuth] Пользователь найден, отправка кода разрешена")
else:
# Check if user exists but without proper role
user_by_email = await users_collection().find_one({'email': email_lower})
if user_by_email:
print(f"[ModerationAuth] Пользователь найден, но роль не moderator/admin: {user_by_email.get('role')}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Регистрация недоступна. Обратитесь к администратору для получения доступа."
)
# No user found - allow for debugging
print(f"[ModerationAuth] Пользователь не найден, но отправка кода разрешена для {email_lower}")
# Generate 6-digit code
code = str(secrets.randbelow(900000) + 100000)
# Delete old codes
await email_verification_codes_collection().delete_many({
'email': email_lower,
'purpose': 'registration'
})
# Save new code (valid for 15 minutes)
await email_verification_codes_collection().insert_one({
'email': email_lower,
'code': code,
'purpose': 'registration',
'verified': False,
'expiresAt': datetime.utcnow() + timedelta(minutes=15),
'createdAt': datetime.utcnow()
})
# Send code via email
try:
await send_verification_code(email_lower, code)
return {"success": True, "message": "Код подтверждения отправлен на email"}
except ValueError as email_error:
# Delete code if email failed
await email_verification_codes_collection().delete_many({
'email': email_lower,
'code': code
})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(email_error)
)
except Exception as email_error:
await email_verification_codes_collection().delete_many({
'email': email_lower,
'code': code
})
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Не удалось отправить код на email: {str(email_error)}"
)
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] Ошибка отправки кода: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/register")
async def register(request: RegisterRequest, response: Response):
"""Register with email verification code"""
try:
email_lower = request.email.lower().strip()
# Find verification code
verification_code = await email_verification_codes_collection().find_one({
'email': email_lower,
'code': request.code,
'purpose': 'registration',
'verified': False
})
if not verification_code:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный или истекший код"
)
# Check expiration
if datetime.utcnow() > verification_code['expiresAt']:
await email_verification_codes_collection().delete_one({'_id': verification_code['_id']})
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Код истек. Запросите новый."
)
# Find user (must be created by administrator)
user = await users_collection().find_one({
'email': email_lower,
'role': {'$in': ['moderator', 'admin']}
})
if not user:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Регистрация недоступна. Обратитесь к администратору."
)
# Check if user already has password
if user.get('passwordHash'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Аккаунт уже зарегистрирован. Используйте вход по паролю."
)
# Check password strength
if len(request.password) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пароль должен содержать минимум 6 символов"
)
# Hash password
password_hash = hash_password(request.password)
# Update user
await users_collection().update_one(
{'_id': user['_id']},
{
'$set': {
'passwordHash': password_hash,
'emailVerified': True,
'username': request.username or user.get('username')
}
}
)
# Mark code as verified
await email_verification_codes_collection().update_one(
{'_id': verification_code['_id']},
{'$set': {'verified': True}}
)
# Generate tokens
user_id_str = str(user['_id'])
access_token = create_access_token(user_id_str)
refresh_token = create_refresh_token(user_id_str)
# Set cookies
set_auth_cookies(response, access_token, refresh_token)
return {
"success": True,
"user": {
"id": user_id_str,
"username": request.username or user.get('username'),
"role": user.get('role')
},
"accessToken": access_token
}
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] Ошибка регистрации: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/login")
async def login(request: LoginRequest, response: Response):
"""Login with email and password"""
try:
email_lower = request.email.lower().strip()
# Find user with password
user = await users_collection().find_one({
'email': email_lower,
'passwordHash': {'$exists': True, '$ne': None},
'role': {'$in': ['moderator', 'admin']}
})
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный email или пароль"
)
if user.get('banned'):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Аккаунт заблокирован"
)
# Verify password
if not verify_password(request.password, user['passwordHash']):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный email или пароль"
)
# Update last active
await users_collection().update_one(
{'_id': user['_id']},
{'$set': {'lastActiveAt': datetime.utcnow()}}
)
# Generate tokens
user_id_str = str(user['_id'])
access_token = create_access_token(user_id_str)
refresh_token = create_refresh_token(user_id_str)
# Set cookies
set_auth_cookies(response, access_token, refresh_token)
return {
"success": True,
"user": {
"id": user_id_str,
"username": user.get('username'),
"role": user.get('role'),
"telegramId": user.get('telegramId')
},
"accessToken": access_token
}
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] Ошибка авторизации: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/telegram-widget")
async def telegram_widget_auth(request: TelegramWidgetAuth, response: Response):
"""Authenticate via Telegram Login Widget"""
try:
# Find user by telegramId
user = await users_collection().find_one({'telegramId': str(request.id)})
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Пользователь не найден. Сначала зарегистрируйтесь через бота."
)
if user.get('role') not in ['moderator', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Доступ запрещен. У вас нет прав модератора."
)
if user.get('banned'):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Аккаунт заблокирован"
)
# Update user data from widget
update_fields = {}
if request.username and not user.get('username'):
update_fields['username'] = request.username
if request.first_name and not user.get('firstName'):
update_fields['firstName'] = request.first_name
if request.last_name and not user.get('lastName'):
update_fields['lastName'] = request.last_name
if request.photo_url and not user.get('photoUrl'):
update_fields['photoUrl'] = request.photo_url
update_fields['lastActiveAt'] = datetime.utcnow()
if update_fields:
await users_collection().update_one(
{'_id': user['_id']},
{'$set': update_fields}
)
# Generate tokens
user_id_str = str(user['_id'])
access_token = create_access_token(user_id_str)
refresh_token = create_refresh_token(user_id_str)
# Set cookies
set_auth_cookies(response, access_token, refresh_token)
print(f"[ModerationAuth] Успешная авторизация через виджет: {user.get('username')}")
return {
"success": True,
"user": {
"id": user_id_str,
"username": user.get('username'),
"role": user.get('role'),
"telegramId": user.get('telegramId')
},
"accessToken": access_token
}
except HTTPException:
raise
except Exception as e:
print(f"[ModerationAuth] Ошибка авторизации через виджет: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Ошибка сервера"
)
@router.post("/logout")
async def logout(response: Response):
"""Logout and clear cookies"""
clear_auth_cookies(response)
return {"success": True}
@router.get("/config")
async def get_config():
"""Get configuration for frontend"""
bot_username = settings.MODERATION_BOT_USERNAME
# If not set, try to get from Bot API (simplified - can add full implementation)
if not bot_username:
bot_username = "moderation_bot"
return {"botUsername": bot_username}
@router.get("/me")
async def get_current_user_info(user: dict = Depends(get_current_user)):
"""Get current authenticated user info"""
if user.get('role') not in ['moderator', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Доступ запрещен"
)
return {
"success": True,
"user": {
"id": str(user['_id']),
"username": user.get('username'),
"role": user.get('role'),
"telegramId": user.get('telegramId')
}
}
2025-12-14 23:56:36 +00:00
@router.post("/telegram")
async def telegram_auth_alias(request: TelegramWidgetAuth, response: Response):
"""Alias for /telegram-widget for compatibility with frontend"""
return await telegram_widget_auth(request, response)