You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

134 lines
4.4 KiB

"""Кто видит тест: автор + назначенные пользователи."""
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import exists, select
from ..db import get_session
from ..models import Test, TestAssignment, TestAssignmentTarget, TestAttempt, TestVersion, User
def is_test_author(created_by, user_id) -> bool:
if created_by is None or user_id is None:
return False
return str(created_by) == str(user_id)
@dataclass
class AccessResult:
ok: bool
is_author: bool
not_found: bool
def user_has_test_access(user_id: str, test_id: str) -> AccessResult:
import uuid as _uuid
session = get_session()
try:
tid = _uuid.UUID(test_id)
uid = _uuid.UUID(user_id)
except (ValueError, AttributeError):
return AccessResult(ok=False, is_author=False, not_found=True)
test = session.get(Test, tid)
if not test:
return AccessResult(ok=False, is_author=False, not_found=True)
if is_test_author(test.created_by, uid):
return AccessResult(ok=True, is_author=True, not_found=False)
# Глобальная политика доступа: любой активный тест доступен всем сотрудникам.
if bool(test.is_active):
return AccessResult(ok=True, is_author=False, not_found=False)
assigned = session.query(
exists().where(
TestAssignmentTarget.target_type == 'user',
TestAssignmentTarget.target_id == uid,
TestAssignmentTarget.assignment_id == TestAssignment.id,
TestAssignment.test_version_id == TestVersion.id,
TestVersion.test_id == tid,
)
).scalar()
return AccessResult(ok=bool(assigned), is_author=False, not_found=False)
def list_visible_tests(user_id: str) -> list[dict]:
"""В dev-режиме возвращает все активные тесты независимо от назначения."""
import uuid as _uuid
session = get_session()
try:
uid = _uuid.UUID(user_id)
except (ValueError, AttributeError):
uid = None
rows = (
session.query(Test, TestVersion, User)
.join(TestVersion, (TestVersion.test_id == Test.id) & TestVersion.is_active.is_(True))
.outerjoin(User, User.id == Test.created_by)
.filter(Test.is_active.is_(True))
.order_by(Test.updated_at.desc().nullslast(), Test.created_at.desc())
.all()
)
return [
{
'id': str(t.id),
'title': t.title,
'description': t.description,
'chain_active': t.is_active,
'created_at': t.created_at,
'updated_at': t.updated_at,
'active_version_id': str(tv.id),
'version': tv.version,
'created_by': str(t.created_by) if t.created_by else None,
'author_full_name': u.full_name if u else '',
'has_in_progress_attempt': bool(
uid and session.query(
exists().where(
TestAttempt.user_id == uid,
TestAttempt.status == 'in_progress',
TestAttempt.test_version_id == TestVersion.id,
TestVersion.test_id == t.id,
)
).scalar()
),
}
for t, tv, u in rows
]
def list_hidden_by_author(user_id: str) -> list[dict]:
import uuid as _uuid
session = get_session()
try:
uid = _uuid.UUID(user_id)
except (ValueError, AttributeError):
return []
rows = (
session.query(Test, TestVersion, User)
.join(TestVersion, (TestVersion.test_id == Test.id) & TestVersion.is_active.is_(True))
.join(User, User.id == Test.created_by)
.filter(Test.is_active.is_(False), Test.created_by == uid)
.order_by(Test.updated_at.desc().nullslast(), Test.created_at.desc())
.all()
)
return [
{
'id': str(t.id),
'title': t.title,
'description': t.description,
'chain_active': t.is_active,
'created_at': t.created_at,
'updated_at': t.updated_at,
'active_version_id': str(tv.id),
'version': tv.version,
'created_by': str(t.created_by),
'author_full_name': u.full_name,
}
for t, tv, u in rows
]