"""Сервис прохождения теста.""" from __future__ import annotations import uuid as _uuid from datetime import datetime, timezone from sqlalchemy import func from sqlalchemy.orm import Session, selectinload from ..db import get_session from ..models import ( AnswerOption, Question, Test, TestAttempt, TestVersion, User, UserAnswer, ) from .test_access import is_test_author, is_test_edit_open, user_has_test_access class HttpError(Exception): def __init__(self, status: int, message: str): super().__init__(message) self.status = status self.message = message def _sort_uuid_strings(items) -> list[str]: return sorted({str(x) for x in (items or []) if x is not None}) def _same_selection(selected, correct_ids) -> bool: return _sort_uuid_strings(selected) == _sort_uuid_strings(correct_ids) def _to_uuid(val) -> _uuid.UUID | None: if isinstance(val, _uuid.UUID): return val try: return _uuid.UUID(str(val)) except (ValueError, AttributeError): return None # ─── load questions (shared) ───────────────────────────────────────────────── def load_questions_for_version( session: Session, test_version_id, *, include_correct: bool, include_hints: bool = False, ) -> list[dict]: vid = _to_uuid(test_version_id) if vid is None: return [] questions = ( session.query(Question) .options(selectinload(Question.options)) .filter(Question.test_version_id == vid) .order_by(Question.question_order) .all() ) out = [] for q in questions: options = [] for o in sorted(q.options, key=lambda x: x.option_order): base = {'id': str(o.id), 'text': o.text, 'optionOrder': o.option_order} if include_correct: base['isCorrect'] = bool(o.is_correct) options.append(base) item = { 'id': str(q.id), 'text': q.text, 'questionOrder': q.question_order, 'hasMultipleAnswers': bool(q.has_multiple_answers), 'options': options, } if include_hints: hint = (q.ai_hint or '').strip() item['aiHint'] = hint or None out.append(item) return out # ─── start ─────────────────────────────────────────────────────────────────── def start_attempt(session_or_eng, user_id: str, test_id: str) -> dict: """Принимает engine (legacy) или session — для обратной совместимости.""" acc = user_has_test_access(user_id, test_id) if not acc.ok: raise HttpError(404, 'Тест не найден.') session = get_session() tid = _to_uuid(test_id) uid = _to_uuid(user_id) active_version = ( session.query(TestVersion) .filter(TestVersion.test_id == tid, TestVersion.is_active.is_(True)) .first() ) if not active_version: raise HttpError(404, 'Нет активной версии теста.') max_n = ( session.query(func.coalesce(func.max(TestAttempt.attempt_number), 0)) .filter( TestAttempt.test_version_id == active_version.id, TestAttempt.user_id == uid, ) .scalar() or 0 ) attempt = TestAttempt( test_version_id=active_version.id, user_id=uid, attempt_number=int(max_n) + 1, status='in_progress', ) session.add(attempt) session.commit() session.refresh(attempt) return { 'attempt': { 'id': str(attempt.id), 'test_version_id': str(attempt.test_version_id), 'user_id': str(attempt.user_id), 'attempt_number': attempt.attempt_number, 'status': attempt.status, 'started_at': attempt.started_at.isoformat() if attempt.started_at else None, } } # ─── play ──────────────────────────────────────────────────────────────────── def get_play_content(session_or_eng, user_id: str, test_id: str, attempt_id: str) -> dict: session = get_session() aid = _to_uuid(attempt_id) uid = _to_uuid(user_id) tid = _to_uuid(test_id) attempt = ( session.query(TestAttempt) .options( selectinload(TestAttempt.test_version).selectinload(TestVersion.test) ) .filter(TestAttempt.id == aid) .first() ) if not attempt: raise HttpError(404, 'Попытка не найдена.') if attempt.test_version.test_id != tid: raise HttpError(404, 'Попытка не найдена.') if attempt.user_id != uid: raise HttpError(403, 'Доступ запрещён.') if attempt.status != 'in_progress': raise HttpError(400, 'Попытка уже завершена.') test = attempt.test_version.test qs = load_questions_for_version(session, attempt.test_version_id, include_correct=False) return { 'testTitle': test.title, 'passingThreshold': test.passing_threshold, 'timeLimit': test.time_limit, 'hintsEnabled': bool(test.hints_enabled), 'resultMode': test.result_mode or 'end', 'attemptId': str(attempt.id), 'questions': qs, } # ─── submit ────────────────────────────────────────────────────────────────── def submit_attempt(session_or_eng, user_id: str, test_id: str, attempt_id: str, raw_answers: dict | None) -> dict: answers = raw_answers if isinstance(raw_answers, dict) else {} session = get_session() aid = _to_uuid(attempt_id) uid = _to_uuid(user_id) tid = _to_uuid(test_id) attempt = ( session.query(TestAttempt) .options(selectinload(TestAttempt.test_version).selectinload(TestVersion.test)) .filter(TestAttempt.id == aid) .with_for_update() .first() ) if not attempt: raise HttpError(404, 'Попытка не найдена.') if attempt.test_version.test_id != tid: raise HttpError(404, 'Попытка не найдена.') if attempt.user_id != uid: raise HttpError(403, 'Доступ запрещён.') if attempt.status != 'in_progress': raise HttpError(400, 'Попытка уже завершена.') test = attempt.test_version.test questions = ( session.query(Question) .options(selectinload(Question.options)) .filter(Question.test_version_id == attempt.test_version_id) .all() ) if not questions: raise HttpError(400, 'В тесте нет вопросов.') by_q: dict[str, dict] = {} for q in questions: qid = str(q.id) by_q[qid] = {'all': {str(o.id) for o in q.options}, 'correct': [str(o.id) for o in q.options if o.is_correct]} correct_count = 0 for q in questions: qid = str(q.id) selected = answers.get(qid, []) if not isinstance(selected, list): selected = [str(selected)] selected = [str(x) for x in selected] g = by_q[qid] for sid in selected: if sid not in g['all']: raise HttpError(400, 'Некорректный вариант ответа.') if _same_selection(selected, g['correct']): correct_count += 1 total = len(questions) percent = (correct_count / total) * 100 if total else 0 threshold = int(test.passing_threshold or 0) passed = percent + 1e-9 >= threshold # удаляем старые ответы и записываем новые session.query(UserAnswer).filter(UserAnswer.attempt_id == aid).delete(synchronize_session='fetch') for q in questions: qid = str(q.id) selected = answers.get(qid, []) if not isinstance(selected, list): selected = [str(selected)] selected_uuids = [_to_uuid(x) for x in selected if _to_uuid(x) is not None] session.add(UserAnswer( attempt_id=aid, question_id=q.id, selected_options=selected_uuids, )) attempt.status = 'completed' attempt.completed_at = datetime.now(timezone.utc) attempt.correct_count = correct_count attempt.total_questions = total attempt.passed = passed session.commit() review = build_review_from_db(session, attempt_id) return { 'attemptId': attempt_id, 'correctCount': correct_count, 'totalQuestions': total, 'percent': round(percent, 1), 'passed': passed, 'passingThreshold': threshold, 'review': review, } # ─── review ────────────────────────────────────────────────────────────────── def build_review_from_db(session: Session, attempt_id: str) -> dict: aid = _to_uuid(attempt_id) attempt = ( session.query(TestAttempt) .options( selectinload(TestAttempt.test_version).selectinload(TestVersion.test), selectinload(TestAttempt.user), selectinload(TestAttempt.user_answers), ) .filter(TestAttempt.id == aid) .first() ) if not attempt: raise HttpError(404, 'Попытка не найдена.') if attempt.status != 'completed': raise HttpError(400, 'Попытка не завершена.') test = attempt.test_version.test questions = load_questions_for_version( session, attempt.test_version_id, include_correct=True, include_hints=True ) sel_by_q: dict[str, list[str]] = { str(ua.question_id): [str(x) for x in (ua.selected_options or [])] for ua in attempt.user_answers } total = int(attempt.total_questions or len(questions)) percent = round(((attempt.correct_count or 0) / total) * 100, 1) if total else 0 q_out = [] for q in questions: selected = _sort_uuid_strings(sel_by_q.get(q['id'], [])) correct = _sort_uuid_strings([o['id'] for o in q['options'] if o.get('isCorrect')]) selected_set = set(selected) q_out.append({ 'id': q['id'], 'text': q['text'], 'hasMultipleAnswers': q['hasMultipleAnswers'], 'aiHint': q.get('aiHint'), 'isUserCorrect': _same_selection(selected, correct), 'options': [ { 'id': o['id'], 'text': o['text'], 'isCorrect': o.get('isCorrect', False), 'selected': o['id'] in selected_set, } for o in q['options'] ], }) return { 'attemptId': str(attempt.id), 'testId': str(test.id), 'testTitle': test.title, 'passingThreshold': int(test.passing_threshold or 0), 'timeLimit': test.time_limit, 'resultMode': test.result_mode or 'end', 'hintsEnabled': bool(test.hints_enabled), 'correctCount': int(attempt.correct_count or 0), 'totalQuestions': total, 'percent': percent, 'passed': bool(attempt.passed), 'startedAt': attempt.started_at.isoformat() if attempt.started_at else None, 'completedAt': attempt.completed_at.isoformat() if attempt.completed_at else None, 'attempterUserId': str(attempt.user_id), 'attempterName': attempt.user.full_name, 'attempterLogin': attempt.user.login, 'questions': q_out, } def get_attempt_review_for_user(session_or_eng, current_user_id: str, test_id: str, attempt_id: str) -> dict: session = get_session() aid = _to_uuid(attempt_id) tid = _to_uuid(test_id) attempt = ( session.query(TestAttempt) .options(selectinload(TestAttempt.test_version).selectinload(TestVersion.test)) .filter(TestAttempt.id == aid) .first() ) if not attempt: raise HttpError(404, 'Попытка не найдена.') if attempt.test_version.test_id != tid: raise HttpError(404, 'Попытка не найдена.') is_owner = str(attempt.user_id) == str(current_user_id) is_author = is_test_author(attempt.test_version.test.created_by, current_user_id) if not is_owner and not is_author and not is_test_edit_open(): raise HttpError(403, 'Доступ запрещён.') return build_review_from_db(session, attempt_id) # ─── hints ─────────────────────────────────────────────────────────────────── def count_missing_hints(session_or_eng, test_id: str) -> dict: session = get_session() tid = _to_uuid(test_id) if tid is None: return {'total': 0, 'missing': 0} active_version = ( session.query(TestVersion) .filter(TestVersion.test_id == tid, TestVersion.is_active.is_(True)) .first() ) if not active_version: return {'total': 0, 'missing': 0} all_qs = session.query(Question).options(selectinload(Question.options)).filter( Question.test_version_id == active_version.id ).all() filled_qs = [] for q in all_qs: if not (q.text or '').strip(): continue if len([o for o in q.options if (o.text or '').strip()]) < 2: continue filled_qs.append(q) total = len(filled_qs) missing = sum(1 for q in filled_qs if not q.ai_hint) return {'total': total, 'missing': missing} def generate_missing_hints_for_test(session_or_eng, author_id: str, test_id: str) -> dict: from .ai_editor import generate_question_hint session = get_session() tid = _to_uuid(test_id) test = session.get(Test, tid) if not test: raise HttpError(404, 'Тест не найден.') if not is_test_author(test.created_by, author_id) and not is_test_edit_open(): raise HttpError(403, 'Доступ запрещён.') active_version = ( session.query(TestVersion) .filter(TestVersion.test_id == tid, TestVersion.is_active.is_(True)) .first() ) if not active_version: return {'generated': 0, 'failed': 0, 'total': 0} missing_qs = ( session.query(Question) .options(selectinload(Question.options)) .filter( Question.test_version_id == active_version.id, (Question.ai_hint == None) | (Question.ai_hint == ''), # noqa: E711 ) .order_by(Question.question_order) .all() ) generated = failed = skipped = 0 for q in missing_qs: # Подсказку строим только по заполненному вопросу (есть текст и >=2 непустых варианта). if not (q.text or '').strip(): skipped += 1 continue valid_opts = [o for o in q.options if (o.text or '').strip()] if len(valid_opts) < 2: skipped += 1 continue opt_payload = [{'text': o.text, 'isCorrect': bool(o.is_correct)} for o in q.options] hint = generate_question_hint(question_text=q.text, options=opt_payload) if hint: q.ai_hint = hint generated += 1 else: failed += 1 session.commit() return {'generated': generated, 'failed': failed, 'skipped': skipped, 'total': len(missing_qs)} def generate_next_missing_hint_for_test(session_or_eng, author_id: str, test_id: str) -> dict: """Генерирует одну недостающую подсказку (для отображения прогресса в UI).""" from .ai_editor import generate_question_hint session = get_session() tid = _to_uuid(test_id) test = session.get(Test, tid) if not test: raise HttpError(404, 'Тест не найден.') if not is_test_author(test.created_by, author_id) and not is_test_edit_open(): raise HttpError(403, 'Доступ запрещён.') active_version = ( session.query(TestVersion) .filter(TestVersion.test_id == tid, TestVersion.is_active.is_(True)) .first() ) if not active_version: return {'generated': 0, 'remaining': 0, 'done': True, 'totalMissing': 0} before = count_missing_hints(None, test_id).get('missing') or 0 missing_qs = ( session.query(Question) .options(selectinload(Question.options)) .filter( Question.test_version_id == active_version.id, (Question.ai_hint == None) | (Question.ai_hint == ''), # noqa: E711 ) .order_by(Question.question_order) .all() ) for q in missing_qs: if not (q.text or '').strip(): continue valid_opts = [o for o in q.options if (o.text or '').strip()] if len(valid_opts) < 2: continue opt_payload = [{'text': o.text, 'isCorrect': bool(o.is_correct)} for o in q.options] hint = generate_question_hint(question_text=q.text, options=opt_payload) if hint: q.ai_hint = hint session.commit() after = count_missing_hints(None, test_id).get('missing') or 0 return { 'generated': 1, 'remaining': after, 'done': after == 0, 'totalMissing': before, 'failed': 0, } after = count_missing_hints(None, test_id).get('missing') or 0 return { 'generated': 0, 'remaining': after, 'done': after == 0, 'totalMissing': before, 'failed': 0, } def check_question_for_attempt(session_or_eng, user_id: str, test_id: str, attempt_id: str, question_id: str, selected_option_ids: list[str]) -> dict: session = get_session() aid = _to_uuid(attempt_id) uid = _to_uuid(user_id) tid = _to_uuid(test_id) qid = _to_uuid(question_id) attempt = ( session.query(TestAttempt) .options( selectinload(TestAttempt.test_version).selectinload(TestVersion.test) ) .filter(TestAttempt.id == aid) .first() ) if not attempt: raise HttpError(404, 'Попытка не найдена.') if attempt.test_version.test_id != tid: raise HttpError(404, 'Попытка не найдена.') if attempt.user_id != uid: raise HttpError(403, 'Доступ запрещён.') if attempt.status != 'in_progress': raise HttpError(400, 'Попытка уже завершена.') question = ( session.query(Question) .options(selectinload(Question.options)) .filter( Question.id == qid, Question.test_version_id == attempt.test_version_id, ) .first() ) if not question: raise HttpError(404, 'Вопрос не найден.') correct_ids = [str(o.id) for o in question.options if o.is_correct] is_correct = _same_selection(selected_option_ids, correct_ids) selected_set = {str(x) for x in (selected_option_ids or [])} selected_texts = [o.text for o in question.options if str(o.id) in selected_set] correct_texts = [o.text for o in question.options if o.is_correct] test = attempt.test_version.test explanation = '' if test.hints_enabled: if question.ai_hint: explanation = question.ai_hint else: try: from .ai_editor import explain_answer explanation = explain_answer( question_text=question.text, options=[{'text': o.text, 'isCorrect': bool(o.is_correct)} for o in question.options], selected_texts=selected_texts, is_correct=is_correct, ) except Exception: explanation = '' return { 'questionId': str(question.id), 'isCorrect': is_correct, 'correctOptionIds': correct_ids, 'correctOptionTexts': correct_texts, 'explanation': explanation, } def list_test_attempts_for_author(session_or_eng, author_id: str, test_id: str) -> list[dict]: session = get_session() tid = _to_uuid(test_id) test = session.get(Test, tid) if not test: raise HttpError(404, 'Тест не найден.') if not is_test_author(test.created_by, author_id) and not is_test_edit_open(): raise HttpError(403, 'Доступ запрещён.') rows = ( session.query(TestAttempt, TestVersion, User) .join(TestVersion, TestAttempt.test_version_id == TestVersion.id) .join(User, TestAttempt.user_id == User.id) .filter(TestVersion.test_id == tid) .order_by(TestAttempt.started_at.desc().nullslast()) .limit(200) .all() ) return [ { 'id': str(a.id), 'user_id': str(a.user_id), 'status': a.status, 'attempt_number': a.attempt_number, 'started_at': a.started_at, 'completed_at': a.completed_at, 'correct_count': a.correct_count, 'total_questions': a.total_questions, 'passed': a.passed, 'test_version': tv.version, 'attempter_name': u.full_name, 'attempter_login': u.login, } for a, tv, u in rows ]