140 lines
4.7 KiB
Python
140 lines
4.7 KiB
Python
"""
|
||
Telegram Mini App initData validation
|
||
"""
|
||
import hmac
|
||
import hashlib
|
||
import urllib.parse
|
||
from typing import Optional, Dict, Any
|
||
from config import settings
|
||
|
||
MAX_AUTH_AGE_SECONDS = 60 * 60 * 24 * 7 # 7 дней (увеличено для модерации)
|
||
AUTH_AGE_TOLERANCE_SECONDS = 60 * 60 * 24 # 24 часа допуск (увеличено для избежания "сессия устарела")
|
||
|
||
|
||
def validate_init_data(init_data_raw: str, bot_token: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
Validate and parse Telegram Mini App initData
|
||
|
||
Args:
|
||
init_data_raw: Raw initData string from Telegram
|
||
bot_token: Bot token (uses MODERATION_BOT_TOKEN if not provided)
|
||
|
||
Returns:
|
||
Parsed initData payload with user data
|
||
|
||
Raises:
|
||
ValueError: If validation fails
|
||
"""
|
||
token_to_use = bot_token or settings.MODERATION_BOT_TOKEN
|
||
|
||
if not token_to_use or not isinstance(token_to_use, str) or not token_to_use.strip():
|
||
error_msg = (
|
||
'Bot token модерации не настроен (MODERATION_BOT_TOKEN)'
|
||
if bot_token
|
||
else 'Bot token не настроен (TELEGRAM_BOT_TOKEN)'
|
||
)
|
||
print(f'[Telegram] Token error: {error_msg}')
|
||
raise ValueError(error_msg)
|
||
|
||
if not init_data_raw or not isinstance(init_data_raw, str):
|
||
raise ValueError('initData не передан')
|
||
|
||
print(f'[Telegram] validate_init_data called: hasInitData={bool(init_data_raw)}, length={len(init_data_raw)}')
|
||
|
||
# Parse query string
|
||
params = urllib.parse.parse_qs(init_data_raw)
|
||
|
||
# Get hash
|
||
hash_value = params.get('hash', [None])[0]
|
||
if not hash_value:
|
||
raise ValueError('Отсутствует hash в initData')
|
||
|
||
# Remove hash from params
|
||
params_without_hash = {k: v[0] for k, v in params.items() if k != 'hash'}
|
||
|
||
# Create data check string (sorted keys)
|
||
data_check_arr = sorted(params_without_hash.items())
|
||
data_check_string = '\n'.join(f'{key}={value}' for key, value in data_check_arr)
|
||
|
||
# Create secret key
|
||
secret_key = hmac.new(
|
||
'WebAppData'.encode(),
|
||
token_to_use.encode(),
|
||
hashlib.sha256
|
||
).digest()
|
||
|
||
# Create signature
|
||
signature = hmac.new(
|
||
secret_key,
|
||
data_check_string.encode(),
|
||
hashlib.sha256
|
||
).hexdigest()
|
||
|
||
# Compare signatures
|
||
if signature != hash_value:
|
||
print(f'[Telegram] Hash mismatch: calculated={signature[:20]}..., received={hash_value[:20]}...')
|
||
raise ValueError('Неверная подпись initData')
|
||
|
||
# Check auth_date if present
|
||
auth_date_str = params_without_hash.get('auth_date')
|
||
if auth_date_str:
|
||
try:
|
||
auth_date = int(auth_date_str)
|
||
import time
|
||
now = int(time.time())
|
||
age = abs(now - auth_date)
|
||
|
||
if age > MAX_AUTH_AGE_SECONDS:
|
||
# Allow tolerance (особенно для модерации, где сессии могут быть длинными)
|
||
if age <= MAX_AUTH_AGE_SECONDS + AUTH_AGE_TOLERANCE_SECONDS:
|
||
print(f'⚠️ InitData expired ({age}s), but allowing due to tolerance.')
|
||
else:
|
||
raise ValueError(f'Данные авторизации устарели (возраст: {age}с, макс: {MAX_AUTH_AGE_SECONDS}с)')
|
||
except ValueError:
|
||
pass # Invalid auth_date, skip check
|
||
|
||
# Parse user data if present
|
||
user_str = params_without_hash.get('user')
|
||
user_data = None
|
||
if user_str:
|
||
import json
|
||
try:
|
||
user_data = json.loads(user_str)
|
||
except json.JSONDecodeError:
|
||
raise ValueError('Неверный формат user данных в initData')
|
||
|
||
print('[Telegram] initData validation complete')
|
||
|
||
return {
|
||
'user': user_data,
|
||
'auth_date': auth_date_str,
|
||
'hash': hash_value,
|
||
'raw': params_without_hash
|
||
}
|
||
|
||
|
||
def normalize_telegram_user(user_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""Normalize Telegram user data (handle both camelCase and snake_case)"""
|
||
if not user_data:
|
||
return {}
|
||
|
||
normalized = {}
|
||
|
||
# ID
|
||
normalized['id'] = str(user_data.get('id') or user_data.get('id'))
|
||
|
||
# Username
|
||
normalized['username'] = user_data.get('username') or user_data.get('username')
|
||
|
||
# First name
|
||
normalized['firstName'] = user_data.get('first_name') or user_data.get('firstName')
|
||
|
||
# Last name
|
||
normalized['lastName'] = user_data.get('last_name') or user_data.get('lastName')
|
||
|
||
# Photo URL
|
||
normalized['photoUrl'] = user_data.get('photo_url') or user_data.get('photoUrl')
|
||
|
||
return normalized
|
||
|