from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import delete, func, select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import get_db from app.models.attempt import TestAttempt from app.models.test import Answer, Question, Test from app.schemas.test import TestCreate, TestListItem, TestOut, TestUpdateResponse router = APIRouter(prefix="/api/tests", tags=["tests"]) @router.get("", response_model=list[TestListItem]) async def list_tests(db: AsyncSession = Depends(get_db)): # Показываем только активные версии (is_active = True) result = await db.execute( select(Test) .options(selectinload(Test.questions)) .where(Test.is_active == True) .order_by(Test.created_at.desc()) ) tests = result.scalars().all() items = [] for test in tests: item = TestListItem.model_validate(test) item.questions_count = len(test.questions) items.append(item) return items @router.get("/{test_id}", response_model=TestOut) async def get_test(test_id: int, db: AsyncSession = Depends(get_db)): # Загружаем любую версию по id (без фильтра is_active — нужно для просмотра истории) result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == test_id) ) test = result.scalar_one_or_none() if not test: raise HTTPException(status_code=404, detail="Тест не найден") return test @router.post("", response_model=TestOut, status_code=201) async def create_test(data: TestCreate, db: AsyncSession = Depends(get_db)): test = Test( title=data.title, description=data.description, passing_score=data.passing_score, time_limit=data.time_limit, allow_navigation_back=data.allow_navigation_back, ) db.add(test) await db.flush() for order, q_data in enumerate(data.questions): question = Question(test_id=test.id, text=q_data.text, order=order) db.add(question) await db.flush() for a_data in q_data.answers: db.add(Answer( question_id=question.id, text=a_data.text, is_correct=a_data.is_correct, )) await db.commit() result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == test.id) ) return result.scalar_one() @router.get("/{test_id}/versions", response_model=list[TestListItem]) async def get_test_versions(test_id: int, db: AsyncSession = Depends(get_db)): """Возвращает все версии теста (от первой к последней).""" result = await db.execute(select(Test).where(Test.id == test_id)) test = result.scalar_one_or_none() if not test: raise HTTPException(status_code=404, detail="Тест не найден") # Идём вверх до корневой версии root = test while root.parent_id is not None: result = await db.execute(select(Test).where(Test.id == root.parent_id)) root = result.scalar_one() # Идём вниз от корня, собирая цепочку versions: list[Test] = [] current = root while current is not None: result = await db.execute( select(Test) .options(selectinload(Test.questions)) .where(Test.id == current.id) ) current_with_qs = result.scalar_one() versions.append(current_with_qs) result = await db.execute(select(Test).where(Test.parent_id == current.id)) current = result.scalar_one_or_none() items = [] for v in versions: item = TestListItem.model_validate(v) item.questions_count = len(v.questions) items.append(item) return items @router.post("/{test_id}/activate", response_model=TestOut) async def activate_test_version(test_id: int, db: AsyncSession = Depends(get_db)): """Делает указанную версию активной, деактивирует все остальные в цепочке.""" result = await db.execute(select(Test).where(Test.id == test_id)) test = result.scalar_one_or_none() if not test: raise HTTPException(status_code=404, detail="Тест не найден") # Идём вверх до корневой версии root = test while root.parent_id is not None: result = await db.execute(select(Test).where(Test.id == root.parent_id)) root = result.scalar_one() # Собираем все id версий в цепочке all_ids: list[int] = [] current = root while current is not None: all_ids.append(current.id) result = await db.execute(select(Test).where(Test.parent_id == current.id)) current = result.scalar_one_or_none() # Деактивируем все, активируем нужную await db.execute(update(Test).where(Test.id.in_(all_ids)).values(is_active=False)) await db.execute(update(Test).where(Test.id == test_id).values(is_active=True)) await db.commit() result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == test_id) ) return result.scalar_one() @router.put("/{test_id}", response_model=TestUpdateResponse) async def update_test(test_id: int, data: TestCreate, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Test).where(Test.id == test_id)) test = result.scalar_one_or_none() if not test: raise HTTPException(status_code=404, detail="Тест не найден") attempts_count = await db.scalar( select(func.count()).select_from(TestAttempt).where(TestAttempt.test_id == test_id) ) if attempts_count == 0: # Редактируем на месте test.title = data.title test.description = data.description test.passing_score = data.passing_score test.time_limit = data.time_limit test.allow_navigation_back = data.allow_navigation_back # Сначала удаляем ответы (FK: answers.question_id → questions.id) q_ids_result = await db.execute(select(Question.id).where(Question.test_id == test_id)) q_ids = [row[0] for row in q_ids_result.fetchall()] if q_ids: await db.execute(delete(Answer).where(Answer.question_id.in_(q_ids))) await db.execute(delete(Question).where(Question.test_id == test_id)) await db.flush() for order, q_data in enumerate(data.questions): question = Question(test_id=test.id, text=q_data.text, order=order) db.add(question) await db.flush() for a_data in q_data.answers: db.add(Answer( question_id=question.id, text=a_data.text, is_correct=a_data.is_correct, )) await db.commit() result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == test.id) ) return {"test": result.scalar_one(), "is_new_version": False} else: # Есть попытки — создаём новую версию, деактивируем текущую test.is_active = False new_test = Test( title=data.title, description=data.description, passing_score=data.passing_score, time_limit=data.time_limit, allow_navigation_back=data.allow_navigation_back, version=test.version + 1, parent_id=test.id, is_active=True, ) db.add(new_test) await db.flush() for order, q_data in enumerate(data.questions): question = Question(test_id=new_test.id, text=q_data.text, order=order) db.add(question) await db.flush() for a_data in q_data.answers: db.add(Answer( question_id=question.id, text=a_data.text, is_correct=a_data.is_correct, )) await db.commit() result = await db.execute( select(Test) .options(selectinload(Test.questions).selectinload(Question.answers)) .where(Test.id == new_test.id) ) return {"test": result.scalar_one(), "is_new_version": True}