"""Версионирование графа шагов state machine (Спринт 7.7). Каждая state-machine-ветка хранит один или несколько графов в `intent_step_graphs`. Активен ровно один (is_active=True): чат и Песочница используют только его. Остальные графы — резерв на случай отката или A/B-сравнения. Шаги (`intent_steps`) ссылаются на граф через `graph_id`. Один шаг принадлежит одному графу; чтобы шаг работал в разных версиях, он дублируется при создании копии графа. Здесь: - `ensure_seed_graphs` — идемпотентная data-migration. Создаёт активный граф для каждой ветки и привязывает к нему существующие шаги (graph_id=NULL). Для `new_booking` дополнительно восстанавливает архивный 6-шаговый граф v1 из `_archived_v1/*.md` и `_PRE_SPRINT_7_6_ALLOWED_NEXT`. - `get_active_graph` — выдаёт активный граф ветки (для фильтра шагов). """ import json import logging from pathlib import Path from sqlalchemy import delete, select, update from sqlalchemy.ext.asyncio import AsyncSession from db.models import Intent, IntentStep, IntentStepGraph from services.intent_step_service import ( SEED_INTENT_STEPS, _NEW_BOOKING_DEPRECATED_STEP_CODES, _PRE_SPRINT_7_6_ALLOWED_NEXT, PROMPTS_INTENTS_DIR, ) logger = logging.getLogger(__name__) # Имена активных графов по веткам. Для new_booking — это вариант 2 Спринта 7.6. _ACTIVE_GRAPH_NAMES: dict[str, str] = { "new_booking": "v2 (4 шага, Спринт 7.6)", } _DEFAULT_ACTIVE_GRAPH_NAME = "v1 (исходный)" # Архивный 6-шаговый граф new_booking — снапшот до Спринта 7.6. _ARCHIVED_NEW_BOOKING_V1_NAME = "v1 (6 шагов, до Спринта 7.6)" _ARCHIVED_NEW_BOOKING_V1_STEPS: list[dict] = [ {"code": "intro", "name": "Приветствие", "guards": {}}, { "code": "qualify", "name": "Повод и специалист", "guards": { "require_legal_rep": { "description": "Для записи ребёнка нужны ФИО и телефон законного представителя", "trigger_slot": "is_child", "trigger_value": True, "required_slots": ["legal_rep_name", "legal_rep_phone"], }, }, }, {"code": "present", "name": "Презентация плана", "guards": {}}, {"code": "offer_time", "name": "Удобное время", "guards": {}}, {"code": "book", "name": "Подтверждение записи", "guards": {}}, {"code": "close", "name": "Завершение", "guards": {}}, ] def _archived_prompt_path(intent_code: str, step_code: str) -> Path: return PROMPTS_INTENTS_DIR / intent_code / "steps" / "_archived_v1" / f"{step_code}.md" def _load_archived_prompt(intent_code: str, step_code: str) -> str: path = _archived_prompt_path(intent_code, step_code) try: return path.read_text(encoding="utf-8").strip() except FileNotFoundError: logger.warning("Archived prompt %s/%s not found at %s", intent_code, step_code, path) return "" async def get_active_graph(session: AsyncSession, intent_id: int) -> IntentStepGraph | None: stmt = ( select(IntentStepGraph) .where(IntentStepGraph.intent_id == intent_id, IntentStepGraph.is_active.is_(True)) .limit(1) ) return (await session.execute(stmt)).scalar_one_or_none() async def list_graphs(session: AsyncSession, intent_id: int) -> list[tuple[IntentStepGraph, int]]: """Все графы ветки + кол-во шагов в каждом, в порядке: активный первым, затем по version.""" graphs = list((await session.execute( select(IntentStepGraph) .where(IntentStepGraph.intent_id == intent_id) .order_by(IntentStepGraph.is_active.desc(), IntentStepGraph.version) )).scalars().all()) counts: dict[int, int] = {} if graphs: from sqlalchemy import func rows = (await session.execute( select(IntentStep.graph_id, func.count(IntentStep.id)) .where(IntentStep.graph_id.in_([g.id for g in graphs])) .group_by(IntentStep.graph_id) )).all() counts = {gid: cnt for gid, cnt in rows} return [(g, counts.get(g.id, 0)) for g in graphs] async def set_active_graph( session: AsyncSession, intent_id: int, graph_id: int ) -> IntentStepGraph | None: """Сделать `graph_id` активным графом ветки, остальные — неактивными.""" target = (await session.execute( select(IntentStepGraph).where( IntentStepGraph.intent_id == intent_id, IntentStepGraph.id == graph_id, ) )).scalar_one_or_none() if target is None: return None await session.execute( update(IntentStepGraph) .where(IntentStepGraph.intent_id == intent_id) .values(is_active=False) ) target.is_active = True await session.commit() await session.refresh(target) return target async def ensure_seed_graphs(session: AsyncSession) -> None: """Создаёт активный граф для каждой state-machine-ветки и привязывает существующие шаги. Для `new_booking` восстанавливает резервный v1-граф. Идемпотентность: при повторном вызове ничего не делает, если активный граф уже создан, а шагов с graph_id=NULL не осталось. """ created_graphs = 0 bound_steps = 0 archived_v1_created = False for intent_code, _steps_def in SEED_INTENT_STEPS.items(): intent = (await session.execute( select(Intent).where(Intent.code == intent_code) )).scalar_one_or_none() if intent is None: logger.warning("Cannot seed graph for %s: intent not found", intent_code) continue active = await get_active_graph(session, intent.id) if active is None: active = IntentStepGraph( intent_id=intent.id, version=1, name=_ACTIVE_GRAPH_NAMES.get(intent_code, _DEFAULT_ACTIVE_GRAPH_NAME), is_active=True, ) session.add(active) await session.flush() created_graphs += 1 logger.info("Created active graph %r for intent %s", active.name, intent_code) # Привязываем «бесхозные» шаги (graph_id IS NULL) — это существующие записи # из intent_steps, которые жили до миграции версии j6d8c4b56g23. result = await session.execute( update(IntentStep) .where(IntentStep.intent_id == intent.id, IntentStep.graph_id.is_(None)) .values(graph_id=active.id) ) bound_steps += result.rowcount or 0 # Спринт 7.7: создать архивный 6-шаговый граф new_booking. Один раз. nb_intent = (await session.execute( select(Intent).where(Intent.code == "new_booking") )).scalar_one_or_none() nb_active_graph: IntentStepGraph | None = None if nb_intent is not None: nb_active_graph = await get_active_graph(session, nb_intent.id) existing_v1 = (await session.execute( select(IntentStepGraph).where( IntentStepGraph.intent_id == nb_intent.id, IntentStepGraph.name == _ARCHIVED_NEW_BOOKING_V1_NAME, ) )).scalar_one_or_none() if existing_v1 is None: v1_graph = IntentStepGraph( intent_id=nb_intent.id, version=2, name=_ARCHIVED_NEW_BOOKING_V1_NAME, is_active=False, ) session.add(v1_graph) await session.flush() for order, data in enumerate(_ARCHIVED_NEW_BOOKING_V1_STEPS): allowed_next = _PRE_SPRINT_7_6_ALLOWED_NEXT[data["code"]] prompt = _load_archived_prompt("new_booking", data["code"]) session.add(IntentStep( intent_id=nb_intent.id, graph_id=v1_graph.id, code=data["code"], name=data["name"], order_index=order, system_prompt=prompt, allowed_next_json=json.dumps(allowed_next, ensure_ascii=False), guards_json=json.dumps(data["guards"], ensure_ascii=False), )) archived_v1_created = True logger.info( "Created archived graph %r for new_booking (6 steps)", _ARCHIVED_NEW_BOOKING_V1_NAME, ) # Спринт 7.7: чистим активный граф new_booking от deprecated шагов (present, # offer_time). Они должны жить только в архивном v1. Идемпотентно. deprecated_removed = 0 if nb_intent is not None and nb_active_graph is not None: result = await session.execute( delete(IntentStep).where( IntentStep.graph_id == nb_active_graph.id, IntentStep.code.in_(_NEW_BOOKING_DEPRECATED_STEP_CODES), ) ) deprecated_removed = result.rowcount or 0 if deprecated_removed: logger.info( "Removed %d deprecated steps from active new_booking graph: %s", deprecated_removed, sorted(_NEW_BOOKING_DEPRECATED_STEP_CODES), ) if created_graphs or bound_steps or archived_v1_created or deprecated_removed: await session.commit() logger.info( "ensure_seed_graphs: graphs=%d, bound_steps=%d, archived_v1=%s", created_graphs, bound_steps, archived_v1_created, )