Приложение для тестирования сотрудников клиники методом один вопрос - до пяти ответов один из которых правильный. Сотрудник должен выбрать правильный вариант ответа
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

199 lines
7.0 KiB

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.attempt import TestAttempt
from app.models.test import Answer, Question, Test
from app.schemas.test import TestCreate, TestListItem, TestOut, TestUpdateResponse
router = APIRouter(prefix="/api/tests", tags=["tests"])
@router.get("", response_model=list[TestListItem])
async def list_tests(db: AsyncSession = Depends(get_db)):
# Показываем только последние версии: те, чей id не упоминается как parent_id
parent_ids_subq = select(Test.parent_id).where(Test.parent_id.is_not(None))
result = await db.execute(
select(Test)
.options(selectinload(Test.questions))
.where(Test.is_active == True, Test.id.not_in(parent_ids_subq))
.order_by(Test.created_at.desc())
)
tests = result.scalars().all()
items = []
for test in tests:
item = TestListItem.model_validate(test)
item.questions_count = len(test.questions)
items.append(item)
return items
@router.get("/{test_id}", response_model=TestOut)
async def get_test(test_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == test_id, Test.is_active == True)
)
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
return test
@router.post("", response_model=TestOut, status_code=201)
async def create_test(data: TestCreate, db: AsyncSession = Depends(get_db)):
test = Test(
title=data.title,
description=data.description,
passing_score=data.passing_score,
time_limit=data.time_limit,
allow_navigation_back=data.allow_navigation_back,
)
db.add(test)
await db.flush()
for order, q_data in enumerate(data.questions):
question = Question(test_id=test.id, text=q_data.text, order=order)
db.add(question)
await db.flush()
for a_data in q_data.answers:
db.add(Answer(
question_id=question.id,
text=a_data.text,
is_correct=a_data.is_correct,
))
await db.commit()
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == test.id)
)
return result.scalar_one()
@router.get("/{test_id}/versions", response_model=list[TestListItem])
async def get_test_versions(test_id: int, db: AsyncSession = Depends(get_db)):
"""Возвращает все версии теста (от первой к последней)."""
# Загружаем текущий тест
result = await db.execute(select(Test).where(Test.id == test_id))
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
# Идём вверх до корневой версии
root = test
while root.parent_id is not None:
result = await db.execute(select(Test).where(Test.id == root.parent_id))
root = result.scalar_one()
# Идём вниз от корня, собирая цепочку
versions: list[Test] = []
current = root
while current is not None:
result = await db.execute(
select(Test)
.options(selectinload(Test.questions))
.where(Test.id == current.id)
)
current_with_qs = result.scalar_one()
versions.append(current_with_qs)
result = await db.execute(select(Test).where(Test.parent_id == current.id))
current = result.scalar_one_or_none()
items = []
for v in versions:
item = TestListItem.model_validate(v)
item.questions_count = len(v.questions)
items.append(item)
return items
@router.put("/{test_id}", response_model=TestUpdateResponse)
async def update_test(test_id: int, data: TestCreate, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Test)
.where(Test.id == test_id, Test.is_active == True)
)
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
# Проверяем, есть ли уже попытки прохождения
attempts_count = await db.scalar(
select(func.count()).select_from(TestAttempt).where(TestAttempt.test_id == test_id)
)
if attempts_count == 0:
# Редактируем на месте: обновляем поля, пересоздаём вопросы
test.title = data.title
test.description = data.description
test.passing_score = data.passing_score
test.time_limit = data.time_limit
test.allow_navigation_back = data.allow_navigation_back
await db.execute(delete(Question).where(Question.test_id == test_id))
await db.flush()
for order, q_data in enumerate(data.questions):
question = Question(test_id=test.id, text=q_data.text, order=order)
db.add(question)
await db.flush()
for a_data in q_data.answers:
db.add(Answer(
question_id=question.id,
text=a_data.text,
is_correct=a_data.is_correct,
))
await db.commit()
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == test.id)
)
return {"test": result.scalar_one(), "is_new_version": False}
else:
# Есть попытки — создаём новую версию
new_test = Test(
title=data.title,
description=data.description,
passing_score=data.passing_score,
time_limit=data.time_limit,
allow_navigation_back=data.allow_navigation_back,
version=test.version + 1,
parent_id=test.id,
)
db.add(new_test)
await db.flush()
for order, q_data in enumerate(data.questions):
question = Question(test_id=new_test.id, text=q_data.text, order=order)
db.add(question)
await db.flush()
for a_data in q_data.answers:
db.add(Answer(
question_id=question.id,
text=a_data.text,
is_correct=a_data.is_correct,
))
await db.commit()
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == new_test.id)
)
return {"test": result.scalar_one(), "is_new_version": True}