Приложение для тестирования сотрудников клиники методом один вопрос - до пяти ответов один из которых правильный. Сотрудник должен выбрать правильный вариант ответа
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

236 lines
8.6 KiB

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import delete, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.attempt import TestAttempt
from app.models.test import Answer, Question, Test
from app.schemas.test import TestCreate, TestListItem, TestOut, TestUpdateResponse
router = APIRouter(prefix="/api/tests", tags=["tests"])
@router.get("", response_model=list[TestListItem])
async def list_tests(db: AsyncSession = Depends(get_db)):
# Показываем только активные версии (is_active = True)
result = await db.execute(
select(Test)
.options(selectinload(Test.questions))
.where(Test.is_active == True)
.order_by(Test.created_at.desc())
)
tests = result.scalars().all()
items = []
for test in tests:
item = TestListItem.model_validate(test)
item.questions_count = len(test.questions)
items.append(item)
return items
@router.get("/{test_id}", response_model=TestOut)
async def get_test(test_id: int, db: AsyncSession = Depends(get_db)):
# Загружаем любую версию по id (без фильтра is_active — нужно для просмотра истории)
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == test_id)
)
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
return test
@router.post("", response_model=TestOut, status_code=201)
async def create_test(data: TestCreate, db: AsyncSession = Depends(get_db)):
test = Test(
title=data.title,
description=data.description,
passing_score=data.passing_score,
time_limit=data.time_limit,
allow_navigation_back=data.allow_navigation_back,
)
db.add(test)
await db.flush()
for order, q_data in enumerate(data.questions):
question = Question(test_id=test.id, text=q_data.text, order=order)
db.add(question)
await db.flush()
for a_data in q_data.answers:
db.add(Answer(
question_id=question.id,
text=a_data.text,
is_correct=a_data.is_correct,
))
await db.commit()
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == test.id)
)
return result.scalar_one()
@router.get("/{test_id}/versions", response_model=list[TestListItem])
async def get_test_versions(test_id: int, db: AsyncSession = Depends(get_db)):
"""Возвращает все версии теста (от первой к последней)."""
result = await db.execute(select(Test).where(Test.id == test_id))
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
# Идём вверх до корневой версии
root = test
while root.parent_id is not None:
result = await db.execute(select(Test).where(Test.id == root.parent_id))
root = result.scalar_one()
# Идём вниз от корня, собирая цепочку
versions: list[Test] = []
current = root
while current is not None:
result = await db.execute(
select(Test)
.options(selectinload(Test.questions))
.where(Test.id == current.id)
)
current_with_qs = result.scalar_one()
versions.append(current_with_qs)
result = await db.execute(select(Test).where(Test.parent_id == current.id))
current = result.scalar_one_or_none()
items = []
for v in versions:
item = TestListItem.model_validate(v)
item.questions_count = len(v.questions)
items.append(item)
return items
@router.post("/{test_id}/activate", response_model=TestOut)
async def activate_test_version(test_id: int, db: AsyncSession = Depends(get_db)):
"""Делает указанную версию активной, деактивирует все остальные в цепочке."""
result = await db.execute(select(Test).where(Test.id == test_id))
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
# Идём вверх до корневой версии
root = test
while root.parent_id is not None:
result = await db.execute(select(Test).where(Test.id == root.parent_id))
root = result.scalar_one()
# Собираем все id версий в цепочке
all_ids: list[int] = []
current = root
while current is not None:
all_ids.append(current.id)
result = await db.execute(select(Test).where(Test.parent_id == current.id))
current = result.scalar_one_or_none()
# Деактивируем все, активируем нужную
await db.execute(update(Test).where(Test.id.in_(all_ids)).values(is_active=False))
await db.execute(update(Test).where(Test.id == test_id).values(is_active=True))
await db.commit()
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == test_id)
)
return result.scalar_one()
@router.put("/{test_id}", response_model=TestUpdateResponse)
async def update_test(test_id: int, data: TestCreate, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Test).where(Test.id == test_id))
test = result.scalar_one_or_none()
if not test:
raise HTTPException(status_code=404, detail="Тест не найден")
attempts_count = await db.scalar(
select(func.count()).select_from(TestAttempt).where(TestAttempt.test_id == test_id)
)
if attempts_count == 0:
# Редактируем на месте
test.title = data.title
test.description = data.description
test.passing_score = data.passing_score
test.time_limit = data.time_limit
test.allow_navigation_back = data.allow_navigation_back
# Сначала удаляем ответы (FK: answers.question_id → questions.id)
q_ids_result = await db.execute(select(Question.id).where(Question.test_id == test_id))
q_ids = [row[0] for row in q_ids_result.fetchall()]
if q_ids:
await db.execute(delete(Answer).where(Answer.question_id.in_(q_ids)))
await db.execute(delete(Question).where(Question.test_id == test_id))
await db.flush()
for order, q_data in enumerate(data.questions):
question = Question(test_id=test.id, text=q_data.text, order=order)
db.add(question)
await db.flush()
for a_data in q_data.answers:
db.add(Answer(
question_id=question.id,
text=a_data.text,
is_correct=a_data.is_correct,
))
await db.commit()
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == test.id)
)
return {"test": result.scalar_one(), "is_new_version": False}
else:
# Есть попытки — создаём новую версию, деактивируем текущую
test.is_active = False
new_test = Test(
title=data.title,
description=data.description,
passing_score=data.passing_score,
time_limit=data.time_limit,
allow_navigation_back=data.allow_navigation_back,
version=test.version + 1,
parent_id=test.id,
is_active=True,
)
db.add(new_test)
await db.flush()
for order, q_data in enumerate(data.questions):
question = Question(test_id=new_test.id, text=q_data.text, order=order)
db.add(question)
await db.flush()
for a_data in q_data.answers:
db.add(Answer(
question_id=question.id,
text=a_data.text,
is_correct=a_data.is_correct,
))
await db.commit()
result = await db.execute(
select(Test)
.options(selectinload(Test.questions).selectinload(Question.answers))
.where(Test.id == new_test.id)
)
return {"test": result.scalar_one(), "is_new_version": True}