Files
RAG_helper/services/chat_service.py
T
AR 15 M4 52b46bc53e feat(sprint6c+sprint7): терминология, сверка примеров с кодом, мульти-RAG (часть A)
Спринт 6c — терминология и сверка документации с реальным кодом:
- Словарь терминов в static/docs.html: «маршрутизатор» вместо «роутер»,
  «защитное условие» вместо «guard», «пошаговая ветка» вместо «многошаговая».
  Разделены концепты «намерение» (intent) и «ветка» (branch) с пометкой,
  что в коде они хранятся как одна сущность 1:1.
- Песочница: «Решение маршрутизатора» виден всегда (зелёный/жёлтый),
  счётчик переключений «N из 3» отдельной плашкой, бейджи под словарь.
- Настройки: «Условия перехода» → «Защитные условия (guards, JSON)».
- GRAPH_ARCHITECTURE_v4.md: имена полей thread_state и слоты приведены
  к реальной БД (db/models/thread_state.py) и таксономии промптов шагов
  (prompts/intents/new_booking/steps/). Ссылки на *_v2 примеры. На v3
  поставлена шапка «устарело».
- 4 примера переписаны как *_v2: реальные current_intent_code/
  current_step_code/slots_json, реальные allowed_next без двойных переходов,
  реальная таксономия слотов name/reason/specialist/preferred_time/confirmed.
  Удалены вымышленные CRM tool calls и слоты, которых нет в коде.
- static/example.html — параметризованная страница с навигацией между
  4 примерами; роут GET /api/docs/examples/{name} в main.py отдаёт
  markdown без дублирования файлов.
- Редактирование документов в Отладке: GET/PUT /documents/{id}/raw,
  textarea с переразметкой и обновлением Chroma при сохранении.

Спринт 7, часть A — мульти-RAG через подписку ветка↔документы:
- Миграция: таблица intent_documents (M:N), модель IntentDocument,
  индекс по document_id для обратного поиска.
- API: GET/PUT /intents/{code}/documents и GET/PUT /documents/{id}/intents
  с PUT-семантикой «полный список», атомарно. Сервис
  services/intent_document_service.py.
- Retrieval-фильтр в chat_service: подтягивает document_ids активной
  ветки и передаёт в vectorstore.query(). Дефолт пустой подписки —
  document_ids=[] (= 0 чанков), не «вся коллекция»: пустая подписка
  означает «ветка не настроена», подмешивать случайное хуже, чем
  ничего. vectorstore.query() различает None (нет фильтра) и [] (0).
- UI Настроек: блок «Документы базы знаний» в правом сайдбаре,
  всегда видим независимо от вкладки, сортировка по имени, счётчик
  «N из M», PUT при сохранении.
- UI Отладки: третья кнопка «привязка» рядом с «удалить» —
  раскрывашка со списком веток (галочки), быстрая привязка прямо
  на странице загрузки.
- Песочница: блок «Срез RAG» с подпиской/найдено, ворнинг при пустой
  подписке. Поле rag_subscription в QueryResponse и ChatResponse.
- Системный промпт страницы Отладки переехал в обычную ветку _debug
  («Страница отладки»). Удалён prompts/system_prompt.md и логика
  DEFAULT_SYSTEM_PROMPT в llm_client. routers/query.py подтягивает
  активный конфиг ветки _debug и её подписки. Дефолт пустой подписки
  для _debug — None (вся коллекция), не [] как для пациентских — чтобы
  Отладка работала «из коробки». На странице Отладки info-bar показывает
  активную версию и счётчик подписок, ссылка → Настройки.
- Тест-блок «Тест-вопрос» в центре Настроек: расширил /query
  параметрами intent_code (default _debug), system_prompt (override
  для теста черновика из textarea), disable_rag (для _router).
  Редактор промпта обёрнут в <details open> — можно свернуть до
  одной строки. Под ним — три колонки результата (RAG / промпт /
  ответ). Для _router показывается подсказка про отсутствие RAG.

