nakama/moderation/backend-py/utils/auth.py

292 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Authentication utilities
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
import bcrypt
from fastapi import HTTPException, status, Depends, Cookie, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import settings
from database import users_collection, moderation_admins_collection
# Password hashing
# Используем bcrypt с явной обрезкой паролей до 72 байт
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__ident="2b" # Используем современный формат bcrypt
)
# JWT bearer
security = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
"""
Hash password with bcrypt
Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт.
Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт.
ВАЖНО: Если пароль обрезан при регистрации, то при входе нужно ввести
тот же пароль (который также будет обрезан до 72 байт), иначе проверка не сработает.
Для избежания проблем рекомендуется использовать пароли до 24 символов,
что обычно не превышает 72 байта даже для UTF-8 символов.
"""
# Bcrypt has a maximum password length of 72 bytes
# Truncate if necessary (encode to bytes first to get accurate byte length)
print(f"[Auth] 🔍 hash_password вызван:")
print(f"[Auth] Пароль (repr): {repr(password)}")
print(f"[Auth] Длина в символах: {len(password)}")
password_bytes = password.encode('utf-8')
original_length = len(password_bytes)
print(f"[Auth] Длина в байтах: {original_length}")
print(f"[Auth] Байты (hex): {password_bytes.hex()}")
if original_length > 72:
# Обрезаем до первых 72 байт
password = password_bytes[:72].decode('utf-8', errors='ignore')
print(f"[Auth] ⚠️ Пароль обрезан до 72 байт (было {original_length} байт)")
print(f"[Auth] ⚠️ ВНИМАНИЕ: При входе используйте тот же пароль, он также будет обрезан")
else:
print(f"[Auth] ✅ Пароль в пределах лимита (72 байта)")
# ВАЖНО: passlib/bcrypt может проверять длину пароля ВНУТРИ библиотеки
# Обрезаем пароль до 72 байт ПЕРЕД передачей в pwd_context.hash()
# Это гарантирует, что пароль никогда не превысит лимит
password_bytes_final = password_bytes[:72]
password_final = password_bytes_final.decode('utf-8', errors='strict')
print(f"[Auth] Финальная длина в байтах: {len(password_bytes_final)}")
print(f"[Auth] Финальный пароль (repr): {repr(password_final)}")
print(f"[Auth] Финальный пароль (len): {len(password_final)} символов")
# Дополнительная проверка: убедимся, что после декодирования длина в байтах не изменилась
password_final_bytes_check = password_final.encode('utf-8')
if len(password_final_bytes_check) > 72:
print(f"[Auth] ⚠️ После декодирования длина стала {len(password_final_bytes_check)} байт, обрезаем еще раз")
password_final = password_final_bytes_check[:72].decode('utf-8', errors='ignore')
password_final_bytes_check = password_final.encode('utf-8')
print(f"[Auth] Финальная проверка: {len(password_final_bytes_check)} байт")
try:
# Используем прямой вызов bcrypt для обхода проблемы с passlib
# Обрезаем пароль до 72 байт и передаем как bytes
password_bytes_for_bcrypt = password_final_bytes_check[:72]
# Генерируем соль и хешируем пароль
salt = bcrypt.gensalt(rounds=10)
hashed = bcrypt.hashpw(password_bytes_for_bcrypt, salt)
# Конвертируем bytes в строку для хранения
result = hashed.decode('utf-8')
print(f"[Auth] ✅ Пароль успешно захеширован через прямой bcrypt")
return result
except ValueError as e:
# Если bcrypt все равно выдает ошибку, попробуем обрезать до 71 байта
error_msg = str(e).lower()
if "72 bytes" in error_msg or "longer than" in error_msg or "truncate" in error_msg:
print(f"[Auth] ⚠️ Bcrypt все равно жалуется на длину, обрезаем до 71 байта")
password_bytes_safe = password_bytes[:71]
salt = bcrypt.gensalt(rounds=10)
hashed = bcrypt.hashpw(password_bytes_safe, salt)
result = hashed.decode('utf-8')
print(f"[Auth] ✅ Пароль успешно захеширован после обрезки до 71 байта")
return result
print(f"[Auth] ❌ ValueError: {e}")
raise
except Exception as e:
print(f"[Auth] ❌ Ошибка при хешировании: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
raise
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify password against hash
Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт.
Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт.
Это гарантирует, что проверка сработает даже если пароль был обрезан при регистрации.
"""
# Bcrypt has a maximum password length of 72 bytes
# Truncate if necessary (encode to bytes first to get accurate byte length)
password_bytes = plain_password.encode('utf-8')
password_bytes_final = password_bytes[:72]
# Используем прямой вызов bcrypt для проверки
try:
hashed_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes_final, hashed_bytes)
except Exception as e:
print(f"[Auth] ❌ Ошибка при проверке пароля: {type(e).__name__}: {e}")
# Fallback на passlib
plain_password_final = password_bytes_final.decode('utf-8', errors='ignore')
return pwd_context.verify(plain_password_final, hashed_password)
def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
if expires_delta is None:
expires_delta = timedelta(seconds=settings.JWT_ACCESS_EXPIRES_IN)
expire = datetime.utcnow() + expires_delta
to_encode = {
"userId": user_id,
"exp": expire,
"type": "access"
}
encoded_jwt = jwt.encode(to_encode, settings.JWT_ACCESS_SECRET, algorithm="HS256")
return encoded_jwt
def create_refresh_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT refresh token"""
if expires_delta is None:
expires_delta = timedelta(seconds=settings.JWT_REFRESH_EXPIRES_IN)
expire = datetime.utcnow() + expires_delta
to_encode = {
"userId": user_id,
"exp": expire,
"type": "refresh"
}
encoded_jwt = jwt.encode(to_encode, settings.JWT_REFRESH_SECRET, algorithm="HS256")
return encoded_jwt
def verify_token(token: str, token_type: str = "access") -> Optional[dict]:
"""Verify JWT token"""
try:
secret = settings.JWT_ACCESS_SECRET if token_type == "access" else settings.JWT_REFRESH_SECRET
payload = jwt.decode(token, secret, algorithms=["HS256"])
if payload.get("type") != token_type:
return None
return payload
except JWTError:
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
access_token: Optional[str] = Cookie(None, alias=settings.JWT_ACCESS_COOKIE_NAME)
):
"""Get current authenticated user"""
token = None
# Try Bearer token from header
if credentials:
token = credentials.credentials
# Try cookie
elif access_token:
token = access_token
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Не авторизован"
)
# Verify token
payload = verify_token(token, "access")
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный токен"
)
# Get user from database
user_id = payload.get("userId")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный токен"
)
from bson import ObjectId
user = await users_collection().find_one({"_id": ObjectId(user_id)})
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Пользователь не найден"
)
if user.get('banned'):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Аккаунт заблокирован"
)
return user
async def require_moderator(user: dict = Depends(get_current_user)):
"""Require moderator or admin role"""
if user.get('role') not in ['moderator', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для модерации"
)
return user
async def require_owner(user: dict = Depends(get_current_user)):
"""Require owner role"""
username = user.get('username', '').lower()
is_owner = (
user.get('role') == 'admin' or
username in settings.OWNER_USERNAMES_LIST
)
if not is_owner:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав. Требуются права владельца."
)
return user
def normalize_username(username: Optional[str]) -> Optional[str]:
"""Normalize username (remove @ and lowercase)"""
if not username:
return None
username = username.strip().lower()
if username.startswith('@'):
username = username[1:]
return username if username else None
async def is_moderation_admin(telegram_id: Optional[str] = None, username: Optional[str] = None) -> bool:
"""Check if user is moderation admin"""
if not telegram_id and not username:
return False
query = {}
if telegram_id:
query['telegramId'] = telegram_id
if username:
normalized = normalize_username(username)
if normalized:
query['username'] = normalized
admin = await moderation_admins_collection().find_one(query)
return admin is not None