""" Telegram Mini App initData validation """ import hmac import hashlib import urllib.parse from typing import Optional, Dict, Any from config import settings MAX_AUTH_AGE_SECONDS = 60 * 60 # 1 час AUTH_AGE_TOLERANCE_SECONDS = 300 # 5 минут допуск def validate_init_data(init_data_raw: str, bot_token: Optional[str] = None) -> Dict[str, Any]: """ Validate and parse Telegram Mini App initData Args: init_data_raw: Raw initData string from Telegram bot_token: Bot token (uses MODERATION_BOT_TOKEN if not provided) Returns: Parsed initData payload with user data Raises: ValueError: If validation fails """ token_to_use = bot_token or settings.MODERATION_BOT_TOKEN if not token_to_use or not isinstance(token_to_use, str) or not token_to_use.strip(): error_msg = ( 'Bot token модерации не настроен (MODERATION_BOT_TOKEN)' if bot_token else 'Bot token не настроен (TELEGRAM_BOT_TOKEN)' ) print(f'[Telegram] Token error: {error_msg}') raise ValueError(error_msg) if not init_data_raw or not isinstance(init_data_raw, str): raise ValueError('initData не передан') print(f'[Telegram] validate_init_data called: hasInitData={bool(init_data_raw)}, length={len(init_data_raw)}') # Parse query string params = urllib.parse.parse_qs(init_data_raw) # Get hash hash_value = params.get('hash', [None])[0] if not hash_value: raise ValueError('Отсутствует hash в initData') # Remove hash from params params_without_hash = {k: v[0] for k, v in params.items() if k != 'hash'} # Create data check string (sorted keys) data_check_arr = sorted(params_without_hash.items()) data_check_string = '\n'.join(f'{key}={value}' for key, value in data_check_arr) # Create secret key secret_key = hmac.new( 'WebAppData'.encode(), token_to_use.encode(), hashlib.sha256 ).digest() # Create signature signature = hmac.new( secret_key, data_check_string.encode(), hashlib.sha256 ).hexdigest() # Compare signatures if signature != hash_value: print(f'[Telegram] Hash mismatch: calculated={signature[:20]}..., received={hash_value[:20]}...') raise ValueError('Неверная подпись initData') # Check auth_date if present auth_date_str = params_without_hash.get('auth_date') if auth_date_str: try: auth_date = int(auth_date_str) import time now = int(time.time()) age = abs(now - auth_date) if age > MAX_AUTH_AGE_SECONDS: # Allow small tolerance in development if settings.IS_DEVELOPMENT and age <= MAX_AUTH_AGE_SECONDS + AUTH_AGE_TOLERANCE_SECONDS: print(f'⚠️ InitData expired ({age}s), but allowing due to tolerance in development.') else: raise ValueError(f'Данные авторизации устарели (возраст: {age}с, макс: {MAX_AUTH_AGE_SECONDS}с)') except ValueError: pass # Invalid auth_date, skip check # Parse user data if present user_str = params_without_hash.get('user') user_data = None if user_str: import json try: user_data = json.loads(user_str) except json.JSONDecodeError: raise ValueError('Неверный формат user данных в initData') print('[Telegram] initData validation complete') return { 'user': user_data, 'auth_date': auth_date_str, 'hash': hash_value, 'raw': params_without_hash } def normalize_telegram_user(user_data: Dict[str, Any]) -> Dict[str, Any]: """Normalize Telegram user data (handle both camelCase and snake_case)""" if not user_data: return {} normalized = {} # ID normalized['id'] = str(user_data.get('id') or user_data.get('id')) # Username normalized['username'] = user_data.get('username') or user_data.get('username') # First name normalized['firstName'] = user_data.get('first_name') or user_data.get('firstName') # Last name normalized['lastName'] = user_data.get('last_name') or user_data.get('lastName') # Photo URL normalized['photoUrl'] = user_data.get('photo_url') or user_data.get('photoUrl') return normalized