import random from datetime import datetime, timezone from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import get_db from app.models.attempt import AttemptAnswer, TestAttempt from app.models.test import Question, Test from app.schemas.attempt import ( AnswerForTest, AnswerResult, AttemptListItem, AttemptListResponse, AttemptResult, AttemptStart, AttemptStarted, AttemptSubmitDto, QuestionForTest, QuestionResult, ) # Временно: до Sprint 6 (авторизация) все попытки принадлежат гостю GUEST_USER_ID = 1 GUEST_USER_NAME = "Гость" router = APIRouter(prefix="/api/attempts", tags=["attempts"]) @router.post("", response_model=AttemptStarted, status_code=201) async def start_attempt(data: AttemptStart, db: AsyncSession = Depends(get_db)): """Начать попытку прохождения теста. Возвращает вопросы в случайном порядке без правильных ответов.""" result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == data.test_id, Test.is_active == True) ) test = result.scalar_one_or_none() if not test: raise HTTPException(status_code=404, detail="Тест не найден") attempt = TestAttempt(test_id=test.id, status="in_progress", user_id=GUEST_USER_ID) db.add(attempt) await db.commit() await db.refresh(attempt) # Перемешиваем вопросы и ответы случайно questions_shuffled = list(test.questions) random.shuffle(questions_shuffled) questions_for_test = [] for q in questions_shuffled: correct_count = sum(1 for a in q.answers if a.is_correct) answers_shuffled = list(q.answers) random.shuffle(answers_shuffled) questions_for_test.append( QuestionForTest( id=q.id, text=q.text, is_multiple=correct_count > 1, answers=[AnswerForTest(id=a.id, text=a.text) for a in answers_shuffled], ) ) return AttemptStarted( id=attempt.id, test_id=test.id, test_title=test.title, test_description=test.description, started_at=attempt.started_at, time_limit=test.time_limit, allow_navigation_back=test.allow_navigation_back, questions=questions_for_test, ) @router.post("/{attempt_id}/submit", response_model=AttemptResult) async def submit_attempt( attempt_id: int, data: AttemptSubmitDto, db: AsyncSession = Depends(get_db) ): """Завершить попытку: сохранить ответы, подсчитать результат.""" result = await db.execute( select(TestAttempt) .where(TestAttempt.id == attempt_id, TestAttempt.status == "in_progress") ) attempt = result.scalar_one_or_none() if not attempt: raise HTTPException(status_code=404, detail="Попытка не найдена или уже завершена") # Загружаем тест с вопросами и ответами test_result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == attempt.test_id) ) test = test_result.scalar_one() # Сохраняем выбранные ответы submitted = {qa.question_id: set(qa.answer_ids) for qa in data.answers} for qa in data.answers: for answer_id in qa.answer_ids: db.add(AttemptAnswer( attempt_id=attempt.id, question_id=qa.question_id, answer_id=answer_id, )) # Подсчёт результата correct_count = 0 question_results = [] for question in test.questions: correct_ids = {a.id for a in question.answers if a.is_correct} selected_ids = submitted.get(question.id, set()) is_correct = selected_ids == correct_ids if is_correct: correct_count += 1 question_results.append( QuestionResult( id=question.id, text=question.text, is_answered_correctly=is_correct, answers=[ AnswerResult( id=a.id, text=a.text, is_correct=a.is_correct, is_selected=a.id in selected_ids, ) for a in question.answers ], ) ) total = len(test.questions) score = round((correct_count / total) * 100, 1) if total > 0 else 0.0 passed = score >= test.passing_score finished_at = datetime.now(timezone.utc) attempt.finished_at = finished_at attempt.score = score attempt.passed = passed attempt.correct_count = correct_count attempt.total_count = total attempt.status = "finished" await db.commit() return AttemptResult( id=attempt.id, test_id=test.id, test_title=test.title, started_at=attempt.started_at, finished_at=finished_at, score=score, passed=passed, passing_score=test.passing_score, correct_count=correct_count, total_count=total, questions=question_results, ) @router.get("", response_model=AttemptListResponse) async def list_attempts( test_id: Optional[int] = Query(None), date_from: Optional[datetime] = Query(None), date_to: Optional[datetime] = Query(None), page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), ): """Трекер попыток: все завершённые попытки с фильтрацией и пагинацией.""" base = ( select(TestAttempt) .options(selectinload(TestAttempt.test)) .where(TestAttempt.status == "finished") ) if test_id is not None: base = base.where(TestAttempt.test_id == test_id) if date_from is not None: base = base.where(TestAttempt.started_at >= date_from) if date_to is not None: base = base.where(TestAttempt.started_at <= date_to) total = (await db.execute(select(func.count()).select_from(base.subquery()))).scalar_one() rows_result = await db.execute( base.order_by(TestAttempt.started_at.desc()) .offset((page - 1) * page_size) .limit(page_size) ) attempts_list = rows_result.scalars().all() items = [ AttemptListItem( id=a.id, test_id=a.test_id, test_title=a.test.title, test_version=a.test.version, user_id=a.user_id, user_name=GUEST_USER_NAME, # TODO Sprint 6: заменить на JOIN с таблицей users started_at=a.started_at, finished_at=a.finished_at, score=a.score, correct_count=a.correct_count, total_count=a.total_count, passed=a.passed, ) for a in attempts_list ] return AttemptListResponse(items=items, total=total, page=page, page_size=page_size) @router.get("/{attempt_id}/result", response_model=AttemptResult) async def get_result(attempt_id: int, db: AsyncSession = Depends(get_db)): """Получить результат завершённой попытки.""" result = await db.execute( select(TestAttempt) .options(selectinload(TestAttempt.answers).selectinload(AttemptAnswer.answer)) .where(TestAttempt.id == attempt_id, TestAttempt.status == "finished") ) attempt = result.scalar_one_or_none() if not attempt: raise HTTPException(status_code=404, detail="Результат не найден") test_result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == attempt.test_id) ) test = test_result.scalar_one() # Восстанавливаем выбранные ответы из БД selected_by_question: dict[int, set[int]] = {} for aa in attempt.answers: selected_by_question.setdefault(aa.question_id, set()).add(aa.answer_id) question_results = [] for question in test.questions: correct_ids = {a.id for a in question.answers if a.is_correct} selected_ids = selected_by_question.get(question.id, set()) question_results.append( QuestionResult( id=question.id, text=question.text, is_answered_correctly=selected_ids == correct_ids, answers=[ AnswerResult( id=a.id, text=a.text, is_correct=a.is_correct, is_selected=a.id in selected_ids, ) for a in question.answers ], ) ) return AttemptResult( id=attempt.id, test_id=test.id, test_title=test.title, started_at=attempt.started_at, finished_at=attempt.finished_at, score=attempt.score, passed=attempt.passed, passing_score=test.passing_score, correct_count=attempt.correct_count, total_count=attempt.total_count, questions=question_results, )