Files
RAG_helper/services/eval_run_service.py
AR 15 M4 a8f7e68795 feat(sprint8a): регрессия роутера в UI с выбором кейсов и кэшем
Оператор-настройщик после правки промпта _router нажимает «Прогнать выбранное»
на странице «Регрессия» и видит, что сломалось. Не CLI, не в обход
интерфейса — встроено в верхнюю навигацию рядом с Настройками.

Backend:
- Таблицы eval_runs / eval_run_cases (с is_pass) / eval_router_predictions
  (кэш text_hash + router_config_id → predicted_intent). Миграции
  k7e9d5c67h34 и l8f0e6d78i45.
- services/eval_run_service.py: start_router_run(text_hashes) запускает
  фоновую корутину через asyncio.create_task, фиксирует активную версию
  _router. Кэш привязан к версии: повторный прогон на той же версии —
  мгновенный, на новой — пересчитывается. compute_diff_vs_previous
  сравнивает с предыдущим прогоном на той же версии (новые fail / pass).
- API: POST /eval/runs (фон, body text_hashes), GET /eval/runs,
  GET /eval/runs/{id}, GET /eval/router-cases-with-status (все 1573 кейса
  + кэш на активной версии).

Frontend (static/regression.html — новая страница, ссылка добавлена в
шапки index/sandbox/settings/docs):
- Сворачиваемый блок «Выбор кейсов»: фильтр по intent, ввод диапазона
  (1-50, 200-300), кнопки «Все видимые», «Снять все», «Только без кэша»,
  «Только FAIL в кэше», «Снять кэшированные». Чекбокс в шапке.
- Таблица 1573 кейсов отсортирована по count desc: #, чекбокс, запрос,
  intent, частота, кэш (PASS / FAIL → predicted / —). Цветной фон строки
  по статусу кэша.
- Счётчик «выбрано N (новых: X, в кэше: Y)»; кнопка
  «Прогнать выбранное (X новых + Y из кэша)» — сразу видно реальный
  объём LLM-работы.
- Polling /eval/runs/{id} раз в 2 секунды, прогресс-бар, drill-down:
  все кейсы прогона + фильтр pass/fail + поиск + diff vs предыдущий
  (новые fail / новые pass).

docs/SPRINTS.md: Спринт 8 разбит на 8a ( закрыт), 8b (регрессия ответов
веток, ждёт базу кейсов от пользователя), 8c (handoff/resumable/loop/
guard/rag — позже).

docs/BACKLOG.md: новый файл для идей на потом. Записаны: просмотр
архивного графа без активации (из 7.7), варианты C (LLM-judge) и D
(эталон + embeddings) для регрессии веток в 8b.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 20:39:22 +05:00

288 lines
11 KiB
Python

