"""Шаги state machine внутри ветки: сид, чтение, правка (Спринт 6a). Шаги живут в БД (`intent_steps`), сид при старте читает файлы промптов из `prompts/intents/{intent_code}/steps/{step_code}.md`. Список шагов и переходы описаны в словаре `SEED_INTENT_STEPS` ниже — новые state-machine-ветки добавляются сюда + соответствующие файлы. """ import json import logging from datetime import datetime, timezone from pathlib import Path from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from db.models import Intent, IntentStep logger = logging.getLogger(__name__) PROMPTS_INTENTS_DIR = Path(__file__).resolve().parent.parent / "prompts" / "intents" # Стартовая описание шагов для state-machine-веток. Ключ — code ветки; значение — # список шагов в порядке следования. `allowed_next` описывает граф переходов. SEED_INTENT_STEPS: dict[str, list[dict]] = { "new_booking": [ { "code": "intro", "name": "Приветствие", "allowed_next": ["intro", "qualify"], }, { "code": "qualify", "name": "Повод и специалист", "allowed_next": ["qualify", "present"], }, { "code": "present", "name": "Презентация плана", "allowed_next": ["present", "qualify", "offer_time"], }, { "code": "offer_time", "name": "Удобное время", "allowed_next": ["offer_time", "book"], }, { "code": "book", "name": "Подтверждение записи", "allowed_next": ["book", "qualify", "offer_time", "close"], }, { "code": "close", "name": "Завершение", "allowed_next": ["close"], }, ], } def _step_prompt_path(intent_code: str, step_code: str) -> Path: return PROMPTS_INTENTS_DIR / intent_code / "steps" / f"{step_code}.md" def load_seed_step_prompt(intent_code: str, step_code: str) -> str: path = _step_prompt_path(intent_code, step_code) try: return path.read_text(encoding="utf-8").strip() except FileNotFoundError: logger.warning("Seed prompt for step %s/%s not found at %s", intent_code, step_code, path) return "" def has_state_machine(intent_code: str) -> bool: return intent_code in SEED_INTENT_STEPS def parse_allowed_next(step: IntentStep) -> list[str]: try: value = json.loads(step.allowed_next_json) except (json.JSONDecodeError, TypeError): return [] return value if isinstance(value, list) else [] def parse_guards(step: IntentStep) -> dict: try: value = json.loads(step.guards_json) except (json.JSONDecodeError, TypeError): return {} return value if isinstance(value, dict) else {} async def list_steps_for_intent(session: AsyncSession, intent_id: int) -> list[IntentStep]: stmt = select(IntentStep).where(IntentStep.intent_id == intent_id).order_by(IntentStep.order_index, IntentStep.id) return list((await session.execute(stmt)).scalars().all()) async def get_step_by_code( session: AsyncSession, intent_id: int, step_code: str ) -> IntentStep | None: stmt = select(IntentStep).where( IntentStep.intent_id == intent_id, IntentStep.code == step_code ) return (await session.execute(stmt)).scalar_one_or_none() async def get_first_step(session: AsyncSession, intent_id: int) -> IntentStep | None: stmt = ( select(IntentStep) .where(IntentStep.intent_id == intent_id) .order_by(IntentStep.order_index, IntentStep.id) .limit(1) ) return (await session.execute(stmt)).scalar_one_or_none() async def update_step( session: AsyncSession, step: IntentStep, *, name: str | None = None, system_prompt: str | None = None, allowed_next: list[str] | None = None, guards: dict | None = None, ) -> IntentStep: if name is not None: step.name = name if system_prompt is not None: step.system_prompt = system_prompt if allowed_next is not None: step.allowed_next_json = json.dumps(allowed_next, ensure_ascii=False) if guards is not None: step.guards_json = json.dumps(guards, ensure_ascii=False) step.updated_at = datetime.now(timezone.utc) await session.commit() await session.refresh(step) return step async def ensure_seed_steps(session: AsyncSession) -> None: """Досиживает недостающие шаги для state-machine-веток. Существующие не трогаются.""" added = 0 for intent_code, steps_def in SEED_INTENT_STEPS.items(): intent = (await session.execute( select(Intent).where(Intent.code == intent_code) )).scalar_one_or_none() if intent is None: logger.warning("Cannot seed steps for %s: intent not found", intent_code) continue existing = set((await session.execute( select(IntentStep.code).where(IntentStep.intent_id == intent.id) )).scalars().all()) for order, data in enumerate(steps_def): if data["code"] in existing: continue prompt = load_seed_step_prompt(intent_code, data["code"]) session.add(IntentStep( intent_id=intent.id, code=data["code"], name=data["name"], order_index=order, system_prompt=prompt, allowed_next_json=json.dumps(data["allowed_next"], ensure_ascii=False), guards_json="{}", )) added += 1 if added: await session.commit() logger.info("Seeded %d missing intent_steps", added)