Files
RAG_helper/services/state_machine.py
T
AR 15 M4 85c3ec0222 feat(sprint6b-D): soft-insertion counter + message meta_json
- thread_state.soft_insertion_count: растёт при боковом ответе (soft_insertion=true
  в STATE_JSON без смены шага/слотов), сбрасывается при продвижении или handoff
- При soft_insertion_count >= 3 в системный промпт ветки добавляется SOFT_INSERTION_NUDGE
  — явная инструкция вернуть пациента к вопросу текущего шага
- state_machine.parse_branch_response читает флаг soft_insertion из STATE_JSON
- Новая колонка message.meta_json: {router_intent_code, served_intent_code, step_code, events}
  — хранит снимок маршрутизации каждой реплики ассистента
- «Песочница»: бейджи событий (sticky / soft_insertion / hard_handoff / resumed /
  routing_loop / validation_blocked) над каждым ответом ассистента

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 20:24:22 +05:00

155 lines
5.4 KiB
Python

"""Парсер structured-output ветки + валидатор переходов state machine (Спринт 6a).
Формат ответа ветки со state machine:
Текст пациенту, markdown разрешён.
STATE_JSON: {"state_after": "<step_code>", "slots_updated": {"slot": "value"}}
Либо — при exit condition — вместо `STATE_JSON:` строка `[INTENT_CHANGE: <code>]`.
Парсер вырезает служебную часть и возвращает видимый текст + решение модели.
"""
import json
import logging
import re
logger = logging.getLogger(__name__)
_INTENT_CHANGE_RE = re.compile(r"\[INTENT_CHANGE:\s*([a-z_][a-z0-9_]*)\s*\]")
_STATE_JSON_RE = re.compile(r"STATE_JSON\s*:\s*", re.IGNORECASE)
def parse_branch_response(text: str) -> dict:
"""Разобрать ответ ветки на visible_text + intent_change / state_update.
Возвращает:
visible_text: str — текст пациенту (без служебных тегов),
intent_change: str | None — код ветки, если сработал exit condition,
state_update: {'state_after': str, 'slots_updated': dict} | None — при штатном ответе,
parse_error: str | None — если что-то не разобралось, сюда кладётся причина
(visible_text при этом = исходный текст без мусора).
"""
# Exit condition имеет приоритет.
intent_match = _INTENT_CHANGE_RE.search(text)
if intent_match:
visible = text[:intent_match.start()].rstrip()
return {
"visible_text": visible,
"intent_change": intent_match.group(1),
"state_update": None,
"parse_error": None,
}
state_match = _STATE_JSON_RE.search(text)
if not state_match:
# Модель не вернула служебную часть. Возвращаем весь текст и ошибку парсинга.
return {
"visible_text": text.rstrip(),
"intent_change": None,
"state_update": None,
"parse_error": "no STATE_JSON block",
}
raw_json, _ = _consume_json_object(text, state_match.end())
if raw_json is None:
return {
"visible_text": text[:state_match.start()].rstrip(),
"intent_change": None,
"state_update": None,
"parse_error": "STATE_JSON present but no balanced JSON object",
}
try:
data = json.loads(raw_json)
except json.JSONDecodeError as e:
return {
"visible_text": text[:state_match.start()].rstrip(),
"intent_change": None,
"state_update": None,
"parse_error": f"STATE_JSON invalid JSON: {e}",
}
if not isinstance(data, dict):
return {
"visible_text": text[:state_match.start()].rstrip(),
"intent_change": None,
"state_update": None,
"parse_error": "STATE_JSON is not an object",
}
state_after = data.get("state_after")
slots_updated = data.get("slots_updated", {})
soft_insertion = bool(data.get("soft_insertion", False))
if not isinstance(state_after, str) or not state_after:
return {
"visible_text": text[:state_match.start()].rstrip(),
"intent_change": None,
"state_update": None,
"parse_error": "STATE_JSON missing state_after",
}
if not isinstance(slots_updated, dict):
slots_updated = {}
return {
"visible_text": text[:state_match.start()].rstrip(),
"intent_change": None,
"state_update": {
"state_after": state_after,
"slots_updated": slots_updated,
"soft_insertion": soft_insertion,
},
"parse_error": None,
}
def _consume_json_object(text: str, start: int) -> tuple[str | None, int]:
"""Вытянуть сбалансированный JSON-объект из text[start:]. См. парсер в chat_service."""
i = start
n = len(text)
while i < n and text[i].isspace():
i += 1
if i >= n or text[i] != "{":
return None, start
depth = 0
in_str = False
esc = False
j = i
while j < n:
ch = text[j]
if in_str:
if esc:
esc = False
elif ch == "\\":
esc = True
elif ch == '"':
in_str = False
else:
if ch == '"':
in_str = True
elif ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return text[i:j + 1], j + 1
j += 1
return None, start
def validate_transition(
*, current_step: str, requested_step: str, allowed_next: list[str],
) -> tuple[bool, str]:
"""Разрешён ли переход `current_step → requested_step`.
Остаться на месте (`requested_step == current_step`) разрешено всегда.
Возвращает (ok, reason).
"""
if requested_step == current_step:
return True, "stay"
if requested_step in allowed_next:
return True, "ok"
return (
False,
f"requested {requested_step!r} not in allowed_next {allowed_next!r} of {current_step!r}",
)