"""Регрессия роутера через UI (Спринт 8a). Один прогон = одна запись в `eval_runs`. Активная версия `_router` фиксируется в `router_config_id`, чтобы можно было сравнивать прогоны между версиями. Сами кейсы живут в JSONL (`eval/router_cases_*.jsonl`); здесь только их прогон, кэш LLM-ответов и расхождения. Поток: 1. `start_router_run(min_count)` — создаёт `EvalRun(status=running)`, фиксирует активную версию роутера, запускает фоновую корутину `_run_router_suite`. 2. `_run_router_suite` — читает кейсы по `min_count`, для каждого: - lookup в `eval_router_predictions` → если есть, cache_hit++, - иначе вызывает `RouterClient.classify(history=[], snapshot=None)` и пишет в кэш, - если `predicted != expected` — пишет в `eval_run_cases`. В конце выставляет `status=done`, `finished_at`. 3. На любой ошибке — `status=error`, `error_text`. Кэш ключ: sha256(text) + router_config_id. Текст хранится как есть в `eval_run_cases` для детального отчёта в UI. """ import asyncio import hashlib import json import logging from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from db.models import EvalRouterPrediction, EvalRun, EvalRunCase from db.session import SessionLocal from services import config_service, intent_service from services.router_client import RouterClient logger = logging.getLogger(__name__) ROUTER_CASES_FILES = ("router_cases_booking.jsonl", "router_cases_other.jsonl") EVAL_DIR = Path(__file__).resolve().parent.parent / "eval" @dataclass class _Case: text: str expected_intent: str count: int def _text_hash(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def load_all_router_cases() -> list[_Case]: """Все кейсы из JSONL без фильтрации, отсортированы по count desc, затем text. Сортировка стабильна — это важно для индексов в UI («диапазон 1-100»). """ cases: list[_Case] = [] for fname in ROUTER_CASES_FILES: path = EVAL_DIR / fname if not path.exists(): logger.warning("Router cases file not found: %s", path) continue with path.open(encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: logger.warning("Bad JSONL line in %s: %r", fname, line[:120]) continue cases.append(_Case( text=str(obj["text"]), expected_intent=str(obj["expected_intent"]), count=int(obj.get("count", 1)), )) cases.sort(key=lambda c: (-c.count, c.text)) return cases def filter_cases_by_hashes(cases: list[_Case], text_hashes: list[str]) -> list[_Case]: wanted = set(text_hashes) return [c for c in cases if _text_hash(c.text) in wanted] async def cached_predictions( session: AsyncSession, router_config_id: int | None ) -> dict[str, str]: """{ text_hash → predicted_intent } для активной версии роутера.""" rows = (await session.execute( select(EvalRouterPrediction.text_hash, EvalRouterPrediction.predicted_intent) .where(EvalRouterPrediction.router_config_id == router_config_id) )).all() return {th: pi for th, pi in rows} async def _resolve_active_router_config_id(session: AsyncSession) -> int | None: pair = await config_service.get_active_config_by_intent_code( session, intent_service.ROUTER_INTENT_CODE ) if pair is None: return None _, cfg = pair return cfg.id async def start_router_run( session: AsyncSession, text_hashes: list[str] ) -> EvalRun: """Создаёт run в status=running и запускает фоновую корутину прогона. `text_hashes` — выбранные оператором кейсы (см. UI: диапазон / чекбоксы). Пустой список → ValueError (бессмысленный прогон, ловим раньше валидацией). `min_count` оставлен в схеме для обратной совместимости — пишем 0. """ if not text_hashes: raise ValueError("text_hashes is empty") router_config_id = await _resolve_active_router_config_id(session) all_cases = load_all_router_cases() cases = filter_cases_by_hashes(all_cases, text_hashes) run = EvalRun( suite="router", router_config_id=router_config_id, min_count=0, status="running", total=len(cases), ) session.add(run) await session.commit() await session.refresh(run) asyncio.create_task(_run_router_suite(run.id, router_config_id, cases)) return run async def _run_router_suite( run_id: int, router_config_id: int | None, cases: list[_Case] ) -> None: """Фоновый прогон: своя сессия, никаких объектов от вызывающего.""" router = RouterClient() passed = failed = cache_hits = 0 try: async with SessionLocal() as session: run = await session.get(EvalRun, run_id) if run is None: logger.error("eval_run %d disappeared before start", run_id) return for case in cases: predicted, was_cached = await _classify_with_cache( session, router, case.text, router_config_id ) if was_cached: cache_hits += 1 is_pass = predicted == case.expected_intent if is_pass: passed += 1 else: failed += 1 session.add(EvalRunCase( run_id=run_id, text=case.text, expected_intent=case.expected_intent, predicted_intent=predicted, count_weight=case.count, is_pass=is_pass, )) # Промежуточный commit раз в 50 кейсов — чтобы UI видел прогресс. if (passed + failed) % 50 == 0: run.passed = passed run.failed = failed run.cache_hits = cache_hits await session.commit() run.passed = passed run.failed = failed run.cache_hits = cache_hits run.status = "done" run.finished_at = datetime.now(timezone.utc) await session.commit() logger.info( "eval_run %d done: total=%d passed=%d failed=%d cache_hits=%d", run_id, len(cases), passed, failed, cache_hits, ) except Exception as e: logger.exception("eval_run %d failed: %s", run_id, e) try: async with SessionLocal() as session: run = await session.get(EvalRun, run_id) if run is not None: run.status = "error" run.error_text = f"{type(e).__name__}: {e}" run.finished_at = datetime.now(timezone.utc) await session.commit() except Exception: logger.exception("Failed to mark eval_run %d as error", run_id) async def _classify_with_cache( session: AsyncSession, router: RouterClient, text: str, router_config_id: int | None, ) -> tuple[str, bool]: """Возвращает (predicted_intent, was_cached). Кэшируется по (sha256(text), router_config_id).""" text_hash = _text_hash(text) cached = (await session.execute( select(EvalRouterPrediction).where( EvalRouterPrediction.text_hash == text_hash, EvalRouterPrediction.router_config_id == router_config_id, ) )).scalar_one_or_none() if cached is not None: return cached.predicted_intent, True result = await router.classify(session, history=[], text=text, snapshot=None) predicted = result.get("code") or "general_info" session.add(EvalRouterPrediction( text_hash=text_hash, router_config_id=router_config_id, predicted_intent=predicted, )) return predicted, False async def list_runs(session: AsyncSession, limit: int = 50) -> list[EvalRun]: return list((await session.execute( select(EvalRun).order_by(EvalRun.id.desc()).limit(limit) )).scalars().all()) async def get_run(session: AsyncSession, run_id: int) -> EvalRun | None: return await session.get(EvalRun, run_id) async def list_run_cases( session: AsyncSession, run_id: int, *, only_fails: bool = False ) -> list[EvalRunCase]: stmt = select(EvalRunCase).where(EvalRunCase.run_id == run_id) if only_fails: stmt = stmt.where(EvalRunCase.is_pass.is_(False)) stmt = stmt.order_by( EvalRunCase.is_pass, # сначала false (failed), затем true (passed) EvalRunCase.count_weight.desc(), EvalRunCase.id, ) return list((await session.execute(stmt)).scalars().all()) async def list_run_fails(session: AsyncSession, run_id: int) -> list[EvalRunCase]: return await list_run_cases(session, run_id, only_fails=True) @dataclass class RunDiff: """Разница с предыдущим завершённым прогоном того же router_config (если есть).""" prev_run_id: int | None new_fails: list[EvalRunCase] # появились в этом прогоне, не было в предыдущем new_passes: list[EvalRunCase] # были fail в предыдущем, теперь pass — берём из prev async def compute_diff_vs_previous( session: AsyncSession, run: EvalRun ) -> RunDiff: """Сравнение с предыдущим done-прогоном на той же версии роутера.""" if run.router_config_id is None or run.status != "done": return RunDiff(prev_run_id=None, new_fails=[], new_passes=[]) prev = (await session.execute( select(EvalRun) .where( EvalRun.router_config_id == run.router_config_id, EvalRun.status == "done", EvalRun.id < run.id, ) .order_by(EvalRun.id.desc()) .limit(1) )).scalar_one_or_none() if prev is None: return RunDiff(prev_run_id=None, new_fails=[], new_passes=[]) cur_fails = await list_run_fails(session, run.id) prev_fails = await list_run_fails(session, prev.id) cur_keys = {(c.text, c.expected_intent) for c in cur_fails} prev_keys = {(c.text, c.expected_intent) for c in prev_fails} new_fails = [c for c in cur_fails if (c.text, c.expected_intent) not in prev_keys] new_passes = [c for c in prev_fails if (c.text, c.expected_intent) not in cur_keys] return RunDiff(prev_run_id=prev.id, new_fails=new_fails, new_passes=new_passes)