You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

539 lines
19 KiB

"""Сервис прохождения теста."""
from __future__ import annotations
import uuid as _uuid
from datetime import datetime, timezone
from sqlalchemy import func
from sqlalchemy.orm import Session, selectinload
from ..db import get_session
from ..models import (
AnswerOption,
Question,
Test,
TestAttempt,
TestVersion,
User,
UserAnswer,
)
from .test_access import is_test_author, user_has_test_access
class HttpError(Exception):
def __init__(self, status: int, message: str):
super().__init__(message)
self.status = status
self.message = message
def _sort_uuid_strings(items) -> list[str]:
return sorted({str(x) for x in (items or []) if x is not None})
def _same_selection(selected, correct_ids) -> bool:
return _sort_uuid_strings(selected) == _sort_uuid_strings(correct_ids)
def _to_uuid(val) -> _uuid.UUID | None:
if isinstance(val, _uuid.UUID):
return val
try:
return _uuid.UUID(str(val))
except (ValueError, AttributeError):
return None
# ─── load questions (shared) ─────────────────────────────────────────────────
def load_questions_for_version(session: Session, test_version_id, *, include_correct: bool) -> list[dict]:
vid = _to_uuid(test_version_id)
if vid is None:
return []
questions = (
session.query(Question)
.options(selectinload(Question.options))
.filter(Question.test_version_id == vid)
.order_by(Question.question_order)
.all()
)
out = []
for q in questions:
options = []
for o in sorted(q.options, key=lambda x: x.option_order):
base = {'id': str(o.id), 'text': o.text, 'optionOrder': o.option_order}
if include_correct:
base['isCorrect'] = bool(o.is_correct)
options.append(base)
out.append({
'id': str(q.id),
'text': q.text,
'questionOrder': q.question_order,
'hasMultipleAnswers': bool(q.has_multiple_answers),
'options': options,
})
return out
# ─── start ───────────────────────────────────────────────────────────────────
def start_attempt(session_or_eng, user_id: str, test_id: str) -> dict:
"""Принимает engine (legacy) или session — для обратной совместимости."""
acc = user_has_test_access(user_id, test_id)
if not acc.ok:
raise HttpError(404, 'Тест не найден.')
session = get_session()
tid = _to_uuid(test_id)
uid = _to_uuid(user_id)
active_version = (
session.query(TestVersion)
.filter(TestVersion.test_id == tid, TestVersion.is_active.is_(True))
.first()
)
if not active_version:
raise HttpError(404, 'Нет активной версии теста.')
max_n = (
session.query(func.coalesce(func.max(TestAttempt.attempt_number), 0))
.filter(
TestAttempt.test_version_id == active_version.id,
TestAttempt.user_id == uid,
)
.scalar() or 0
)
attempt = TestAttempt(
test_version_id=active_version.id,
user_id=uid,
attempt_number=int(max_n) + 1,
status='in_progress',
)
session.add(attempt)
session.commit()
session.refresh(attempt)
return {
'attempt': {
'id': str(attempt.id),
'test_version_id': str(attempt.test_version_id),
'user_id': str(attempt.user_id),
'attempt_number': attempt.attempt_number,
'status': attempt.status,
'started_at': attempt.started_at.isoformat() if attempt.started_at else None,
}
}
# ─── play ────────────────────────────────────────────────────────────────────
def get_play_content(session_or_eng, user_id: str, test_id: str, attempt_id: str) -> dict:
session = get_session()
aid = _to_uuid(attempt_id)
uid = _to_uuid(user_id)
tid = _to_uuid(test_id)
attempt = (
session.query(TestAttempt)
.options(
selectinload(TestAttempt.test_version).selectinload(TestVersion.test)
)
.filter(TestAttempt.id == aid)
.first()
)
if not attempt:
raise HttpError(404, 'Попытка не найдена.')
if attempt.test_version.test_id != tid:
raise HttpError(404, 'Попытка не найдена.')
if attempt.user_id != uid:
raise HttpError(403, 'Доступ запрещён.')
if attempt.status != 'in_progress':
raise HttpError(400, 'Попытка уже завершена.')
test = attempt.test_version.test
qs = load_questions_for_version(session, attempt.test_version_id, include_correct=False)
return {
'testTitle': test.title,
'passingThreshold': test.passing_threshold,
'timeLimit': test.time_limit,
'hintsEnabled': bool(test.hints_enabled),
'resultMode': test.result_mode or 'end',
'attemptId': str(attempt.id),
'questions': qs,
}
# ─── submit ──────────────────────────────────────────────────────────────────
def submit_attempt(session_or_eng, user_id: str, test_id: str, attempt_id: str,
raw_answers: dict | None) -> dict:
answers = raw_answers if isinstance(raw_answers, dict) else {}
session = get_session()
aid = _to_uuid(attempt_id)
uid = _to_uuid(user_id)
tid = _to_uuid(test_id)
attempt = (
session.query(TestAttempt)
.options(selectinload(TestAttempt.test_version).selectinload(TestVersion.test))
.filter(TestAttempt.id == aid)
.with_for_update()
.first()
)
if not attempt:
raise HttpError(404, 'Попытка не найдена.')
if attempt.test_version.test_id != tid:
raise HttpError(404, 'Попытка не найдена.')
if attempt.user_id != uid:
raise HttpError(403, 'Доступ запрещён.')
if attempt.status != 'in_progress':
raise HttpError(400, 'Попытка уже завершена.')
test = attempt.test_version.test
questions = (
session.query(Question)
.options(selectinload(Question.options))
.filter(Question.test_version_id == attempt.test_version_id)
.all()
)
if not questions:
raise HttpError(400, 'В тесте нет вопросов.')
by_q: dict[str, dict] = {}
for q in questions:
qid = str(q.id)
by_q[qid] = {'all': {str(o.id) for o in q.options}, 'correct': [str(o.id) for o in q.options if o.is_correct]}
correct_count = 0
for q in questions:
qid = str(q.id)
selected = answers.get(qid, [])
if not isinstance(selected, list):
selected = [str(selected)]
selected = [str(x) for x in selected]
g = by_q[qid]
for sid in selected:
if sid not in g['all']:
raise HttpError(400, 'Некорректный вариант ответа.')
if _same_selection(selected, g['correct']):
correct_count += 1
total = len(questions)
percent = (correct_count / total) * 100 if total else 0
threshold = int(test.passing_threshold or 0)
passed = percent + 1e-9 >= threshold
# удаляем старые ответы и записываем новые
session.query(UserAnswer).filter(UserAnswer.attempt_id == aid).delete(synchronize_session='fetch')
for q in questions:
qid = str(q.id)
selected = answers.get(qid, [])
if not isinstance(selected, list):
selected = [str(selected)]
selected_uuids = [_to_uuid(x) for x in selected if _to_uuid(x) is not None]
session.add(UserAnswer(
attempt_id=aid,
question_id=q.id,
selected_options=selected_uuids,
))
attempt.status = 'completed'
attempt.completed_at = datetime.now(timezone.utc)
attempt.correct_count = correct_count
attempt.total_questions = total
attempt.passed = passed
session.commit()
review = build_review_from_db(session, attempt_id)
return {
'attemptId': attempt_id,
'correctCount': correct_count,
'totalQuestions': total,
'percent': round(percent, 1),
'passed': passed,
'passingThreshold': threshold,
'review': review,
}
# ─── review ──────────────────────────────────────────────────────────────────
def build_review_from_db(session: Session, attempt_id: str) -> dict:
aid = _to_uuid(attempt_id)
attempt = (
session.query(TestAttempt)
.options(
selectinload(TestAttempt.test_version).selectinload(TestVersion.test),
selectinload(TestAttempt.user),
selectinload(TestAttempt.user_answers),
)
.filter(TestAttempt.id == aid)
.first()
)
if not attempt:
raise HttpError(404, 'Попытка не найдена.')
if attempt.status != 'completed':
raise HttpError(400, 'Попытка не завершена.')
test = attempt.test_version.test
questions = load_questions_for_version(session, attempt.test_version_id, include_correct=True)
sel_by_q: dict[str, list[str]] = {
str(ua.question_id): [str(x) for x in (ua.selected_options or [])]
for ua in attempt.user_answers
}
total = int(attempt.total_questions or len(questions))
percent = round(((attempt.correct_count or 0) / total) * 100, 1) if total else 0
q_out = []
for q in questions:
selected = _sort_uuid_strings(sel_by_q.get(q['id'], []))
correct = _sort_uuid_strings([o['id'] for o in q['options'] if o.get('isCorrect')])
selected_set = set(selected)
q_out.append({
'id': q['id'],
'text': q['text'],
'hasMultipleAnswers': q['hasMultipleAnswers'],
'isUserCorrect': _same_selection(selected, correct),
'options': [
{
'id': o['id'],
'text': o['text'],
'isCorrect': o.get('isCorrect', False),
'selected': o['id'] in selected_set,
}
for o in q['options']
],
})
return {
'attemptId': str(attempt.id),
'testId': str(test.id),
'testTitle': test.title,
'passingThreshold': int(test.passing_threshold or 0),
'correctCount': int(attempt.correct_count or 0),
'totalQuestions': total,
'percent': percent,
'passed': bool(attempt.passed),
'startedAt': attempt.started_at.isoformat() if attempt.started_at else None,
'completedAt': attempt.completed_at.isoformat() if attempt.completed_at else None,
'attempterUserId': str(attempt.user_id),
'attempterName': attempt.user.full_name,
'attempterLogin': attempt.user.login,
'questions': q_out,
}
def get_attempt_review_for_user(session_or_eng, current_user_id: str, test_id: str,
attempt_id: str) -> dict:
session = get_session()
aid = _to_uuid(attempt_id)
tid = _to_uuid(test_id)
attempt = (
session.query(TestAttempt)
.options(selectinload(TestAttempt.test_version).selectinload(TestVersion.test))
.filter(TestAttempt.id == aid)
.first()
)
if not attempt:
raise HttpError(404, 'Попытка не найдена.')
if attempt.test_version.test_id != tid:
raise HttpError(404, 'Попытка не найдена.')
is_owner = str(attempt.user_id) == str(current_user_id)
is_author = is_test_author(attempt.test_version.test.created_by, current_user_id)
if not is_owner and not is_author:
raise HttpError(403, 'Доступ запрещён.')
return build_review_from_db(session, attempt_id)
# ─── hints ───────────────────────────────────────────────────────────────────
def count_missing_hints(session_or_eng, test_id: str) -> dict:
session = get_session()
tid = _to_uuid(test_id)
if tid is None:
return {'total': 0, 'missing': 0}
active_version = (
session.query(TestVersion)
.filter(TestVersion.test_id == tid, TestVersion.is_active.is_(True))
.first()
)
if not active_version:
return {'total': 0, 'missing': 0}
all_qs = session.query(Question).options(selectinload(Question.options)).filter(
Question.test_version_id == active_version.id
).all()
filled_qs = []
for q in all_qs:
if not (q.text or '').strip():
continue
if len([o for o in q.options if (o.text or '').strip()]) < 2:
continue
filled_qs.append(q)
total = len(filled_qs)
missing = sum(1 for q in filled_qs if not q.ai_hint)
return {'total': total, 'missing': missing}
def generate_missing_hints_for_test(session_or_eng, author_id: str, test_id: str) -> dict:
from .ai_editor import generate_question_hint
session = get_session()
tid = _to_uuid(test_id)
test = session.get(Test, tid)
if not test:
raise HttpError(404, 'Тест не найден.')
if not is_test_author(test.created_by, author_id):
raise HttpError(403, 'Доступ запрещён.')
active_version = (
session.query(TestVersion)
.filter(TestVersion.test_id == tid, TestVersion.is_active.is_(True))
.first()
)
if not active_version:
return {'generated': 0, 'failed': 0, 'total': 0}
missing_qs = (
session.query(Question)
.options(selectinload(Question.options))
.filter(
Question.test_version_id == active_version.id,
(Question.ai_hint == None) | (Question.ai_hint == ''), # noqa: E711
)
.order_by(Question.question_order)
.all()
)
generated = failed = skipped = 0
for q in missing_qs:
# Подсказку строим только по заполненному вопросу (есть текст и >=2 непустых варианта).
if not (q.text or '').strip():
skipped += 1
continue
valid_opts = [o for o in q.options if (o.text or '').strip()]
if len(valid_opts) < 2:
skipped += 1
continue
opt_payload = [{'text': o.text, 'isCorrect': bool(o.is_correct)} for o in q.options]
hint = generate_question_hint(question_text=q.text, options=opt_payload)
if hint:
q.ai_hint = hint
generated += 1
else:
failed += 1
session.commit()
return {'generated': generated, 'failed': failed, 'skipped': skipped, 'total': len(missing_qs)}
def check_question_for_attempt(session_or_eng, user_id: str, test_id: str, attempt_id: str,
question_id: str, selected_option_ids: list[str]) -> dict:
session = get_session()
aid = _to_uuid(attempt_id)
uid = _to_uuid(user_id)
tid = _to_uuid(test_id)
qid = _to_uuid(question_id)
attempt = (
session.query(TestAttempt)
.options(
selectinload(TestAttempt.test_version).selectinload(TestVersion.test)
)
.filter(TestAttempt.id == aid)
.first()
)
if not attempt:
raise HttpError(404, 'Попытка не найдена.')
if attempt.test_version.test_id != tid:
raise HttpError(404, 'Попытка не найдена.')
if attempt.user_id != uid:
raise HttpError(403, 'Доступ запрещён.')
if attempt.status != 'in_progress':
raise HttpError(400, 'Попытка уже завершена.')
question = (
session.query(Question)
.options(selectinload(Question.options))
.filter(
Question.id == qid,
Question.test_version_id == attempt.test_version_id,
)
.first()
)
if not question:
raise HttpError(404, 'Вопрос не найден.')
correct_ids = [str(o.id) for o in question.options if o.is_correct]
is_correct = _same_selection(selected_option_ids, correct_ids)
selected_set = {str(x) for x in (selected_option_ids or [])}
selected_texts = [o.text for o in question.options if str(o.id) in selected_set]
correct_texts = [o.text for o in question.options if o.is_correct]
test = attempt.test_version.test
explanation = ''
if test.hints_enabled:
if question.ai_hint:
explanation = question.ai_hint
else:
try:
from .ai_editor import explain_answer
explanation = explain_answer(
question_text=question.text,
options=[{'text': o.text, 'isCorrect': bool(o.is_correct)} for o in question.options],
selected_texts=selected_texts,
is_correct=is_correct,
)
except Exception:
explanation = ''
return {
'questionId': str(question.id),
'isCorrect': is_correct,
'correctOptionIds': correct_ids,
'correctOptionTexts': correct_texts,
'explanation': explanation,
}
def list_test_attempts_for_author(session_or_eng, author_id: str, test_id: str) -> list[dict]:
session = get_session()
tid = _to_uuid(test_id)
test = session.get(Test, tid)
if not test:
raise HttpError(404, 'Тест не найден.')
if not is_test_author(test.created_by, author_id):
raise HttpError(403, 'Доступ запрещён.')
rows = (
session.query(TestAttempt, TestVersion, User)
.join(TestVersion, TestAttempt.test_version_id == TestVersion.id)
.join(User, TestAttempt.user_id == User.id)
.filter(TestVersion.test_id == tid)
.order_by(TestAttempt.started_at.desc().nullslast())
.limit(200)
.all()
)
return [
{
'id': str(a.id),
'user_id': str(a.user_id),
'status': a.status,
'attempt_number': a.attempt_number,
'started_at': a.started_at,
'completed_at': a.completed_at,
'correct_count': a.correct_count,
'total_questions': a.total_questions,
'passed': a.passed,
'test_version': tv.version,
'attempter_name': u.full_name,
'attempter_login': u.login,
}
for a, tv, u in rows
]