Files
AR 15 M4 bb5e3f5eb3 feat(sprint8b): регрессия ответов веток · general_info + фикс PRAGMA foreign_keys
Параллель к 8a, но проверяем не код intent от роутера, а содержимое ответа
конкретной ветки на одиночную реплику. Старт — general_info, 46 кейсов.

Логика pass/fail (для одного кейса):
- A — RAG-секция: среди retrieved-чанков есть кусок с
  section == expected_doc_section (точное совпадение). Если поле не задано —
  пропускаем.
- B — keywords: обязательные expected_keywords встречаются в predicted_answer
  (case-insensitive). По умолчанию все; поддерживаются keywords_min: N
  и keywords_any: true. Запрещённые expected_must_not — ни одного.
- Pass = A ∧ B. Незаданные поля не проверяются.
- Кэш: (text_hash, branch_config_id) → {answer_text, retrieved_sections}.
  Привязан к версии промпта ветки. Смена версии = пустой кэш = свежий прогон.
  Правка JSONL без изменения text → pass/fail пересчитывается без LLM.

Backend:
- Таблицы eval_branch_runs / eval_branch_run_cases / eval_branch_predictions.
  Миграция m9g1f7e89j56.
- services/eval_branch_run_service.py: загрузка JSONL, фоновый прогон через
  asyncio.create_task, кэш, оценка A+B с поддержкой keywords_min/keywords_any.
- chat_service.run_branch_single_turn — изолированный single-turn без
  роутера и треда (использует существующий config_service + vectorstore + llm).
- API: POST /eval/branch-runs, GET /eval/branch-runs?intent_code=,
  GET /eval/branch-runs/{id}, GET /eval/branch-cases-with-status?intent_code=.

UI (static/regression.html):
- Селектор режима «Роутер / Ветка · general_info». Логика пикера переиспользуется
  (фильтры, диапазон, массовый выбор, счётчик «новые / в кэше»).
- Для режима «Ветка»: фильтр по coverage, колонки секция/coverage, keywords,
  частота, кэш. Drill-down прогона: ожидание, retrieved-секции, причины fail,
  полный ответ ветки.

База кейсов (eval/branch_cases_general_info.jsonl) — от пользователя, 46 кейсов
по схеме {text, intent, coverage, expected_doc_section?, expected_keywords?,
expected_must_not?, keywords_min?, keywords_any?, count?, note?}.

Связанная правка SQLite (нашли при удалении документа в этом спринте):
- db/session.py: connect-listener PRAGMA foreign_keys=ON на каждое подключение.
  Без этого ondelete=CASCADE в SQLite не enforced, и удаление документа
  оставляло подписки в intent_documents висячими (что давало пустой RAG
  и fail регрессии).
- Миграция n0h2g8f9a0k67 — одноразовая чистка существующих висячих подписок.

docs/SPRINTS.md: Спринт 8b →  Закрыт. Diff vs предыдущий прогон для веток
и кнопка «Сбросить кэш регрессии» вынесены в docs/BACKLOG.md.

Также включены обновлённые data/datasets/general_info.md и price_question.md
(рабочий материал оператора), и черновик eval/branch_cases_price_question.jsonl
для следующего захода (8b на price_question).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 01:20:59 +05:00

