""" Authentication utilities """ from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, status, Depends, Cookie, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from config import settings from database import users_collection, moderation_admins_collection # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT bearer security = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """ Hash password with bcrypt Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт. Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт. ВАЖНО: Если пароль обрезан при регистрации, то при входе нужно ввести тот же пароль (который также будет обрезан до 72 байт), иначе проверка не сработает. Для избежания проблем рекомендуется использовать пароли до 24 символов, что обычно не превышает 72 байта даже для UTF-8 символов. """ # Bcrypt has a maximum password length of 72 bytes # Truncate if necessary (encode to bytes first to get accurate byte length) print(f"[Auth] 🔍 hash_password вызван:") print(f"[Auth] Пароль (repr): {repr(password)}") print(f"[Auth] Длина в символах: {len(password)}") password_bytes = password.encode('utf-8') original_length = len(password_bytes) print(f"[Auth] Длина в байтах: {original_length}") print(f"[Auth] Байты (hex): {password_bytes.hex()}") if original_length > 72: # Обрезаем до первых 72 байт password = password_bytes[:72].decode('utf-8', errors='ignore') print(f"[Auth] ⚠️ Пароль обрезан до 72 байт (было {original_length} байт)") print(f"[Auth] ⚠️ ВНИМАНИЕ: При входе используйте тот же пароль, он также будет обрезан") else: print(f"[Auth] ✅ Пароль в пределах лимита (72 байта)") try: result = pwd_context.hash(password) print(f"[Auth] ✅ Пароль успешно захеширован") return result except Exception as e: print(f"[Auth] ❌ Ошибка при хешировании: {type(e).__name__}: {e}") import traceback traceback.print_exc() raise def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify password against hash Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт. Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт. Это гарантирует, что проверка сработает даже если пароль был обрезан при регистрации. """ # Bcrypt has a maximum password length of 72 bytes # Truncate if necessary (encode to bytes first to get accurate byte length) password_bytes = plain_password.encode('utf-8') if len(password_bytes) > 72: # Обрезаем до первых 72 байт (так же, как при хешировании) plain_password = password_bytes[:72].decode('utf-8', errors='ignore') print(f"[Auth] ⚠️ Пароль при проверке обрезан до 72 байт (было {len(password_bytes)} байт)") return pwd_context.verify(plain_password, hashed_password) def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" if expires_delta is None: expires_delta = timedelta(seconds=settings.JWT_ACCESS_EXPIRES_IN) expire = datetime.utcnow() + expires_delta to_encode = { "userId": user_id, "exp": expire, "type": "access" } encoded_jwt = jwt.encode(to_encode, settings.JWT_ACCESS_SECRET, algorithm="HS256") return encoded_jwt def create_refresh_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """Create JWT refresh token""" if expires_delta is None: expires_delta = timedelta(seconds=settings.JWT_REFRESH_EXPIRES_IN) expire = datetime.utcnow() + expires_delta to_encode = { "userId": user_id, "exp": expire, "type": "refresh" } encoded_jwt = jwt.encode(to_encode, settings.JWT_REFRESH_SECRET, algorithm="HS256") return encoded_jwt def verify_token(token: str, token_type: str = "access") -> Optional[dict]: """Verify JWT token""" try: secret = settings.JWT_ACCESS_SECRET if token_type == "access" else settings.JWT_REFRESH_SECRET payload = jwt.decode(token, secret, algorithms=["HS256"]) if payload.get("type") != token_type: return None return payload except JWTError: return None async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), access_token: Optional[str] = Cookie(None, alias=settings.JWT_ACCESS_COOKIE_NAME) ): """Get current authenticated user""" token = None # Try Bearer token from header if credentials: token = credentials.credentials # Try cookie elif access_token: token = access_token if not token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Не авторизован" ) # Verify token payload = verify_token(token, "access") if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен" ) # Get user from database user_id = payload.get("userId") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен" ) from bson import ObjectId user = await users_collection().find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Пользователь не найден" ) if user.get('banned'): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Аккаунт заблокирован" ) return user async def require_moderator(user: dict = Depends(get_current_user)): """Require moderator or admin role""" if user.get('role') not in ['moderator', 'admin']: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Недостаточно прав для модерации" ) return user async def require_owner(user: dict = Depends(get_current_user)): """Require owner role""" username = user.get('username', '').lower() is_owner = ( user.get('role') == 'admin' or username in settings.OWNER_USERNAMES_LIST ) if not is_owner: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Недостаточно прав. Требуются права владельца." ) return user def normalize_username(username: Optional[str]) -> Optional[str]: """Normalize username (remove @ and lowercase)""" if not username: return None username = username.strip().lower() if username.startswith('@'): username = username[1:] return username if username else None async def is_moderation_admin(telegram_id: Optional[str] = None, username: Optional[str] = None) -> bool: """Check if user is moderation admin""" if not telegram_id and not username: return False query = {} if telegram_id: query['telegramId'] = telegram_id if username: normalized = normalize_username(username) if normalized: query['username'] = normalized admin = await moderation_admins_collection().find_one(query) return admin is not None