You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.6 KiB

"""Создание/правка теста, fork версии при наличии попыток."""
from __future__ import annotations
import uuid as _uuid
from typing import Any
from sqlalchemy import func
from sqlalchemy.orm import Session
from ..db import get_session
from ..messages import RU
from ..models import AnswerOption, Question, Test, TestVersion
from .test_access import is_test_author
from .test_chain import has_any_attempt_for_test
class HttpError(Exception):
def __init__(self, status: int, message: str):
super().__init__(message)
self.status = status
self.message = message
def create_test_with_version(author_id: str, *, title: str, description: str | None) -> dict:
session = get_session()
try:
uid = _uuid.UUID(author_id)
except (ValueError, AttributeError):
raise HttpError(400, 'Некорректный user_id.')
test = Test(
title=title,
description=description or None,
created_by=uid,
is_active=True,
is_versioned=True,
)
session.add(test)
session.flush() # получаем test.id
version = TestVersion(test_id=test.id, version=1, is_active=True, parent_id=None)
session.add(version)
session.commit()
return {'testId': str(test.id), 'versionId': str(version.id)}
def _get_active_version(session: Session, test_id: _uuid.UUID) -> TestVersion | None:
return (
session.query(TestVersion)
.filter(TestVersion.test_id == test_id, TestVersion.is_active.is_(True))
.first()
)
def _copy_question_tree(session: Session, from_version_id, to_version_id) -> None:
questions = (
session.query(Question)
.filter(Question.test_version_id == from_version_id)
.order_by(Question.question_order)
.all()
)
for q in questions:
new_q = Question(
test_version_id=to_version_id,
text=q.text,
question_order=q.question_order,
has_multiple_answers=q.has_multiple_answers,
ai_hint=q.ai_hint,
)
session.add(new_q)
session.flush()
for o in sorted(q.options, key=lambda x: x.option_order):
session.add(AnswerOption(
question_id=new_q.id,
text=o.text,
is_correct=o.is_correct,
option_order=o.option_order,
))
def _fork_new_version(session: Session, test_id: _uuid.UUID) -> TestVersion:
av = _get_active_version(session, test_id)
if not av:
raise HttpError(500, RU['internal'] if 'internal' in RU else 'Внутренняя ошибка.')
max_ver = (
session.query(func.coalesce(func.max(TestVersion.version), 0))
.filter(TestVersion.test_id == test_id)
.scalar() or 0
)
next_v = int(max_ver) + 1
# деактивируем все версии
session.query(TestVersion).filter(TestVersion.test_id == test_id).update(
{TestVersion.is_active: False}, synchronize_session='fetch'
)
new_version = TestVersion(
test_id=test_id,
version=next_v,
is_active=True,
parent_id=av.id,
)
session.add(new_version)
session.flush()
_copy_question_tree(session, av.id, new_version.id)
return new_version
def _replace_version_content(session: Session, version: TestVersion, payload: dict) -> None:
# Снимок ai_hint по тексту вопроса перед удалением
old_hints: dict[str, str] = {}
for q in version.questions:
if q.ai_hint and q.text not in old_hints:
old_hints[q.text] = q.ai_hint
# удаляем через cascade (answer_options удалятся каскадно через ORM)
for q in list(version.questions):
session.delete(q)
session.flush()
questions_payload = payload.get('questions') or []
for i, qp in enumerate(questions_payload):
q_text = (qp.get('text') or '').strip()
new_q = Question(
test_version_id=version.id,
text=q_text,
question_order=qp.get('question_order') or (i + 1),
has_multiple_answers=bool(qp.get('hasMultipleAnswers')),
ai_hint=old_hints.get(q_text),
)
session.add(new_q)
session.flush()
for j, op in enumerate(qp.get('options') or []):
session.add(AnswerOption(
question_id=new_q.id,
text=(op.get('text') or '').strip(),
is_correct=bool(op.get('isCorrect')),
option_order=op.get('option_order') or (j + 1),
))
def save_test_draft(author_id: str, test_id: str, payload: dict) -> dict:
if not isinstance(payload, dict):
payload = {}
session = get_session()
try:
tid = _uuid.UUID(test_id)
except (ValueError, AttributeError):
raise HttpError(404, 'Тест не найден.')
test = session.get(Test, tid)
if not test:
raise HttpError(404, 'Тест не найден.')
if not is_test_author(test.created_by, author_id):
raise HttpError(403, 'Доступ запрещён.')
if payload.get('title') is not None:
test.title = payload['title']
if payload.get('description') is not None:
test.description = payload['description'] or None
if payload.get('passingThreshold') is not None:
try:
test.passing_threshold = max(0, min(100, round(float(payload['passingThreshold']))))
except (TypeError, ValueError):
pass
if 'timeLimit' in payload:
tl = payload.get('timeLimit')
try:
test.time_limit = None if tl in (None, '', 0) else max(0, int(tl))
except (TypeError, ValueError):
pass
if 'hintsEnabled' in payload:
test.hints_enabled = bool(payload['hintsEnabled'])
if 'resultMode' in payload:
rm = (payload.get('resultMode') or '').strip().lower()
if rm in ('immediate', 'end'):
test.result_mode = rm
has_attempts = has_any_attempt_for_test(session, tid)
active_version = _get_active_version(session, tid)
if not active_version:
raise HttpError(500, 'Нет активной версии теста.')
forked = False
if has_attempts and 'questions' in payload and payload.get('questions') is not None:
active_version = _fork_new_version(session, tid)
forked = True
if payload.get('questions') is not None:
_replace_version_content(session, active_version, payload)
session.commit()
return {'testId': test_id, 'versionId': str(active_version.id), 'forked': forked}