907 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
from datetime import datetime, timezone
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import IntentStep, Message, Thread
from services import config_service, intent_document_service, intent_step_service, thread_state_service
from services.llm_client import LLMClient, LLMUnavailableError
from services.router_client import RouterClient
from services.state_machine import check_guards, parse_branch_response, validate_transition
from services.vectorstore import VectorStoreService
logger = logging.getLogger(__name__)
HISTORY_LIMIT = 20
FALLBACK_INTENT_CODE = "general_info"
ESCALATE_INTENT_CODE = "escalate_human"
MAX_BOUNCES = 1
HANDOFF_CAP = 3 # столько hard-handoff'ов разрешено за диалог; четвёртое — авто-перевод
SOFT_INSERTION_CAP = 3 # столько «боковых вопросов» подряд терпим, потом возвращаем к шагу
ROUTING_LOOP_REPLY = (
"Уточню детали с администратором клиники, свяжемся с вами "
"в течение ближайшего часа."
)
SOFT_INSERTION_NUDGE = (
"[ВОЗВРАТ К СЦЕНАРИЮ]\n"
"Пациент уже несколько реплик подряд задаёт боковые вопросы, не двигая сценарий. "
"На этой реплике уверенно верни его к вопросу текущего шага одной короткой фразой; "
"не давай развернутого ответа на стороннюю тему."
)
def _auto_thread_name(first_user_text: str) -> str:
preview = first_user_text.strip().replace("\n", " ")
if len(preview) > 60:
preview = preview[:60].rstrip() + ""
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
return f"{preview} · {stamp}"
def _retrieved_to_sources(retrieved: list[dict]) -> list[dict]:
sources = []
for item in retrieved:
meta = item.get("metadata", {})
sources.append({
"document_id": meta.get("document_id", ""),
"document_name": meta.get("document_name", ""),
"chunk_text": item["text"][:500],
"section": meta.get("section", ""),
"page": meta.get("page_number", 0),
"relevance_score": round(item.get("relevance_score", 0), 3),
})
return sources
def _format_state_context(
snapshot: dict,
current_step: IntentStep | None,
router_hint: str | None = None,
soft_nudge: bool = False,
escalation_reason: str | None = None,
) -> str:
"""Блок с текущим состоянием треда для дописывания в системный промпт."""
slots = snapshot.get("slots", {}) or {}
slots_json = json.dumps(slots, ensure_ascii=False)
lines = ["", "[ТЕКУЩЕЕ СОСТОЯНИЕ]"]
if current_step is not None:
allowed = intent_step_service.parse_allowed_next(current_step)
lines.append(f"step_code: {current_step.code} ({current_step.name})")
lines.append(f"allowed_next: {json.dumps(allowed, ensure_ascii=False)}")
else:
lines.append("step_code: —")
lines.append(f"slots: {slots_json}")
if escalation_reason:
lines.append(f"escalation_reason: {escalation_reason}")
if router_hint:
lines.append("")
lines.append("[ПОДСКАЗКА РОУТЕРА]")
lines.append(router_hint)
if soft_nudge:
lines.append("")
lines.append(SOFT_INSERTION_NUDGE)
return "\n" + "\n".join(lines)
def _build_operator_summary(
reason: str | None,
history: list[dict],
slots: dict,
suspended_slots: dict,
) -> dict:
"""Саммари для оператора при передаче диалога."""
combined_slots = {**suspended_slots, **slots}
summary = {
"reason": reason or "explicit_request",
"slots": combined_slots,
"history_tail": [
{"role": m["role"], "text": m["content"][:300]}
for m in history[-8:]
],
}
return summary
async def _resolve_intent_with_fallback(
session: AsyncSession, intent_code: str
) -> tuple[str, object, object]:
pair = await config_service.get_active_config_by_intent_code(session, intent_code)
if pair is None:
logger.warning("Intent %r has no active config, falling back to %s", intent_code, FALLBACK_INTENT_CODE)
pair = await config_service.get_active_config_by_intent_code(session, FALLBACK_INTENT_CODE)
if pair is None:
raise RuntimeError(f"No active config for fallback intent {FALLBACK_INTENT_CODE!r}")
intent, cfg = pair
return FALLBACK_INTENT_CODE, intent, cfg
intent, cfg = pair
return intent_code, intent, cfg
async def _resolve_current_step(
session: AsyncSession, intent_id: int, intent_code: str, step_code: str | None,
) -> IntentStep | None:
"""Найти шаг state machine для текущего состояния. Если кода нет — взять первый шаг ветки."""
if not intent_step_service.has_state_machine(intent_code):
return None
if step_code:
step = await intent_step_service.get_step_by_code(session, intent_id, step_code)
if step is not None:
return step
logger.warning("Step %r not found for intent %s, falling back to first step", step_code, intent_code)
return await intent_step_service.get_first_step(session, intent_id)
def _eval_pending_guard(
current_step: "IntentStep | None",
slots: dict,
) -> "dict | None":
"""Возвращает описание активного guard'а если триггер сработал, но слоты ещё не заполнены."""
if current_step is None:
return None
guards = intent_step_service.parse_guards(current_step)
if not guards:
return None
for guard_name, guard_def in guards.items():
if not isinstance(guard_def, dict):
continue
trigger_slot = guard_def.get("trigger_slot")
trigger_value = guard_def.get("trigger_value")
required_slots: list[str] = guard_def.get("required_slots", [])
slot_val = slots.get(trigger_slot) if trigger_slot else None
if isinstance(slot_val, str) and slot_val.lower() in ("true", "false"):
slot_val = slot_val.lower() == "true"
triggered = (trigger_slot is None) or (slot_val == trigger_value)
if not triggered:
continue
missing = [s for s in required_slots if not slots.get(s)]
if missing:
return {
"guard_name": guard_name,
"description": guard_def.get("description", ""),
"missing_slots": missing,
}
return None
async def run_branch_single_turn(
session: AsyncSession,
vectorstore: VectorStoreService,
llm: LLMClient,
intent_code: str,
text: str,
*,
top_k: int = 5,
temperature: float = 0.0,
) -> dict:
"""Single-turn запрос к ветке для регрессии (Спринт 8b).
Изолированно от обычного `send_message`: без роутера, без треда, без
state machine. Просто берём активный промпт ветки + RAG-чанки по
подпискам + LLM. Возвращаем `{answer_text, retrieved, branch_config_id,
branch_config_version, retrieved_sections}`.
"""
pair = await config_service.get_active_config_by_intent_code(session, intent_code)
if pair is None:
raise RuntimeError(f"No active config for intent {intent_code!r}")
intent, active_cfg = pair
subscribed_document_ids = await intent_document_service.list_documents_for_intent_code(
session, intent_code,
)
retrieved = vectorstore.query(
query_text=text,
top_k=top_k,
document_ids=subscribed_document_ids,
)
base_prompt = config_service.compose_full_system_prompt(active_cfg)
llm_result = await llm.chat(
question=text,
sources=retrieved,
history=[],
system_prompt=base_prompt,
temperature=temperature,
)
parsed = parse_branch_response(llm_result["text"])
answer_text = parsed["visible_text"] or llm_result["text"]
retrieved_sections = []
for r in retrieved or []:
meta = r.get("metadata") or {}
section = meta.get("section") or ""
document_name = meta.get("document_name") or ""
retrieved_sections.append({"section": section, "document_name": document_name})
return {
"answer_text": answer_text,
"retrieved": retrieved or [],
"retrieved_sections": retrieved_sections,
"branch_config_id": active_cfg.id,
"branch_config_version": active_cfg.version,
}
async def send_message(
session: AsyncSession,
vectorstore: VectorStoreService,
llm: LLMClient,
router: RouterClient,
text: str,
thread_id: int | None = None,
top_k: int = 5,
temperature: float | None = None,
max_tokens: int | None = None,
) -> dict:
"""Обработать реплику пациента: роутер → state machine → LLM → ответ.
Важно: коммит транзакции делается только в самом конце. Если LLM упадёт —
rollback в роутере откатит thread + user_msg, чтобы «пустые» диалоги без
ответа ассистента не висели в списке.
"""
if thread_id is None:
thread = Thread(name=_auto_thread_name(text))
session.add(thread)
await session.flush()
else:
thread = await session.get(Thread, thread_id)
if thread is None:
raise LookupError(f"Thread {thread_id} not found")
user_msg = Message(thread_id=thread.id, role="user", text=text)
session.add(user_msg)
await session.flush() # только flush, без commit — чтобы откатить при ошибке LLM
stmt = (
select(Message)
.where(Message.thread_id == thread.id, Message.id != user_msg.id)
.order_by(Message.created_at.desc(), Message.id.desc())
.limit(HISTORY_LIMIT)
)
rows = (await session.execute(stmt)).scalars().all()
history = [{"role": m.role, "content": m.text} for m in reversed(rows)]
# 1a. Снимок состояния — нужен роутеру, чтобы предпочитать текущую ветку.
snapshot = await thread_state_service.load_snapshot(session, thread.id)
# 1b. Роутер — куда направляем.
routing = await router.classify(session=session, history=history, text=text, snapshot=snapshot)
router_code = routing["code"]
router_version = routing.get("version")
escalation_reason: str | None = routing.get("escalation_reason")
router_assembled_prompt: str = routing.get("router_assembled_prompt", "")
# 2. Логика выбора effective_code:
# 2.1. Если есть suspended_intent и роутер вернулся в него — RESUME: восстанавливаем
# прерванный сценарий, очищаем suspended_*, handoff_count=0.
# 2.2. Иначе если диалог идёт по sm-ветке и роутер предлагает другую — sticky:
# НЕ сбрасываем state, передаём LLM [ПОДСКАЗКА РОУТЕРА].
# 2.3. Иначе если prev — не-sm и роутер ведёт в другую ветку — hard-handoff.
prev_intent_code = snapshot["current_intent_code"]
handoff_count = snapshot.get("handoff_count", 0)
soft_insertion_count = snapshot.get("soft_insertion_count", 0)
suspended_intent = snapshot.get("suspended_intent")
resumable_step_code = snapshot.get("resumable_step_code")
resumable_slots = snapshot.get("resumable_slots", {}) or {}
router_hint: str | None = None
effective_code = router_code
routing_loop_triggered = False
resumed_from_suspended = False
if suspended_intent and suspended_intent == router_code and prev_intent_code != suspended_intent:
logger.info(
"Resume from suspended in thread %d: %s (step=%s, %d slots)",
thread.id, suspended_intent, resumable_step_code, len(resumable_slots),
)
snapshot = {
"current_intent_code": suspended_intent,
"current_step": 0,
"current_step_code": resumable_step_code,
"slots": dict(resumable_slots),
"handoff_count": 0,
"soft_insertion_count": 0,
"suspended_intent": None,
"resumable_step_code": None,
"resumable_slots": {},
}
prev_intent_code = suspended_intent
handoff_count = 0
soft_insertion_count = 0
suspended_intent = None
resumable_step_code = None
resumable_slots = {}
effective_code = snapshot["current_intent_code"]
resumed_from_suspended = True
elif prev_intent_code and prev_intent_code != router_code:
if intent_step_service.has_state_machine(prev_intent_code):
logger.info(
"Router suggested %s but thread %d is in sm %s — sticky, hint only",
router_code, thread.id, prev_intent_code,
)
router_hint = (
f"Роутер на этой реплике счёл, что тема — `{router_code}`. "
f"Ты сейчас ведёшь сценарий `{prev_intent_code}`. "
f"Если пациент действительно сменил тему (перенос, цены, острое состояние) — "
f"выдай `[INTENT_CHANGE: {router_code}]`. "
f"Если реплика укладывается в сценарий (повод/жалоба/имя) — "
f"зафиксируй её в соответствующий слот и продолжай по сценарию."
)
effective_code = prev_intent_code
else:
# Реальный hard-handoff: prev — не sm-ветка, роутер ведёт.
logger.info(
"Router switched intent for thread %d: %s%s (state reset)",
thread.id, prev_intent_code, router_code,
)
handoff_count += 1
soft_insertion_count = 0
snapshot = {
"current_intent_code": router_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": handoff_count,
"soft_insertion_count": 0,
# suspended_* не трогаем — там может лежать прерванная sm-ветка,
# к которой пациент ещё захочет вернуться.
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
# 2b. Защита от петли (v2 §4.3): если за диалог накопилось много handoff'ов и
# сейчас ещё одно переключение — забираем диалог в escalate_human с заглушкой,
# без вызова LLM. После авто-эскалации сбрасываем handoff_count и suspended_*
# (диалог переходит к оператору, прерванный сценарий не продолжаем).
if handoff_count > HANDOFF_CAP and effective_code != ESCALATE_INTENT_CODE:
logger.warning(
"Routing loop guard tripped for thread %d (handoff_count=%d), forcing %s",
thread.id, handoff_count, ESCALATE_INTENT_CODE,
)
effective_code = ESCALATE_INTENT_CODE
snapshot = {
"current_intent_code": ESCALATE_INTENT_CODE,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": 0,
"soft_insertion_count": 0,
"suspended_intent": None,
"resumable_step_code": None,
"resumable_slots": {},
}
handoff_count = 0
soft_insertion_count = 0
suspended_intent = None
resumable_step_code = None
resumable_slots = {}
routing_loop_triggered = True
escalation_reason = "routing_loop"
# 3. Разрешаем ветку (с fallback) и шаг.
served_code, intent, active_cfg = await _resolve_intent_with_fallback(session, effective_code)
if served_code != effective_code:
snapshot = {
"current_intent_code": served_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": handoff_count,
"soft_insertion_count": 0,
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
soft_insertion_count = 0
router_hint = None
# Финализируем snapshot.current_intent_code на served_code: для не-sm-веток
# (general_info / price_question / ...) state_update от LLM не приходит, и без
# этого snapshot["current_intent_code"] осталось бы None для нового треда —
# тогда на следующей реплике prev_intent_code не определится и handoff_count
# не инкрементится.
snapshot["current_intent_code"] = served_code
# Подписки активной ветки на документы (Спринт 7, часть A — мульти-RAG).
# Дефолт пустой подписки — пустой список, т.е. retrieval вернёт 0 чанков.
# Это сознательный выбор: пустая подписка = ветка не настроена, подмешивать
# случайное хуже, чем не подмешивать ничего. См. SPRINTS.md, Спринт 7.
subscribed_document_ids = await intent_document_service.list_documents_for_intent_code(
session, served_code,
)
retrieved = vectorstore.query(
query_text=text,
top_k=top_k,
document_ids=subscribed_document_ids,
)
sources = _retrieved_to_sources(retrieved)
bounce_log: list[dict] = []
validation_events: list[dict] = [] # illegal transitions для UI-подсветки
last_assembled_prompt = ""
visible_text = ""
parse_error: str | None = None
is_state_machine = False
parsed: dict | None = None # инициализируем заранее: routing_loop guard может пропустить for-цикл
# Если уже сработала защита от петли — не зовём LLM, формируем заглушку.
if routing_loop_triggered:
visible_text = ROUTING_LOOP_REPLY
last_assembled_prompt = (
"[ROUTING LOOP GUARD]\n"
f"handoff_count превысил {HANDOFF_CAP}, диалог автоматически уведён в "
f"{ESCALATE_INTENT_CODE}. LLM не вызывался."
)
else:
for attempt in range(MAX_BOUNCES + 1):
current_step = await _resolve_current_step(
session, intent.id, served_code, snapshot.get("current_step_code"),
)
is_state_machine = current_step is not None
if current_step is not None and snapshot.get("current_step_code") != current_step.code:
snapshot["current_step_code"] = current_step.code
base_prompt = config_service.compose_full_system_prompt(active_cfg)
step_prompt = f"\n\n{current_step.system_prompt}" if current_step else ""
soft_nudge = is_state_machine and soft_insertion_count >= SOFT_INSERTION_CAP
state_context = _format_state_context(
snapshot, current_step, router_hint, soft_nudge,
escalation_reason=escalation_reason if served_code == ESCALATE_INTENT_CODE else None,
)
system_prompt = base_prompt + step_prompt + state_context
llm_result = await llm.chat(
question=text,
sources=retrieved,
history=history,
system_prompt=system_prompt,
temperature=temperature,
max_tokens=max_tokens,
)
last_assembled_prompt = llm_result["assembled_prompt"]
parsed = parse_branch_response(llm_result["text"])
visible_text = parsed["visible_text"] or llm_result["text"]
# STATE_JSON-блок ждём только от state-machine-веток. У остальных
# «no STATE_JSON» — ожидаемое состояние, не ошибка.
parse_error = parsed["parse_error"] if is_state_machine else None
if parsed["intent_change"] and attempt < MAX_BOUNCES:
new_code = parsed["intent_change"]
bounce_log.append({
"from": served_code,
"to": new_code,
"preface": parsed["visible_text"],
})
logger.info(
"Intent bounce in thread %d: %s%s", thread.id, served_code, new_code,
)
# Если уходим из sm-ветки и suspended_* ещё свободно — запоминаем,
# чтобы вернуться к прерванному сценарию, когда роутер увидит,
# что пациент возвращается к теме (см. блок 2.1 в начале send_message).
if (
is_state_machine
and current_step is not None
and not suspended_intent
and new_code != served_code
):
suspended_intent = served_code
resumable_step_code = current_step.code
resumable_slots = dict(snapshot.get("slots", {}))
logger.info(
"Suspending sm scenario for thread %d: %s (step=%s, %d slots)",
thread.id, suspended_intent, resumable_step_code, len(resumable_slots),
)
handoff_count += 1
# Защита от петли работает и здесь — на bouncing'е.
if handoff_count > HANDOFF_CAP and new_code != ESCALATE_INTENT_CODE:
logger.warning(
"Routing loop guard tripped on bounce in thread %d (handoff_count=%d)",
thread.id, handoff_count,
)
served_code, intent, active_cfg = await _resolve_intent_with_fallback(
session, ESCALATE_INTENT_CODE,
)
snapshot = {
"current_intent_code": served_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": 0,
"soft_insertion_count": 0,
"suspended_intent": None,
"resumable_step_code": None,
"resumable_slots": {},
}
handoff_count = 0
soft_insertion_count = 0
suspended_intent = None
resumable_step_code = None
resumable_slots = {}
visible_text = ROUTING_LOOP_REPLY
last_assembled_prompt = (
"[ROUTING LOOP GUARD]\n"
f"handoff_count превысил {HANDOFF_CAP} на bouncing'е, "
f"диалог автоматически уведён в {ESCALATE_INTENT_CODE}."
)
routing_loop_triggered = True
parse_error = None
is_state_machine = False
parsed = {"visible_text": visible_text, "intent_change": None, "state_update": None, "parse_error": None}
break
served_code, intent, active_cfg = await _resolve_intent_with_fallback(session, new_code)
soft_insertion_count = 0 # новая ветка — счётчик с нуля
snapshot = {
"current_intent_code": served_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": handoff_count,
"soft_insertion_count": 0,
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
router_hint = None # новая ветка — подсказка больше неактуальна
continue
if parsed["state_update"] is not None and current_step is not None:
requested = parsed["state_update"]["state_after"]
soft_insertion_flag = bool(parsed["state_update"].get("soft_insertion", False))
allowed = intent_step_service.parse_allowed_next(current_step)
ok, reason = validate_transition(
current_step=current_step.code,
requested_step=requested,
allowed_next=allowed,
)
slots_updated = parsed["state_update"]["slots_updated"]
merged_slots = {**snapshot.get("slots", {}), **slots_updated}
guard_name: str | None = None
missing_slots: list[str] = []
guard_desc: str | None = None
if ok:
guards = intent_step_service.parse_guards(current_step)
guard_ok, guard_name, missing_slots, guard_desc = check_guards(
current_step_code=current_step.code,
requested_step_code=requested,
slots=merged_slots,
guards=guards,
)
if not guard_ok:
ok = False
reason = (
f"guard {guard_name!r} не пройден: не хватает слотов {missing_slots}"
)
# Решаем, как изменился soft_insertion_count.
stayed_on_step = ok and requested == current_step.code
if soft_insertion_flag and stayed_on_step and not slots_updated:
soft_insertion_count += 1
else:
soft_insertion_count = 0
base_state = {
"current_intent_code": served_code,
"slots": merged_slots,
"handoff_count": handoff_count,
"soft_insertion_count": soft_insertion_count,
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
if ok:
snapshot = {
**base_state,
"current_step": snapshot["current_step"] + (1 if requested != current_step.code else 0),
"current_step_code": requested,
}
else:
logger.warning(
"Blocked state_after in thread %d (%s): %s", thread.id, served_code, reason,
)
event: dict = {
"current_step": current_step.code,
"requested_step": requested,
"reason": reason,
}
if guard_name:
event["guard_name"] = guard_name
event["missing_slots"] = missing_slots
event["guard_description"] = guard_desc or ""
validation_events.append(event)
# Слоты всё равно мёржим (информация полезная), шаг не двигаем.
snapshot = {
**base_state,
"current_step": snapshot["current_step"],
"current_step_code": current_step.code,
}
elif parsed["state_update"] is None and current_step is not None and parse_error:
logger.warning(
"State machine branch %s returned no STATE_JSON: %s", served_code, parse_error,
)
break
# 4. Сохраняем: thread_state пишется ПОСЛЕ всей логики, коммит — единой транзакцией.
await thread_state_service.upsert(
session, thread.id,
intent_code=snapshot["current_intent_code"],
step=snapshot["current_step"],
step_code=snapshot.get("current_step_code"),
slots=snapshot["slots"],
handoff_count=snapshot.get("handoff_count", handoff_count),
soft_insertion_count=snapshot.get("soft_insertion_count", soft_insertion_count),
suspended_intent=snapshot.get("suspended_intent"),
resumable_step_code=snapshot.get("resumable_step_code"),
resumable_slots=snapshot.get("resumable_slots"),
)
user_msg.intent_id = intent.id
if thread.agent_config_id is None:
thread.agent_config_id = active_cfg.id
# Собираем мета-снимок реплики: что увидит UI рядом с бейджем ветки.
events: list[str] = []
if routing_loop_triggered:
events.append("routing_loop")
if resumed_from_suspended:
events.append("resumed")
if bounce_log:
events.append("hard_handoff")
if router_hint and not routing_loop_triggered and not bounce_log:
events.append("sticky")
if validation_events:
events.append("validation_blocked")
# soft_insertion: ветка явно пометила ответ боковым (см. парсер state_update).
last_state_update = parsed.get("state_update") if isinstance(parsed, dict) else None
if last_state_update and last_state_update.get("soft_insertion"):
events.append("soft_insertion")
meta = {
"router_intent_code": router_code,
"served_intent_code": served_code,
"step_code": snapshot.get("current_step_code"),
"step_name": current_step.name if current_step else None,
"is_state_machine": is_state_machine,
"events": events,
}
# Саммари для оператора — формируется при передаче в escalate_human.
operator_summary: dict | None = None
if served_code == ESCALATE_INTENT_CODE:
operator_summary = _build_operator_summary(
reason=escalation_reason,
history=history,
slots=snapshot.get("slots", {}),
suspended_slots=snapshot.get("resumable_slots", {}),
)
logger.info(
"Operator summary for thread %d: %s",
thread.id, json.dumps(operator_summary, ensure_ascii=False),
)
assistant_msg = Message(
thread_id=thread.id,
role="assistant",
text=visible_text,
sources_json=json.dumps(sources, ensure_ascii=False),
assembled_prompt=last_assembled_prompt,
intent_id=intent.id,
meta_json=json.dumps(meta, ensure_ascii=False),
escalation_reason=escalation_reason if served_code == ESCALATE_INTENT_CODE else None,
)
session.add(assistant_msg)
thread.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(assistant_msg)
await session.refresh(thread)
logger.info(
"Chat: thread=%d, router=%s, served=%s (v%d), step=%s, slots=%d keys, "
"bounces=%d, validation=%d, handoff=%d, routing_loop=%s",
thread.id, router_code, served_code, active_cfg.version,
snapshot.get("current_step_code") or "-",
len(snapshot["slots"]),
len(bounce_log),
len(validation_events),
snapshot.get("handoff_count", 0),
routing_loop_triggered,
)
return {
"thread_id": thread.id,
"thread_name": thread.name,
"message_id": assistant_msg.id,
"intent_code": intent.code,
"intent_name": intent.name,
"router_intent_code": router_code,
"config_version": active_cfg.version,
"router_version": router_version,
"answer": visible_text,
"sources": sources,
"model_used": llm.model,
"assembled_prompt": last_assembled_prompt,
"thread_state": {
"current_intent_code": snapshot["current_intent_code"],
"current_step": snapshot["current_step"],
"current_step_code": snapshot.get("current_step_code"),
"slots": snapshot["slots"],
"handoff_count": snapshot.get("handoff_count", handoff_count),
"soft_insertion_count": snapshot.get("soft_insertion_count", soft_insertion_count),
"suspended_intent": snapshot.get("suspended_intent"),
"resumable_step_code": snapshot.get("resumable_step_code"),
"resumable_slots": snapshot.get("resumable_slots", {}),
"pending_guard": _eval_pending_guard(current_step, snapshot["slots"]),
},
"bounces": bounce_log,
"validation_events": validation_events,
"parse_error": parse_error,
"routing_loop_triggered": routing_loop_triggered,
"resumed_from_suspended": resumed_from_suspended,
"message_meta": meta,
"escalation_reason": escalation_reason if served_code == ESCALATE_INTENT_CODE else None,
"operator_summary": operator_summary,
"router_assembled_prompt": router_assembled_prompt,
"rag_subscription": {
"subscribed_count": len(subscribed_document_ids),
"found_count": len(retrieved),
},
}
async def list_threads(session: AsyncSession) -> list[dict]:
count_subq = (
select(Message.thread_id, func.count(Message.id).label("cnt"))
.group_by(Message.thread_id)
.subquery()
)
first_msg_subq = (
select(Message.thread_id, func.min(Message.id).label("first_id"))
.where(Message.role == "user")
.group_by(Message.thread_id)
.subquery()
)
stmt = (
select(
Thread,
func.coalesce(count_subq.c.cnt, 0).label("messages_count"),
Message.text.label("first_text"),
)
.outerjoin(count_subq, count_subq.c.thread_id == Thread.id)
.outerjoin(first_msg_subq, first_msg_subq.c.thread_id == Thread.id)
.outerjoin(Message, Message.id == first_msg_subq.c.first_id)
.order_by(Thread.updated_at.desc())
)
rows = (await session.execute(stmt)).all()
result = []
for thread, messages_count, first_text in rows:
preview = (first_text or "").strip().replace("\n", " ")
if len(preview) > 120:
preview = preview[:120].rstrip() + ""
result.append({
"id": thread.id,
"name": thread.name,
"created_at": thread.created_at.isoformat(),
"updated_at": thread.updated_at.isoformat(),
"messages_count": messages_count,
"first_message_preview": preview,
})
return result
async def get_thread_detail(session: AsyncSession, thread_id: int) -> dict | None:
from db.models import Intent
thread = await session.get(Thread, thread_id)
if thread is None:
return None
stmt = (
select(Message, Intent.code, Intent.name)
.outerjoin(Intent, Intent.id == Message.intent_id)
.where(Message.thread_id == thread_id)
.order_by(Message.created_at)
)
rows = (await session.execute(stmt)).all()
# Lookup для обогащения старых meta: (intent_id, step_code) -> step_name
# Имена шагов берём только из активного графа: на исторических сообщениях
# отображается текущая версия имени, архивные графы (Спринт 7.7) не считаем.
from db.models import IntentStepGraph
step_rows = (await session.execute(
select(IntentStep.intent_id, IntentStep.code, IntentStep.name)
.join(IntentStepGraph, IntentStepGraph.id == IntentStep.graph_id)
.where(IntentStepGraph.is_active.is_(True))
)).all()
step_name_lookup: dict[tuple, str] = {(iid, sc): sn for iid, sc, sn in step_rows}
messages = []
for m, intent_code, intent_name in rows:
sources = []
if m.sources_json:
try:
sources = json.loads(m.sources_json)
except json.JSONDecodeError:
logger.warning("Bad sources_json for message %d", m.id)
meta = None
if m.meta_json:
try:
meta = json.loads(m.meta_json)
except json.JSONDecodeError:
logger.warning("Bad meta_json for message %d", m.id)
# Обогащаем meta полями, которых не было в старых сообщениях
if meta and meta.get("step_code"):
if "step_name" not in meta and m.intent_id:
meta["step_name"] = step_name_lookup.get((m.intent_id, meta["step_code"]))
if "is_state_machine" not in meta:
meta["is_state_machine"] = True
messages.append({
"id": m.id,
"role": m.role,
"text": m.text,
"created_at": m.created_at.isoformat(),
"sources": sources,
"assembled_prompt": m.assembled_prompt or "",
"intent_code": intent_code or "",
"intent_name": intent_name or "",
"meta": meta,
"escalation_reason": m.escalation_reason,
})
state = await thread_state_service.load_snapshot(session, thread_id)
# Вычисляем pending_guard для текущего состояния треда
pending_guard = None
if state.get("current_step_code") and state.get("current_intent_code"):
from db.models import Intent as _Intent
intent_obj = (await session.execute(
select(_Intent).where(_Intent.code == state["current_intent_code"])
)).scalar_one_or_none()
if intent_obj:
cur_step = await intent_step_service.get_step_by_code(
session, intent_obj.id, state["current_step_code"]
)
pending_guard = _eval_pending_guard(cur_step, state.get("slots", {}))
state["pending_guard"] = pending_guard
return {
"id": thread.id,
"name": thread.name,
"created_at": thread.created_at.isoformat(),
"updated_at": thread.updated_at.isoformat(),
"messages": messages,
"thread_state": state,
}
async def rename_thread(session: AsyncSession, thread_id: int, name: str) -> dict | None:
thread = await session.get(Thread, thread_id)
if thread is None:
return None
thread.name = name
thread.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(thread)
return {
"id": thread.id,
"name": thread.name,
"created_at": thread.created_at.isoformat(),
"updated_at": thread.updated_at.isoformat(),
"messages_count": 0,
"first_message_preview": "",
}
async def delete_thread(session: AsyncSession, thread_id: int) -> int | None:
thread = await session.get(Thread, thread_id)
if thread is None:
return None
count_stmt = select(func.count(Message.id)).where(Message.thread_id == thread_id)
messages_count = (await session.execute(count_stmt)).scalar_one() or 0
await session.execute(delete(Message).where(Message.thread_id == thread_id))
await session.delete(thread)
await session.commit()
return int(messages_count)