2025-12-14 23:45:41 +00:00
|
|
|
|
"""
|
|
|
|
|
|
Authentication utilities
|
|
|
|
|
|
"""
|
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
from jose import JWTError, jwt
|
|
|
|
|
|
from passlib.context import CryptContext
|
2025-12-15 01:06:58 +00:00
|
|
|
|
import bcrypt
|
2025-12-14 23:45:41 +00:00
|
|
|
|
from fastapi import HTTPException, status, Depends, Cookie, Header
|
|
|
|
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
|
|
|
|
|
|
|
|
|
|
from config import settings
|
|
|
|
|
|
from database import users_collection, moderation_admins_collection
|
|
|
|
|
|
|
|
|
|
|
|
# Password hashing
|
2025-12-15 01:06:58 +00:00
|
|
|
|
# Используем bcrypt с явной обрезкой паролей до 72 байт
|
|
|
|
|
|
pwd_context = CryptContext(
|
|
|
|
|
|
schemes=["bcrypt"],
|
|
|
|
|
|
deprecated="auto",
|
|
|
|
|
|
bcrypt__ident="2b" # Используем современный формат bcrypt
|
|
|
|
|
|
)
|
2025-12-14 23:45:41 +00:00
|
|
|
|
|
|
|
|
|
|
# JWT bearer
|
|
|
|
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
2025-12-15 01:01:39 +00:00
|
|
|
|
"""
|
|
|
|
|
|
Hash password with bcrypt
|
|
|
|
|
|
|
|
|
|
|
|
Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт.
|
|
|
|
|
|
Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт.
|
|
|
|
|
|
|
|
|
|
|
|
ВАЖНО: Если пароль обрезан при регистрации, то при входе нужно ввести
|
|
|
|
|
|
тот же пароль (который также будет обрезан до 72 байт), иначе проверка не сработает.
|
|
|
|
|
|
|
|
|
|
|
|
Для избежания проблем рекомендуется использовать пароли до 24 символов,
|
|
|
|
|
|
что обычно не превышает 72 байта даже для UTF-8 символов.
|
|
|
|
|
|
"""
|
|
|
|
|
|
# Bcrypt has a maximum password length of 72 bytes
|
|
|
|
|
|
# Truncate if necessary (encode to bytes first to get accurate byte length)
|
|
|
|
|
|
print(f"[Auth] 🔍 hash_password вызван:")
|
|
|
|
|
|
print(f"[Auth] Пароль (repr): {repr(password)}")
|
|
|
|
|
|
print(f"[Auth] Длина в символах: {len(password)}")
|
|
|
|
|
|
|
|
|
|
|
|
password_bytes = password.encode('utf-8')
|
|
|
|
|
|
original_length = len(password_bytes)
|
|
|
|
|
|
print(f"[Auth] Длина в байтах: {original_length}")
|
|
|
|
|
|
print(f"[Auth] Байты (hex): {password_bytes.hex()}")
|
|
|
|
|
|
|
|
|
|
|
|
if original_length > 72:
|
|
|
|
|
|
# Обрезаем до первых 72 байт
|
|
|
|
|
|
password = password_bytes[:72].decode('utf-8', errors='ignore')
|
|
|
|
|
|
print(f"[Auth] ⚠️ Пароль обрезан до 72 байт (было {original_length} байт)")
|
|
|
|
|
|
print(f"[Auth] ⚠️ ВНИМАНИЕ: При входе используйте тот же пароль, он также будет обрезан")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"[Auth] ✅ Пароль в пределах лимита (72 байта)")
|
|
|
|
|
|
|
2025-12-15 01:06:58 +00:00
|
|
|
|
# ВАЖНО: passlib/bcrypt может проверять длину пароля ВНУТРИ библиотеки
|
|
|
|
|
|
# Обрезаем пароль до 72 байт ПЕРЕД передачей в pwd_context.hash()
|
|
|
|
|
|
# Это гарантирует, что пароль никогда не превысит лимит
|
|
|
|
|
|
password_bytes_final = password_bytes[:72]
|
|
|
|
|
|
password_final = password_bytes_final.decode('utf-8', errors='strict')
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[Auth] Финальная длина в байтах: {len(password_bytes_final)}")
|
|
|
|
|
|
print(f"[Auth] Финальный пароль (repr): {repr(password_final)}")
|
|
|
|
|
|
print(f"[Auth] Финальный пароль (len): {len(password_final)} символов")
|
|
|
|
|
|
|
|
|
|
|
|
# Дополнительная проверка: убедимся, что после декодирования длина в байтах не изменилась
|
|
|
|
|
|
password_final_bytes_check = password_final.encode('utf-8')
|
|
|
|
|
|
if len(password_final_bytes_check) > 72:
|
|
|
|
|
|
print(f"[Auth] ⚠️ После декодирования длина стала {len(password_final_bytes_check)} байт, обрезаем еще раз")
|
|
|
|
|
|
password_final = password_final_bytes_check[:72].decode('utf-8', errors='ignore')
|
|
|
|
|
|
password_final_bytes_check = password_final.encode('utf-8')
|
|
|
|
|
|
|
|
|
|
|
|
print(f"[Auth] Финальная проверка: {len(password_final_bytes_check)} байт")
|
|
|
|
|
|
|
2025-12-15 01:01:39 +00:00
|
|
|
|
try:
|
2025-12-15 01:06:58 +00:00
|
|
|
|
# Используем прямой вызов bcrypt для обхода проблемы с passlib
|
|
|
|
|
|
# Обрезаем пароль до 72 байт и передаем как bytes
|
|
|
|
|
|
password_bytes_for_bcrypt = password_final_bytes_check[:72]
|
|
|
|
|
|
|
|
|
|
|
|
# Генерируем соль и хешируем пароль
|
|
|
|
|
|
salt = bcrypt.gensalt(rounds=10)
|
|
|
|
|
|
hashed = bcrypt.hashpw(password_bytes_for_bcrypt, salt)
|
|
|
|
|
|
|
|
|
|
|
|
# Конвертируем bytes в строку для хранения
|
|
|
|
|
|
result = hashed.decode('utf-8')
|
|
|
|
|
|
print(f"[Auth] ✅ Пароль успешно захеширован через прямой bcrypt")
|
2025-12-15 01:01:39 +00:00
|
|
|
|
return result
|
2025-12-15 01:06:58 +00:00
|
|
|
|
except ValueError as e:
|
|
|
|
|
|
# Если bcrypt все равно выдает ошибку, попробуем обрезать до 71 байта
|
|
|
|
|
|
error_msg = str(e).lower()
|
|
|
|
|
|
if "72 bytes" in error_msg or "longer than" in error_msg or "truncate" in error_msg:
|
|
|
|
|
|
print(f"[Auth] ⚠️ Bcrypt все равно жалуется на длину, обрезаем до 71 байта")
|
|
|
|
|
|
password_bytes_safe = password_bytes[:71]
|
|
|
|
|
|
salt = bcrypt.gensalt(rounds=10)
|
|
|
|
|
|
hashed = bcrypt.hashpw(password_bytes_safe, salt)
|
|
|
|
|
|
result = hashed.decode('utf-8')
|
|
|
|
|
|
print(f"[Auth] ✅ Пароль успешно захеширован после обрезки до 71 байта")
|
|
|
|
|
|
return result
|
|
|
|
|
|
print(f"[Auth] ❌ ValueError: {e}")
|
|
|
|
|
|
raise
|
2025-12-15 01:01:39 +00:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[Auth] ❌ Ошибка при хешировании: {type(e).__name__}: {e}")
|
|
|
|
|
|
import traceback
|
|
|
|
|
|
traceback.print_exc()
|
|
|
|
|
|
raise
|
2025-12-14 23:45:41 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
2025-12-15 01:01:39 +00:00
|
|
|
|
"""
|
|
|
|
|
|
Verify password against hash
|
|
|
|
|
|
|
|
|
|
|
|
Bcrypt имеет ограничение: пароль не может быть длиннее 72 байт.
|
|
|
|
|
|
Если пароль превышает 72 байта, он будет автоматически обрезан до первых 72 байт.
|
|
|
|
|
|
|
|
|
|
|
|
Это гарантирует, что проверка сработает даже если пароль был обрезан при регистрации.
|
|
|
|
|
|
"""
|
|
|
|
|
|
# Bcrypt has a maximum password length of 72 bytes
|
|
|
|
|
|
# Truncate if necessary (encode to bytes first to get accurate byte length)
|
|
|
|
|
|
password_bytes = plain_password.encode('utf-8')
|
2025-12-15 01:06:58 +00:00
|
|
|
|
password_bytes_final = password_bytes[:72]
|
|
|
|
|
|
|
|
|
|
|
|
# Используем прямой вызов bcrypt для проверки
|
|
|
|
|
|
try:
|
|
|
|
|
|
hashed_bytes = hashed_password.encode('utf-8')
|
|
|
|
|
|
return bcrypt.checkpw(password_bytes_final, hashed_bytes)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"[Auth] ❌ Ошибка при проверке пароля: {type(e).__name__}: {e}")
|
|
|
|
|
|
# Fallback на passlib
|
|
|
|
|
|
plain_password_final = password_bytes_final.decode('utf-8', errors='ignore')
|
|
|
|
|
|
return pwd_context.verify(plain_password_final, hashed_password)
|
2025-12-14 23:45:41 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
|
|
|
|
|
|
"""Create JWT access token"""
|
|
|
|
|
|
if expires_delta is None:
|
|
|
|
|
|
expires_delta = timedelta(seconds=settings.JWT_ACCESS_EXPIRES_IN)
|
|
|
|
|
|
|
|
|
|
|
|
expire = datetime.utcnow() + expires_delta
|
|
|
|
|
|
to_encode = {
|
|
|
|
|
|
"userId": user_id,
|
|
|
|
|
|
"exp": expire,
|
|
|
|
|
|
"type": "access"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, settings.JWT_ACCESS_SECRET, algorithm="HS256")
|
|
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_refresh_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
|
|
|
|
|
|
"""Create JWT refresh token"""
|
|
|
|
|
|
if expires_delta is None:
|
|
|
|
|
|
expires_delta = timedelta(seconds=settings.JWT_REFRESH_EXPIRES_IN)
|
|
|
|
|
|
|
|
|
|
|
|
expire = datetime.utcnow() + expires_delta
|
|
|
|
|
|
to_encode = {
|
|
|
|
|
|
"userId": user_id,
|
|
|
|
|
|
"exp": expire,
|
|
|
|
|
|
"type": "refresh"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, settings.JWT_REFRESH_SECRET, algorithm="HS256")
|
|
|
|
|
|
return encoded_jwt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def verify_token(token: str, token_type: str = "access") -> Optional[dict]:
|
|
|
|
|
|
"""Verify JWT token"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
secret = settings.JWT_ACCESS_SECRET if token_type == "access" else settings.JWT_REFRESH_SECRET
|
|
|
|
|
|
payload = jwt.decode(token, secret, algorithms=["HS256"])
|
|
|
|
|
|
|
|
|
|
|
|
if payload.get("type") != token_type:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
return payload
|
|
|
|
|
|
except JWTError:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_current_user(
|
|
|
|
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
|
|
|
|
access_token: Optional[str] = Cookie(None, alias=settings.JWT_ACCESS_COOKIE_NAME)
|
|
|
|
|
|
):
|
|
|
|
|
|
"""Get current authenticated user"""
|
|
|
|
|
|
token = None
|
|
|
|
|
|
|
|
|
|
|
|
# Try Bearer token from header
|
|
|
|
|
|
if credentials:
|
|
|
|
|
|
token = credentials.credentials
|
|
|
|
|
|
# Try cookie
|
|
|
|
|
|
elif access_token:
|
|
|
|
|
|
token = access_token
|
|
|
|
|
|
|
|
|
|
|
|
if not token:
|
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
|
|
detail="Не авторизован"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Verify token
|
|
|
|
|
|
payload = verify_token(token, "access")
|
|
|
|
|
|
if not payload:
|
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
|
|
detail="Неверный токен"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Get user from database
|
|
|
|
|
|
user_id = payload.get("userId")
|
|
|
|
|
|
if not user_id:
|
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
|
|
detail="Неверный токен"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
from bson import ObjectId
|
|
|
|
|
|
user = await users_collection().find_one({"_id": ObjectId(user_id)})
|
|
|
|
|
|
|
|
|
|
|
|
if not user:
|
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
|
|
detail="Пользователь не найден"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if user.get('banned'):
|
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
|
|
|
|
detail="Аккаунт заблокирован"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def require_moderator(user: dict = Depends(get_current_user)):
|
|
|
|
|
|
"""Require moderator or admin role"""
|
|
|
|
|
|
if user.get('role') not in ['moderator', 'admin']:
|
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
|
|
|
|
detail="Недостаточно прав для модерации"
|
|
|
|
|
|
)
|
|
|
|
|
|
return user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def require_owner(user: dict = Depends(get_current_user)):
|
|
|
|
|
|
"""Require owner role"""
|
|
|
|
|
|
username = user.get('username', '').lower()
|
|
|
|
|
|
is_owner = (
|
|
|
|
|
|
user.get('role') == 'admin' or
|
|
|
|
|
|
username in settings.OWNER_USERNAMES_LIST
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if not is_owner:
|
|
|
|
|
|
raise HTTPException(
|
|
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
|
|
|
|
detail="Недостаточно прав. Требуются права владельца."
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_username(username: Optional[str]) -> Optional[str]:
|
|
|
|
|
|
"""Normalize username (remove @ and lowercase)"""
|
|
|
|
|
|
if not username:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
username = username.strip().lower()
|
|
|
|
|
|
if username.startswith('@'):
|
|
|
|
|
|
username = username[1:]
|
|
|
|
|
|
|
|
|
|
|
|
return username if username else None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def is_moderation_admin(telegram_id: Optional[str] = None, username: Optional[str] = None) -> bool:
|
|
|
|
|
|
"""Check if user is moderation admin"""
|
|
|
|
|
|
if not telegram_id and not username:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
query = {}
|
|
|
|
|
|
if telegram_id:
|
|
|
|
|
|
query['telegramId'] = telegram_id
|
|
|
|
|
|
|
|
|
|
|
|
if username:
|
|
|
|
|
|
normalized = normalize_username(username)
|
|
|
|
|
|
if normalized:
|
|
|
|
|
|
query['username'] = normalized
|
|
|
|
|
|
|
|
|
|
|
|
admin = await moderation_admins_collection().find_one(query)
|
|
|
|
|
|
return admin is not None
|
|
|
|
|
|
|