You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
236 lines
8.6 KiB
236 lines
8.6 KiB
from fastapi import APIRouter, Depends, HTTPException |
|
from sqlalchemy import delete, func, select, update |
|
from sqlalchemy.ext.asyncio import AsyncSession |
|
from sqlalchemy.orm import selectinload |
|
|
|
from app.database import get_db |
|
from app.models.attempt import TestAttempt |
|
from app.models.test import Answer, Question, Test |
|
from app.schemas.test import TestCreate, TestListItem, TestOut, TestUpdateResponse |
|
|
|
router = APIRouter(prefix="/api/tests", tags=["tests"]) |
|
|
|
|
|
@router.get("", response_model=list[TestListItem]) |
|
async def list_tests(db: AsyncSession = Depends(get_db)): |
|
# Показываем только активные версии (is_active = True) |
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions)) |
|
.where(Test.is_active == True) |
|
.order_by(Test.created_at.desc()) |
|
) |
|
tests = result.scalars().all() |
|
|
|
items = [] |
|
for test in tests: |
|
item = TestListItem.model_validate(test) |
|
item.questions_count = len(test.questions) |
|
items.append(item) |
|
|
|
return items |
|
|
|
|
|
@router.get("/{test_id}", response_model=TestOut) |
|
async def get_test(test_id: int, db: AsyncSession = Depends(get_db)): |
|
# Загружаем любую версию по id (без фильтра is_active — нужно для просмотра истории) |
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == test_id) |
|
) |
|
test = result.scalar_one_or_none() |
|
if not test: |
|
raise HTTPException(status_code=404, detail="Тест не найден") |
|
return test |
|
|
|
|
|
@router.post("", response_model=TestOut, status_code=201) |
|
async def create_test(data: TestCreate, db: AsyncSession = Depends(get_db)): |
|
test = Test( |
|
title=data.title, |
|
description=data.description, |
|
passing_score=data.passing_score, |
|
time_limit=data.time_limit, |
|
allow_navigation_back=data.allow_navigation_back, |
|
) |
|
db.add(test) |
|
await db.flush() |
|
|
|
for order, q_data in enumerate(data.questions): |
|
question = Question(test_id=test.id, text=q_data.text, order=order) |
|
db.add(question) |
|
await db.flush() |
|
|
|
for a_data in q_data.answers: |
|
db.add(Answer( |
|
question_id=question.id, |
|
text=a_data.text, |
|
is_correct=a_data.is_correct, |
|
)) |
|
|
|
await db.commit() |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == test.id) |
|
) |
|
return result.scalar_one() |
|
|
|
|
|
@router.get("/{test_id}/versions", response_model=list[TestListItem]) |
|
async def get_test_versions(test_id: int, db: AsyncSession = Depends(get_db)): |
|
"""Возвращает все версии теста (от первой к последней).""" |
|
result = await db.execute(select(Test).where(Test.id == test_id)) |
|
test = result.scalar_one_or_none() |
|
if not test: |
|
raise HTTPException(status_code=404, detail="Тест не найден") |
|
|
|
# Идём вверх до корневой версии |
|
root = test |
|
while root.parent_id is not None: |
|
result = await db.execute(select(Test).where(Test.id == root.parent_id)) |
|
root = result.scalar_one() |
|
|
|
# Идём вниз от корня, собирая цепочку |
|
versions: list[Test] = [] |
|
current = root |
|
while current is not None: |
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions)) |
|
.where(Test.id == current.id) |
|
) |
|
current_with_qs = result.scalar_one() |
|
versions.append(current_with_qs) |
|
|
|
result = await db.execute(select(Test).where(Test.parent_id == current.id)) |
|
current = result.scalar_one_or_none() |
|
|
|
items = [] |
|
for v in versions: |
|
item = TestListItem.model_validate(v) |
|
item.questions_count = len(v.questions) |
|
items.append(item) |
|
return items |
|
|
|
|
|
@router.post("/{test_id}/activate", response_model=TestOut) |
|
async def activate_test_version(test_id: int, db: AsyncSession = Depends(get_db)): |
|
"""Делает указанную версию активной, деактивирует все остальные в цепочке.""" |
|
result = await db.execute(select(Test).where(Test.id == test_id)) |
|
test = result.scalar_one_or_none() |
|
if not test: |
|
raise HTTPException(status_code=404, detail="Тест не найден") |
|
|
|
# Идём вверх до корневой версии |
|
root = test |
|
while root.parent_id is not None: |
|
result = await db.execute(select(Test).where(Test.id == root.parent_id)) |
|
root = result.scalar_one() |
|
|
|
# Собираем все id версий в цепочке |
|
all_ids: list[int] = [] |
|
current = root |
|
while current is not None: |
|
all_ids.append(current.id) |
|
result = await db.execute(select(Test).where(Test.parent_id == current.id)) |
|
current = result.scalar_one_or_none() |
|
|
|
# Деактивируем все, активируем нужную |
|
await db.execute(update(Test).where(Test.id.in_(all_ids)).values(is_active=False)) |
|
await db.execute(update(Test).where(Test.id == test_id).values(is_active=True)) |
|
await db.commit() |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == test_id) |
|
) |
|
return result.scalar_one() |
|
|
|
|
|
@router.put("/{test_id}", response_model=TestUpdateResponse) |
|
async def update_test(test_id: int, data: TestCreate, db: AsyncSession = Depends(get_db)): |
|
result = await db.execute(select(Test).where(Test.id == test_id)) |
|
test = result.scalar_one_or_none() |
|
if not test: |
|
raise HTTPException(status_code=404, detail="Тест не найден") |
|
|
|
attempts_count = await db.scalar( |
|
select(func.count()).select_from(TestAttempt).where(TestAttempt.test_id == test_id) |
|
) |
|
|
|
if attempts_count == 0: |
|
# Редактируем на месте |
|
test.title = data.title |
|
test.description = data.description |
|
test.passing_score = data.passing_score |
|
test.time_limit = data.time_limit |
|
test.allow_navigation_back = data.allow_navigation_back |
|
|
|
# Сначала удаляем ответы (FK: answers.question_id → questions.id) |
|
q_ids_result = await db.execute(select(Question.id).where(Question.test_id == test_id)) |
|
q_ids = [row[0] for row in q_ids_result.fetchall()] |
|
if q_ids: |
|
await db.execute(delete(Answer).where(Answer.question_id.in_(q_ids))) |
|
await db.execute(delete(Question).where(Question.test_id == test_id)) |
|
await db.flush() |
|
|
|
for order, q_data in enumerate(data.questions): |
|
question = Question(test_id=test.id, text=q_data.text, order=order) |
|
db.add(question) |
|
await db.flush() |
|
for a_data in q_data.answers: |
|
db.add(Answer( |
|
question_id=question.id, |
|
text=a_data.text, |
|
is_correct=a_data.is_correct, |
|
)) |
|
|
|
await db.commit() |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == test.id) |
|
) |
|
return {"test": result.scalar_one(), "is_new_version": False} |
|
|
|
else: |
|
# Есть попытки — создаём новую версию, деактивируем текущую |
|
test.is_active = False |
|
|
|
new_test = Test( |
|
title=data.title, |
|
description=data.description, |
|
passing_score=data.passing_score, |
|
time_limit=data.time_limit, |
|
allow_navigation_back=data.allow_navigation_back, |
|
version=test.version + 1, |
|
parent_id=test.id, |
|
is_active=True, |
|
) |
|
db.add(new_test) |
|
await db.flush() |
|
|
|
for order, q_data in enumerate(data.questions): |
|
question = Question(test_id=new_test.id, text=q_data.text, order=order) |
|
db.add(question) |
|
await db.flush() |
|
for a_data in q_data.answers: |
|
db.add(Answer( |
|
question_id=question.id, |
|
text=a_data.text, |
|
is_correct=a_data.is_correct, |
|
)) |
|
|
|
await db.commit() |
|
|
|
result = await db.execute( |
|
select(Test) |
|
.options(selectinload(Test.questions).selectinload(Question.answers)) |
|
.where(Test.id == new_test.id) |
|
) |
|
return {"test": result.scalar_one(), "is_new_version": True}
|
|
|