You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
161 lines
5.6 KiB
161 lines
5.6 KiB
from fastapi import APIRouter, Depends, HTTPException |
|
from sqlalchemy import delete, func, select |
|
from sqlalchemy.ext.asyncio import AsyncSession |
|
from sqlalchemy.orm import selectinload |
|
|
|
from app.database import get_db |
|
from app.models.attempt import TestAttempt |
|
from app.models.test import Answer, Question, Test |
|
from app.schemas.test import TestCreate, TestListItem, TestOut, TestUpdateResponse |
|
|
|
router = APIRouter(prefix="/api/tests", tags=["tests"]) |
|
|
|
|
|
@router.get("", response_model=list[TestListItem]) |
|
async def list_tests(db: AsyncSession = Depends(get_db)): |
|
# Показываем только последние версии: те, чей id не упоминается как parent_id |
|
parent_ids_subq = select(Test.parent_id).where(Test.parent_id.is_not(None)) |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions)) |
|
.where(Test.is_active == True, Test.id.not_in(parent_ids_subq)) |
|
.order_by(Test.created_at.desc()) |
|
) |
|
tests = result.scalars().all() |
|
|
|
items = [] |
|
for test in tests: |
|
item = TestListItem.model_validate(test) |
|
item.questions_count = len(test.questions) |
|
items.append(item) |
|
|
|
return items |
|
|
|
|
|
@router.get("/{test_id}", response_model=TestOut) |
|
async def get_test(test_id: int, db: AsyncSession = Depends(get_db)): |
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == test_id, Test.is_active == True) |
|
) |
|
test = result.scalar_one_or_none() |
|
if not test: |
|
raise HTTPException(status_code=404, detail="Тест не найден") |
|
return test |
|
|
|
|
|
@router.post("", response_model=TestOut, status_code=201) |
|
async def create_test(data: TestCreate, db: AsyncSession = Depends(get_db)): |
|
test = Test( |
|
title=data.title, |
|
description=data.description, |
|
passing_score=data.passing_score, |
|
time_limit=data.time_limit, |
|
allow_navigation_back=data.allow_navigation_back, |
|
) |
|
db.add(test) |
|
await db.flush() |
|
|
|
for order, q_data in enumerate(data.questions): |
|
question = Question(test_id=test.id, text=q_data.text, order=order) |
|
db.add(question) |
|
await db.flush() |
|
|
|
for a_data in q_data.answers: |
|
db.add(Answer( |
|
question_id=question.id, |
|
text=a_data.text, |
|
is_correct=a_data.is_correct, |
|
)) |
|
|
|
await db.commit() |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == test.id) |
|
) |
|
return result.scalar_one() |
|
|
|
|
|
@router.put("/{test_id}", response_model=TestUpdateResponse) |
|
async def update_test(test_id: int, data: TestCreate, db: AsyncSession = Depends(get_db)): |
|
result = await db.execute( |
|
select(Test) |
|
.where(Test.id == test_id, Test.is_active == True) |
|
) |
|
test = result.scalar_one_or_none() |
|
if not test: |
|
raise HTTPException(status_code=404, detail="Тест не найден") |
|
|
|
# Проверяем, есть ли уже попытки прохождения |
|
attempts_count = await db.scalar( |
|
select(func.count()).select_from(TestAttempt).where(TestAttempt.test_id == test_id) |
|
) |
|
|
|
if attempts_count == 0: |
|
# Редактируем на месте: обновляем поля, пересоздаём вопросы |
|
test.title = data.title |
|
test.description = data.description |
|
test.passing_score = data.passing_score |
|
test.time_limit = data.time_limit |
|
test.allow_navigation_back = data.allow_navigation_back |
|
|
|
await db.execute(delete(Question).where(Question.test_id == test_id)) |
|
await db.flush() |
|
|
|
for order, q_data in enumerate(data.questions): |
|
question = Question(test_id=test.id, text=q_data.text, order=order) |
|
db.add(question) |
|
await db.flush() |
|
for a_data in q_data.answers: |
|
db.add(Answer( |
|
question_id=question.id, |
|
text=a_data.text, |
|
is_correct=a_data.is_correct, |
|
)) |
|
|
|
await db.commit() |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == test.id) |
|
) |
|
return {"test": result.scalar_one(), "is_new_version": False} |
|
|
|
else: |
|
# Есть попытки — создаём новую версию |
|
new_test = Test( |
|
title=data.title, |
|
description=data.description, |
|
passing_score=data.passing_score, |
|
time_limit=data.time_limit, |
|
allow_navigation_back=data.allow_navigation_back, |
|
version=test.version + 1, |
|
parent_id=test.id, |
|
) |
|
db.add(new_test) |
|
await db.flush() |
|
|
|
for order, q_data in enumerate(data.questions): |
|
question = Question(test_id=new_test.id, text=q_data.text, order=order) |
|
db.add(question) |
|
await db.flush() |
|
for a_data in q_data.answers: |
|
db.add(Answer( |
|
question_id=question.id, |
|
text=a_data.text, |
|
is_correct=a_data.is_correct, |
|
)) |
|
|
|
await db.commit() |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == new_test.id) |
|
) |
|
return {"test": result.scalar_one(), "is_new_version": True}
|
|
|