Приложение для тестирования сотрудников клиники методом один вопрос - до пяти ответов один из которых правильный. Сотрудник должен выбрать правильный вариант ответа
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

223 lines
7.4 KiB

import random
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.attempt import AttemptAnswer, TestAttempt
from app.models.test import Question, Test
from app.schemas.attempt import (
AttemptResult,
AttemptStart,
AttemptStarted,
AttemptSubmitDto,
AnswerForTest,
AnswerResult,
QuestionForTest,
QuestionResult,
)
router = APIRouter(prefix="/api/attempts", tags=["attempts"])
@router.post("", response_model=AttemptStarted, status_code=201)
async def start_attempt(data: AttemptStart, db: AsyncSession = Depends(get_db)):
"""Начать попытку прохождения теста. Возвращает вопросы в случайном порядке без правильных ответов."""
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == data.test_id, Test.is_active == True)
)
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
attempt = TestAttempt(test_id=test.id, status="in_progress")
db.add(attempt)
await db.commit()
await db.refresh(attempt)
# Перемешиваем вопросы и ответы случайно
questions_shuffled = list(test.questions)
random.shuffle(questions_shuffled)
questions_for_test = []
for q in questions_shuffled:
correct_count = sum(1 for a in q.answers if a.is_correct)
answers_shuffled = list(q.answers)
random.shuffle(answers_shuffled)
questions_for_test.append(
QuestionForTest(
id=q.id,
text=q.text,
is_multiple=correct_count > 1,
answers=[AnswerForTest(id=a.id, text=a.text) for a in answers_shuffled],
)
)
return AttemptStarted(
id=attempt.id,
test_id=test.id,
test_title=test.title,
test_description=test.description,
started_at=attempt.started_at,
time_limit=test.time_limit,
allow_navigation_back=test.allow_navigation_back,
questions=questions_for_test,
)
@router.post("/{attempt_id}/submit", response_model=AttemptResult)
async def submit_attempt(
attempt_id: int, data: AttemptSubmitDto, db: AsyncSession = Depends(get_db)
):
"""Завершить попытку: сохранить ответы, подсчитать результат."""
result = await db.execute(
select(TestAttempt)
.where(TestAttempt.id == attempt_id, TestAttempt.status == "in_progress")
)
attempt = result.scalar_one_or_none()
if not attempt:
raise HTTPException(status_code=404, detail="Попытка не найдена или уже завершена")
# Загружаем тест с вопросами и ответами
test_result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == attempt.test_id)
)
test = test_result.scalar_one()
# Сохраняем выбранные ответы
submitted = {qa.question_id: set(qa.answer_ids) for qa in data.answers}
for qa in data.answers:
for answer_id in qa.answer_ids:
db.add(AttemptAnswer(
attempt_id=attempt.id,
question_id=qa.question_id,
answer_id=answer_id,
))
# Подсчёт результата
correct_count = 0
question_results = []
for question in test.questions:
correct_ids = {a.id for a in question.answers if a.is_correct}
selected_ids = submitted.get(question.id, set())
is_correct = selected_ids == correct_ids
if is_correct:
correct_count += 1
question_results.append(
QuestionResult(
id=question.id,
text=question.text,
is_answered_correctly=is_correct,
answers=[
AnswerResult(
id=a.id,
text=a.text,
is_correct=a.is_correct,
is_selected=a.id in selected_ids,
)
for a in question.answers
],
)
)
total = len(test.questions)
score = round((correct_count / total) * 100, 1) if total > 0 else 0.0
passed = score >= test.passing_score
finished_at = datetime.now(timezone.utc)
attempt.finished_at = finished_at
attempt.score = score
attempt.passed = passed
attempt.correct_count = correct_count
attempt.total_count = total
attempt.status = "finished"
await db.commit()
return AttemptResult(
id=attempt.id,
test_id=test.id,
test_title=test.title,
started_at=attempt.started_at,
finished_at=finished_at,
score=score,
passed=passed,
passing_score=test.passing_score,
correct_count=correct_count,
total_count=total,
questions=question_results,
)
@router.get("/{attempt_id}/result", response_model=AttemptResult)
async def get_result(attempt_id: int, db: AsyncSession = Depends(get_db)):
"""Получить результат завершённой попытки."""
result = await db.execute(
select(TestAttempt)
.options(selectinload(TestAttempt.answers).selectinload(AttemptAnswer.answer))
.where(TestAttempt.id == attempt_id, TestAttempt.status == "finished")
)
attempt = result.scalar_one_or_none()
if not attempt:
raise HTTPException(status_code=404, detail="Результат не найден")
test_result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == attempt.test_id)
)
test = test_result.scalar_one()
# Восстанавливаем выбранные ответы из БД
selected_by_question: dict[int, set[int]] = {}
for aa in attempt.answers:
selected_by_question.setdefault(aa.question_id, set()).add(aa.answer_id)
question_results = []
for question in test.questions:
correct_ids = {a.id for a in question.answers if a.is_correct}
selected_ids = selected_by_question.get(question.id, set())
question_results.append(
QuestionResult(
id=question.id,
text=question.text,
is_answered_correctly=selected_ids == correct_ids,
answers=[
AnswerResult(
id=a.id,
text=a.text,
is_correct=a.is_correct,
is_selected=a.id in selected_ids,
)
for a in question.answers
],
)
)
return AttemptResult(
id=attempt.id,
test_id=test.id,
test_title=test.title,
started_at=attempt.started_at,
finished_at=attempt.finished_at,
score=attempt.score,
passed=attempt.passed,
passing_score=test.passing_score,
correct_count=attempt.correct_count,
total_count=attempt.total_count,
questions=question_results,
)