import json import logging from datetime import datetime, timezone from sqlalchemy import delete, func, select from sqlalchemy.ext.asyncio import AsyncSession from db.models import IntentStep, Message, Thread from services import config_service, intent_document_service, intent_step_service, thread_state_service from services.llm_client import LLMClient, LLMUnavailableError from services.router_client import RouterClient from services.state_machine import check_guards, parse_branch_response, validate_transition from services.vectorstore import VectorStoreService logger = logging.getLogger(__name__) HISTORY_LIMIT = 20 FALLBACK_INTENT_CODE = "general_info" ESCALATE_INTENT_CODE = "escalate_human" MAX_BOUNCES = 1 HANDOFF_CAP = 3 # столько hard-handoff'ов разрешено за диалог; четвёртое — авто-перевод SOFT_INSERTION_CAP = 3 # столько «боковых вопросов» подряд терпим, потом возвращаем к шагу ROUTING_LOOP_REPLY = ( "Уточню детали с администратором клиники, свяжемся с вами " "в течение ближайшего часа." ) SOFT_INSERTION_NUDGE = ( "[ВОЗВРАТ К СЦЕНАРИЮ]\n" "Пациент уже несколько реплик подряд задаёт боковые вопросы, не двигая сценарий. " "На этой реплике уверенно верни его к вопросу текущего шага одной короткой фразой; " "не давай развернутого ответа на стороннюю тему." ) def _auto_thread_name(first_user_text: str) -> str: preview = first_user_text.strip().replace("\n", " ") if len(preview) > 60: preview = preview[:60].rstrip() + "…" stamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M") return f"{preview} · {stamp}" def _retrieved_to_sources(retrieved: list[dict]) -> list[dict]: sources = [] for item in retrieved: meta = item.get("metadata", {}) sources.append({ "document_id": meta.get("document_id", ""), "document_name": meta.get("document_name", ""), "chunk_text": item["text"][:500], "section": meta.get("section", ""), "page": meta.get("page_number", 0), "relevance_score": round(item.get("relevance_score", 0), 3), }) return sources def _format_state_context( snapshot: dict, current_step: IntentStep | None, router_hint: str | None = None, soft_nudge: bool = False, escalation_reason: str | None = None, ) -> str: """Блок с текущим состоянием треда для дописывания в системный промпт.""" slots = snapshot.get("slots", {}) or {} slots_json = json.dumps(slots, ensure_ascii=False) lines = ["", "[ТЕКУЩЕЕ СОСТОЯНИЕ]"] if current_step is not None: allowed = intent_step_service.parse_allowed_next(current_step) lines.append(f"step_code: {current_step.code} ({current_step.name})") lines.append(f"allowed_next: {json.dumps(allowed, ensure_ascii=False)}") else: lines.append("step_code: —") lines.append(f"slots: {slots_json}") if escalation_reason: lines.append(f"escalation_reason: {escalation_reason}") if router_hint: lines.append("") lines.append("[ПОДСКАЗКА РОУТЕРА]") lines.append(router_hint) if soft_nudge: lines.append("") lines.append(SOFT_INSERTION_NUDGE) return "\n" + "\n".join(lines) def _build_operator_summary( reason: str | None, history: list[dict], slots: dict, suspended_slots: dict, ) -> dict: """Саммари для оператора при передаче диалога.""" combined_slots = {**suspended_slots, **slots} summary = { "reason": reason or "explicit_request", "slots": combined_slots, "history_tail": [ {"role": m["role"], "text": m["content"][:300]} for m in history[-8:] ], } return summary async def _resolve_intent_with_fallback( session: AsyncSession, intent_code: str ) -> tuple[str, object, object]: pair = await config_service.get_active_config_by_intent_code(session, intent_code) if pair is None: logger.warning("Intent %r has no active config, falling back to %s", intent_code, FALLBACK_INTENT_CODE) pair = await config_service.get_active_config_by_intent_code(session, FALLBACK_INTENT_CODE) if pair is None: raise RuntimeError(f"No active config for fallback intent {FALLBACK_INTENT_CODE!r}") intent, cfg = pair return FALLBACK_INTENT_CODE, intent, cfg intent, cfg = pair return intent_code, intent, cfg async def _resolve_current_step( session: AsyncSession, intent_id: int, intent_code: str, step_code: str | None, ) -> IntentStep | None: """Найти шаг state machine для текущего состояния. Если кода нет — взять первый шаг ветки.""" if not intent_step_service.has_state_machine(intent_code): return None if step_code: step = await intent_step_service.get_step_by_code(session, intent_id, step_code) if step is not None: return step logger.warning("Step %r not found for intent %s, falling back to first step", step_code, intent_code) return await intent_step_service.get_first_step(session, intent_id) def _eval_pending_guard( current_step: "IntentStep | None", slots: dict, ) -> "dict | None": """Возвращает описание активного guard'а если триггер сработал, но слоты ещё не заполнены.""" if current_step is None: return None guards = intent_step_service.parse_guards(current_step) if not guards: return None for guard_name, guard_def in guards.items(): if not isinstance(guard_def, dict): continue trigger_slot = guard_def.get("trigger_slot") trigger_value = guard_def.get("trigger_value") required_slots: list[str] = guard_def.get("required_slots", []) slot_val = slots.get(trigger_slot) if trigger_slot else None if isinstance(slot_val, str) and slot_val.lower() in ("true", "false"): slot_val = slot_val.lower() == "true" triggered = (trigger_slot is None) or (slot_val == trigger_value) if not triggered: continue missing = [s for s in required_slots if not slots.get(s)] if missing: return { "guard_name": guard_name, "description": guard_def.get("description", ""), "missing_slots": missing, } return None async def send_message( session: AsyncSession, vectorstore: VectorStoreService, llm: LLMClient, router: RouterClient, text: str, thread_id: int | None = None, top_k: int = 5, temperature: float | None = None, max_tokens: int | None = None, ) -> dict: """Обработать реплику пациента: роутер → state machine → LLM → ответ. Важно: коммит транзакции делается только в самом конце. Если LLM упадёт — rollback в роутере откатит thread + user_msg, чтобы «пустые» диалоги без ответа ассистента не висели в списке. """ if thread_id is None: thread = Thread(name=_auto_thread_name(text)) session.add(thread) await session.flush() else: thread = await session.get(Thread, thread_id) if thread is None: raise LookupError(f"Thread {thread_id} not found") user_msg = Message(thread_id=thread.id, role="user", text=text) session.add(user_msg) await session.flush() # только flush, без commit — чтобы откатить при ошибке LLM stmt = ( select(Message) .where(Message.thread_id == thread.id, Message.id != user_msg.id) .order_by(Message.created_at.desc(), Message.id.desc()) .limit(HISTORY_LIMIT) ) rows = (await session.execute(stmt)).scalars().all() history = [{"role": m.role, "content": m.text} for m in reversed(rows)] # 1a. Снимок состояния — нужен роутеру, чтобы предпочитать текущую ветку. snapshot = await thread_state_service.load_snapshot(session, thread.id) # 1b. Роутер — куда направляем. routing = await router.classify(session=session, history=history, text=text, snapshot=snapshot) router_code = routing["code"] router_version = routing.get("version") escalation_reason: str | None = routing.get("escalation_reason") router_assembled_prompt: str = routing.get("router_assembled_prompt", "") # 2. Логика выбора effective_code: # 2.1. Если есть suspended_intent и роутер вернулся в него — RESUME: восстанавливаем # прерванный сценарий, очищаем suspended_*, handoff_count=0. # 2.2. Иначе если диалог идёт по sm-ветке и роутер предлагает другую — sticky: # НЕ сбрасываем state, передаём LLM [ПОДСКАЗКА РОУТЕРА]. # 2.3. Иначе если prev — не-sm и роутер ведёт в другую ветку — hard-handoff. prev_intent_code = snapshot["current_intent_code"] handoff_count = snapshot.get("handoff_count", 0) soft_insertion_count = snapshot.get("soft_insertion_count", 0) suspended_intent = snapshot.get("suspended_intent") resumable_step_code = snapshot.get("resumable_step_code") resumable_slots = snapshot.get("resumable_slots", {}) or {} router_hint: str | None = None effective_code = router_code routing_loop_triggered = False resumed_from_suspended = False if suspended_intent and suspended_intent == router_code and prev_intent_code != suspended_intent: logger.info( "Resume from suspended in thread %d: %s (step=%s, %d slots)", thread.id, suspended_intent, resumable_step_code, len(resumable_slots), ) snapshot = { "current_intent_code": suspended_intent, "current_step": 0, "current_step_code": resumable_step_code, "slots": dict(resumable_slots), "handoff_count": 0, "soft_insertion_count": 0, "suspended_intent": None, "resumable_step_code": None, "resumable_slots": {}, } prev_intent_code = suspended_intent handoff_count = 0 soft_insertion_count = 0 suspended_intent = None resumable_step_code = None resumable_slots = {} effective_code = snapshot["current_intent_code"] resumed_from_suspended = True elif prev_intent_code and prev_intent_code != router_code: if intent_step_service.has_state_machine(prev_intent_code): logger.info( "Router suggested %s but thread %d is in sm %s — sticky, hint only", router_code, thread.id, prev_intent_code, ) router_hint = ( f"Роутер на этой реплике счёл, что тема — `{router_code}`. " f"Ты сейчас ведёшь сценарий `{prev_intent_code}`. " f"Если пациент действительно сменил тему (перенос, цены, острое состояние) — " f"выдай `[INTENT_CHANGE: {router_code}]`. " f"Если реплика укладывается в сценарий (повод/жалоба/имя) — " f"зафиксируй её в соответствующий слот и продолжай по сценарию." ) effective_code = prev_intent_code else: # Реальный hard-handoff: prev — не sm-ветка, роутер ведёт. logger.info( "Router switched intent for thread %d: %s → %s (state reset)", thread.id, prev_intent_code, router_code, ) handoff_count += 1 soft_insertion_count = 0 snapshot = { "current_intent_code": router_code, "current_step": 0, "current_step_code": None, "slots": {}, "handoff_count": handoff_count, "soft_insertion_count": 0, # suspended_* не трогаем — там может лежать прерванная sm-ветка, # к которой пациент ещё захочет вернуться. "suspended_intent": suspended_intent, "resumable_step_code": resumable_step_code, "resumable_slots": resumable_slots, } # 2b. Защита от петли (v2 §4.3): если за диалог накопилось много handoff'ов и # сейчас ещё одно переключение — забираем диалог в escalate_human с заглушкой, # без вызова LLM. После авто-эскалации сбрасываем handoff_count и suspended_* # (диалог переходит к оператору, прерванный сценарий не продолжаем). if handoff_count > HANDOFF_CAP and effective_code != ESCALATE_INTENT_CODE: logger.warning( "Routing loop guard tripped for thread %d (handoff_count=%d), forcing %s", thread.id, handoff_count, ESCALATE_INTENT_CODE, ) effective_code = ESCALATE_INTENT_CODE snapshot = { "current_intent_code": ESCALATE_INTENT_CODE, "current_step": 0, "current_step_code": None, "slots": {}, "handoff_count": 0, "soft_insertion_count": 0, "suspended_intent": None, "resumable_step_code": None, "resumable_slots": {}, } handoff_count = 0 soft_insertion_count = 0 suspended_intent = None resumable_step_code = None resumable_slots = {} routing_loop_triggered = True escalation_reason = "routing_loop" # 3. Разрешаем ветку (с fallback) и шаг. served_code, intent, active_cfg = await _resolve_intent_with_fallback(session, effective_code) if served_code != effective_code: snapshot = { "current_intent_code": served_code, "current_step": 0, "current_step_code": None, "slots": {}, "handoff_count": handoff_count, "soft_insertion_count": 0, "suspended_intent": suspended_intent, "resumable_step_code": resumable_step_code, "resumable_slots": resumable_slots, } soft_insertion_count = 0 router_hint = None # Финализируем snapshot.current_intent_code на served_code: для не-sm-веток # (general_info / price_question / ...) state_update от LLM не приходит, и без # этого snapshot["current_intent_code"] осталось бы None для нового треда — # тогда на следующей реплике prev_intent_code не определится и handoff_count # не инкрементится. snapshot["current_intent_code"] = served_code # Подписки активной ветки на документы (Спринт 7, часть A — мульти-RAG). # Дефолт пустой подписки — пустой список, т.е. retrieval вернёт 0 чанков. # Это сознательный выбор: пустая подписка = ветка не настроена, подмешивать # случайное хуже, чем не подмешивать ничего. См. SPRINTS.md, Спринт 7. subscribed_document_ids = await intent_document_service.list_documents_for_intent_code( session, served_code, ) retrieved = vectorstore.query( query_text=text, top_k=top_k, document_ids=subscribed_document_ids, ) sources = _retrieved_to_sources(retrieved) bounce_log: list[dict] = [] validation_events: list[dict] = [] # illegal transitions для UI-подсветки last_assembled_prompt = "" visible_text = "" parse_error: str | None = None is_state_machine = False parsed: dict | None = None # инициализируем заранее: routing_loop guard может пропустить for-цикл # Если уже сработала защита от петли — не зовём LLM, формируем заглушку. if routing_loop_triggered: visible_text = ROUTING_LOOP_REPLY last_assembled_prompt = ( "[ROUTING LOOP GUARD]\n" f"handoff_count превысил {HANDOFF_CAP}, диалог автоматически уведён в " f"{ESCALATE_INTENT_CODE}. LLM не вызывался." ) else: for attempt in range(MAX_BOUNCES + 1): current_step = await _resolve_current_step( session, intent.id, served_code, snapshot.get("current_step_code"), ) is_state_machine = current_step is not None if current_step is not None and snapshot.get("current_step_code") != current_step.code: snapshot["current_step_code"] = current_step.code base_prompt = config_service.compose_full_system_prompt(active_cfg) step_prompt = f"\n\n{current_step.system_prompt}" if current_step else "" soft_nudge = is_state_machine and soft_insertion_count >= SOFT_INSERTION_CAP state_context = _format_state_context( snapshot, current_step, router_hint, soft_nudge, escalation_reason=escalation_reason if served_code == ESCALATE_INTENT_CODE else None, ) system_prompt = base_prompt + step_prompt + state_context llm_result = await llm.chat( question=text, sources=retrieved, history=history, system_prompt=system_prompt, temperature=temperature, max_tokens=max_tokens, ) last_assembled_prompt = llm_result["assembled_prompt"] parsed = parse_branch_response(llm_result["text"]) visible_text = parsed["visible_text"] or llm_result["text"] # STATE_JSON-блок ждём только от state-machine-веток. У остальных # «no STATE_JSON» — ожидаемое состояние, не ошибка. parse_error = parsed["parse_error"] if is_state_machine else None if parsed["intent_change"] and attempt < MAX_BOUNCES: new_code = parsed["intent_change"] bounce_log.append({ "from": served_code, "to": new_code, "preface": parsed["visible_text"], }) logger.info( "Intent bounce in thread %d: %s → %s", thread.id, served_code, new_code, ) # Если уходим из sm-ветки и suspended_* ещё свободно — запоминаем, # чтобы вернуться к прерванному сценарию, когда роутер увидит, # что пациент возвращается к теме (см. блок 2.1 в начале send_message). if ( is_state_machine and current_step is not None and not suspended_intent and new_code != served_code ): suspended_intent = served_code resumable_step_code = current_step.code resumable_slots = dict(snapshot.get("slots", {})) logger.info( "Suspending sm scenario for thread %d: %s (step=%s, %d slots)", thread.id, suspended_intent, resumable_step_code, len(resumable_slots), ) handoff_count += 1 # Защита от петли работает и здесь — на bouncing'е. if handoff_count > HANDOFF_CAP and new_code != ESCALATE_INTENT_CODE: logger.warning( "Routing loop guard tripped on bounce in thread %d (handoff_count=%d)", thread.id, handoff_count, ) served_code, intent, active_cfg = await _resolve_intent_with_fallback( session, ESCALATE_INTENT_CODE, ) snapshot = { "current_intent_code": served_code, "current_step": 0, "current_step_code": None, "slots": {}, "handoff_count": 0, "soft_insertion_count": 0, "suspended_intent": None, "resumable_step_code": None, "resumable_slots": {}, } handoff_count = 0 soft_insertion_count = 0 suspended_intent = None resumable_step_code = None resumable_slots = {} visible_text = ROUTING_LOOP_REPLY last_assembled_prompt = ( "[ROUTING LOOP GUARD]\n" f"handoff_count превысил {HANDOFF_CAP} на bouncing'е, " f"диалог автоматически уведён в {ESCALATE_INTENT_CODE}." ) routing_loop_triggered = True parse_error = None is_state_machine = False parsed = {"visible_text": visible_text, "intent_change": None, "state_update": None, "parse_error": None} break served_code, intent, active_cfg = await _resolve_intent_with_fallback(session, new_code) soft_insertion_count = 0 # новая ветка — счётчик с нуля snapshot = { "current_intent_code": served_code, "current_step": 0, "current_step_code": None, "slots": {}, "handoff_count": handoff_count, "soft_insertion_count": 0, "suspended_intent": suspended_intent, "resumable_step_code": resumable_step_code, "resumable_slots": resumable_slots, } router_hint = None # новая ветка — подсказка больше неактуальна continue if parsed["state_update"] is not None and current_step is not None: requested = parsed["state_update"]["state_after"] soft_insertion_flag = bool(parsed["state_update"].get("soft_insertion", False)) allowed = intent_step_service.parse_allowed_next(current_step) ok, reason = validate_transition( current_step=current_step.code, requested_step=requested, allowed_next=allowed, ) slots_updated = parsed["state_update"]["slots_updated"] merged_slots = {**snapshot.get("slots", {}), **slots_updated} guard_name: str | None = None missing_slots: list[str] = [] guard_desc: str | None = None if ok: guards = intent_step_service.parse_guards(current_step) guard_ok, guard_name, missing_slots, guard_desc = check_guards( current_step_code=current_step.code, requested_step_code=requested, slots=merged_slots, guards=guards, ) if not guard_ok: ok = False reason = ( f"guard {guard_name!r} не пройден: не хватает слотов {missing_slots}" ) # Решаем, как изменился soft_insertion_count. stayed_on_step = ok and requested == current_step.code if soft_insertion_flag and stayed_on_step and not slots_updated: soft_insertion_count += 1 else: soft_insertion_count = 0 base_state = { "current_intent_code": served_code, "slots": merged_slots, "handoff_count": handoff_count, "soft_insertion_count": soft_insertion_count, "suspended_intent": suspended_intent, "resumable_step_code": resumable_step_code, "resumable_slots": resumable_slots, } if ok: snapshot = { **base_state, "current_step": snapshot["current_step"] + (1 if requested != current_step.code else 0), "current_step_code": requested, } else: logger.warning( "Blocked state_after in thread %d (%s): %s", thread.id, served_code, reason, ) event: dict = { "current_step": current_step.code, "requested_step": requested, "reason": reason, } if guard_name: event["guard_name"] = guard_name event["missing_slots"] = missing_slots event["guard_description"] = guard_desc or "" validation_events.append(event) # Слоты всё равно мёржим (информация полезная), шаг не двигаем. snapshot = { **base_state, "current_step": snapshot["current_step"], "current_step_code": current_step.code, } elif parsed["state_update"] is None and current_step is not None and parse_error: logger.warning( "State machine branch %s returned no STATE_JSON: %s", served_code, parse_error, ) break # 4. Сохраняем: thread_state пишется ПОСЛЕ всей логики, коммит — единой транзакцией. await thread_state_service.upsert( session, thread.id, intent_code=snapshot["current_intent_code"], step=snapshot["current_step"], step_code=snapshot.get("current_step_code"), slots=snapshot["slots"], handoff_count=snapshot.get("handoff_count", handoff_count), soft_insertion_count=snapshot.get("soft_insertion_count", soft_insertion_count), suspended_intent=snapshot.get("suspended_intent"), resumable_step_code=snapshot.get("resumable_step_code"), resumable_slots=snapshot.get("resumable_slots"), ) user_msg.intent_id = intent.id if thread.agent_config_id is None: thread.agent_config_id = active_cfg.id # Собираем мета-снимок реплики: что увидит UI рядом с бейджем ветки. events: list[str] = [] if routing_loop_triggered: events.append("routing_loop") if resumed_from_suspended: events.append("resumed") if bounce_log: events.append("hard_handoff") if router_hint and not routing_loop_triggered and not bounce_log: events.append("sticky") if validation_events: events.append("validation_blocked") # soft_insertion: ветка явно пометила ответ боковым (см. парсер state_update). last_state_update = parsed.get("state_update") if isinstance(parsed, dict) else None if last_state_update and last_state_update.get("soft_insertion"): events.append("soft_insertion") meta = { "router_intent_code": router_code, "served_intent_code": served_code, "step_code": snapshot.get("current_step_code"), "step_name": current_step.name if current_step else None, "is_state_machine": is_state_machine, "events": events, } # Саммари для оператора — формируется при передаче в escalate_human. operator_summary: dict | None = None if served_code == ESCALATE_INTENT_CODE: operator_summary = _build_operator_summary( reason=escalation_reason, history=history, slots=snapshot.get("slots", {}), suspended_slots=snapshot.get("resumable_slots", {}), ) logger.info( "Operator summary for thread %d: %s", thread.id, json.dumps(operator_summary, ensure_ascii=False), ) assistant_msg = Message( thread_id=thread.id, role="assistant", text=visible_text, sources_json=json.dumps(sources, ensure_ascii=False), assembled_prompt=last_assembled_prompt, intent_id=intent.id, meta_json=json.dumps(meta, ensure_ascii=False), escalation_reason=escalation_reason if served_code == ESCALATE_INTENT_CODE else None, ) session.add(assistant_msg) thread.updated_at = datetime.now(timezone.utc) await session.commit() await session.refresh(assistant_msg) await session.refresh(thread) logger.info( "Chat: thread=%d, router=%s, served=%s (v%d), step=%s, slots=%d keys, " "bounces=%d, validation=%d, handoff=%d, routing_loop=%s", thread.id, router_code, served_code, active_cfg.version, snapshot.get("current_step_code") or "-", len(snapshot["slots"]), len(bounce_log), len(validation_events), snapshot.get("handoff_count", 0), routing_loop_triggered, ) return { "thread_id": thread.id, "thread_name": thread.name, "message_id": assistant_msg.id, "intent_code": intent.code, "intent_name": intent.name, "router_intent_code": router_code, "config_version": active_cfg.version, "router_version": router_version, "answer": visible_text, "sources": sources, "model_used": llm.model, "assembled_prompt": last_assembled_prompt, "thread_state": { "current_intent_code": snapshot["current_intent_code"], "current_step": snapshot["current_step"], "current_step_code": snapshot.get("current_step_code"), "slots": snapshot["slots"], "handoff_count": snapshot.get("handoff_count", handoff_count), "soft_insertion_count": snapshot.get("soft_insertion_count", soft_insertion_count), "suspended_intent": snapshot.get("suspended_intent"), "resumable_step_code": snapshot.get("resumable_step_code"), "resumable_slots": snapshot.get("resumable_slots", {}), "pending_guard": _eval_pending_guard(current_step, snapshot["slots"]), }, "bounces": bounce_log, "validation_events": validation_events, "parse_error": parse_error, "routing_loop_triggered": routing_loop_triggered, "resumed_from_suspended": resumed_from_suspended, "message_meta": meta, "escalation_reason": escalation_reason if served_code == ESCALATE_INTENT_CODE else None, "operator_summary": operator_summary, "router_assembled_prompt": router_assembled_prompt, "rag_subscription": { "subscribed_count": len(subscribed_document_ids), "found_count": len(retrieved), }, } async def list_threads(session: AsyncSession) -> list[dict]: count_subq = ( select(Message.thread_id, func.count(Message.id).label("cnt")) .group_by(Message.thread_id) .subquery() ) first_msg_subq = ( select(Message.thread_id, func.min(Message.id).label("first_id")) .where(Message.role == "user") .group_by(Message.thread_id) .subquery() ) stmt = ( select( Thread, func.coalesce(count_subq.c.cnt, 0).label("messages_count"), Message.text.label("first_text"), ) .outerjoin(count_subq, count_subq.c.thread_id == Thread.id) .outerjoin(first_msg_subq, first_msg_subq.c.thread_id == Thread.id) .outerjoin(Message, Message.id == first_msg_subq.c.first_id) .order_by(Thread.updated_at.desc()) ) rows = (await session.execute(stmt)).all() result = [] for thread, messages_count, first_text in rows: preview = (first_text or "").strip().replace("\n", " ") if len(preview) > 120: preview = preview[:120].rstrip() + "…" result.append({ "id": thread.id, "name": thread.name, "created_at": thread.created_at.isoformat(), "updated_at": thread.updated_at.isoformat(), "messages_count": messages_count, "first_message_preview": preview, }) return result async def get_thread_detail(session: AsyncSession, thread_id: int) -> dict | None: from db.models import Intent thread = await session.get(Thread, thread_id) if thread is None: return None stmt = ( select(Message, Intent.code, Intent.name) .outerjoin(Intent, Intent.id == Message.intent_id) .where(Message.thread_id == thread_id) .order_by(Message.created_at) ) rows = (await session.execute(stmt)).all() # Lookup для обогащения старых meta: (intent_id, step_code) -> step_name # Имена шагов берём только из активного графа: на исторических сообщениях # отображается текущая версия имени, архивные графы (Спринт 7.7) не считаем. from db.models import IntentStepGraph step_rows = (await session.execute( select(IntentStep.intent_id, IntentStep.code, IntentStep.name) .join(IntentStepGraph, IntentStepGraph.id == IntentStep.graph_id) .where(IntentStepGraph.is_active.is_(True)) )).all() step_name_lookup: dict[tuple, str] = {(iid, sc): sn for iid, sc, sn in step_rows} messages = [] for m, intent_code, intent_name in rows: sources = [] if m.sources_json: try: sources = json.loads(m.sources_json) except json.JSONDecodeError: logger.warning("Bad sources_json for message %d", m.id) meta = None if m.meta_json: try: meta = json.loads(m.meta_json) except json.JSONDecodeError: logger.warning("Bad meta_json for message %d", m.id) # Обогащаем meta полями, которых не было в старых сообщениях if meta and meta.get("step_code"): if "step_name" not in meta and m.intent_id: meta["step_name"] = step_name_lookup.get((m.intent_id, meta["step_code"])) if "is_state_machine" not in meta: meta["is_state_machine"] = True messages.append({ "id": m.id, "role": m.role, "text": m.text, "created_at": m.created_at.isoformat(), "sources": sources, "assembled_prompt": m.assembled_prompt or "", "intent_code": intent_code or "", "intent_name": intent_name or "", "meta": meta, "escalation_reason": m.escalation_reason, }) state = await thread_state_service.load_snapshot(session, thread_id) # Вычисляем pending_guard для текущего состояния треда pending_guard = None if state.get("current_step_code") and state.get("current_intent_code"): from db.models import Intent as _Intent intent_obj = (await session.execute( select(_Intent).where(_Intent.code == state["current_intent_code"]) )).scalar_one_or_none() if intent_obj: cur_step = await intent_step_service.get_step_by_code( session, intent_obj.id, state["current_step_code"] ) pending_guard = _eval_pending_guard(cur_step, state.get("slots", {})) state["pending_guard"] = pending_guard return { "id": thread.id, "name": thread.name, "created_at": thread.created_at.isoformat(), "updated_at": thread.updated_at.isoformat(), "messages": messages, "thread_state": state, } async def rename_thread(session: AsyncSession, thread_id: int, name: str) -> dict | None: thread = await session.get(Thread, thread_id) if thread is None: return None thread.name = name thread.updated_at = datetime.now(timezone.utc) await session.commit() await session.refresh(thread) return { "id": thread.id, "name": thread.name, "created_at": thread.created_at.isoformat(), "updated_at": thread.updated_at.isoformat(), "messages_count": 0, "first_message_preview": "", } async def delete_thread(session: AsyncSession, thread_id: int) -> int | None: thread = await session.get(Thread, thread_id) if thread is None: return None count_stmt = select(func.count(Message.id)).where(Message.thread_id == thread_id) messages_count = (await session.execute(count_stmt)).scalar_one() or 0 await session.execute(delete(Message).where(Message.thread_id == thread_id)) await session.delete(thread) await session.commit() return int(messages_count)