""" Authentication utilities """ from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext import bcrypt from fastapi import HTTPException, status, Depends, Cookie, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from config import settings from database import users_collection, moderation_admins_collection # Password hashing # Используем bcrypt с явной обрезкой паролей до 72 байт pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__ident="2b" # Используем современный формат bcrypt ) # JWT bearer security = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """ Hash password with bcrypt Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт. Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт. ВАЖНО: Если пароль обрезан при регистрации, то при входе нужно ввести тот же пароль (который также будет обрезан до 72 байт), иначе проверка не сработает. Для избежания проблем рекомендуется использовать пароли до 24 символов, что обычно не превышает 72 байта даже для UTF-8 символов. """ # Bcrypt has a maximum password length of 72 bytes # Truncate if necessary (encode to bytes first to get accurate byte length) print(f"[Auth] 🔍 hash_password вызван:") print(f"[Auth] Пароль (repr): {repr(password)}") print(f"[Auth] Длина в символах: {len(password)}") password_bytes = password.encode('utf-8') original_length = len(password_bytes) print(f"[Auth] Длина в байтах: {original_length}") print(f"[Auth] Байты (hex): {password_bytes.hex()}") if original_length > 72: # Обрезаем до первых 72 байт password = password_bytes[:72].decode('utf-8', errors='ignore') print(f"[Auth] ⚠️ Пароль обрезан до 72 байт (было {original_length} байт)") print(f"[Auth] ⚠️ ВНИМАНИЕ: При входе используйте тот же пароль, он также будет обрезан") else: print(f"[Auth] ✅ Пароль в пределах лимита (72 байта)") # ВАЖНО: passlib/bcrypt может проверять длину пароля ВНУТРИ библиотеки # Обрезаем пароль до 72 байт ПЕРЕД передачей в pwd_context.hash() # Это гарантирует, что пароль никогда не превысит лимит password_bytes_final = password_bytes[:72] password_final = password_bytes_final.decode('utf-8', errors='strict') print(f"[Auth] Финальная длина в байтах: {len(password_bytes_final)}") print(f"[Auth] Финальный пароль (repr): {repr(password_final)}") print(f"[Auth] Финальный пароль (len): {len(password_final)} символов") # Дополнительная проверка: убедимся, что после декодирования длина в байтах не изменилась password_final_bytes_check = password_final.encode('utf-8') if len(password_final_bytes_check) > 72: print(f"[Auth] ⚠️ После декодирования длина стала {len(password_final_bytes_check)} байт, обрезаем еще раз") password_final = password_final_bytes_check[:72].decode('utf-8', errors='ignore') password_final_bytes_check = password_final.encode('utf-8') print(f"[Auth] Финальная проверка: {len(password_final_bytes_check)} байт") try: # Используем прямой вызов bcrypt для обхода проблемы с passlib # Обрезаем пароль до 72 байт и передаем как bytes password_bytes_for_bcrypt = password_final_bytes_check[:72] # Генерируем соль и хешируем пароль salt = bcrypt.gensalt(rounds=10) hashed = bcrypt.hashpw(password_bytes_for_bcrypt, salt) # Конвертируем bytes в строку для хранения result = hashed.decode('utf-8') print(f"[Auth] ✅ Пароль успешно захеширован через прямой bcrypt") return result except ValueError as e: # Если bcrypt все равно выдает ошибку, попробуем обрезать до 71 байта error_msg = str(e).lower() if "72 bytes" in error_msg or "longer than" in error_msg or "truncate" in error_msg: print(f"[Auth] ⚠️ Bcrypt все равно жалуется на длину, обрезаем до 71 байта") password_bytes_safe = password_bytes[:71] salt = bcrypt.gensalt(rounds=10) hashed = bcrypt.hashpw(password_bytes_safe, salt) result = hashed.decode('utf-8') print(f"[Auth] ✅ Пароль успешно захеширован после обрезки до 71 байта") return result print(f"[Auth] ❌ ValueError: {e}") raise except Exception as e: print(f"[Auth] ❌ Ошибка при хешировании: {type(e).__name__}: {e}") import traceback traceback.print_exc() raise def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify password against hash Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт. Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт. Это гарантирует, что проверка сработает даже если пароль был обрезан при регистрации. """ # Bcrypt has a maximum password length of 72 bytes # Truncate if necessary (encode to bytes first to get accurate byte length) password_bytes = plain_password.encode('utf-8') password_bytes_final = password_bytes[:72] # Используем прямой вызов bcrypt для проверки try: hashed_bytes = hashed_password.encode('utf-8') return bcrypt.checkpw(password_bytes_final, hashed_bytes) except Exception as e: print(f"[Auth] ❌ Ошибка при проверке пароля: {type(e).__name__}: {e}") # Fallback на passlib plain_password_final = password_bytes_final.decode('utf-8', errors='ignore') return pwd_context.verify(plain_password_final, hashed_password) def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" if expires_delta is None: expires_delta = timedelta(seconds=settings.JWT_ACCESS_EXPIRES_IN) expire = datetime.utcnow() + expires_delta to_encode = { "userId": user_id, "exp": expire, "type": "access" } encoded_jwt = jwt.encode(to_encode, settings.JWT_ACCESS_SECRET, algorithm="HS256") return encoded_jwt def create_refresh_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """Create JWT refresh token""" if expires_delta is None: expires_delta = timedelta(seconds=settings.JWT_REFRESH_EXPIRES_IN) expire = datetime.utcnow() + expires_delta to_encode = { "userId": user_id, "exp": expire, "type": "refresh" } encoded_jwt = jwt.encode(to_encode, settings.JWT_REFRESH_SECRET, algorithm="HS256") return encoded_jwt def verify_token(token: str, token_type: str = "access") -> Optional[dict]: """Verify JWT token""" try: secret = settings.JWT_ACCESS_SECRET if token_type == "access" else settings.JWT_REFRESH_SECRET payload = jwt.decode(token, secret, algorithms=["HS256"]) if payload.get("type") != token_type: return None return payload except JWTError: return None async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), access_token: Optional[str] = Cookie(None, alias=settings.JWT_ACCESS_COOKIE_NAME) ): """Get current authenticated user""" token = None # Try Bearer token from header if credentials: token = credentials.credentials # Try cookie elif access_token: token = access_token if not token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Не авторизован" ) # Verify token payload = verify_token(token, "access") if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен" ) # Get user from database user_id = payload.get("userId") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный токен" ) from bson import ObjectId user = await users_collection().find_one({"_id": ObjectId(user_id)}) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Пользователь не найден" ) if user.get('banned'): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Аккаунт заблокирован" ) return user async def require_moderator(user: dict = Depends(get_current_user)): """Require moderator or admin role""" if user.get('role') not in ['moderator', 'admin']: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Недостаточно прав для модерации" ) return user async def require_owner(user: dict = Depends(get_current_user)): """Require owner role""" username = user.get('username', '').lower() is_owner = ( user.get('role') == 'admin' or username in settings.OWNER_USERNAMES_LIST ) if not is_owner: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Недостаточно прав. Требуются права владельца." ) return user def normalize_username(username: Optional[str]) -> Optional[str]: """Normalize username (remove @ and lowercase)""" if not username: return None username = username.strip().lower() if username.startswith('@'): username = username[1:] return username if username else None async def is_moderation_admin(telegram_id: Optional[str] = None, username: Optional[str] = None) -> bool: """Check if user is moderation admin""" if not telegram_id and not username: return False query = {} if telegram_id: query['telegramId'] = telegram_id if username: normalized = normalize_username(username) if normalized: query['username'] = normalized admin = await moderation_admins_collection().find_one(query) return admin is not None