You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
282 lines
9.5 KiB
282 lines
9.5 KiB
import random |
|
from datetime import datetime, timezone |
|
from typing import Optional |
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query |
|
from sqlalchemy import func, select |
|
from sqlalchemy.ext.asyncio import AsyncSession |
|
from sqlalchemy.orm import selectinload |
|
|
|
from app.database import get_db |
|
from app.models.attempt import AttemptAnswer, TestAttempt |
|
from app.models.test import Question, Test |
|
from app.schemas.attempt import ( |
|
AnswerForTest, |
|
AnswerResult, |
|
AttemptListItem, |
|
AttemptListResponse, |
|
AttemptResult, |
|
AttemptStart, |
|
AttemptStarted, |
|
AttemptSubmitDto, |
|
QuestionForTest, |
|
QuestionResult, |
|
) |
|
|
|
# Временно: до Sprint 6 (авторизация) все попытки принадлежат гостю |
|
GUEST_USER_ID = 1 |
|
GUEST_USER_NAME = "Гость" |
|
|
|
router = APIRouter(prefix="/api/attempts", tags=["attempts"]) |
|
|
|
|
|
@router.post("", response_model=AttemptStarted, status_code=201) |
|
async def start_attempt(data: AttemptStart, db: AsyncSession = Depends(get_db)): |
|
"""Начать попытку прохождения теста. Возвращает вопросы в случайном порядке без правильных ответов.""" |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == data.test_id, Test.is_active == True) |
|
) |
|
test = result.scalar_one_or_none() |
|
if not test: |
|
raise HTTPException(status_code=404, detail="Тест не найден") |
|
|
|
attempt = TestAttempt(test_id=test.id, status="in_progress", user_id=GUEST_USER_ID) |
|
db.add(attempt) |
|
await db.commit() |
|
await db.refresh(attempt) |
|
|
|
# Перемешиваем вопросы и ответы случайно |
|
questions_shuffled = list(test.questions) |
|
random.shuffle(questions_shuffled) |
|
|
|
questions_for_test = [] |
|
for q in questions_shuffled: |
|
correct_count = sum(1 for a in q.answers if a.is_correct) |
|
answers_shuffled = list(q.answers) |
|
random.shuffle(answers_shuffled) |
|
questions_for_test.append( |
|
QuestionForTest( |
|
id=q.id, |
|
text=q.text, |
|
is_multiple=correct_count > 1, |
|
answers=[AnswerForTest(id=a.id, text=a.text) for a in answers_shuffled], |
|
) |
|
) |
|
|
|
return AttemptStarted( |
|
id=attempt.id, |
|
test_id=test.id, |
|
test_title=test.title, |
|
test_description=test.description, |
|
started_at=attempt.started_at, |
|
time_limit=test.time_limit, |
|
allow_navigation_back=test.allow_navigation_back, |
|
questions=questions_for_test, |
|
) |
|
|
|
|
|
@router.post("/{attempt_id}/submit", response_model=AttemptResult) |
|
async def submit_attempt( |
|
attempt_id: int, data: AttemptSubmitDto, db: AsyncSession = Depends(get_db) |
|
): |
|
"""Завершить попытку: сохранить ответы, подсчитать результат.""" |
|
|
|
result = await db.execute( |
|
select(TestAttempt) |
|
.where(TestAttempt.id == attempt_id, TestAttempt.status == "in_progress") |
|
) |
|
attempt = result.scalar_one_or_none() |
|
if not attempt: |
|
raise HTTPException(status_code=404, detail="Попытка не найдена или уже завершена") |
|
|
|
# Загружаем тест с вопросами и ответами |
|
test_result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == attempt.test_id) |
|
) |
|
test = test_result.scalar_one() |
|
|
|
# Сохраняем выбранные ответы |
|
submitted = {qa.question_id: set(qa.answer_ids) for qa in data.answers} |
|
|
|
for qa in data.answers: |
|
for answer_id in qa.answer_ids: |
|
db.add(AttemptAnswer( |
|
attempt_id=attempt.id, |
|
question_id=qa.question_id, |
|
answer_id=answer_id, |
|
)) |
|
|
|
# Подсчёт результата |
|
correct_count = 0 |
|
question_results = [] |
|
|
|
for question in test.questions: |
|
correct_ids = {a.id for a in question.answers if a.is_correct} |
|
selected_ids = submitted.get(question.id, set()) |
|
is_correct = selected_ids == correct_ids |
|
|
|
if is_correct: |
|
correct_count += 1 |
|
|
|
question_results.append( |
|
QuestionResult( |
|
id=question.id, |
|
text=question.text, |
|
is_answered_correctly=is_correct, |
|
answers=[ |
|
AnswerResult( |
|
id=a.id, |
|
text=a.text, |
|
is_correct=a.is_correct, |
|
is_selected=a.id in selected_ids, |
|
) |
|
for a in question.answers |
|
], |
|
) |
|
) |
|
|
|
total = len(test.questions) |
|
score = round((correct_count / total) * 100, 1) if total > 0 else 0.0 |
|
passed = score >= test.passing_score |
|
finished_at = datetime.now(timezone.utc) |
|
|
|
attempt.finished_at = finished_at |
|
attempt.score = score |
|
attempt.passed = passed |
|
attempt.correct_count = correct_count |
|
attempt.total_count = total |
|
attempt.status = "finished" |
|
|
|
await db.commit() |
|
|
|
return AttemptResult( |
|
id=attempt.id, |
|
test_id=test.id, |
|
test_title=test.title, |
|
started_at=attempt.started_at, |
|
finished_at=finished_at, |
|
score=score, |
|
passed=passed, |
|
passing_score=test.passing_score, |
|
correct_count=correct_count, |
|
total_count=total, |
|
questions=question_results, |
|
) |
|
|
|
|
|
@router.get("", response_model=AttemptListResponse) |
|
async def list_attempts( |
|
test_id: Optional[int] = Query(None), |
|
date_from: Optional[datetime] = Query(None), |
|
date_to: Optional[datetime] = Query(None), |
|
page: int = Query(1, ge=1), |
|
page_size: int = Query(20, ge=1, le=100), |
|
db: AsyncSession = Depends(get_db), |
|
): |
|
"""Трекер попыток: все завершённые попытки с фильтрацией и пагинацией.""" |
|
base = ( |
|
select(TestAttempt) |
|
.options(selectinload(TestAttempt.test)) |
|
.where(TestAttempt.status == "finished") |
|
) |
|
if test_id is not None: |
|
base = base.where(TestAttempt.test_id == test_id) |
|
if date_from is not None: |
|
base = base.where(TestAttempt.started_at >= date_from) |
|
if date_to is not None: |
|
base = base.where(TestAttempt.started_at <= date_to) |
|
|
|
total = (await db.execute(select(func.count()).select_from(base.subquery()))).scalar_one() |
|
|
|
rows_result = await db.execute( |
|
base.order_by(TestAttempt.started_at.desc()) |
|
.offset((page - 1) * page_size) |
|
.limit(page_size) |
|
) |
|
attempts_list = rows_result.scalars().all() |
|
|
|
items = [ |
|
AttemptListItem( |
|
id=a.id, |
|
test_id=a.test_id, |
|
test_title=a.test.title, |
|
test_version=a.test.version, |
|
user_id=a.user_id, |
|
user_name=GUEST_USER_NAME, # TODO Sprint 6: заменить на JOIN с таблицей users |
|
started_at=a.started_at, |
|
finished_at=a.finished_at, |
|
score=a.score, |
|
correct_count=a.correct_count, |
|
total_count=a.total_count, |
|
passed=a.passed, |
|
) |
|
for a in attempts_list |
|
] |
|
|
|
return AttemptListResponse(items=items, total=total, page=page, page_size=page_size) |
|
|
|
|
|
@router.get("/{attempt_id}/result", response_model=AttemptResult) |
|
async def get_result(attempt_id: int, db: AsyncSession = Depends(get_db)): |
|
"""Получить результат завершённой попытки.""" |
|
|
|
result = await db.execute( |
|
select(TestAttempt) |
|
.options(selectinload(TestAttempt.answers).selectinload(AttemptAnswer.answer)) |
|
.where(TestAttempt.id == attempt_id, TestAttempt.status == "finished") |
|
) |
|
attempt = result.scalar_one_or_none() |
|
if not attempt: |
|
raise HTTPException(status_code=404, detail="Результат не найден") |
|
|
|
test_result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == attempt.test_id) |
|
) |
|
test = test_result.scalar_one() |
|
|
|
# Восстанавливаем выбранные ответы из БД |
|
selected_by_question: dict[int, set[int]] = {} |
|
for aa in attempt.answers: |
|
selected_by_question.setdefault(aa.question_id, set()).add(aa.answer_id) |
|
|
|
question_results = [] |
|
for question in test.questions: |
|
correct_ids = {a.id for a in question.answers if a.is_correct} |
|
selected_ids = selected_by_question.get(question.id, set()) |
|
question_results.append( |
|
QuestionResult( |
|
id=question.id, |
|
text=question.text, |
|
is_answered_correctly=selected_ids == correct_ids, |
|
answers=[ |
|
AnswerResult( |
|
id=a.id, |
|
text=a.text, |
|
is_correct=a.is_correct, |
|
is_selected=a.id in selected_ids, |
|
) |
|
for a in question.answers |
|
], |
|
) |
|
) |
|
|
|
return AttemptResult( |
|
id=attempt.id, |
|
test_id=test.id, |
|
test_title=test.title, |
|
started_at=attempt.started_at, |
|
finished_at=attempt.finished_at, |
|
score=attempt.score, |
|
passed=attempt.passed, |
|
passing_score=test.passing_score, |
|
correct_count=attempt.correct_count, |
|
total_count=attempt.total_count, |
|
questions=question_results, |
|
)
|
|
|