"""Кто видит тест: автор + назначенные пользователи.""" from __future__ import annotations import os from dataclasses import dataclass from sqlalchemy import exists, func from ..db import get_session from ..models import Question, Test, TestAssignment, TestAssignmentTarget, TestAttempt, TestVersion, User def _truthy_env(val: str | None) -> bool: return (val or '').strip().lower() in ('1', 'true', 'yes', 'on') def is_test_edit_open() -> bool: """Пока без RBAC: любой залогиненный пользователь может править любой тест. Задайте ``CLINIC_TESTS_RESTRICT_EDIT_TO_AUTHOR=1``, чтобы снова требовать роль автора для редактирования, подсказок, списка попыток и разбора чужих попыток. """ return not _truthy_env(os.environ.get('CLINIC_TESTS_RESTRICT_EDIT_TO_AUTHOR')) def is_test_author(created_by, user_id) -> bool: if created_by is None or user_id is None: return False return str(created_by) == str(user_id) @dataclass class AccessResult: ok: bool is_author: bool not_found: bool def user_has_test_access(user_id: str, test_id: str) -> AccessResult: import uuid as _uuid session = get_session() try: tid = _uuid.UUID(test_id) uid = _uuid.UUID(user_id) except (ValueError, AttributeError): return AccessResult(ok=False, is_author=False, not_found=True) test = session.get(Test, tid) if not test: return AccessResult(ok=False, is_author=False, not_found=True) if is_test_author(test.created_by, uid): return AccessResult(ok=True, is_author=True, not_found=False) # Глобальная политика доступа: любой активный тест доступен всем сотрудникам. if bool(test.is_active): return AccessResult(ok=True, is_author=False, not_found=False) assigned = session.query( exists().where( TestAssignmentTarget.target_type == 'user', TestAssignmentTarget.target_id == uid, TestAssignmentTarget.assignment_id == TestAssignment.id, TestAssignment.test_version_id == TestVersion.id, TestVersion.test_id == tid, ) ).scalar() return AccessResult(ok=bool(assigned), is_author=False, not_found=False) def list_visible_tests(user_id: str) -> list[dict]: """В dev-режиме возвращает все активные тесты независимо от назначения.""" import uuid as _uuid session = get_session() try: uid = _uuid.UUID(user_id) except (ValueError, AttributeError): uid = None qcount_sq = ( session.query( Question.test_version_id.label('tv_id'), func.count(Question.id).label('qc'), ) .group_by(Question.test_version_id) .subquery() ) rows = ( session.query(Test, TestVersion, User, qcount_sq.c.qc) .join(TestVersion, (TestVersion.test_id == Test.id) & TestVersion.is_active.is_(True)) .outerjoin(User, User.id == Test.created_by) .outerjoin(qcount_sq, qcount_sq.c.tv_id == TestVersion.id) .filter(Test.is_active.is_(True)) .order_by(Test.updated_at.desc().nullslast(), Test.created_at.desc()) .all() ) return [ { 'id': str(t.id), 'title': t.title, 'description': t.description, 'chain_active': t.is_active, 'created_at': t.created_at, 'updated_at': t.updated_at, 'active_version_id': str(tv.id), 'version': tv.version, 'created_by': str(t.created_by) if t.created_by else None, 'author_full_name': u.full_name if u else '—', 'passing_threshold': int(t.passing_threshold or 0), 'time_limit': t.time_limit, 'result_mode': (t.result_mode or 'end'), 'hints_enabled': bool(t.hints_enabled), 'questions_count': int(qc or 0), 'has_in_progress_attempt': bool( uid and session.query( exists().where( TestAttempt.user_id == uid, TestAttempt.status == 'in_progress', TestAttempt.test_version_id == TestVersion.id, TestVersion.test_id == t.id, ) ).scalar() ), } for t, tv, u, qc in rows ] def list_hidden_by_author(user_id: str) -> list[dict]: import uuid as _uuid session = get_session() try: uid = _uuid.UUID(user_id) except (ValueError, AttributeError): return [] qcount_sq = ( session.query( Question.test_version_id.label('tv_id'), func.count(Question.id).label('qc'), ) .group_by(Question.test_version_id) .subquery() ) rows = ( session.query(Test, TestVersion, User, qcount_sq.c.qc) .join(TestVersion, (TestVersion.test_id == Test.id) & TestVersion.is_active.is_(True)) .join(User, User.id == Test.created_by) .outerjoin(qcount_sq, qcount_sq.c.tv_id == TestVersion.id) .filter(Test.is_active.is_(False), Test.created_by == uid) .order_by(Test.updated_at.desc().nullslast(), Test.created_at.desc()) .all() ) return [ { 'id': str(t.id), 'title': t.title, 'description': t.description, 'chain_active': t.is_active, 'created_at': t.created_at, 'updated_at': t.updated_at, 'active_version_id': str(tv.id), 'version': tv.version, 'created_by': str(t.created_by), 'author_full_name': u.full_name, 'passing_threshold': int(t.passing_threshold or 0), 'time_limit': t.time_limit, 'result_mode': (t.result_mode or 'end'), 'hints_enabled': bool(t.hints_enabled), 'questions_count': int(qc or 0), 'has_in_progress_attempt': bool( session.query( exists().where( TestAttempt.user_id == uid, TestAttempt.status == 'in_progress', TestAttempt.test_version_id == TestVersion.id, TestVersion.test_id == t.id, ) ).scalar() ), } for t, tv, u, qc in rows ]