Документы:
- data/datasets/*.md — наработки по 6 веткам (рабочие материалы оператора).
- docs/BRANCH_MAP_AND_PROMPTS_v1.md, docs/OPTIMIZATION_CONVERSION_v1.md,
  docs/guides/state_machine_and_slots.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 20:00:44 +05:00

842 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
from datetime import datetime, timezone
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import IntentStep, Message, Thread
from services import config_service, intent_document_service, intent_step_service, thread_state_service
from services.llm_client import LLMClient, LLMUnavailableError
from services.router_client import RouterClient
from services.state_machine import check_guards, parse_branch_response, validate_transition
from services.vectorstore import VectorStoreService
logger = logging.getLogger(__name__)
HISTORY_LIMIT = 20
FALLBACK_INTENT_CODE = "general_info"
ESCALATE_INTENT_CODE = "escalate_human"
MAX_BOUNCES = 1
HANDOFF_CAP = 3 # столько hard-handoff'ов разрешено за диалог; четвёртое — авто-перевод
SOFT_INSERTION_CAP = 3 # столько «боковых вопросов» подряд терпим, потом возвращаем к шагу
ROUTING_LOOP_REPLY = (
"Уточню детали с администратором клиники, свяжемся с вами "
"в течение ближайшего часа."
)
SOFT_INSERTION_NUDGE = (
"[ВОЗВРАТ К СЦЕНАРИЮ]\n"
"Пациент уже несколько реплик подряд задаёт боковые вопросы, не двигая сценарий. "
"На этой реплике уверенно верни его к вопросу текущего шага одной короткой фразой; "
"не давай развернутого ответа на стороннюю тему."
)
def _auto_thread_name(first_user_text: str) -> str:
preview = first_user_text.strip().replace("\n", " ")
if len(preview) > 60:
preview = preview[:60].rstrip() + ""
stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
return f"{preview} · {stamp}"
def _retrieved_to_sources(retrieved: list[dict]) -> list[dict]:
sources = []
for item in retrieved:
meta = item.get("metadata", {})
sources.append({
"document_id": meta.get("document_id", ""),
"document_name": meta.get("document_name", ""),
"chunk_text": item["text"][:500],
"section": meta.get("section", ""),
"page": meta.get("page_number", 0),
"relevance_score": round(item.get("relevance_score", 0), 3),
})
return sources
def _format_state_context(
snapshot: dict,
current_step: IntentStep | None,
router_hint: str | None = None,
soft_nudge: bool = False,
escalation_reason: str | None = None,
) -> str:
"""Блок с текущим состоянием треда для дописывания в системный промпт."""
slots = snapshot.get("slots", {}) or {}
slots_json = json.dumps(slots, ensure_ascii=False)
lines = ["", "[ТЕКУЩЕЕ СОСТОЯНИЕ]"]
if current_step is not None:
allowed = intent_step_service.parse_allowed_next(current_step)
lines.append(f"step_code: {current_step.code} ({current_step.name})")
lines.append(f"allowed_next: {json.dumps(allowed, ensure_ascii=False)}")
else:
lines.append("step_code: —")
lines.append(f"slots: {slots_json}")
if escalation_reason:
lines.append(f"escalation_reason: {escalation_reason}")
if router_hint:
lines.append("")
lines.append("[ПОДСКАЗКА РОУТЕРА]")
lines.append(router_hint)
if soft_nudge:
lines.append("")
lines.append(SOFT_INSERTION_NUDGE)
return "\n" + "\n".join(lines)
def _build_operator_summary(
reason: str | None,
history: list[dict],
slots: dict,
suspended_slots: dict,
) -> dict:
"""Саммари для оператора при передаче диалога."""
combined_slots = {**suspended_slots, **slots}
summary = {
"reason": reason or "explicit_request",
"slots": combined_slots,
"history_tail": [
{"role": m["role"], "text": m["content"][:300]}
for m in history[-8:]
],
}
return summary
async def _resolve_intent_with_fallback(
session: AsyncSession, intent_code: str
) -> tuple[str, object, object]:
pair = await config_service.get_active_config_by_intent_code(session, intent_code)
if pair is None:
logger.warning("Intent %r has no active config, falling back to %s", intent_code, FALLBACK_INTENT_CODE)
pair = await config_service.get_active_config_by_intent_code(session, FALLBACK_INTENT_CODE)
if pair is None:
raise RuntimeError(f"No active config for fallback intent {FALLBACK_INTENT_CODE!r}")
intent, cfg = pair
return FALLBACK_INTENT_CODE, intent, cfg
intent, cfg = pair
return intent_code, intent, cfg
async def _resolve_current_step(
session: AsyncSession, intent_id: int, intent_code: str, step_code: str | None,
) -> IntentStep | None:
"""Найти шаг state machine для текущего состояния. Если кода нет — взять первый шаг ветки."""
if not intent_step_service.has_state_machine(intent_code):
return None
if step_code:
step = await intent_step_service.get_step_by_code(session, intent_id, step_code)
if step is not None:
return step
logger.warning("Step %r not found for intent %s, falling back to first step", step_code, intent_code)
return await intent_step_service.get_first_step(session, intent_id)
def _eval_pending_guard(
current_step: "IntentStep | None",
slots: dict,
) -> "dict | None":
"""Возвращает описание активного guard'а если триггер сработал, но слоты ещё не заполнены."""
if current_step is None:
return None
guards = intent_step_service.parse_guards(current_step)
if not guards:
return None
for guard_name, guard_def in guards.items():
if not isinstance(guard_def, dict):
continue
trigger_slot = guard_def.get("trigger_slot")
trigger_value = guard_def.get("trigger_value")
required_slots: list[str] = guard_def.get("required_slots", [])
slot_val = slots.get(trigger_slot) if trigger_slot else None
if isinstance(slot_val, str) and slot_val.lower() in ("true", "false"):
slot_val = slot_val.lower() == "true"
triggered = (trigger_slot is None) or (slot_val == trigger_value)
if not triggered:
continue
missing = [s for s in required_slots if not slots.get(s)]
if missing:
return {
"guard_name": guard_name,
"description": guard_def.get("description", ""),
"missing_slots": missing,
}
return None
async def send_message(
session: AsyncSession,
vectorstore: VectorStoreService,
llm: LLMClient,
router: RouterClient,
text: str,
thread_id: int | None = None,
top_k: int = 5,
temperature: float | None = None,
max_tokens: int | None = None,
) -> dict:
"""Обработать реплику пациента: роутер → state machine → LLM → ответ.
Важно: коммит транзакции делается только в самом конце. Если LLM упадёт —
rollback в роутере откатит thread + user_msg, чтобы «пустые» диалоги без
ответа ассистента не висели в списке.
"""
if thread_id is None:
thread = Thread(name=_auto_thread_name(text))
session.add(thread)
await session.flush()
else:
thread = await session.get(Thread, thread_id)
if thread is None:
raise LookupError(f"Thread {thread_id} not found")
user_msg = Message(thread_id=thread.id, role="user", text=text)
session.add(user_msg)
await session.flush() # только flush, без commit — чтобы откатить при ошибке LLM
stmt = (
select(Message)
.where(Message.thread_id == thread.id, Message.id != user_msg.id)
.order_by(Message.created_at.desc(), Message.id.desc())
.limit(HISTORY_LIMIT)
)
rows = (await session.execute(stmt)).scalars().all()
history = [{"role": m.role, "content": m.text} for m in reversed(rows)]
# 1a. Снимок состояния — нужен роутеру, чтобы предпочитать текущую ветку.
snapshot = await thread_state_service.load_snapshot(session, thread.id)
# 1b. Роутер — куда направляем.
routing = await router.classify(session=session, history=history, text=text, snapshot=snapshot)
router_code = routing["code"]
router_version = routing.get("version")
escalation_reason: str | None = routing.get("escalation_reason")
router_assembled_prompt: str = routing.get("router_assembled_prompt", "")
# 2. Логика выбора effective_code:
# 2.1. Если есть suspended_intent и роутер вернулся в него — RESUME: восстанавливаем
# прерванный сценарий, очищаем suspended_*, handoff_count=0.
# 2.2. Иначе если диалог идёт по sm-ветке и роутер предлагает другую — sticky:
# НЕ сбрасываем state, передаём LLM [ПОДСКАЗКА РОУТЕРА].
# 2.3. Иначе если prev — не-sm и роутер ведёт в другую ветку — hard-handoff.
prev_intent_code = snapshot["current_intent_code"]
handoff_count = snapshot.get("handoff_count", 0)
soft_insertion_count = snapshot.get("soft_insertion_count", 0)
suspended_intent = snapshot.get("suspended_intent")
resumable_step_code = snapshot.get("resumable_step_code")
resumable_slots = snapshot.get("resumable_slots", {}) or {}
router_hint: str | None = None
effective_code = router_code
routing_loop_triggered = False
resumed_from_suspended = False
if suspended_intent and suspended_intent == router_code and prev_intent_code != suspended_intent:
logger.info(
"Resume from suspended in thread %d: %s (step=%s, %d slots)",
thread.id, suspended_intent, resumable_step_code, len(resumable_slots),
)
snapshot = {
"current_intent_code": suspended_intent,
"current_step": 0,
"current_step_code": resumable_step_code,
"slots": dict(resumable_slots),
"handoff_count": 0,
"soft_insertion_count": 0,
"suspended_intent": None,
"resumable_step_code": None,
"resumable_slots": {},
}
prev_intent_code = suspended_intent
handoff_count = 0
soft_insertion_count = 0
suspended_intent = None
resumable_step_code = None
resumable_slots = {}
effective_code = snapshot["current_intent_code"]
resumed_from_suspended = True
elif prev_intent_code and prev_intent_code != router_code:
if intent_step_service.has_state_machine(prev_intent_code):
logger.info(
"Router suggested %s but thread %d is in sm %s — sticky, hint only",
router_code, thread.id, prev_intent_code,
)
router_hint = (
f"Роутер на этой реплике счёл, что тема — `{router_code}`. "
f"Ты сейчас ведёшь сценарий `{prev_intent_code}`. "
f"Если пациент действительно сменил тему (перенос, цены, острое состояние) — "
f"выдай `[INTENT_CHANGE: {router_code}]`. "
f"Если реплика укладывается в сценарий (повод/жалоба/имя) — "
f"зафиксируй её в соответствующий слот и продолжай по сценарию."
)
effective_code = prev_intent_code
else:
# Реальный hard-handoff: prev — не sm-ветка, роутер ведёт.
logger.info(
"Router switched intent for thread %d: %s%s (state reset)",
thread.id, prev_intent_code, router_code,
)
handoff_count += 1
soft_insertion_count = 0
snapshot = {
"current_intent_code": router_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": handoff_count,
"soft_insertion_count": 0,
# suspended_* не трогаем — там может лежать прерванная sm-ветка,
# к которой пациент ещё захочет вернуться.
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
# 2b. Защита от петли (v2 §4.3): если за диалог накопилось много handoff'ов и
# сейчас ещё одно переключение — забираем диалог в escalate_human с заглушкой,
# без вызова LLM. После авто-эскалации сбрасываем handoff_count и suspended_*
# (диалог переходит к оператору, прерванный сценарий не продолжаем).
if handoff_count > HANDOFF_CAP and effective_code != ESCALATE_INTENT_CODE:
logger.warning(
"Routing loop guard tripped for thread %d (handoff_count=%d), forcing %s",
thread.id, handoff_count, ESCALATE_INTENT_CODE,
)
effective_code = ESCALATE_INTENT_CODE
snapshot = {
"current_intent_code": ESCALATE_INTENT_CODE,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": 0,
"soft_insertion_count": 0,
"suspended_intent": None,
"resumable_step_code": None,
"resumable_slots": {},
}
handoff_count = 0
soft_insertion_count = 0
suspended_intent = None
resumable_step_code = None
resumable_slots = {}
routing_loop_triggered = True
escalation_reason = "routing_loop"
# 3. Разрешаем ветку (с fallback) и шаг.
served_code, intent, active_cfg = await _resolve_intent_with_fallback(session, effective_code)
if served_code != effective_code:
snapshot = {
"current_intent_code": served_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": handoff_count,
"soft_insertion_count": 0,
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
soft_insertion_count = 0
router_hint = None
# Финализируем snapshot.current_intent_code на served_code: для не-sm-веток
# (general_info / price_question / ...) state_update от LLM не приходит, и без
# этого snapshot["current_intent_code"] осталось бы None для нового треда —
# тогда на следующей реплике prev_intent_code не определится и handoff_count
# не инкрементится.
snapshot["current_intent_code"] = served_code
# Подписки активной ветки на документы (Спринт 7, часть A — мульти-RAG).
# Дефолт пустой подписки — пустой список, т.е. retrieval вернёт 0 чанков.
# Это сознательный выбор: пустая подписка = ветка не настроена, подмешивать
# случайное хуже, чем не подмешивать ничего. См. SPRINTS.md, Спринт 7.
subscribed_document_ids = await intent_document_service.list_documents_for_intent_code(
session, served_code,
)
retrieved = vectorstore.query(
query_text=text,
top_k=top_k,
document_ids=subscribed_document_ids,
)
sources = _retrieved_to_sources(retrieved)
bounce_log: list[dict] = []
validation_events: list[dict] = [] # illegal transitions для UI-подсветки
last_assembled_prompt = ""
visible_text = ""
parse_error: str | None = None
is_state_machine = False
parsed: dict | None = None # инициализируем заранее: routing_loop guard может пропустить for-цикл
# Если уже сработала защита от петли — не зовём LLM, формируем заглушку.
if routing_loop_triggered:
visible_text = ROUTING_LOOP_REPLY
last_assembled_prompt = (
"[ROUTING LOOP GUARD]\n"
f"handoff_count превысил {HANDOFF_CAP}, диалог автоматически уведён в "
f"{ESCALATE_INTENT_CODE}. LLM не вызывался."
)
else:
for attempt in range(MAX_BOUNCES + 1):
current_step = await _resolve_current_step(
session, intent.id, served_code, snapshot.get("current_step_code"),
)
is_state_machine = current_step is not None
if current_step is not None and snapshot.get("current_step_code") != current_step.code:
snapshot["current_step_code"] = current_step.code
base_prompt = config_service.compose_full_system_prompt(active_cfg)
step_prompt = f"\n\n{current_step.system_prompt}" if current_step else ""
soft_nudge = is_state_machine and soft_insertion_count >= SOFT_INSERTION_CAP
state_context = _format_state_context(
snapshot, current_step, router_hint, soft_nudge,
escalation_reason=escalation_reason if served_code == ESCALATE_INTENT_CODE else None,
)
system_prompt = base_prompt + step_prompt + state_context
llm_result = await llm.chat(
question=text,
sources=retrieved,
history=history,
system_prompt=system_prompt,
temperature=temperature,
max_tokens=max_tokens,
)
last_assembled_prompt = llm_result["assembled_prompt"]
parsed = parse_branch_response(llm_result["text"])
visible_text = parsed["visible_text"] or llm_result["text"]
# STATE_JSON-блок ждём только от state-machine-веток. У остальных
# «no STATE_JSON» — ожидаемое состояние, не ошибка.
parse_error = parsed["parse_error"] if is_state_machine else None
if parsed["intent_change"] and attempt < MAX_BOUNCES:
new_code = parsed["intent_change"]
bounce_log.append({
"from": served_code,
"to": new_code,
"preface": parsed["visible_text"],
})
logger.info(
"Intent bounce in thread %d: %s%s", thread.id, served_code, new_code,
)
# Если уходим из sm-ветки и suspended_* ещё свободно — запоминаем,
# чтобы вернуться к прерванному сценарию, когда роутер увидит,
# что пациент возвращается к теме (см. блок 2.1 в начале send_message).
if (
is_state_machine
and current_step is not None
and not suspended_intent
and new_code != served_code
):
suspended_intent = served_code
resumable_step_code = current_step.code
resumable_slots = dict(snapshot.get("slots", {}))
logger.info(
"Suspending sm scenario for thread %d: %s (step=%s, %d slots)",
thread.id, suspended_intent, resumable_step_code, len(resumable_slots),
)
handoff_count += 1
# Защита от петли работает и здесь — на bouncing'е.
if handoff_count > HANDOFF_CAP and new_code != ESCALATE_INTENT_CODE:
logger.warning(
"Routing loop guard tripped on bounce in thread %d (handoff_count=%d)",
thread.id, handoff_count,
)
served_code, intent, active_cfg = await _resolve_intent_with_fallback(
session, ESCALATE_INTENT_CODE,
)
snapshot = {
"current_intent_code": served_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": 0,
"soft_insertion_count": 0,
"suspended_intent": None,
"resumable_step_code": None,
"resumable_slots": {},
}
handoff_count = 0
soft_insertion_count = 0
suspended_intent = None
resumable_step_code = None
resumable_slots = {}
visible_text = ROUTING_LOOP_REPLY
last_assembled_prompt = (
"[ROUTING LOOP GUARD]\n"
f"handoff_count превысил {HANDOFF_CAP} на bouncing'е, "
f"диалог автоматически уведён в {ESCALATE_INTENT_CODE}."
)
routing_loop_triggered = True
parse_error = None
is_state_machine = False
parsed = {"visible_text": visible_text, "intent_change": None, "state_update": None, "parse_error": None}
break
served_code, intent, active_cfg = await _resolve_intent_with_fallback(session, new_code)
soft_insertion_count = 0 # новая ветка — счётчик с нуля
snapshot = {
"current_intent_code": served_code,
"current_step": 0,
"current_step_code": None,
"slots": {},
"handoff_count": handoff_count,
"soft_insertion_count": 0,
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
router_hint = None # новая ветка — подсказка больше неактуальна
continue
if parsed["state_update"] is not None and current_step is not None:
requested = parsed["state_update"]["state_after"]
soft_insertion_flag = bool(parsed["state_update"].get("soft_insertion", False))
allowed = intent_step_service.parse_allowed_next(current_step)
ok, reason = validate_transition(
current_step=current_step.code,
requested_step=requested,
allowed_next=allowed,
)
slots_updated = parsed["state_update"]["slots_updated"]
merged_slots = {**snapshot.get("slots", {}), **slots_updated}
guard_name: str | None = None
missing_slots: list[str] = []
guard_desc: str | None = None
if ok:
guards = intent_step_service.parse_guards(current_step)
guard_ok, guard_name, missing_slots, guard_desc = check_guards(
current_step_code=current_step.code,
requested_step_code=requested,
slots=merged_slots,
guards=guards,
)
if not guard_ok:
ok = False
reason = (
f"guard {guard_name!r} не пройден: не хватает слотов {missing_slots}"
)
# Решаем, как изменился soft_insertion_count.
stayed_on_step = ok and requested == current_step.code
if soft_insertion_flag and stayed_on_step and not slots_updated:
soft_insertion_count += 1
else:
soft_insertion_count = 0
base_state = {
"current_intent_code": served_code,
"slots": merged_slots,
"handoff_count": handoff_count,
"soft_insertion_count": soft_insertion_count,
"suspended_intent": suspended_intent,
"resumable_step_code": resumable_step_code,
"resumable_slots": resumable_slots,
}
if ok:
snapshot = {
**base_state,
"current_step": snapshot["current_step"] + (1 if requested != current_step.code else 0),
"current_step_code": requested,
}
else:
logger.warning(
"Blocked state_after in thread %d (%s): %s", thread.id, served_code, reason,
)
event: dict = {
"current_step": current_step.code,
"requested_step": requested,
"reason": reason,
}
if guard_name:
event["guard_name"] = guard_name
event["missing_slots"] = missing_slots
event["guard_description"] = guard_desc or ""
validation_events.append(event)
# Слоты всё равно мёржим (информация полезная), шаг не двигаем.
snapshot = {
**base_state,
"current_step": snapshot["current_step"],
"current_step_code": current_step.code,
}
elif parsed["state_update"] is None and current_step is not None and parse_error:
logger.warning(
"State machine branch %s returned no STATE_JSON: %s", served_code, parse_error,
)
break
# 4. Сохраняем: thread_state пишется ПОСЛЕ всей логики, коммит — единой транзакцией.
await thread_state_service.upsert(
session, thread.id,
intent_code=snapshot["current_intent_code"],
step=snapshot["current_step"],
step_code=snapshot.get("current_step_code"),
slots=snapshot["slots"],
handoff_count=snapshot.get("handoff_count", handoff_count),
soft_insertion_count=snapshot.get("soft_insertion_count", soft_insertion_count),
suspended_intent=snapshot.get("suspended_intent"),
resumable_step_code=snapshot.get("resumable_step_code"),
resumable_slots=snapshot.get("resumable_slots"),
)
user_msg.intent_id = intent.id
if thread.agent_config_id is None:
thread.agent_config_id = active_cfg.id
# Собираем мета-снимок реплики: что увидит UI рядом с бейджем ветки.
events: list[str] = []
if routing_loop_triggered:
events.append("routing_loop")
if resumed_from_suspended:
events.append("resumed")
if bounce_log:
events.append("hard_handoff")
if router_hint and not routing_loop_triggered and not bounce_log:
events.append("sticky")
if validation_events:
events.append("validation_blocked")
# soft_insertion: ветка явно пометила ответ боковым (см. парсер state_update).
last_state_update = parsed.get("state_update") if isinstance(parsed, dict) else None
if last_state_update and last_state_update.get("soft_insertion"):
events.append("soft_insertion")
meta = {
"router_intent_code": router_code,
"served_intent_code": served_code,
"step_code": snapshot.get("current_step_code"),
"step_name": current_step.name if current_step else None,
"is_state_machine": is_state_machine,
"events": events,
}
# Саммари для оператора — формируется при передаче в escalate_human.
operator_summary: dict | None = None
if served_code == ESCALATE_INTENT_CODE:
operator_summary = _build_operator_summary(
reason=escalation_reason,
history=history,
slots=snapshot.get("slots", {}),
suspended_slots=snapshot.get("resumable_slots", {}),
)
logger.info(
"Operator summary for thread %d: %s",
thread.id, json.dumps(operator_summary, ensure_ascii=False),
)
assistant_msg = Message(
thread_id=thread.id,
role="assistant",
text=visible_text,
sources_json=json.dumps(sources, ensure_ascii=False),
assembled_prompt=last_assembled_prompt,
intent_id=intent.id,
meta_json=json.dumps(meta, ensure_ascii=False),
escalation_reason=escalation_reason if served_code == ESCALATE_INTENT_CODE else None,
)
session.add(assistant_msg)
thread.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(assistant_msg)
await session.refresh(thread)
logger.info(
"Chat: thread=%d, router=%s, served=%s (v%d), step=%s, slots=%d keys, "
"bounces=%d, validation=%d, handoff=%d, routing_loop=%s",
thread.id, router_code, served_code, active_cfg.version,
snapshot.get("current_step_code") or "-",
len(snapshot["slots"]),
len(bounce_log),
len(validation_events),
snapshot.get("handoff_count", 0),
routing_loop_triggered,
)
return {
"thread_id": thread.id,
"thread_name": thread.name,
"message_id": assistant_msg.id,
"intent_code": intent.code,
"intent_name": intent.name,
"router_intent_code": router_code,
"config_version": active_cfg.version,
"router_version": router_version,
"answer": visible_text,
"sources": sources,
"model_used": llm.model,
"assembled_prompt": last_assembled_prompt,
"thread_state": {
"current_intent_code": snapshot["current_intent_code"],
"current_step": snapshot["current_step"],
"current_step_code": snapshot.get("current_step_code"),
"slots": snapshot["slots"],
"handoff_count": snapshot.get("handoff_count", handoff_count),
"soft_insertion_count": snapshot.get("soft_insertion_count", soft_insertion_count),
"suspended_intent": snapshot.get("suspended_intent"),
"resumable_step_code": snapshot.get("resumable_step_code"),
"resumable_slots": snapshot.get("resumable_slots", {}),
"pending_guard": _eval_pending_guard(current_step, snapshot["slots"]),
},
"bounces": bounce_log,
"validation_events": validation_events,
"parse_error": parse_error,
"routing_loop_triggered": routing_loop_triggered,
"resumed_from_suspended": resumed_from_suspended,
"message_meta": meta,
"escalation_reason": escalation_reason if served_code == ESCALATE_INTENT_CODE else None,
"operator_summary": operator_summary,
"router_assembled_prompt": router_assembled_prompt,
"rag_subscription": {
"subscribed_count": len(subscribed_document_ids),
"found_count": len(retrieved),
},
}
async def list_threads(session: AsyncSession) -> list[dict]:
count_subq = (
select(Message.thread_id, func.count(Message.id).label("cnt"))
.group_by(Message.thread_id)
.subquery()
)
first_msg_subq = (
select(Message.thread_id, func.min(Message.id).label("first_id"))
.where(Message.role == "user")
.group_by(Message.thread_id)
.subquery()
)
stmt = (
select(
Thread,
func.coalesce(count_subq.c.cnt, 0).label("messages_count"),
Message.text.label("first_text"),
)
.outerjoin(count_subq, count_subq.c.thread_id == Thread.id)
.outerjoin(first_msg_subq, first_msg_subq.c.thread_id == Thread.id)
.outerjoin(Message, Message.id == first_msg_subq.c.first_id)
.order_by(Thread.updated_at.desc())
)
rows = (await session.execute(stmt)).all()
result = []
for thread, messages_count, first_text in rows:
preview = (first_text or "").strip().replace("\n", " ")
if len(preview) > 120:
preview = preview[:120].rstrip() + ""
result.append({
"id": thread.id,
"name": thread.name,
"created_at": thread.created_at.isoformat(),
"updated_at": thread.updated_at.isoformat(),
"messages_count": messages_count,
"first_message_preview": preview,
})
return result
async def get_thread_detail(session: AsyncSession, thread_id: int) -> dict | None:
from db.models import Intent
thread = await session.get(Thread, thread_id)
if thread is None:
return None
stmt = (
select(Message, Intent.code, Intent.name)
.outerjoin(Intent, Intent.id == Message.intent_id)
.where(Message.thread_id == thread_id)
.order_by(Message.created_at)
)
rows = (await session.execute(stmt)).all()
# Lookup для обогащения старых meta: (intent_id, step_code) -> step_name
step_rows = (await session.execute(select(IntentStep.intent_id, IntentStep.code, IntentStep.name))).all()
step_name_lookup: dict[tuple, str] = {(iid, sc): sn for iid, sc, sn in step_rows}
messages = []
for m, intent_code, intent_name in rows:
sources = []
if m.sources_json:
try:
sources = json.loads(m.sources_json)
except json.JSONDecodeError:
logger.warning("Bad sources_json for message %d", m.id)
meta = None
if m.meta_json:
try:
meta = json.loads(m.meta_json)
except json.JSONDecodeError:
logger.warning("Bad meta_json for message %d", m.id)
# Обогащаем meta полями, которых не было в старых сообщениях
if meta and meta.get("step_code"):
if "step_name" not in meta and m.intent_id:
meta["step_name"] = step_name_lookup.get((m.intent_id, meta["step_code"]))
if "is_state_machine" not in meta:
meta["is_state_machine"] = True
messages.append({
"id": m.id,
"role": m.role,
"text": m.text,
"created_at": m.created_at.isoformat(),
"sources": sources,
"assembled_prompt": m.assembled_prompt or "",
"intent_code": intent_code or "",
"intent_name": intent_name or "",
"meta": meta,
"escalation_reason": m.escalation_reason,
})
state = await thread_state_service.load_snapshot(session, thread_id)
# Вычисляем pending_guard для текущего состояния треда
pending_guard = None
if state.get("current_step_code") and state.get("current_intent_code"):
from db.models import Intent as _Intent
intent_obj = (await session.execute(
select(_Intent).where(_Intent.code == state["current_intent_code"])
)).scalar_one_or_none()
if intent_obj:
cur_step = await intent_step_service.get_step_by_code(
session, intent_obj.id, state["current_step_code"]
)
pending_guard = _eval_pending_guard(cur_step, state.get("slots", {}))
state["pending_guard"] = pending_guard
return {
"id": thread.id,
"name": thread.name,
"created_at": thread.created_at.isoformat(),
"updated_at": thread.updated_at.isoformat(),
"messages": messages,
"thread_state": state,
}
async def rename_thread(session: AsyncSession, thread_id: int, name: str) -> dict | None:
thread = await session.get(Thread, thread_id)
if thread is None:
return None
thread.name = name
thread.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(thread)
return {
"id": thread.id,
"name": thread.name,
"created_at": thread.created_at.isoformat(),
"updated_at": thread.updated_at.isoformat(),
"messages_count": 0,
"first_message_preview": "",
}
async def delete_thread(session: AsyncSession, thread_id: int) -> int | None:
thread = await session.get(Thread, thread_id)
if thread is None:
return None
count_stmt = select(func.count(Message.id)).where(Message.thread_id == thread_id)
messages_count = (await session.execute(count_stmt)).scalar_one() or 0
await session.execute(delete(Message).where(Message.thread_id == thread_id))
await session.delete(thread)
await session.commit()
return int(messages_count)