"""Парсер structured-output ветки + валидатор переходов state machine (Спринт 6a). Формат ответа ветки со state machine: Текст пациенту, markdown разрешён. STATE_JSON: {"state_after": "", "slots_updated": {"slot": "value"}} Либо — при exit condition — вместо `STATE_JSON:` строка `[INTENT_CHANGE: ]`. Парсер вырезает служебную часть и возвращает видимый текст + решение модели. """ import json import logging import re logger = logging.getLogger(__name__) _INTENT_CHANGE_RE = re.compile(r"\[INTENT_CHANGE:\s*([a-z_][a-z0-9_]*)\s*\]") _STATE_JSON_RE = re.compile(r"STATE_JSON\s*:\s*", re.IGNORECASE) def parse_branch_response(text: str) -> dict: """Разобрать ответ ветки на visible_text + intent_change / state_update. Возвращает: visible_text: str — текст пациенту (без служебных тегов), intent_change: str | None — код ветки, если сработал exit condition, state_update: {'state_after': str, 'slots_updated': dict} | None — при штатном ответе, parse_error: str | None — если что-то не разобралось, сюда кладётся причина (visible_text при этом = исходный текст без мусора). """ # Exit condition имеет приоритет. intent_match = _INTENT_CHANGE_RE.search(text) if intent_match: visible = text[:intent_match.start()].rstrip() return { "visible_text": visible, "intent_change": intent_match.group(1), "state_update": None, "parse_error": None, } state_match = _STATE_JSON_RE.search(text) if not state_match: # Модель не вернула служебную часть. Возвращаем весь текст и ошибку парсинга. return { "visible_text": text.rstrip(), "intent_change": None, "state_update": None, "parse_error": "no STATE_JSON block", } raw_json, _ = _consume_json_object(text, state_match.end()) if raw_json is None: return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": "STATE_JSON present but no balanced JSON object", } try: data = json.loads(raw_json) except json.JSONDecodeError as e: return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": f"STATE_JSON invalid JSON: {e}", } if not isinstance(data, dict): return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": "STATE_JSON is not an object", } state_after = data.get("state_after") slots_updated = data.get("slots_updated", {}) soft_insertion = bool(data.get("soft_insertion", False)) if not isinstance(state_after, str) or not state_after: return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": "STATE_JSON missing state_after", } if not isinstance(slots_updated, dict): slots_updated = {} return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": { "state_after": state_after, "slots_updated": slots_updated, "soft_insertion": soft_insertion, }, "parse_error": None, } def _consume_json_object(text: str, start: int) -> tuple[str | None, int]: """Вытянуть сбалансированный JSON-объект из text[start:]. См. парсер в chat_service.""" i = start n = len(text) while i < n and text[i].isspace(): i += 1 if i >= n or text[i] != "{": return None, start depth = 0 in_str = False esc = False j = i while j < n: ch = text[j] if in_str: if esc: esc = False elif ch == "\\": esc = True elif ch == '"': in_str = False else: if ch == '"': in_str = True elif ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: return text[i:j + 1], j + 1 j += 1 return None, start def validate_transition( *, current_step: str, requested_step: str, allowed_next: list[str], ) -> tuple[bool, str]: """Разрешён ли переход `current_step → requested_step`. Остаться на месте (`requested_step == current_step`) разрешено всегда. Возвращает (ok, reason). """ if requested_step == current_step: return True, "stay" if requested_step in allowed_next: return True, "ok" return ( False, f"requested {requested_step!r} not in allowed_next {allowed_next!r} of {current_step!r}", ) def check_guards( *, current_step_code: str, requested_step_code: str, slots: dict, guards: dict, ) -> tuple[bool, str | None, list[str], str | None]: """Проверяет guards шага: можно ли перейти с учётом текущих слотов. Guards проверяются только при реальном переходе (requested != current). Возвращает (ok, guard_name, missing_slots, guard_description). Формат guards: { "require_legal_rep": { "description": "...", "trigger_slot": "is_child", "trigger_value": true, "required_slots": ["legal_rep_name", "legal_rep_phone"] } } """ if requested_step_code == current_step_code or not guards: return True, None, [], None for guard_name, guard_def in guards.items(): if not isinstance(guard_def, dict): continue trigger_slot = guard_def.get("trigger_slot") trigger_value = guard_def.get("trigger_value") required_slots: list[str] = guard_def.get("required_slots", []) slot_val = slots.get(trigger_slot) if trigger_slot else None # Нормализация: модель может вернуть "true"/"false" как строки. if isinstance(slot_val, str) and slot_val.lower() in ("true", "false"): slot_val = slot_val.lower() == "true" triggered = (trigger_slot is None) or (slot_val == trigger_value) if not triggered: continue missing = [s for s in required_slots if not slots.get(s)] if missing: desc = guard_def.get("description", "") return False, guard_name, missing, desc return True, None, [], None