"""LLM-роутер: по последней реплике + короткой истории определяет ветку. Отдельный класс от LLMClient сознательно — роутер зовётся часто (каждую реплику), имеет смысл в будущем перевести на более дешёвую модель (gpt-4o-mini, локальная Qwen). Сейчас оба используют DeepSeek. Системный промпт роутера лежит в БД как активный конфиг ветки `_router` (версионируется, редактируется из UI «Настройки»). Если БД недоступна или ветки нет — используем fallback из prompts/intents/_router.md. """ import asyncio import logging import re from pathlib import Path import httpx from sqlalchemy.ext.asyncio import AsyncSession from config import settings from services import config_service, intent_service logger = logging.getLogger(__name__) _FALLBACK_PROMPT_PATH = Path(__file__).resolve().parent.parent / "prompts" / "intents" / "_router.md" def _load_fallback_prompt() -> str: try: return _FALLBACK_PROMPT_PATH.read_text(encoding="utf-8").strip() except FileNotFoundError: logger.warning("Router fallback prompt not found at %s", _FALLBACK_PROMPT_PATH) return "" FALLBACK_SYSTEM_PROMPT = _load_fallback_prompt() VALID_CODES = { "new_booking", "reschedule", "price_question", "medical_question", "general_info", "escalate_human", } ESCALATION_REASONS = {"acute_pain", "surgery", "angry", "explicit_request", "routing_loop"} CODE_RE = re.compile(r"\b(new_booking|reschedule|price_question|medical_question|general_info|escalate_human)\b") REASON_RE = re.compile(r"escalate_human\|([a-z_]+)") class RouterClient: def __init__( self, api_key: str | None = None, model: str | None = None, base_url: str | None = None, ): self.api_key = api_key or settings.deepseek_api_key self.model = model or settings.deepseek_model self.base_url = (base_url or settings.deepseek_base_url).rstrip("/") def _format_history(self, history: list[dict], last_n: int = 4) -> str: """Короткая история последних реплик — для контекста классификации.""" if not history: return "(предыдущих реплик нет)" tail = history[-last_n:] lines = [] for m in tail: role_ru = "Пациент" if m["role"] == "user" else "Ассистент" content = m["content"].replace("\n", " ")[:300] lines.append(f"{role_ru}: {content}") return "\n".join(lines) def _format_state_hint(self, snapshot: dict | None) -> str: """Блок [ТЕКУЩИЙ СЦЕНАРИЙ] для промпта роутера.""" if not snapshot: return "" intent = snapshot.get("current_intent_code") if not intent: return "" step = snapshot.get("current_step_code") slots = snapshot.get("slots") or {} lines = [ "", "[ТЕКУЩИЙ СЦЕНАРИЙ]", f"Сейчас в диалоге активна ветка: {intent}" + (f", шаг: {step}" if step else ""), ] if slots: import json as _json lines.append(f"Собранные слоты: {_json.dumps(slots, ensure_ascii=False)}") lines.append( "Если новая реплика пациента логично продолжает этот сценарий " "или относится к нему косвенно — предпочитай ту же ветку." ) return "\n".join(lines) async def _get_system_prompt(self, session: AsyncSession) -> tuple[str, int | None]: """Активный промпт роутера из БД (ветка _router). Возвращает (prompt, version_or_None).""" pair = await config_service.get_active_config_by_intent_code( session, intent_service.ROUTER_INTENT_CODE ) if pair is None: return FALLBACK_SYSTEM_PROMPT, None _, cfg = pair return config_service.compose_full_system_prompt(cfg), cfg.version async def classify( self, session: AsyncSession, history: list[dict], text: str, snapshot: dict | None = None, ) -> dict: """Классифицировать реплику. Возвращает {code, version} — версия роутера для отладки. При сомнении или парсинг-ошибке — general_info (безопасный fallback). """ system_prompt, version = await self._get_system_prompt(session) state_hint = self._format_state_hint(snapshot) user_message = ( f"История последних реплик:\n{self._format_history(history)}" f"{state_hint}\n\n" f"Новая реплика пациента:\n{text}\n\n" f"Код ветки:" ) url = f"{self.base_url}/chat/completions" payload = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}, ], "temperature": 0.0, "max_tokens": 20, } data: dict | None = None last_error: Exception | None = None # Один ретрай: DeepSeek иногда отвечает 5xx / пустым исключением. for attempt in range(2): try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( url, json=payload, headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", }, ) response.raise_for_status() data = response.json() break except Exception as e: last_error = e logger.warning( "Router LLM call failed (attempt %d, %s: %s)", attempt + 1, type(e).__name__, e, ) if attempt < 1: await asyncio.sleep(0.5) if data is None: logger.warning( "Router LLM failed after retries (%s), falling back to general_info", last_error, ) return {"code": "general_info", "version": version} raw = (data["choices"][0]["message"]["content"] or "").strip() match = CODE_RE.search(raw) if match: code = match.group(1) escalation_reason: str | None = None if code == "escalate_human": reason_match = REASON_RE.search(raw) if reason_match and reason_match.group(1) in ESCALATION_REASONS: escalation_reason = reason_match.group(1) else: escalation_reason = "explicit_request" logger.info("Router v%s: %r → %s%s", version, text[:80], code, f"|{escalation_reason}" if escalation_reason else "") return { "code": code, "version": version, "escalation_reason": escalation_reason, "router_assembled_prompt": f"[system]\n{system_prompt}\n\n[user]\n{user_message}", } logger.warning("Router returned unrecognized response %r, falling back to general_info", raw) return {"code": "general_info", "version": version, "escalation_reason": None, "router_assembled_prompt": ""}