"""Шаги state machine внутри ветки: сид, чтение, правка (Спринт 6a). Шаги живут в БД (`intent_steps`), сид при старте читает файлы промптов из `prompts/intents/{intent_code}/steps/{step_code}.md`. Список шагов и переходы описаны в словаре `SEED_INTENT_STEPS` ниже — новые state-machine-ветки добавляются сюда + соответствующие файлы. """ import json import logging from datetime import datetime, timezone from pathlib import Path from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from db.models import Intent, IntentStep, IntentStepGraph logger = logging.getLogger(__name__) PROMPTS_INTENTS_DIR = Path(__file__).resolve().parent.parent / "prompts" / "intents" # Стартовая описание шагов для state-machine-веток. Ключ — code ветки; значение — # список шагов в порядке следования. `allowed_next` описывает граф переходов. SEED_INTENT_STEPS: dict[str, list[dict]] = { # Спринт 7.6/7.7 (вариант 2): активный граф new_booking — ровно 4 шага # `intro → qualify → book → close`. Старый 6-шаговый сценарий (с `present` # и `offer_time`) сохранён как архивный граф v1, см. # `services/intent_step_graph_service._ARCHIVED_NEW_BOOKING_V1_STEPS`. "new_booking": [ { "code": "intro", "name": "Приветствие", "allowed_next": ["intro", "qualify"], "guards": {}, }, { "code": "qualify", "name": "Повод и специалист", "allowed_next": ["qualify", "book"], "guards": { "require_legal_rep": { "description": "Для записи ребёнка нужны ФИО и телефон законного представителя", "trigger_slot": "is_child", "trigger_value": True, "required_slots": ["legal_rep_name", "legal_rep_phone"], }, }, }, { "code": "book", "name": "Подтверждение записи", "allowed_next": ["book", "qualify", "close"], }, { "code": "close", "name": "Завершение", "allowed_next": ["close"], }, ], } # Коды шагов, которые когда-то были в активном графе new_booking (Спринт 6a), # но в Спринте 7.6 удалены. Список используется для одноразовой чистки активного # графа в `ensure_seed_graphs`. Сами шаги остаются в архивном v1. _NEW_BOOKING_DEPRECATED_STEP_CODES: set[str] = {"present", "offer_time"} # Старые значения allowed_next до Спринта 7.6 — нужны для безопасной миграции # существующих записей в БД (см. migrate_new_booking_allowed_next_v2 ниже). _PRE_SPRINT_7_6_ALLOWED_NEXT: dict[str, list[str]] = { "intro": ["intro", "qualify"], "qualify": ["qualify", "present"], "present": ["present", "qualify", "offer_time"], "offer_time": ["offer_time", "book"], "book": ["book", "qualify", "offer_time", "close"], "close": ["close"], } def _step_prompt_path(intent_code: str, step_code: str) -> Path: return PROMPTS_INTENTS_DIR / intent_code / "steps" / f"{step_code}.md" def load_seed_step_prompt(intent_code: str, step_code: str) -> str: path = _step_prompt_path(intent_code, step_code) try: return path.read_text(encoding="utf-8").strip() except FileNotFoundError: logger.warning("Seed prompt for step %s/%s not found at %s", intent_code, step_code, path) return "" def has_state_machine(intent_code: str) -> bool: return intent_code in SEED_INTENT_STEPS def parse_allowed_next(step: IntentStep) -> list[str]: try: value = json.loads(step.allowed_next_json) except (json.JSONDecodeError, TypeError): return [] return value if isinstance(value, list) else [] def parse_guards(step: IntentStep) -> dict: try: value = json.loads(step.guards_json) except (json.JSONDecodeError, TypeError): return {} return value if isinstance(value, dict) else {} def _active_graph_filter(intent_id: int): """Базовый фильтр: шаги принадлежат активному графу указанного intent. Со Спринта 7.7 у ветки может быть несколько графов; чат и UI «Шаги» работают только с активным. Шаги с graph_id=NULL могут существовать только в окне миграции до первого вызова ensure_seed_graphs — в нормальной работе их нет. """ return ( select(IntentStep) .join(IntentStepGraph, IntentStepGraph.id == IntentStep.graph_id) .where( IntentStepGraph.intent_id == intent_id, IntentStepGraph.is_active.is_(True), ) ) async def list_steps_for_intent(session: AsyncSession, intent_id: int) -> list[IntentStep]: stmt = _active_graph_filter(intent_id).order_by(IntentStep.order_index, IntentStep.id) return list((await session.execute(stmt)).scalars().all()) async def get_step_by_code( session: AsyncSession, intent_id: int, step_code: str ) -> IntentStep | None: stmt = _active_graph_filter(intent_id).where(IntentStep.code == step_code) return (await session.execute(stmt)).scalar_one_or_none() async def get_first_step(session: AsyncSession, intent_id: int) -> IntentStep | None: stmt = ( _active_graph_filter(intent_id) .order_by(IntentStep.order_index, IntentStep.id) .limit(1) ) return (await session.execute(stmt)).scalar_one_or_none() async def update_step( session: AsyncSession, step: IntentStep, *, name: str | None = None, system_prompt: str | None = None, allowed_next: list[str] | None = None, guards: dict | None = None, ) -> IntentStep: if name is not None: step.name = name if system_prompt is not None: step.system_prompt = system_prompt if allowed_next is not None: step.allowed_next_json = json.dumps(allowed_next, ensure_ascii=False) if guards is not None: step.guards_json = json.dumps(guards, ensure_ascii=False) step.updated_at = datetime.now(timezone.utc) await session.commit() await session.refresh(step) return step async def ensure_seed_steps(session: AsyncSession) -> None: """Досиживает недостающие шаги для state-machine-веток. Существующие не трогаются.""" added = 0 for intent_code, steps_def in SEED_INTENT_STEPS.items(): intent = (await session.execute( select(Intent).where(Intent.code == intent_code) )).scalar_one_or_none() if intent is None: logger.warning("Cannot seed steps for %s: intent not found", intent_code) continue existing = set((await session.execute( select(IntentStep.code).where(IntentStep.intent_id == intent.id) )).scalars().all()) for order, data in enumerate(steps_def): if data["code"] in existing: continue prompt = load_seed_step_prompt(intent_code, data["code"]) session.add(IntentStep( intent_id=intent.id, code=data["code"], name=data["name"], order_index=order, system_prompt=prompt, allowed_next_json=json.dumps(data["allowed_next"], ensure_ascii=False), guards_json="{}", )) added += 1 if added: await session.commit() logger.info("Seeded %d missing intent_steps", added) async def ensure_seed_guards(session: AsyncSession) -> None: """Патчит guards_json для существующих шагов, если они остались пустыми '{}'. Нужно для обратной совместимости: шаги созданы раньше, чем guards появились в SEED_INTENT_STEPS. Вызывается при старте после ensure_seed_steps. """ patched = 0 for intent_code, steps_def in SEED_INTENT_STEPS.items(): intent = (await session.execute( select(Intent).where(Intent.code == intent_code) )).scalar_one_or_none() if intent is None: continue for step_data in steps_def: seed_guards = step_data.get("guards") if not seed_guards: continue # Сидинг guards применяется только к шагам активного графа: архивные # графы (Спринт 7.7) могут иметь свою историю guards и трогать их нельзя. # Если активного графа ещё нет (первый запуск до ensure_seed_graphs) — # ищем по graph_id IS NULL. step = (await session.execute( _active_graph_filter(intent.id).where(IntentStep.code == step_data["code"]) )).scalar_one_or_none() if step is None: step = (await session.execute( select(IntentStep).where( IntentStep.intent_id == intent.id, IntentStep.code == step_data["code"], IntentStep.graph_id.is_(None), ) )).scalar_one_or_none() if step is None: continue if step.guards_json in ("{}", "", None): step.guards_json = json.dumps(seed_guards, ensure_ascii=False) patched += 1 if patched: await session.commit() logger.info("Patched guards_json for %d intent_steps", patched) async def migrate_new_booking_allowed_next_v2(session: AsyncSession) -> None: """Одноразовая миграция Спринта 7.6: переключить `allowed_next` шагов `new_booking` на новый граф (intro → qualify → book → close, без present и offer_time). Логика безопасности: для каждого шага сравниваем текущий `allowed_next_json` с дореформенным значением (`_PRE_SPRINT_7_6_ALLOWED_NEXT`). Если совпадает — оператор не правил вручную, обновляем на новое значение из `SEED_INTENT_STEPS`. Если отличается — пропускаем и пишем warning. Идемпотентна: при повторных вызовах второй проход просто никого не находит. """ intent = (await session.execute( select(Intent).where(Intent.code == "new_booking") )).scalar_one_or_none() if intent is None: return seed_by_code = {s["code"]: s for s in SEED_INTENT_STEPS["new_booking"]} updated = 0 skipped: list[str] = [] # Берём шаги активного графа + «бесхозные» (graph_id IS NULL — первый запуск # до ensure_seed_graphs). Архивные графы трогать нельзя — их allowed_next # должен остаться дореформенным (Спринт 7.7). candidates_stmt = ( select(IntentStep) .outerjoin(IntentStepGraph, IntentStepGraph.id == IntentStep.graph_id) .where( IntentStep.intent_id == intent.id, (IntentStep.graph_id.is_(None)) | (IntentStepGraph.is_active.is_(True)), ) .order_by(IntentStep.order_index, IntentStep.id) ) for step in (await session.execute(candidates_stmt)).scalars().all(): old_seed = _PRE_SPRINT_7_6_ALLOWED_NEXT.get(step.code) new_seed_step = seed_by_code.get(step.code) if old_seed is None or new_seed_step is None: continue new_allowed = new_seed_step["allowed_next"] try: current = json.loads(step.allowed_next_json) except (json.JSONDecodeError, TypeError): current = None # Уже на новом значении — ничего не делаем (идемпотентность). if current == new_allowed: continue # Совпадает со старым SEED — оператор не правил, безопасно обновить. if current == old_seed: step.allowed_next_json = json.dumps(new_allowed, ensure_ascii=False) updated += 1 continue # Любое другое значение — оператор правил вручную, не трогаем. skipped.append(f"{step.code}={current!r}") if updated: await session.commit() logger.info( "migrate_new_booking_allowed_next_v2: updated %d steps to Спринт 7.6 graph", updated, ) if skipped: logger.warning( "migrate_new_booking_allowed_next_v2: skipped %d steps (operator-modified): %s", len(skipped), ", ".join(skipped), )