Files
RAG_helper/services/intent_step_service.py
T
AR 15 M4 4977199cd4 feat(sprint6b-F): guards в new_booking — require_legal_rep
- check_guards() в state_machine.py: проверяет guards_json шага при переходе;
  trigger_slot/trigger_value/required_slots; нормализует "true"/"false"-строки
- qualify step: guard require_legal_rep — блокирует переход в present, если
  is_child=true и не заполнены legal_rep_name / legal_rep_phone
- Промпт qualify обновлён: инструкции по is_child, legal_rep, requested_doctor,
  waitlist_flag, needs_surgologist_first
- ensure_seed_guards() патчит guards_json существующих шагов при старте
- Sandbox: блок валидации показывает guard_name + missing_slots + description
- Settings: обновлён лейбл поля guards с примером формата

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-26 18:27:10 +05:00

218 lines
7.8 KiB
Python

"""Шаги state machine внутри ветки: сид, чтение, правка (Спринт 6a).
Шаги живут в БД (`intent_steps`), сид при старте читает файлы промптов из
`prompts/intents/{intent_code}/steps/{step_code}.md`. Список шагов и переходы
описаны в словаре `SEED_INTENT_STEPS` ниже — новые state-machine-ветки
добавляются сюда + соответствующие файлы.
"""
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import Intent, IntentStep
logger = logging.getLogger(__name__)
PROMPTS_INTENTS_DIR = Path(__file__).resolve().parent.parent / "prompts" / "intents"
# Стартовая описание шагов для state-machine-веток. Ключ — code ветки; значение —
# список шагов в порядке следования. `allowed_next` описывает граф переходов.
SEED_INTENT_STEPS: dict[str, list[dict]] = {
"new_booking": [
{
"code": "intro",
"name": "Приветствие",
"allowed_next": ["intro", "qualify"],
"guards": {},
},
{
"code": "qualify",
"name": "Повод и специалист",
"allowed_next": ["qualify", "present"],
"guards": {
"require_legal_rep": {
"description": "Для записи ребёнка нужны ФИО и телефон законного представителя",
"trigger_slot": "is_child",
"trigger_value": True,
"required_slots": ["legal_rep_name", "legal_rep_phone"],
},
},
},
{
"code": "present",
"name": "Презентация плана",
"allowed_next": ["present", "qualify", "offer_time"],
},
{
"code": "offer_time",
"name": "Удобное время",
"allowed_next": ["offer_time", "book"],
},
{
"code": "book",
"name": "Подтверждение записи",
"allowed_next": ["book", "qualify", "offer_time", "close"],
},
{
"code": "close",
"name": "Завершение",
"allowed_next": ["close"],
},
],
}
def _step_prompt_path(intent_code: str, step_code: str) -> Path:
return PROMPTS_INTENTS_DIR / intent_code / "steps" / f"{step_code}.md"
def load_seed_step_prompt(intent_code: str, step_code: str) -> str:
path = _step_prompt_path(intent_code, step_code)
try:
return path.read_text(encoding="utf-8").strip()
except FileNotFoundError:
logger.warning("Seed prompt for step %s/%s not found at %s", intent_code, step_code, path)
return ""
def has_state_machine(intent_code: str) -> bool:
return intent_code in SEED_INTENT_STEPS
def parse_allowed_next(step: IntentStep) -> list[str]:
try:
value = json.loads(step.allowed_next_json)
except (json.JSONDecodeError, TypeError):
return []
return value if isinstance(value, list) else []
def parse_guards(step: IntentStep) -> dict:
try:
value = json.loads(step.guards_json)
except (json.JSONDecodeError, TypeError):
return {}
return value if isinstance(value, dict) else {}
async def list_steps_for_intent(session: AsyncSession, intent_id: int) -> list[IntentStep]:
stmt = select(IntentStep).where(IntentStep.intent_id == intent_id).order_by(IntentStep.order_index, IntentStep.id)
return list((await session.execute(stmt)).scalars().all())
async def get_step_by_code(
session: AsyncSession, intent_id: int, step_code: str
) -> IntentStep | None:
stmt = select(IntentStep).where(
IntentStep.intent_id == intent_id, IntentStep.code == step_code
)
return (await session.execute(stmt)).scalar_one_or_none()
async def get_first_step(session: AsyncSession, intent_id: int) -> IntentStep | None:
stmt = (
select(IntentStep)
.where(IntentStep.intent_id == intent_id)
.order_by(IntentStep.order_index, IntentStep.id)
.limit(1)
)
return (await session.execute(stmt)).scalar_one_or_none()
async def update_step(
session: AsyncSession,
step: IntentStep,
*,
name: str | None = None,
system_prompt: str | None = None,
allowed_next: list[str] | None = None,
guards: dict | None = None,
) -> IntentStep:
if name is not None:
step.name = name
if system_prompt is not None:
step.system_prompt = system_prompt
if allowed_next is not None:
step.allowed_next_json = json.dumps(allowed_next, ensure_ascii=False)
if guards is not None:
step.guards_json = json.dumps(guards, ensure_ascii=False)
step.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(step)
return step
async def ensure_seed_steps(session: AsyncSession) -> None:
"""Досиживает недостающие шаги для state-machine-веток. Существующие не трогаются."""
added = 0
for intent_code, steps_def in SEED_INTENT_STEPS.items():
intent = (await session.execute(
select(Intent).where(Intent.code == intent_code)
)).scalar_one_or_none()
if intent is None:
logger.warning("Cannot seed steps for %s: intent not found", intent_code)
continue
existing = set((await session.execute(
select(IntentStep.code).where(IntentStep.intent_id == intent.id)
)).scalars().all())
for order, data in enumerate(steps_def):
if data["code"] in existing:
continue
prompt = load_seed_step_prompt(intent_code, data["code"])
session.add(IntentStep(
intent_id=intent.id,
code=data["code"],
name=data["name"],
order_index=order,
system_prompt=prompt,
allowed_next_json=json.dumps(data["allowed_next"], ensure_ascii=False),
guards_json="{}",
))
added += 1
if added:
await session.commit()
logger.info("Seeded %d missing intent_steps", added)
async def ensure_seed_guards(session: AsyncSession) -> None:
"""Патчит guards_json для существующих шагов, если они остались пустыми '{}'.
Нужно для обратной совместимости: шаги созданы раньше, чем guards появились
в SEED_INTENT_STEPS. Вызывается при старте после ensure_seed_steps.
"""
patched = 0
for intent_code, steps_def in SEED_INTENT_STEPS.items():
intent = (await session.execute(
select(Intent).where(Intent.code == intent_code)
)).scalar_one_or_none()
if intent is None:
continue
for step_data in steps_def:
seed_guards = step_data.get("guards")
if not seed_guards:
continue
step = (await session.execute(
select(IntentStep).where(
IntentStep.intent_id == intent.id,
IntentStep.code == step_data["code"],
)
)).scalar_one_or_none()
if step is None:
continue
if step.guards_json in ("{}", "", None):
step.guards_json = json.dumps(seed_guards, ensure_ascii=False)
patched += 1
if patched:
await session.commit()
logger.info("Patched guards_json for %d intent_steps", patched)