nakama/moderation/backend-py/utils/auth.py

187 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Authentication utilities
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status, Depends, Cookie, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import settings
from database import users_collection, moderation_admins_collection
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT bearer
security = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
"""Hash password with bcrypt"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
if expires_delta is None:
expires_delta = timedelta(seconds=settings.JWT_ACCESS_EXPIRES_IN)
expire = datetime.utcnow() + expires_delta
to_encode = {
"userId": user_id,
"exp": expire,
"type": "access"
}
encoded_jwt = jwt.encode(to_encode, settings.JWT_ACCESS_SECRET, algorithm="HS256")
return encoded_jwt
def create_refresh_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT refresh token"""
if expires_delta is None:
expires_delta = timedelta(seconds=settings.JWT_REFRESH_EXPIRES_IN)
expire = datetime.utcnow() + expires_delta
to_encode = {
"userId": user_id,
"exp": expire,
"type": "refresh"
}
encoded_jwt = jwt.encode(to_encode, settings.JWT_REFRESH_SECRET, algorithm="HS256")
return encoded_jwt
def verify_token(token: str, token_type: str = "access") -> Optional[dict]:
"""Verify JWT token"""
try:
secret = settings.JWT_ACCESS_SECRET if token_type == "access" else settings.JWT_REFRESH_SECRET
payload = jwt.decode(token, secret, algorithms=["HS256"])
if payload.get("type") != token_type:
return None
return payload
except JWTError:
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
access_token: Optional[str] = Cookie(None, alias=settings.JWT_ACCESS_COOKIE_NAME)
):
"""Get current authenticated user"""
token = None
# Try Bearer token from header
if credentials:
token = credentials.credentials
# Try cookie
elif access_token:
token = access_token
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Не авторизован"
)
# Verify token
payload = verify_token(token, "access")
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный токен"
)
# Get user from database
user_id = payload.get("userId")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный токен"
)
from bson import ObjectId
user = await users_collection().find_one({"_id": ObjectId(user_id)})
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Пользователь не найден"
)
if user.get('banned'):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Аккаунт заблокирован"
)
return user
async def require_moderator(user: dict = Depends(get_current_user)):
"""Require moderator or admin role"""
if user.get('role') not in ['moderator', 'admin']:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для модерации"
)
return user
async def require_owner(user: dict = Depends(get_current_user)):
"""Require owner role"""
username = user.get('username', '').lower()
is_owner = (
user.get('role') == 'admin' or
username in settings.OWNER_USERNAMES_LIST
)
if not is_owner:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав. Требуются права владельца."
)
return user
def normalize_username(username: Optional[str]) -> Optional[str]:
"""Normalize username (remove @ and lowercase)"""
if not username:
return None
username = username.strip().lower()
if username.startswith('@'):
username = username[1:]
return username if username else None
async def is_moderation_admin(telegram_id: Optional[str] = None, username: Optional[str] = None) -> bool:
"""Check if user is moderation admin"""
if not telegram_id and not username:
return False
query = {}
if telegram_id:
query['telegramId'] = telegram_id
if username:
normalized = normalize_username(username)
if normalized:
query['username'] = normalized
admin = await moderation_admins_collection().find_one(query)
return admin is not None