@ -1,5 +1,5 @@
from fastapi import APIRouter , Depends , HTTPException
from sqlalchemy import delete , func , select
from sqlalchemy import delete , func , select , update
from sqlalchemy . ext . asyncio import AsyncSession
from sqlalchemy . orm import selectinload
@ -13,13 +13,11 @@ router = APIRouter(prefix="/api/tests", tags=["tests"])
@router . get ( " " , response_model = list [ TestListItem ] )
async def list_tests ( db : AsyncSession = Depends ( get_db ) ) :
# Показываем только последние версии: те, чей id не упоминается как parent_id
parent_ids_subq = select ( Test . parent_id ) . where ( Test . parent_id . is_not ( None ) )
# Показываем только активные версии (is_active = True)
result = await db . execute (
select ( Test )
. options ( selectinload ( Test . questions ) )
. where ( Test . is_active == True , Test . id . not_in ( parent_ids_subq ) )
. where ( Test . is_active == True )
. order_by ( Test . created_at . desc ( ) )
)
tests = result . scalars ( ) . all ( )
@ -35,10 +33,11 @@ async def list_tests(db: AsyncSession = Depends(get_db)):
@router . get ( " / {test_id} " , response_model = TestOut )
async def get_test ( test_id : int , db : AsyncSession = Depends ( get_db ) ) :
# Загружаем любую версию по id (без фильтра is_active — нужно для просмотра истории)
result = await db . execute (
select ( Test )
. options ( selectinload ( Test . questions ) . selectinload ( Question . answers ) )
. where ( Test . id == test_id , Test . is_active == True )
. where ( Test . id == test_id )
)
test = result . scalar_one_or_none ( )
if not test :
@ -83,7 +82,6 @@ async def create_test(data: TestCreate, db: AsyncSession = Depends(get_db)):
@router . get ( " / {test_id} /versions " , response_model = list [ TestListItem ] )
async def get_test_versions ( test_id : int , db : AsyncSession = Depends ( get_db ) ) :
""" Возвращает все версии теста (от первой к последней). """
# Загружаем текущий тест
result = await db . execute ( select ( Test ) . where ( Test . id == test_id ) )
test = result . scalar_one_or_none ( )
if not test :
@ -118,23 +116,54 @@ async def get_test_versions(test_id: int, db: AsyncSession = Depends(get_db)):
return items
@router . put ( " / {test_id} " , response_model = TestUpdateResponse )
async def update_test ( test_id : int , data : TestCreate , db : AsyncSession = Depends ( get_db ) ) :
@router . post ( " / {test_id} /activate " , response_model = TestOut )
async def activate_test_version ( test_id : int , db : AsyncSession = Depends ( get_db ) ) :
""" Делает указанную версию активной, деактивирует все остальные в цепочке. """
result = await db . execute ( select ( Test ) . where ( Test . id == test_id ) )
test = result . scalar_one_or_none ( )
if not test :
raise HTTPException ( status_code = 404 , detail = " Тест не найден " )
# Идём вверх до корневой версии
root = test
while root . parent_id is not None :
result = await db . execute ( select ( Test ) . where ( Test . id == root . parent_id ) )
root = result . scalar_one ( )
# Собираем все id версий в цепочке
all_ids : list [ int ] = [ ]
current = root
while current is not None :
all_ids . append ( current . id )
result = await db . execute ( select ( Test ) . where ( Test . parent_id == current . id ) )
current = result . scalar_one_or_none ( )
# Деактивируем все, активируем нужную
await db . execute ( update ( Test ) . where ( Test . id . in_ ( all_ids ) ) . values ( is_active = False ) )
await db . execute ( update ( Test ) . where ( Test . id == test_id ) . values ( is_active = True ) )
await db . commit ( )
result = await db . execute (
select ( Test )
. where ( Test . id == test_id , Test . is_active == True )
. options ( selectinload ( Test . questions ) . selectinload ( Question . answers ) )
. where ( Test . id == test_id )
)
return result . scalar_one ( )
@router . put ( " / {test_id} " , response_model = TestUpdateResponse )
async def update_test ( test_id : int , data : TestCreate , db : AsyncSession = Depends ( get_db ) ) :
result = await db . execute ( select ( Test ) . where ( Test . id == test_id ) )
test = result . scalar_one_or_none ( )
if not test :
raise HTTPException ( status_code = 404 , detail = " Тест не найден " )
# Проверяем, есть ли уже попытки прохождения
attempts_count = await db . scalar (
select ( func . count ( ) ) . select_from ( TestAttempt ) . where ( TestAttempt . test_id == test_id )
)
if attempts_count == 0 :
# Редактируем на месте: обновляем поля, пересоздаём вопросы
# Редактируем на месте
test . title = data . title
test . description = data . description
test . passing_score = data . passing_score
@ -165,7 +194,9 @@ async def update_test(test_id: int, data: TestCreate, db: AsyncSession = Depends
return { " test " : result . scalar_one ( ) , " is_new_version " : False }
else :
# Есть попытки — создаём новую версию
# Есть попытки — создаём новую версию, деактивируем текущую
test . is_active = False
new_test = Test (
title = data . title ,
description = data . description ,
@ -174,6 +205,7 @@ async def update_test(test_id: int, data: TestCreate, db: AsyncSession = Depends
allow_navigation_back = data . allow_navigation_back ,
version = test . version + 1 ,
parent_id = test . id ,
is_active = True ,
)
db . add ( new_test )
await db . flush ( )