4977199cd4
- check_guards() в state_machine.py: проверяет guards_json шага при переходе; trigger_slot/trigger_value/required_slots; нормализует "true"/"false"-строки - qualify step: guard require_legal_rep — блокирует переход в present, если is_child=true и не заполнены legal_rep_name / legal_rep_phone - Промпт qualify обновлён: инструкции по is_child, legal_rep, requested_doctor, waitlist_flag, needs_surgologist_first - ensure_seed_guards() патчит guards_json существующих шагов при старте - Sandbox: блок валидации показывает guard_name + missing_slots + description - Settings: обновлён лейбл поля guards с примером формата Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
203 lines
7.2 KiB
Python
203 lines
7.2 KiB
Python
"""Парсер structured-output ветки + валидатор переходов state machine (Спринт 6a).
|
|
|
|
Формат ответа ветки со state machine:
|
|
|
|
Текст пациенту, markdown разрешён.
|
|
|
|
STATE_JSON: {"state_after": "<step_code>", "slots_updated": {"slot": "value"}}
|
|
|
|
Либо — при exit condition — вместо `STATE_JSON:` строка `[INTENT_CHANGE: <code>]`.
|
|
Парсер вырезает служебную часть и возвращает видимый текст + решение модели.
|
|
"""
|
|
import json
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_INTENT_CHANGE_RE = re.compile(r"\[INTENT_CHANGE:\s*([a-z_][a-z0-9_]*)\s*\]")
|
|
_STATE_JSON_RE = re.compile(r"STATE_JSON\s*:\s*", re.IGNORECASE)
|
|
|
|
|
|
def parse_branch_response(text: str) -> dict:
|
|
"""Разобрать ответ ветки на visible_text + intent_change / state_update.
|
|
|
|
Возвращает:
|
|
visible_text: str — текст пациенту (без служебных тегов),
|
|
intent_change: str | None — код ветки, если сработал exit condition,
|
|
state_update: {'state_after': str, 'slots_updated': dict} | None — при штатном ответе,
|
|
parse_error: str | None — если что-то не разобралось, сюда кладётся причина
|
|
(visible_text при этом = исходный текст без мусора).
|
|
"""
|
|
# Exit condition имеет приоритет.
|
|
intent_match = _INTENT_CHANGE_RE.search(text)
|
|
if intent_match:
|
|
visible = text[:intent_match.start()].rstrip()
|
|
return {
|
|
"visible_text": visible,
|
|
"intent_change": intent_match.group(1),
|
|
"state_update": None,
|
|
"parse_error": None,
|
|
}
|
|
|
|
state_match = _STATE_JSON_RE.search(text)
|
|
if not state_match:
|
|
# Модель не вернула служебную часть. Возвращаем весь текст и ошибку парсинга.
|
|
return {
|
|
"visible_text": text.rstrip(),
|
|
"intent_change": None,
|
|
"state_update": None,
|
|
"parse_error": "no STATE_JSON block",
|
|
}
|
|
|
|
raw_json, _ = _consume_json_object(text, state_match.end())
|
|
if raw_json is None:
|
|
return {
|
|
"visible_text": text[:state_match.start()].rstrip(),
|
|
"intent_change": None,
|
|
"state_update": None,
|
|
"parse_error": "STATE_JSON present but no balanced JSON object",
|
|
}
|
|
|
|
try:
|
|
data = json.loads(raw_json)
|
|
except json.JSONDecodeError as e:
|
|
return {
|
|
"visible_text": text[:state_match.start()].rstrip(),
|
|
"intent_change": None,
|
|
"state_update": None,
|
|
"parse_error": f"STATE_JSON invalid JSON: {e}",
|
|
}
|
|
|
|
if not isinstance(data, dict):
|
|
return {
|
|
"visible_text": text[:state_match.start()].rstrip(),
|
|
"intent_change": None,
|
|
"state_update": None,
|
|
"parse_error": "STATE_JSON is not an object",
|
|
}
|
|
|
|
state_after = data.get("state_after")
|
|
slots_updated = data.get("slots_updated", {})
|
|
soft_insertion = bool(data.get("soft_insertion", False))
|
|
if not isinstance(state_after, str) or not state_after:
|
|
return {
|
|
"visible_text": text[:state_match.start()].rstrip(),
|
|
"intent_change": None,
|
|
"state_update": None,
|
|
"parse_error": "STATE_JSON missing state_after",
|
|
}
|
|
if not isinstance(slots_updated, dict):
|
|
slots_updated = {}
|
|
|
|
return {
|
|
"visible_text": text[:state_match.start()].rstrip(),
|
|
"intent_change": None,
|
|
"state_update": {
|
|
"state_after": state_after,
|
|
"slots_updated": slots_updated,
|
|
"soft_insertion": soft_insertion,
|
|
},
|
|
"parse_error": None,
|
|
}
|
|
|
|
|
|
def _consume_json_object(text: str, start: int) -> tuple[str | None, int]:
|
|
"""Вытянуть сбалансированный JSON-объект из text[start:]. См. парсер в chat_service."""
|
|
i = start
|
|
n = len(text)
|
|
while i < n and text[i].isspace():
|
|
i += 1
|
|
if i >= n or text[i] != "{":
|
|
return None, start
|
|
depth = 0
|
|
in_str = False
|
|
esc = False
|
|
j = i
|
|
while j < n:
|
|
ch = text[j]
|
|
if in_str:
|
|
if esc:
|
|
esc = False
|
|
elif ch == "\\":
|
|
esc = True
|
|
elif ch == '"':
|
|
in_str = False
|
|
else:
|
|
if ch == '"':
|
|
in_str = True
|
|
elif ch == "{":
|
|
depth += 1
|
|
elif ch == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
return text[i:j + 1], j + 1
|
|
j += 1
|
|
return None, start
|
|
|
|
|
|
def validate_transition(
|
|
*, current_step: str, requested_step: str, allowed_next: list[str],
|
|
) -> tuple[bool, str]:
|
|
"""Разрешён ли переход `current_step → requested_step`.
|
|
|
|
Остаться на месте (`requested_step == current_step`) разрешено всегда.
|
|
Возвращает (ok, reason).
|
|
"""
|
|
if requested_step == current_step:
|
|
return True, "stay"
|
|
if requested_step in allowed_next:
|
|
return True, "ok"
|
|
return (
|
|
False,
|
|
f"requested {requested_step!r} not in allowed_next {allowed_next!r} of {current_step!r}",
|
|
)
|
|
|
|
|
|
def check_guards(
|
|
*,
|
|
current_step_code: str,
|
|
requested_step_code: str,
|
|
slots: dict,
|
|
guards: dict,
|
|
) -> tuple[bool, str | None, list[str], str | None]:
|
|
"""Проверяет guards шага: можно ли перейти с учётом текущих слотов.
|
|
|
|
Guards проверяются только при реальном переходе (requested != current).
|
|
Возвращает (ok, guard_name, missing_slots, guard_description).
|
|
|
|
Формат guards:
|
|
{
|
|
"require_legal_rep": {
|
|
"description": "...",
|
|
"trigger_slot": "is_child",
|
|
"trigger_value": true,
|
|
"required_slots": ["legal_rep_name", "legal_rep_phone"]
|
|
}
|
|
}
|
|
"""
|
|
if requested_step_code == current_step_code or not guards:
|
|
return True, None, [], None
|
|
|
|
for guard_name, guard_def in guards.items():
|
|
if not isinstance(guard_def, dict):
|
|
continue
|
|
trigger_slot = guard_def.get("trigger_slot")
|
|
trigger_value = guard_def.get("trigger_value")
|
|
required_slots: list[str] = guard_def.get("required_slots", [])
|
|
|
|
slot_val = slots.get(trigger_slot) if trigger_slot else None
|
|
# Нормализация: модель может вернуть "true"/"false" как строки.
|
|
if isinstance(slot_val, str) and slot_val.lower() in ("true", "false"):
|
|
slot_val = slot_val.lower() == "true"
|
|
triggered = (trigger_slot is None) or (slot_val == trigger_value)
|
|
if not triggered:
|
|
continue
|
|
|
|
missing = [s for s in required_slots if not slots.get(s)]
|
|
if missing:
|
|
desc = guard_def.get("description", "")
|
|
return False, guard_name, missing, desc
|
|
|
|
return True, None, [], None
|