"""Регрессия роутера через UI (Спринт 8a).
Один прогон = одна запись в `eval_runs`. Активная версия `_router` фиксируется в
`router_config_id`, чтобы можно было сравнивать прогоны между версиями. Сами кейсы
живут в JSONL (`eval/router_cases_*.jsonl`); здесь только их прогон, кэш LLM-ответов
и расхождения.
Поток:
1. `start_router_run(min_count)` — создаёт `EvalRun(status=running)`, фиксирует
активную версию роутера, запускает фоновую корутину `_run_router_suite`.
2. `_run_router_suite` — читает кейсы по `min_count`, для каждого:
- lookup в `eval_router_predictions` → если есть, cache_hit++,
- иначе вызывает `RouterClient.classify(history=[], snapshot=None)` и пишет в кэш,
- если `predicted != expected` — пишет в `eval_run_cases`.
В конце выставляет `status=done`, `finished_at`.
3. На любой ошибке — `status=error`, `error_text`.
Кэш ключ: sha256(text) + router_config_id. Текст хранится как есть в `eval_run_cases`
для детального отчёта в UI.
"""
import asyncio
import hashlib
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from db.models import EvalRouterPrediction, EvalRun, EvalRunCase
from db.session import SessionLocal
from services import config_service, intent_service
from services.router_client import RouterClient
logger = logging.getLogger(__name__)
ROUTER_CASES_FILES = ("router_cases_booking.jsonl", "router_cases_other.jsonl")
EVAL_DIR = Path(__file__).resolve().parent.parent / "eval"
@dataclass
class _Case:
text: str
expected_intent: str
count: int
def _text_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def load_all_router_cases() -> list[_Case]:
"""Все кейсы из JSONL без фильтрации, отсортированы по count desc, затем text.
Сортировка стабильна — это важно для индексов в UI («диапазон 1-100»).
"""
cases: list[_Case] = []
for fname in ROUTER_CASES_FILES:
path = EVAL_DIR / fname
if not path.exists():
logger.warning("Router cases file not found: %s", path)
continue
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
logger.warning("Bad JSONL line in %s: %r", fname, line[:120])
continue
cases.append(_Case(
text=str(obj["text"]),
expected_intent=str(obj["expected_intent"]),
count=int(obj.get("count", 1)),
))
cases.sort(key=lambda c: (-c.count, c.text))
return cases
def filter_cases_by_hashes(cases: list[_Case], text_hashes: list[str]) -> list[_Case]:
wanted = set(text_hashes)
return [c for c in cases if _text_hash(c.text) in wanted]
async def cached_predictions(
session: AsyncSession, router_config_id: int | None
) -> dict[str, str]:
"""{ text_hash → predicted_intent } для активной версии роутера."""
rows = (await session.execute(
select(EvalRouterPrediction.text_hash, EvalRouterPrediction.predicted_intent)
.where(EvalRouterPrediction.router_config_id == router_config_id)
)).all()
return {th: pi for th, pi in rows}
async def _resolve_active_router_config_id(session: AsyncSession) -> int | None:
pair = await config_service.get_active_config_by_intent_code(
session, intent_service.ROUTER_INTENT_CODE
)
if pair is None:
return None
_, cfg = pair
return cfg.id
async def start_router_run(
session: AsyncSession, text_hashes: list[str]
) -> EvalRun:
"""Создаёт run в status=running и запускает фоновую корутину прогона.
`text_hashes` — выбранные оператором кейсы (см. UI: диапазон / чекбоксы).
Пустой список → ValueError (бессмысленный прогон, ловим раньше валидацией).
`min_count` оставлен в схеме для обратной совместимости — пишем 0.
"""
if not text_hashes:
raise ValueError("text_hashes is empty")
router_config_id = await _resolve_active_router_config_id(session)
all_cases = load_all_router_cases()
cases = filter_cases_by_hashes(all_cases, text_hashes)
run = EvalRun(
suite="router",
router_config_id=router_config_id,
min_count=0,
status="running",
total=len(cases),
)
session.add(run)
await session.commit()
await session.refresh(run)
asyncio.create_task(_run_router_suite(run.id, router_config_id, cases))
return run
async def _run_router_suite(
run_id: int, router_config_id: int | None, cases: list[_Case]
) -> None:
"""Фоновый прогон: своя сессия, никаких объектов от вызывающего."""
router = RouterClient()
passed = failed = cache_hits = 0
try:
async with SessionLocal() as session:
run = await session.get(EvalRun, run_id)
if run is None:
logger.error("eval_run %d disappeared before start", run_id)
return
for case in cases:
predicted, was_cached = await _classify_with_cache(
session, router, case.text, router_config_id
)
if was_cached:
cache_hits += 1
is_pass = predicted == case.expected_intent
if is_pass:
passed += 1
else:
failed += 1
session.add(EvalRunCase(
run_id=run_id,
text=case.text,
expected_intent=case.expected_intent,
predicted_intent=predicted,
count_weight=case.count,
is_pass=is_pass,
))
# Промежуточный commit раз в 50 кейсов — чтобы UI видел прогресс.
if (passed + failed) % 50 == 0:
run.passed = passed
run.failed = failed
run.cache_hits = cache_hits
await session.commit()
run.passed = passed
run.failed = failed
run.cache_hits = cache_hits
run.status = "done"
run.finished_at = datetime.now(timezone.utc)
await session.commit()
logger.info(
"eval_run %d done: total=%d passed=%d failed=%d cache_hits=%d",
run_id, len(cases), passed, failed, cache_hits,
)
except Exception as e:
logger.exception("eval_run %d failed: %s", run_id, e)
try:
async with SessionLocal() as session:
run = await session.get(EvalRun, run_id)
if run is not None:
run.status = "error"
run.error_text = f"{type(e).__name__}: {e}"
run.finished_at = datetime.now(timezone.utc)
await session.commit()
except Exception:
logger.exception("Failed to mark eval_run %d as error", run_id)
async def _classify_with_cache(
session: AsyncSession,
router: RouterClient,
text: str,
router_config_id: int | None,
) -> tuple[str, bool]:
"""Возвращает (predicted_intent, was_cached). Кэшируется по (sha256(text), router_config_id)."""
text_hash = _text_hash(text)
cached = (await session.execute(
select(EvalRouterPrediction).where(
EvalRouterPrediction.text_hash == text_hash,
EvalRouterPrediction.router_config_id == router_config_id,
)
)).scalar_one_or_none()
if cached is not None:
return cached.predicted_intent, True
result = await router.classify(session, history=[], text=text, snapshot=None)
predicted = result.get("code") or "general_info"
session.add(EvalRouterPrediction(
text_hash=text_hash,
router_config_id=router_config_id,
predicted_intent=predicted,
))
return predicted, False
async def list_runs(session: AsyncSession, limit: int = 50) -> list[EvalRun]:
return list((await session.execute(
select(EvalRun).order_by(EvalRun.id.desc()).limit(limit)
)).scalars().all())
async def get_run(session: AsyncSession, run_id: int) -> EvalRun | None:
return await session.get(EvalRun, run_id)
async def list_run_cases(
session: AsyncSession, run_id: int, *, only_fails: bool = False
) -> list[EvalRunCase]:
stmt = select(EvalRunCase).where(EvalRunCase.run_id == run_id)
if only_fails:
stmt = stmt.where(EvalRunCase.is_pass.is_(False))
stmt = stmt.order_by(
EvalRunCase.is_pass, # сначала false (failed), затем true (passed)
EvalRunCase.count_weight.desc(),
EvalRunCase.id,
)
return list((await session.execute(stmt)).scalars().all())
async def list_run_fails(session: AsyncSession, run_id: int) -> list[EvalRunCase]:
return await list_run_cases(session, run_id, only_fails=True)
@dataclass
class RunDiff:
"""Разница с предыдущим завершённым прогоном того же router_config (если есть)."""
prev_run_id: int | None
new_fails: list[EvalRunCase] # появились в этом прогоне, не было в предыдущем
new_passes: list[EvalRunCase] # были fail в предыдущем, теперь pass — берём из prev
async def compute_diff_vs_previous(
session: AsyncSession, run: EvalRun
) -> RunDiff:
"""Сравнение с предыдущим done-прогоном на той же версии роутера."""
if run.router_config_id is None or run.status != "done":
return RunDiff(prev_run_id=None, new_fails=[], new_passes=[])
prev = (await session.execute(
select(EvalRun)
.where(
EvalRun.router_config_id == run.router_config_id,
EvalRun.status == "done",
EvalRun.id < run.id,
)
.order_by(EvalRun.id.desc())
.limit(1)
)).scalar_one_or_none()
if prev is None:
return RunDiff(prev_run_id=None, new_fails=[], new_passes=[])
cur_fails = await list_run_fails(session, run.id)
prev_fails = await list_run_fails(session, prev.id)
cur_keys = {(c.text, c.expected_intent) for c in cur_fails}
prev_keys = {(c.text, c.expected_intent) for c in prev_fails}
new_fails = [c for c in cur_fails if (c.text, c.expected_intent) not in prev_keys]
new_passes = [c for c in prev_fails if (c.text, c.expected_intent) not in cur_keys]
return RunDiff(prev_run_id=prev.id, new_fails=new_fails, new_passes=new_passes)