You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

354 lines
14 KiB

from __future__ import annotations
from sqlalchemy import text
from ..services.test_access import is_test_author, user_has_test_access
class HttpError(Exception):
def __init__(self, status: int, message: str):
super().__init__(message)
self.status = status
self.message = message
def _sort_uuid_strings(items) -> list[str]:
return sorted({str(x) for x in (items or []) if x is not None})
def _same_selection(selected, correct_ids) -> bool:
a = _sort_uuid_strings(selected)
b = _sort_uuid_strings(correct_ids)
return a == b
def load_questions_for_version(conn, test_version_id, *, include_correct: bool) -> list[dict]:
qrows = conn.execute(
text(
'SELECT id, text, question_order, has_multiple_answers '
'FROM questions WHERE test_version_id = :v ORDER BY question_order'
),
{'v': test_version_id},
).mappings().all()
out = []
for q in qrows:
orows = conn.execute(
text(
'SELECT id, text, is_correct, option_order '
'FROM answer_options WHERE question_id = :q ORDER BY option_order'
),
{'q': q['id']},
).mappings().all()
opts = []
for o in orows:
base = {
'id': str(o['id']),
'text': o['text'],
'optionOrder': o['option_order'],
}
if include_correct:
base['isCorrect'] = bool(o['is_correct'])
opts.append(base)
out.append(
{
'id': str(q['id']),
'text': q['text'],
'questionOrder': q['question_order'],
'hasMultipleAnswers': bool(q['has_multiple_answers']),
'options': opts,
}
)
return out
def start_attempt(eng, user_id: str, test_id: str) -> dict:
acc = user_has_test_access(user_id, test_id)
if not acc.ok:
raise HttpError(404, 'Тест не найден.')
with eng.begin() as conn:
tv = conn.execute(
text(
'SELECT id AS test_version_id FROM test_versions '
'WHERE test_id = :id AND is_active = true LIMIT 1'
),
{'id': test_id},
).mappings().first()
if not tv:
raise HttpError(404, 'Нет активной версии теста.')
version_id = tv['test_version_id']
mx = conn.execute(
text(
'SELECT COALESCE(MAX(attempt_number), 0) AS n FROM test_attempts '
'WHERE test_version_id = :v AND user_id = :u'
),
{'v': version_id, 'u': user_id},
).mappings().first()
next_n = int(mx['n'] or 0) + 1
a = conn.execute(
text(
"INSERT INTO test_attempts (test_version_id, user_id, attempt_number, status) "
"VALUES (:v, :u, :n, 'in_progress') "
'RETURNING id, test_version_id, user_id, attempt_number, status, started_at'
),
{'v': version_id, 'u': user_id, 'n': next_n},
).mappings().first()
return {'attempt': dict(a)}
def get_play_content(eng, user_id: str, test_id: str, attempt_id: str) -> dict:
with eng.connect() as conn:
a = conn.execute(
text(
'SELECT ta.id, ta.user_id, ta.status, ta.test_version_id, tv.test_id, '
't.title, t.passing_threshold '
'FROM test_attempts ta '
'INNER JOIN test_versions tv ON tv.id = ta.test_version_id '
'INNER JOIN tests t ON t.id = tv.test_id '
'WHERE ta.id = :a'
),
{'a': attempt_id},
).mappings().first()
if not a:
raise HttpError(404, 'Попытка не найдена.')
if str(a['test_id']) != str(test_id):
raise HttpError(404, 'Попытка не найдена.')
if str(a['user_id']) != str(user_id):
raise HttpError(403, 'Доступ запрещён.')
if a['status'] != 'in_progress':
raise HttpError(400, 'Попытка уже завершена.')
qs = load_questions_for_version(conn, a['test_version_id'], include_correct=False)
return {
'testTitle': a['title'],
'passingThreshold': a['passing_threshold'],
'attemptId': str(a['id']),
'questions': qs,
}
def submit_attempt(eng, user_id: str, test_id: str, attempt_id: str, raw_answers: dict | None) -> dict:
answers = raw_answers if isinstance(raw_answers, dict) else {}
with eng.begin() as conn:
a = conn.execute(
text(
'SELECT id, user_id, status, test_version_id '
'FROM test_attempts WHERE id = :a FOR UPDATE'
),
{'a': attempt_id},
).mappings().first()
if not a:
raise HttpError(404, 'Попытка не найдена.')
link = conn.execute(
text(
'SELECT t.passing_threshold, tv.test_id '
'FROM test_versions tv '
'INNER JOIN tests t ON t.id = tv.test_id '
'WHERE tv.id = :v'
),
{'v': a['test_version_id']},
).mappings().first()
if not link:
raise HttpError(404, 'Тест не найден.')
if str(link['test_id']) != str(test_id):
raise HttpError(404, 'Попытка не найдена.')
if str(a['user_id']) != str(user_id):
raise HttpError(403, 'Доступ запрещён.')
if a['status'] != 'in_progress':
raise HttpError(400, 'Попытка уже завершена.')
qrows = conn.execute(
text('SELECT id FROM questions WHERE test_version_id = :v'),
{'v': a['test_version_id']},
).mappings().all()
if not qrows:
raise HttpError(400, 'В тесте нет вопросов.')
opts = conn.execute(
text(
'SELECT a.id, a.question_id, a.is_correct '
'FROM answer_options a '
'INNER JOIN questions q ON q.id = a.question_id '
'WHERE q.test_version_id = :v'
),
{'v': a['test_version_id']},
).mappings().all()
by_q = {}
for o in opts:
qid = str(o['question_id'])
if qid not in by_q:
by_q[qid] = {'all': set(), 'correct': []}
by_q[qid]['all'].add(str(o['id']))
if o['is_correct']:
by_q[qid]['correct'].append(str(o['id']))
correct_count = 0
for q in qrows:
qid = str(q['id'])
selected = answers.get(qid, [])
if not isinstance(selected, list):
selected = [str(selected)]
selected = [str(x) for x in selected]
g = by_q.get(qid, {'all': set(), 'correct': []})
for sid in selected:
if sid not in g['all']:
raise HttpError(400, 'Некорректный вариант ответа.')
if _same_selection(selected, g['correct']):
correct_count += 1
total = len(qrows)
percent = (correct_count / total) * 100 if total else 0
threshold = int(link['passing_threshold'] or 0)
passed = percent + 1e-9 >= threshold
conn.execute(text('DELETE FROM user_answers WHERE attempt_id = :a'), {'a': attempt_id})
for q in qrows:
qid = str(q['id'])
selected = answers.get(qid, [])
if not isinstance(selected, list):
selected = [str(selected)]
selected = [str(x) for x in selected]
conn.execute(
text(
'INSERT INTO user_answers (attempt_id, question_id, selected_options) '
'VALUES (:a, :q, :s::uuid[])'
),
{'a': attempt_id, 'q': q['id'], 's': selected},
)
conn.execute(
text(
"UPDATE test_attempts SET status = 'completed', completed_at = CURRENT_TIMESTAMP, "
'correct_count = :c, total_questions = :t, passed = :p WHERE id = :a'
),
{'a': attempt_id, 'c': correct_count, 't': total, 'p': passed},
)
review = build_review_from_db(eng, attempt_id)
return {
'attemptId': attempt_id,
'correctCount': correct_count,
'totalQuestions': total,
'percent': round(percent, 1),
'passed': passed,
'passingThreshold': threshold,
'review': review,
}
def build_review_from_db(eng, attempt_id: str) -> dict:
with eng.connect() as conn:
a = conn.execute(
text(
'SELECT ta.id, ta.status, ta.test_version_id, ta.user_id, ta.correct_count, ta.total_questions, '
'ta.passed, ta.started_at, ta.completed_at, '
't.id AS test_id, t.title, t.passing_threshold, '
'u.full_name AS attempter_name, u.login AS attempter_login '
'FROM test_attempts ta '
'INNER JOIN test_versions tv ON tv.id = ta.test_version_id '
'INNER JOIN tests t ON t.id = tv.test_id '
'INNER JOIN users u ON u.id = ta.user_id '
'WHERE ta.id = :a'
),
{'a': attempt_id},
).mappings().first()
if not a:
raise HttpError(404, 'Попытка не найдена.')
if a['status'] != 'completed':
raise HttpError(400, 'Попытка не завершена.')
questions = load_questions_for_version(conn, a['test_version_id'], include_correct=True)
uans = conn.execute(
text('SELECT question_id, selected_options FROM user_answers WHERE attempt_id = :a'),
{'a': attempt_id},
).mappings().all()
sel_by_q = {str(r['question_id']): [str(x) for x in (r['selected_options'] or [])] for r in uans}
total = int(a['total_questions'] or len(questions))
percent = round(((a['correct_count'] or 0) / total) * 100, 1) if total else 0
q_out = []
for q in questions:
selected = _sort_uuid_strings(sel_by_q.get(str(q['id']), []))
correct = _sort_uuid_strings([o['id'] for o in q['options'] if o.get('isCorrect')])
selected_set = set(selected)
q_out.append(
{
'id': q['id'],
'text': q['text'],
'hasMultipleAnswers': q['hasMultipleAnswers'],
'isUserCorrect': _same_selection(selected, correct),
'options': [
{
'id': o['id'],
'text': o['text'],
'isCorrect': o.get('isCorrect', False),
'selected': o['id'] in selected_set,
}
for o in q['options']
],
}
)
return {
'attemptId': str(a['id']),
'testId': str(a['test_id']),
'testTitle': a['title'],
'passingThreshold': int(a['passing_threshold'] or 0),
'correctCount': int(a['correct_count'] or 0),
'totalQuestions': total,
'percent': percent,
'passed': bool(a['passed']),
'startedAt': a['started_at'].isoformat() if a['started_at'] else None,
'completedAt': a['completed_at'].isoformat() if a['completed_at'] else None,
'attempterUserId': str(a['user_id']),
'attempterName': a['attempter_name'],
'attempterLogin': a['attempter_login'],
'questions': q_out,
}
def get_attempt_review_for_user(eng, current_user_id: str, test_id: str, attempt_id: str) -> dict:
with eng.connect() as conn:
row = conn.execute(
text(
'SELECT ta.user_id, t.created_by, tv.test_id '
'FROM test_attempts ta '
'INNER JOIN test_versions tv ON tv.id = ta.test_version_id '
'INNER JOIN tests t ON t.id = tv.test_id '
'WHERE ta.id = :a'
),
{'a': attempt_id},
).mappings().first()
if not row:
raise HttpError(404, 'Попытка не найдена.')
if str(row['test_id']) != str(test_id):
raise HttpError(404, 'Попытка не найдена.')
is_owner = str(row['user_id']) == str(current_user_id)
is_author = is_test_author(row['created_by'], current_user_id)
if not is_owner and not is_author:
raise HttpError(403, 'Доступ запрещён.')
return build_review_from_db(eng, attempt_id)
def list_test_attempts_for_author(eng, author_id: str, test_id: str) -> list[dict]:
with eng.connect() as conn:
t = conn.execute(
text('SELECT id, created_by FROM tests WHERE id = :id'),
{'id': test_id},
).mappings().first()
if not t:
raise HttpError(404, 'Тест не найден.')
if not is_test_author(t['created_by'], author_id):
raise HttpError(403, 'Доступ запрещён.')
rows = conn.execute(
text(
'SELECT ta.id, ta.user_id, ta.status, ta.attempt_number, ta.started_at, ta.completed_at, '
'ta.correct_count, ta.total_questions, ta.passed, tv.version AS test_version, '
'u.full_name AS attempter_name, u.login AS attempter_login '
'FROM test_attempts ta '
'INNER JOIN test_versions tv ON tv.id = ta.test_version_id '
'INNER JOIN users u ON u.id = ta.user_id '
'WHERE tv.test_id = :id '
'ORDER BY ta.started_at DESC NULLS LAST LIMIT 200'
),
{'id': test_id},
).mappings().all()
return [dict(r) for r in rows]