"""Парсер structured-output ветки + валидатор переходов state machine (Спринт 6a). Формат ответа ветки со state machine: Текст пациенту, markdown разрешён. STATE_JSON: {"state_after": "", "slots_updated": {"slot": "value"}} Либо — при exit condition — вместо `STATE_JSON:` строка `[INTENT_CHANGE: ]`. Парсер вырезает служебную часть и возвращает видимый текст + решение модели. """ import json import logging import re logger = logging.getLogger(__name__) _INTENT_CHANGE_RE = re.compile(r"\[INTENT_CHANGE:\s*([a-z_][a-z0-9_]*)\s*\]") _STATE_JSON_RE = re.compile(r"STATE_JSON\s*:\s*", re.IGNORECASE) def parse_branch_response(text: str) -> dict: """Разобрать ответ ветки на visible_text + intent_change / state_update. Возвращает: visible_text: str — текст пациенту (без служебных тегов), intent_change: str | None — код ветки, если сработал exit condition, state_update: {'state_after': str, 'slots_updated': dict} | None — при штатном ответе, parse_error: str | None — если что-то не разобралось, сюда кладётся причина (visible_text при этом = исходный текст без мусора). """ # Exit condition имеет приоритет. intent_match = _INTENT_CHANGE_RE.search(text) if intent_match: visible = text[:intent_match.start()].rstrip() return { "visible_text": visible, "intent_change": intent_match.group(1), "state_update": None, "parse_error": None, } state_match = _STATE_JSON_RE.search(text) if not state_match: # Модель не вернула служебную часть. Возвращаем весь текст и ошибку парсинга. return { "visible_text": text.rstrip(), "intent_change": None, "state_update": None, "parse_error": "no STATE_JSON block", } raw_json, _ = _consume_json_object(text, state_match.end()) if raw_json is None: return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": "STATE_JSON present but no balanced JSON object", } try: data = json.loads(raw_json) except json.JSONDecodeError as e: return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": f"STATE_JSON invalid JSON: {e}", } if not isinstance(data, dict): return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": "STATE_JSON is not an object", } state_after = data.get("state_after") slots_updated = data.get("slots_updated", {}) if not isinstance(state_after, str) or not state_after: return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": None, "parse_error": "STATE_JSON missing state_after", } if not isinstance(slots_updated, dict): slots_updated = {} return { "visible_text": text[:state_match.start()].rstrip(), "intent_change": None, "state_update": {"state_after": state_after, "slots_updated": slots_updated}, "parse_error": None, } def _consume_json_object(text: str, start: int) -> tuple[str | None, int]: """Вытянуть сбалансированный JSON-объект из text[start:]. См. парсер в chat_service.""" i = start n = len(text) while i < n and text[i].isspace(): i += 1 if i >= n or text[i] != "{": return None, start depth = 0 in_str = False esc = False j = i while j < n: ch = text[j] if in_str: if esc: esc = False elif ch == "\\": esc = True elif ch == '"': in_str = False else: if ch == '"': in_str = True elif ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: return text[i:j + 1], j + 1 j += 1 return None, start def validate_transition( *, current_step: str, requested_step: str, allowed_next: list[str], ) -> tuple[bool, str]: """Разрешён ли переход `current_step → requested_step`. Остаться на месте (`requested_step == current_step`) разрешено всегда. Возвращает (ok, reason). """ if requested_step == current_step: return True, "stay" if requested_step in allowed_next: return True, "ok" return ( False, f"requested {requested_step!r} not in allowed_next {allowed_next!r} of {current_step!r}", )