nakama/moderation/backend-py/utils/telegram_initdata.py

140 lines
4.6 KiB
Python
Raw Normal View History

2025-12-15 00:04:03 +00:00
"""
Telegram Mini App initData validation
"""
import hmac
import hashlib
import urllib.parse
from typing import Optional, Dict, Any
from config import settings
2025-12-15 00:15:53 +00:00
MAX_AUTH_AGE_SECONDS = 60 * 60 * 24 # 24 часа (увеличено для модерации)
AUTH_AGE_TOLERANCE_SECONDS = 60 * 60 # 1 час допуск
2025-12-15 00:04:03 +00:00
def validate_init_data(init_data_raw: str, bot_token: Optional[str] = None) -> Dict[str, Any]:
"""
Validate and parse Telegram Mini App initData
Args:
init_data_raw: Raw initData string from Telegram
bot_token: Bot token (uses MODERATION_BOT_TOKEN if not provided)
Returns:
Parsed initData payload with user data
Raises:
ValueError: If validation fails
"""
token_to_use = bot_token or settings.MODERATION_BOT_TOKEN
if not token_to_use or not isinstance(token_to_use, str) or not token_to_use.strip():
error_msg = (
'Bot token модерации не настроен (MODERATION_BOT_TOKEN)'
if bot_token
else 'Bot token не настроен (TELEGRAM_BOT_TOKEN)'
)
print(f'[Telegram] Token error: {error_msg}')
raise ValueError(error_msg)
if not init_data_raw or not isinstance(init_data_raw, str):
raise ValueError('initData не передан')
print(f'[Telegram] validate_init_data called: hasInitData={bool(init_data_raw)}, length={len(init_data_raw)}')
# Parse query string
params = urllib.parse.parse_qs(init_data_raw)
# Get hash
hash_value = params.get('hash', [None])[0]
if not hash_value:
raise ValueError('Отсутствует hash в initData')
# Remove hash from params
params_without_hash = {k: v[0] for k, v in params.items() if k != 'hash'}
# Create data check string (sorted keys)
data_check_arr = sorted(params_without_hash.items())
data_check_string = '\n'.join(f'{key}={value}' for key, value in data_check_arr)
# Create secret key
secret_key = hmac.new(
'WebAppData'.encode(),
token_to_use.encode(),
hashlib.sha256
).digest()
# Create signature
signature = hmac.new(
secret_key,
data_check_string.encode(),
hashlib.sha256
).hexdigest()
# Compare signatures
if signature != hash_value:
print(f'[Telegram] Hash mismatch: calculated={signature[:20]}..., received={hash_value[:20]}...')
raise ValueError('Неверная подпись initData')
# Check auth_date if present
auth_date_str = params_without_hash.get('auth_date')
if auth_date_str:
try:
auth_date = int(auth_date_str)
import time
now = int(time.time())
age = abs(now - auth_date)
if age > MAX_AUTH_AGE_SECONDS:
2025-12-15 00:15:53 +00:00
# Allow tolerance (особенно для модерации, где сессии могут быть длинными)
if age <= MAX_AUTH_AGE_SECONDS + AUTH_AGE_TOLERANCE_SECONDS:
print(f'⚠️ InitData expired ({age}s), but allowing due to tolerance.')
2025-12-15 00:04:03 +00:00
else:
raise ValueError(f'Данные авторизации устарели (возраст: {age}с, макс: {MAX_AUTH_AGE_SECONDS}с)')
except ValueError:
pass # Invalid auth_date, skip check
# Parse user data if present
user_str = params_without_hash.get('user')
user_data = None
if user_str:
import json
try:
user_data = json.loads(user_str)
except json.JSONDecodeError:
raise ValueError('Неверный формат user данных в initData')
print('[Telegram] initData validation complete')
return {
'user': user_data,
'auth_date': auth_date_str,
'hash': hash_value,
'raw': params_without_hash
}
def normalize_telegram_user(user_data: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize Telegram user data (handle both camelCase and snake_case)"""
if not user_data:
return {}
normalized = {}
# ID
normalized['id'] = str(user_data.get('id') or user_data.get('id'))
# Username
normalized['username'] = user_data.get('username') or user_data.get('username')
# First name
normalized['firstName'] = user_data.get('first_name') or user_data.get('firstName')
# Last name
normalized['lastName'] = user_data.get('last_name') or user_data.get('lastName')
# Photo URL
normalized['photoUrl'] = user_data.get('photo_url') or user_data.get('photoUrl')
return normalized