You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
198 lines
6.6 KiB
198 lines
6.6 KiB
"""Создание/правка теста, fork версии при наличии попыток.""" |
|
from __future__ import annotations |
|
|
|
import uuid as _uuid |
|
from typing import Any |
|
|
|
from sqlalchemy import func |
|
from sqlalchemy.orm import Session |
|
|
|
from ..db import get_session |
|
from ..messages import RU |
|
from ..models import AnswerOption, Question, Test, TestVersion |
|
from .test_access import is_test_author |
|
from .test_chain import has_any_attempt_for_test |
|
|
|
|
|
class HttpError(Exception): |
|
def __init__(self, status: int, message: str): |
|
super().__init__(message) |
|
self.status = status |
|
self.message = message |
|
|
|
|
|
def create_test_with_version(author_id: str, *, title: str, description: str | None) -> dict: |
|
session = get_session() |
|
try: |
|
uid = _uuid.UUID(author_id) |
|
except (ValueError, AttributeError): |
|
raise HttpError(400, 'Некорректный user_id.') |
|
|
|
test = Test( |
|
title=title, |
|
description=description or None, |
|
created_by=uid, |
|
is_active=True, |
|
is_versioned=True, |
|
) |
|
session.add(test) |
|
session.flush() # получаем test.id |
|
|
|
version = TestVersion(test_id=test.id, version=1, is_active=True, parent_id=None) |
|
session.add(version) |
|
session.commit() |
|
return {'testId': str(test.id), 'versionId': str(version.id)} |
|
|
|
|
|
def _get_active_version(session: Session, test_id: _uuid.UUID) -> TestVersion | None: |
|
return ( |
|
session.query(TestVersion) |
|
.filter(TestVersion.test_id == test_id, TestVersion.is_active.is_(True)) |
|
.first() |
|
) |
|
|
|
|
|
def _copy_question_tree(session: Session, from_version_id, to_version_id) -> None: |
|
questions = ( |
|
session.query(Question) |
|
.filter(Question.test_version_id == from_version_id) |
|
.order_by(Question.question_order) |
|
.all() |
|
) |
|
for q in questions: |
|
new_q = Question( |
|
test_version_id=to_version_id, |
|
text=q.text, |
|
question_order=q.question_order, |
|
has_multiple_answers=q.has_multiple_answers, |
|
ai_hint=q.ai_hint, |
|
) |
|
session.add(new_q) |
|
session.flush() |
|
for o in sorted(q.options, key=lambda x: x.option_order): |
|
session.add(AnswerOption( |
|
question_id=new_q.id, |
|
text=o.text, |
|
is_correct=o.is_correct, |
|
option_order=o.option_order, |
|
)) |
|
|
|
|
|
def _fork_new_version(session: Session, test_id: _uuid.UUID) -> TestVersion: |
|
av = _get_active_version(session, test_id) |
|
if not av: |
|
raise HttpError(500, RU['internal'] if 'internal' in RU else 'Внутренняя ошибка.') |
|
|
|
max_ver = ( |
|
session.query(func.coalesce(func.max(TestVersion.version), 0)) |
|
.filter(TestVersion.test_id == test_id) |
|
.scalar() or 0 |
|
) |
|
next_v = int(max_ver) + 1 |
|
|
|
# деактивируем все версии |
|
session.query(TestVersion).filter(TestVersion.test_id == test_id).update( |
|
{TestVersion.is_active: False}, synchronize_session='fetch' |
|
) |
|
|
|
new_version = TestVersion( |
|
test_id=test_id, |
|
version=next_v, |
|
is_active=True, |
|
parent_id=av.id, |
|
) |
|
session.add(new_version) |
|
session.flush() |
|
_copy_question_tree(session, av.id, new_version.id) |
|
return new_version |
|
|
|
|
|
def _replace_version_content(session: Session, version: TestVersion, payload: dict) -> None: |
|
# Снимок ai_hint по тексту вопроса перед удалением |
|
old_hints: dict[str, str] = {} |
|
for q in version.questions: |
|
if q.ai_hint and q.text not in old_hints: |
|
old_hints[q.text] = q.ai_hint |
|
|
|
# удаляем через cascade (answer_options удалятся каскадно через ORM) |
|
for q in list(version.questions): |
|
session.delete(q) |
|
session.flush() |
|
|
|
questions_payload = payload.get('questions') or [] |
|
for i, qp in enumerate(questions_payload): |
|
q_text = (qp.get('text') or '').strip() |
|
new_q = Question( |
|
test_version_id=version.id, |
|
text=q_text, |
|
question_order=qp.get('question_order') or (i + 1), |
|
has_multiple_answers=bool(qp.get('hasMultipleAnswers')), |
|
ai_hint=old_hints.get(q_text), |
|
) |
|
session.add(new_q) |
|
session.flush() |
|
for j, op in enumerate(qp.get('options') or []): |
|
session.add(AnswerOption( |
|
question_id=new_q.id, |
|
text=(op.get('text') or '').strip(), |
|
is_correct=bool(op.get('isCorrect')), |
|
option_order=op.get('option_order') or (j + 1), |
|
)) |
|
|
|
|
|
def save_test_draft(author_id: str, test_id: str, payload: dict) -> dict: |
|
if not isinstance(payload, dict): |
|
payload = {} |
|
session = get_session() |
|
try: |
|
tid = _uuid.UUID(test_id) |
|
except (ValueError, AttributeError): |
|
raise HttpError(404, 'Тест не найден.') |
|
|
|
test = session.get(Test, tid) |
|
if not test: |
|
raise HttpError(404, 'Тест не найден.') |
|
if not is_test_author(test.created_by, author_id): |
|
raise HttpError(403, 'Доступ запрещён.') |
|
|
|
if payload.get('title') is not None: |
|
test.title = payload['title'] |
|
if payload.get('description') is not None: |
|
test.description = payload['description'] or None |
|
|
|
if payload.get('passingThreshold') is not None: |
|
try: |
|
test.passing_threshold = max(0, min(100, round(float(payload['passingThreshold'])))) |
|
except (TypeError, ValueError): |
|
pass |
|
|
|
if 'timeLimit' in payload: |
|
tl = payload.get('timeLimit') |
|
try: |
|
test.time_limit = None if tl in (None, '', 0) else max(0, int(tl)) |
|
except (TypeError, ValueError): |
|
pass |
|
|
|
if 'hintsEnabled' in payload: |
|
test.hints_enabled = bool(payload['hintsEnabled']) |
|
|
|
if 'resultMode' in payload: |
|
rm = (payload.get('resultMode') or '').strip().lower() |
|
if rm in ('immediate', 'end'): |
|
test.result_mode = rm |
|
|
|
has_attempts = has_any_attempt_for_test(session, tid) |
|
active_version = _get_active_version(session, tid) |
|
if not active_version: |
|
raise HttpError(500, 'Нет активной версии теста.') |
|
|
|
forked = False |
|
if has_attempts and 'questions' in payload and payload.get('questions') is not None: |
|
active_version = _fork_new_version(session, tid) |
|
forked = True |
|
|
|
if payload.get('questions') is not None: |
|
_replace_version_content(session, active_version, payload) |
|
|
|
session.commit() |
|
return {'testId': test_id, 'versionId': str(active_version.id), 'forked': forked}
|
|
|