"""Шаги state machine внутри ветки: сид, чтение, правка (Спринт 6a). Шаги живут в БД (`intent_steps`), сид при старте читает файлы промптов из `prompts/intents/{intent_code}/steps/{step_code}.md`. Список шагов и переходы описаны в словаре `SEED_INTENT_STEPS` ниже — новые state-machine-ветки добавляются сюда + соответствующие файлы. """ import json import logging from datetime import datetime, timezone from pathlib import Path from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from db.models import Intent, IntentStep logger = logging.getLogger(__name__) PROMPTS_INTENTS_DIR = Path(__file__).resolve().parent.parent / "prompts" / "intents" # Стартовая описание шагов для state-machine-веток. Ключ — code ветки; значение — # список шагов в порядке следования. `allowed_next` описывает граф переходов. SEED_INTENT_STEPS: dict[str, list[dict]] = { # Спринт 7.6 (вариант 2): воронка сжата с 6 шагов до 4 — `intro → qualify → book → close`. # Шаги `present` и `offer_time` оставлены в БД как deprecated (на случай отката решения), # но `qualify` теперь ведёт сразу на `book`, и `book` больше не возвращает на `offer_time`. # См. docs/OPTIMIZATION_CONVERSION_v1.md, блок C. "new_booking": [ { "code": "intro", "name": "Приветствие", "allowed_next": ["intro", "qualify"], "guards": {}, }, { "code": "qualify", "name": "Повод и специалист", "allowed_next": ["qualify", "book"], "guards": { "require_legal_rep": { "description": "Для записи ребёнка нужны ФИО и телефон законного представителя", "trigger_slot": "is_child", "trigger_value": True, "required_slots": ["legal_rep_name", "legal_rep_phone"], }, }, }, { "code": "present", "name": "Презентация плана", # DEPRECATED (Спринт 7.6): шаг изолирован. Если модель ошибочно туда попала — # выходим только в `book`, не зацикливаемся. "allowed_next": ["book"], }, { "code": "offer_time", "name": "Удобное время", # DEPRECATED (Спринт 7.6): станет актуален при подключении реального календаря. "allowed_next": ["offer_time", "book"], }, { "code": "book", "name": "Подтверждение записи", "allowed_next": ["book", "qualify", "close"], }, { "code": "close", "name": "Завершение", "allowed_next": ["close"], }, ], } # Старые значения allowed_next до Спринта 7.6 — нужны для безопасной миграции # существующих записей в БД (см. migrate_new_booking_allowed_next_v2 ниже). _PRE_SPRINT_7_6_ALLOWED_NEXT: dict[str, list[str]] = { "intro": ["intro", "qualify"], "qualify": ["qualify", "present"], "present": ["present", "qualify", "offer_time"], "offer_time": ["offer_time", "book"], "book": ["book", "qualify", "offer_time", "close"], "close": ["close"], } def _step_prompt_path(intent_code: str, step_code: str) -> Path: return PROMPTS_INTENTS_DIR / intent_code / "steps" / f"{step_code}.md" def load_seed_step_prompt(intent_code: str, step_code: str) -> str: path = _step_prompt_path(intent_code, step_code) try: return path.read_text(encoding="utf-8").strip() except FileNotFoundError: logger.warning("Seed prompt for step %s/%s not found at %s", intent_code, step_code, path) return "" def has_state_machine(intent_code: str) -> bool: return intent_code in SEED_INTENT_STEPS def parse_allowed_next(step: IntentStep) -> list[str]: try: value = json.loads(step.allowed_next_json) except (json.JSONDecodeError, TypeError): return [] return value if isinstance(value, list) else [] def parse_guards(step: IntentStep) -> dict: try: value = json.loads(step.guards_json) except (json.JSONDecodeError, TypeError): return {} return value if isinstance(value, dict) else {} async def list_steps_for_intent(session: AsyncSession, intent_id: int) -> list[IntentStep]: stmt = select(IntentStep).where(IntentStep.intent_id == intent_id).order_by(IntentStep.order_index, IntentStep.id) return list((await session.execute(stmt)).scalars().all()) async def get_step_by_code( session: AsyncSession, intent_id: int, step_code: str ) -> IntentStep | None: stmt = select(IntentStep).where( IntentStep.intent_id == intent_id, IntentStep.code == step_code ) return (await session.execute(stmt)).scalar_one_or_none() async def get_first_step(session: AsyncSession, intent_id: int) -> IntentStep | None: stmt = ( select(IntentStep) .where(IntentStep.intent_id == intent_id) .order_by(IntentStep.order_index, IntentStep.id) .limit(1) ) return (await session.execute(stmt)).scalar_one_or_none() async def update_step( session: AsyncSession, step: IntentStep, *, name: str | None = None, system_prompt: str | None = None, allowed_next: list[str] | None = None, guards: dict | None = None, ) -> IntentStep: if name is not None: step.name = name if system_prompt is not None: step.system_prompt = system_prompt if allowed_next is not None: step.allowed_next_json = json.dumps(allowed_next, ensure_ascii=False) if guards is not None: step.guards_json = json.dumps(guards, ensure_ascii=False) step.updated_at = datetime.now(timezone.utc) await session.commit() await session.refresh(step) return step async def ensure_seed_steps(session: AsyncSession) -> None: """Досиживает недостающие шаги для state-machine-веток. Существующие не трогаются.""" added = 0 for intent_code, steps_def in SEED_INTENT_STEPS.items(): intent = (await session.execute( select(Intent).where(Intent.code == intent_code) )).scalar_one_or_none() if intent is None: logger.warning("Cannot seed steps for %s: intent not found", intent_code) continue existing = set((await session.execute( select(IntentStep.code).where(IntentStep.intent_id == intent.id) )).scalars().all()) for order, data in enumerate(steps_def): if data["code"] in existing: continue prompt = load_seed_step_prompt(intent_code, data["code"]) session.add(IntentStep( intent_id=intent.id, code=data["code"], name=data["name"], order_index=order, system_prompt=prompt, allowed_next_json=json.dumps(data["allowed_next"], ensure_ascii=False), guards_json="{}", )) added += 1 if added: await session.commit() logger.info("Seeded %d missing intent_steps", added) async def ensure_seed_guards(session: AsyncSession) -> None: """Патчит guards_json для существующих шагов, если они остались пустыми '{}'. Нужно для обратной совместимости: шаги созданы раньше, чем guards появились в SEED_INTENT_STEPS. Вызывается при старте после ensure_seed_steps. """ patched = 0 for intent_code, steps_def in SEED_INTENT_STEPS.items(): intent = (await session.execute( select(Intent).where(Intent.code == intent_code) )).scalar_one_or_none() if intent is None: continue for step_data in steps_def: seed_guards = step_data.get("guards") if not seed_guards: continue step = (await session.execute( select(IntentStep).where( IntentStep.intent_id == intent.id, IntentStep.code == step_data["code"], ) )).scalar_one_or_none() if step is None: continue if step.guards_json in ("{}", "", None): step.guards_json = json.dumps(seed_guards, ensure_ascii=False) patched += 1 if patched: await session.commit() logger.info("Patched guards_json for %d intent_steps", patched) async def migrate_new_booking_allowed_next_v2(session: AsyncSession) -> None: """Одноразовая миграция Спринта 7.6: переключить `allowed_next` шагов `new_booking` на новый граф (intro → qualify → book → close, без present и offer_time). Логика безопасности: для каждого шага сравниваем текущий `allowed_next_json` с дореформенным значением (`_PRE_SPRINT_7_6_ALLOWED_NEXT`). Если совпадает — оператор не правил вручную, обновляем на новое значение из `SEED_INTENT_STEPS`. Если отличается — пропускаем и пишем warning. Идемпотентна: при повторных вызовах второй проход просто никого не находит. """ intent = (await session.execute( select(Intent).where(Intent.code == "new_booking") )).scalar_one_or_none() if intent is None: return seed_by_code = {s["code"]: s for s in SEED_INTENT_STEPS["new_booking"]} updated = 0 skipped: list[str] = [] for step in await list_steps_for_intent(session, intent.id): old_seed = _PRE_SPRINT_7_6_ALLOWED_NEXT.get(step.code) new_seed_step = seed_by_code.get(step.code) if old_seed is None or new_seed_step is None: continue new_allowed = new_seed_step["allowed_next"] try: current = json.loads(step.allowed_next_json) except (json.JSONDecodeError, TypeError): current = None # Уже на новом значении — ничего не делаем (идемпотентность). if current == new_allowed: continue # Совпадает со старым SEED — оператор не правил, безопасно обновить. if current == old_seed: step.allowed_next_json = json.dumps(new_allowed, ensure_ascii=False) updated += 1 continue # Любое другое значение — оператор правил вручную, не трогаем. skipped.append(f"{step.code}={current!r}") if updated: await session.commit() logger.info( "migrate_new_booking_allowed_next_v2: updated %d steps to Спринт 7.6 graph", updated, ) if skipped: logger.warning( "migrate_new_booking_allowed_next_v2: skipped %d steps (operator-modified): %s", len(skipped), ", ".join(skipped), )