""" Authentication utilities """ from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, status, Depends, Cookie, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from config import settings from database import users_collection, moderation_admins_collection # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT bearer security = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """Hash password with bcrypt""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify password against hash""" return pwd_context.verify(plain_password, hashed_password) def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" if expires_delta is None: expires_delta = timedelta(seconds=settings.JWT_ACCESS_EXPIRES_IN) expire = datetime.utcnow() + expires_delta to_encode = { "userId": user_id, "exp": expire, "type": "access" } encoded_jwt = jwt.encode(to_encode, settings.JWT_ACCESS_SECRET, algorithm="HS256") return encoded_jwt def create_refresh_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """Create JWT refresh token""" if expires_delta is None: expires_delta = timedelta(seconds=settings.JWT_REFRESH_EXPIRES_IN) expire = datetime.utcnow() + expires_delta to_encode = { "userId": user_id, "exp": expire, "type": "refresh" } encoded_jwt = jwt.encode(to_encode, settings.JWT_REFRESH_SECRET, algorithm="HS256") return encoded_jwt def verify_token(token: str, token_type: str = "access") -> Optional[dict]: """Verify JWT token""" try: secret = settings.JWT_ACCESS_SECRET if token_type == "access" else settings.JWT_REFRESH_SECRET payload = jwt.decode(token, secret, algorithms=["HS256"]) if payload.get("type") != token_type: return None return payload except JWTError: return None async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), access_token: Optional[str] = Cookie(None, alias=settings.JWT_ACCESS_COOKIE_NAME) ): """Get current authenticated user""" token = None # Try Bearer token from header if credentials: token = credentials.credentials # Try cookie elif access_token: token = access_token if not token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Не авторизован" ) # Verify token payload = verify_token(token, "access") if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен" ) # Get user from database user_id = payload.get("userId") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен" ) from bson import ObjectId user = await users_collection().find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Пользователь не найден" ) if user.get('banned'): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Аккаунт заблокирован" ) return user async def require_moderator(user: dict = Depends(get_current_user)): """Require moderator or admin role""" if user.get('role') not in ['moderator', 'admin']: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Недостаточно прав для модерации" ) return user async def require_owner(user: dict = Depends(get_current_user)): """Require owner role""" username = user.get('username', '').lower() is_owner = ( user.get('role') == 'admin' or username in settings.OWNER_USERNAMES_LIST ) if not is_owner: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Недостаточно прав. Требуются права владельца." ) return user def normalize_username(username: Optional[str]) -> Optional[str]: """Normalize username (remove @ and lowercase)""" if not username: return None username = username.strip().lower() if username.startswith('@'): username = username[1:] return username if username else None async def is_moderation_admin(telegram_id: Optional[str] = None, username: Optional[str] = None) -> bool: """Check if user is moderation admin""" if not telegram_id and not username: return False query = {} if telegram_id: query['telegramId'] = telegram_id if username: normalized = normalize_username(username) if normalized: query['username'] = normalized admin = await moderation_admins_collection().find_one(query) return admin is not None