feat(flask): E1.0–E1.3, E1.8 — миграция на Python/Flask + AI v2

Этап 1 миграции TestingWebApp на целевой стек (Python/Flask/Jinja),
БД остаётся clinic_tests.

E1.0 — База Flask-приложения: SQLAlchemy/psycopg2 пул, Flask sessions,
фабрика create_app, blueprint main с / и /health, base.html в стиле
кабинета HR (Tailwind CDN + Manrope + Material Symbols), 404/500.

E1.1 — Auth + /api/me: Flask sessions (signed cookie) вместо JWT,
bcrypt + Werkzeug, опц. HR_AUTH=1 с UPSERT в clinic_tests.users по
staff_id. UI /login, JSON /api/auth/{login,logout,me}, декораторы
@login_required / @require_role.

E1.2 — Тесты: список + редактор. 10 эндпоинтов, сервисы test_draft,
test_access, test_chain, ai_editor, llm_client, draft_validator,
editor_content. UI /tests (каталог + создание) и /tests/<id>/edit
(редактор с AI). Полный мобильный UX (аккордеоны/drag-n-drop) — в E1.7.

E1.3 — Импорт документов: pypdf + python-docx, эндпоинт
POST /api/tests/import/document, кнопка «Импорт документа» в
AI-панели редактора, лимит 16 МБ.

E1.8 — AI v2: страница /settings (статус ENV-ключа + ping),
ai/generate-by-title (без сетки), ai/check (рецензия), ai/improve
(массовое было→стало с чекбоксами). Унифицированный ответ AI-ошибок:
{ error, code, settingsUrl }.

Docker:
- docker-compose.dev.yml: добавлены DATABASE_URL, HR_AUTH/HR_DATABASE_URL,
  DEEPSEEK_API_KEY/OPENAI_API_KEY/LLM_BASE_URL/LLM_MODEL и сеть postgres
  для testing-flask.

Документация:
- docs/migration-final.md — двух-этапный план (Этап 1: унификация
  стека внутри TestingWebApp; Этап 2: слияние с tgFlaskForm).
- docs/migration-final-inventory.md — карта 22 эндпоинтов Express.

Made-with: Cursor
This commit is contained in:
Константин Лебединский
2026-04-27 23:29:26 +05:00
parent 31b51b7768
commit 4b0d56ff0e
48 changed files with 4170 additions and 203 deletions
View File
+352
View File
@@ -0,0 +1,352 @@
"""AI-генерация теста/вопроса в редакторе (порт `services/aiEditorService.js`)."""
from __future__ import annotations
from typing import Any
from .draft_validator import (
assert_draft_matches_shape,
parse_json_from_llm_text,
validate_and_normalize_draft,
)
from .llm_client import LlmError, chat_completion_text_content, get_llm_config
class HttpError(Exception):
def __init__(self, status: int, message: str):
super().__init__(message)
self.status = status
self.message = message
def parse_and_validate_shape(s: Any) -> list[dict]:
if not isinstance(s, list) or not s:
raise HttpError(400, 'Передайте непустой массив shape: [{ optionsCount, hasMultipleAnswers }, ...].')
if len(s) > 40:
raise HttpError(400, 'Не более 40 вопросов за раз.')
out = []
for i, row in enumerate(s):
if not isinstance(row, dict):
raise HttpError(400, f'shape[{i}]: ожидается объект.')
try:
n = int(float(row.get('optionsCount')))
except (TypeError, ValueError):
raise HttpError(400, f'shape[{i}]: optionsCount от 2 до 12.')
if n < 2 or n > 12:
raise HttpError(400, f'shape[{i}]: optionsCount от 2 до 12.')
out.append({'optionsCount': n, 'hasMultipleAnswers': bool(row.get('hasMultipleAnswers'))})
return out
def _require_cfg():
cfg = get_llm_config()
if cfg is None:
raise HttpError(503, 'Задайте DEEPSEEK_API_KEY или OPENAI_API_KEY на сервере.')
return cfg
def generate_full_test_by_shape(test_title: str, test_description: str, shape: list[dict]) -> dict:
cfg = _require_cfg()
title = (test_title or '').strip() or 'Тест'
desc = (test_description or '').strip()
lines = []
for i, sh in enumerate(shape):
if sh['hasMultipleAnswers']:
tail = 'несколько вариантов помечены как верные (hasMultipleAnswers: true).'
else:
tail = 'ровно один верный вариант (hasMultipleAnswers: false).'
lines.append(f'Вопрос {i + 1}: ровно {sh["optionsCount"]} вариантов ответа; {tail}')
system = (
'Ты составитель учебных тестов. Отвечай ТОЛЬКО одним JSON-объектом на русском. '
'Схема: {"title": string, "description": string (может быть пустой строкой), '
'"questions": array}. Каждый вопрос: {"text", "hasMultipleAnswers", '
'"options": [{ "text", "isCorrect" }]}.'
)
user = (
'Составь тест по теме.\n\n'
f'Название (можно уточнить, но смысл сохранить): {title}\n'
f'Краткое описание / контекст темы: '
f'{desc or "не указано; придумай согласованную тему с названием."}\n\n'
f'Соблюди СТРОГО число вопросов и вариантов (не больше и не меньше):\n'
+ '\n'.join(lines)
+ '\n\nПравила: варианты — осмысленные, по теме; отметь isCorrect согласно '
'hasMultipleAnswers; для одного правильного — ровна одна true.'
)
raw = chat_completion_text_content(cfg, system, user, 0.35)
parsed = parse_json_from_llm_text(raw)
draft = validate_and_normalize_draft(parsed)
assert_draft_matches_shape({'questions': draft['questions']}, shape)
return {
'title': draft['title'],
'description': draft['description'],
'questions': draft['questions'],
}
# ─── E1.8: AI v2 ──────────────────────────────────────────────────────
def generate_test_by_title(
test_title: str,
test_description: str = '',
questions_count: int = 10,
options_count: int = 4,
has_multiple_answers: bool = False,
) -> dict:
"""Генерация теста ТОЛЬКО по названию: AI сам предлагает вопросы.
Сетка не задаётся жёстко: пользователю даётся подсказка о желаемом числе
вопросов и вариантов, но мы валидируем мягко (не assert_draft_matches_shape).
"""
cfg = _require_cfg()
title = (test_title or '').strip()
if not title:
raise HttpError(400, 'Укажите название теста.')
desc = (test_description or '').strip()
n_q = max(3, min(40, int(questions_count or 10)))
n_opt = max(2, min(12, int(options_count or 4)))
system = (
'Ты опытный методист, составляешь учебные тесты. Отвечай ТОЛЬКО одним '
'JSON-объектом на русском. Схема: {"title", "description", "questions": ['
'{"text", "hasMultipleAnswers": boolean, "options": [{"text", "isCorrect"}]}'
']}. Минимум 2 варианта, отметь хотя бы один isCorrect: true.'
)
user = (
'Составь учебный тест по этой теме.\n\n'
f'Название теста: {title}\n'
f'Описание/контекст: {desc or "не указано — определи по названию."}\n\n'
f'Подсказка по сетке: примерно {n_q} вопросов, в каждом по {n_opt} вариантов '
f'ответа; '
f'тип ответа — {"несколько правильных" if has_multiple_answers else "один правильный"} '
f'(но если по смыслу нужно отступить — отступи). '
'Покрой ключевые подтемы. Дистракторы делай правдоподобными, не очевидно '
'неверными. Текст — короткий, понятный.'
)
raw = chat_completion_text_content(cfg, system, user, 0.45)
parsed = parse_json_from_llm_text(raw)
draft = validate_and_normalize_draft(parsed)
return {
'title': draft['title'],
'description': draft['description'],
'questions': draft['questions'],
}
def check_test_quality(test_title: str, test_description: str, questions: list[dict]) -> dict:
"""AI-рецензия теста: общий вердикт + список рекомендаций по разделам."""
cfg = _require_cfg()
title = (test_title or '').strip() or 'Тест'
desc = (test_description or '').strip()
qs = questions or []
if not qs:
raise HttpError(400, 'В тесте нет вопросов — нечего проверять.')
system = (
'Ты ревьюер учебных тестов. Отвечай ТОЛЬКО JSON: {"verdict": "ok"|"warn"|"bad", '
'"summary": string (1-2 предложения), '
'"sections": [{"title": string, "items": [string, ...]}]}. '
'Разделы рекомендаций: «Чёткость формулировок», «Качество дистракторов», '
'"Охват темы», «Сбалансированность сложности». Пропусти раздел, если '
'претензий нет. Вердикт: ok — годен; warn — есть замечания; bad — серьёзные '
'проблемы. Все тексты — на русском, короткие и предметные.'
)
test_dump = {
'title': title,
'description': desc,
'questions': [
{
'text': q.get('text', ''),
'hasMultipleAnswers': bool(q.get('hasMultipleAnswers')),
'options': [
{'text': o.get('text', ''), 'isCorrect': bool(o.get('isCorrect'))}
for o in (q.get('options') or [])
],
}
for q in qs
],
}
import json as _json
user = 'Проверь качество теста и дай рекомендации:\n\n' + _json.dumps(
test_dump, ensure_ascii=False
)
raw = chat_completion_text_content(cfg, system, user, 0.25)
parsed = parse_json_from_llm_text(raw)
if not isinstance(parsed, dict):
raise LlmError('Неверный формат ответа модели.', code='llm_shape')
verdict = str(parsed.get('verdict') or '').strip().lower()
if verdict not in ('ok', 'warn', 'bad'):
verdict = 'warn'
summary = str(parsed.get('summary') or '').strip()
raw_sections = parsed.get('sections') or []
sections: list[dict] = []
if isinstance(raw_sections, list):
for s in raw_sections:
if not isinstance(s, dict):
continue
t = str(s.get('title') or '').strip()
items = s.get('items') or []
if not t or not isinstance(items, list) or not items:
continue
clean_items = [str(x).strip() for x in items if str(x).strip()]
if clean_items:
sections.append({'title': t, 'items': clean_items})
return {'verdict': verdict, 'summary': summary, 'sections': sections}
def improve_test_full(test_title: str, test_description: str, questions: list[dict]) -> dict:
"""AI-улучшение всего теста. Возвращает 'было → стало' для каждого вопроса.
Для каждого вопроса: original + suggested, флаги textChanged/optionsChanged.
UI решает, что применить (чекбоксы).
"""
cfg = _require_cfg()
title = (test_title or '').strip() or 'Тест'
desc = (test_description or '').strip()
qs = questions or []
if not qs:
raise HttpError(400, 'В тесте нет вопросов — нечего улучшать.')
system = (
'Ты редактор учебных тестов. Получаешь массив вопросов и предлагаешь '
'улучшения: чёткие формулировки, лучшие дистракторы, корректную разметку '
'isCorrect. Сохраняй исходную сетку: число вопросов, число вариантов и '
'значение hasMultipleAnswers НЕ меняй — иначе клиент отклонит ответ. '
'Отвечай ТОЛЬКО JSON: {"questions": [{"text", "hasMultipleAnswers", '
'"options": [{"text", "isCorrect"}]}, ...]}. Тексты — на русском, короткие.'
)
test_dump = {
'title': title,
'description': desc,
'questions': [
{
'text': q.get('text', ''),
'hasMultipleAnswers': bool(q.get('hasMultipleAnswers')),
'options': [
{'text': o.get('text', ''), 'isCorrect': bool(o.get('isCorrect'))}
for o in (q.get('options') or [])
],
}
for q in qs
],
}
import json as _json
user = 'Улучши тест без изменения сетки:\n\n' + _json.dumps(
test_dump, ensure_ascii=False
)
raw = chat_completion_text_content(cfg, system, user, 0.3)
parsed = parse_json_from_llm_text(raw)
shape = [
{
'optionsCount': len(q.get('options') or []),
'hasMultipleAnswers': bool(q.get('hasMultipleAnswers')),
}
for q in qs
]
assert_draft_matches_shape(parsed, shape)
draft = validate_and_normalize_draft(
{'title': title, 'questions': parsed.get('questions') or []}
)
suggested_qs = draft['questions']
items = []
for i, (orig, sug) in enumerate(zip(qs, suggested_qs)):
orig_opts = [
{'text': str(o.get('text', '')).strip(), 'isCorrect': bool(o.get('isCorrect'))}
for o in (orig.get('options') or [])
]
sug_opts = sug['options']
text_changed = (str(orig.get('text', '')).strip() != sug['text'])
options_changed = (
len(orig_opts) != len(sug_opts)
or any(
a['text'] != b['text'] or a['isCorrect'] != b['isCorrect']
for a, b in zip(orig_opts, sug_opts)
)
)
items.append(
{
'index': i,
'original': {
'text': str(orig.get('text', '')).strip(),
'hasMultipleAnswers': bool(orig.get('hasMultipleAnswers')),
'options': orig_opts,
},
'suggested': {
'text': sug['text'],
'hasMultipleAnswers': sug['hasMultipleAnswers'],
'options': sug_opts,
},
'textChanged': text_changed,
'optionsChanged': options_changed,
'changed': text_changed or options_changed,
}
)
return {'items': items}
def generate_or_rephrase_question(
test_title: str,
test_description: str,
question_text: str,
options_count: Any,
has_multiple_answers: bool,
) -> dict:
cfg = _require_cfg()
try:
n = int(float(options_count))
except (TypeError, ValueError):
raise HttpError(400, 'optionsCount: от 2 до 12.')
if n < 2 or n > 12:
raise HttpError(400, 'optionsCount: от 2 до 12.')
topic = (((test_title or '').strip() or 'Тест') + '. ' + (test_description or '').strip()).strip()
qt = (question_text or '').strip()
if qt:
system = (
'Ты редактор учебных материалов. Отвечай ТОЛЬКО JSON: {"text": string} — '
'чёткая формулировка вопроса на русском, 1–3 полных предложения в зависимости '
'от сложности исходного черновика, без вариантов ответа.'
)
user = (
f'Тема теста: {topic}\n\n'
f'Исходный черновик вопроса (улучши формулировку, не меняй смысл без нужды):\n{qt}'
)
raw = chat_completion_text_content(cfg, system, user, 0.3)
parsed = parse_json_from_llm_text(raw)
text = str((parsed or {}).get('text') or '').strip()
if not text:
raise LlmError('Пустой text в ответе модели.', code='llm_shape')
return {'mode': 'rephrase', 'text': text}
system = (
'Ты составитель тестов. Отвечай ТОЛЬКО JSON: {"text", "hasMultipleAnswers", '
'"options": [{ "text", "isCorrect" }]}. Все на русском.'
)
multi_clause = (
'true (несколько верных, минимум 2 isCorrect: true, остальные false).'
if has_multiple_answers
else 'false (ровно один isCorrect: true).'
)
user = (
f'Тема теста: {topic}\n\n'
f'Сформулируй ОДИН вопрос по этой теме с ровно {n} вариантами ответа. '
f'hasMultipleAnswers = {multi_clause}'
)
raw = chat_completion_text_content(cfg, system, user, 0.35)
parsed = parse_json_from_llm_text(raw)
shape = [{'optionsCount': n, 'hasMultipleAnswers': bool(has_multiple_answers)}]
assert_draft_matches_shape({'questions': [parsed]}, shape)
draft = validate_and_normalize_draft({'title': 'временно', 'questions': [parsed]})
return {
'mode': 'full',
'text': draft['questions'][0]['text'],
'hasMultipleAnswers': draft['questions'][0]['hasMultipleAnswers'],
'options': draft['questions'][0]['options'],
}
@@ -0,0 +1,89 @@
"""Извлечение текста из PDF/DOCX/TXT/MD (порт `services/documentExtractService.js`)."""
from __future__ import annotations
from io import BytesIO
from typing import Optional
class HttpError(Exception):
def __init__(self, status: int, message: str):
super().__init__(message)
self.status = status
self.message = message
SUPPORTED_MIME = {
'application/pdf': 'pdf',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx',
'text/plain': 'text',
'text/markdown': 'text',
}
SUPPORTED_EXT = {
'.pdf': 'pdf',
'.docx': 'docx',
'.txt': 'text',
'.md': 'text',
}
def resolve_document_kind(mimetype: str | None, original_name: str | None = '') -> Optional[str]:
m = (mimetype or '').lower()
n = (original_name or '').lower()
if m in SUPPORTED_MIME:
return SUPPORTED_MIME[m]
for ext, kind in SUPPORTED_EXT.items():
if n.endswith(ext):
return kind
return None
def extract_text_from_buffer(kind: str, buf: bytes) -> str:
if kind == 'text':
try:
return buf.decode('utf-8')
except UnicodeDecodeError:
return buf.decode('utf-8', errors='replace')
if kind == 'docx':
try:
from docx import Document
except ImportError:
raise HttpError(500, 'python-docx не установлен (см. requirements.txt).')
doc = Document(BytesIO(buf))
parts = []
for p in doc.paragraphs:
if p.text:
parts.append(p.text)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
if cell.text:
parts.append(cell.text)
return '\n'.join(parts).replace('\r\n', '\n').strip()
if kind == 'pdf':
try:
from pypdf import PdfReader
except ImportError:
raise HttpError(500, 'pypdf не установлен (см. requirements.txt).')
reader = PdfReader(BytesIO(buf))
parts = []
for page in reader.pages:
try:
t = page.extract_text() or ''
except Exception:
t = ''
if t:
parts.append(t)
return '\n'.join(parts).replace('\r\n', '\n').strip()
return ''
def extract_text_from_file(mimetype: str | None, file_storage, original_name: str | None) -> str:
"""`file_storage` — werkzeug FileStorage. Читает целиком в память (≤16 МБ)."""
kind = resolve_document_kind(mimetype, original_name)
if not kind:
raise HttpError(400, 'Неподдерживаемый формат. Допустимы: PDF, DOCX, TXT, MD.')
buf = file_storage.read()
return extract_text_from_buffer(kind, buf)
+72
View File
@@ -0,0 +1,72 @@
"""Генерация черновика теста из извлечённого текста (порт части `documentGenService.js`)."""
from __future__ import annotations
from .draft_validator import (
parse_json_from_llm_text,
validate_and_normalize_draft,
)
from .llm_client import LlmError, chat_completion_text_content, get_llm_config
MAX_EXTRACT_CHARS = 14000
def generation_for_import_document(extracted_text: str) -> dict:
text = (extracted_text or '').strip()
if not text:
return {
'available': False,
'message': 'Нет извлечённого текста — нечего передавать в модель.',
}
cfg = get_llm_config()
if cfg is None:
return {
'available': False,
'message': (
'Автогенерация выключена: задайте DEEPSEEK_API_KEY или OPENAI_API_KEY '
'в .env. Превью текста ниже — можно вставить вручную.'
),
'textPreview': text[:4000],
}
if len(text) > MAX_EXTRACT_CHARS:
slice_ = text[:MAX_EXTRACT_CHARS] + '\n\n[…фрагмент обрезан для API]'
else:
slice_ = text
try:
system = (
'Ты помощник для составления тестов. Отвечай ТОЛЬКО одним JSON-объектом '
'без пояснений. Схема: {"title": string, "description"?: string, '
'"questions": array}. Каждый вопрос: {"text", "hasMultipleAnswers": boolean, '
'"options": [{"text", "isCorrect": boolean}, ...]}. Минимум 2 варианта. '
'Для одиночного выбора ровно один isCorrect: true. '
'Текст и формулировки — на русском, по содержанию входного материала.'
)
user = (
'Составь тест с вопросами с одним или несколькими правильными ответами '
'на основе текста:\n\n' + slice_
)
raw = chat_completion_text_content(cfg, system, user, 0.25)
parsed = parse_json_from_llm_text(raw)
draft = validate_and_normalize_draft(parsed)
return {
'available': True,
'message': (
f'Сгенерировано: «{draft["title"]}», вопросов: '
f'{len(draft["questions"])}. Нажмите «Применить сгенерированный черновик».'
),
'draft': draft,
}
except LlmError as e:
return {
'available': False,
'message': f'Генерация не удалась: {e}',
'errorCode': e.code,
'textPreview': text[:4000],
}
except Exception as e:
return {
'available': False,
'message': f'Генерация не удалась: {e}',
'errorCode': 'unknown',
'textPreview': text[:4000],
}
+105
View File
@@ -0,0 +1,105 @@
"""Парсер JSON от LLM и валидатор draft (порт частей `documentGenService.js`)."""
from __future__ import annotations
import json as _json
import re
from typing import Any
from .llm_client import LlmError
_FENCE_RE = re.compile(r'^```(?:json)?\s*([\s\S]*?)```$', re.MULTILINE)
def parse_json_from_llm_text(text: str) -> Any:
if not isinstance(text, str) or not text.strip():
raise LlmError('Пустой ответ модели.', code='llm_empty')
t = text.strip()
if m := _FENCE_RE.match(t):
t = m.group(1).strip()
try:
return _json.loads(t)
except _json.JSONDecodeError:
raise LlmError('Ответ модели не является корректным JSON.', code='llm_json_parse')
def validate_and_normalize_draft(o: Any) -> dict:
if not isinstance(o, dict):
raise LlmError('JSON не содержит объекта с данными.', code='llm_shape')
title = str(o.get('title') or '').strip()
if not title:
raise LlmError('В ответе нет поля title.', code='llm_shape')
desc = o.get('description')
description = str(desc).strip() if desc and str(desc).strip() else None
raw_qs = o.get('questions')
if not isinstance(raw_qs, list) or not raw_qs:
raise LlmError('В ответе нет вопросов (questions).', code='llm_shape')
if len(raw_qs) > 40:
raise LlmError('Слишком много вопросов в ответе (макс. 40).', code='llm_shape')
questions = []
for i, q in enumerate(raw_qs):
if not isinstance(q, dict):
raise LlmError(f'Вопрос {i + 1}: неверный формат.', code='llm_shape')
text = str(q.get('text') or '').strip()
if not text:
raise LlmError(f'Вопрос {i + 1}: пустой текст.', code='llm_shape')
has_multi = bool(q.get('hasMultipleAnswers'))
raw_opts = q.get('options')
if not isinstance(raw_opts, list) or len(raw_opts) < 2:
raise LlmError(f'Вопрос {i + 1}: нужны минимум 2 варианта ответа.', code='llm_shape')
if len(raw_opts) > 12:
raise LlmError(f'Вопрос {i + 1}: слишком много вариантов (макс. 12).', code='llm_shape')
options = []
for j, op in enumerate(raw_opts):
if not isinstance(op, dict):
raise LlmError(f'Вопрос {i + 1}, вариант {j + 1}: неверный формат.', code='llm_shape')
options.append(
{
'text': (str(op.get('text') or '').strip() or f'Вариант {j + 1}'),
'isCorrect': bool(op.get('isCorrect')),
}
)
correct_n = sum(1 for x in options if x['isCorrect'])
if correct_n == 0:
raise LlmError(
f'Вопрос {i + 1}: отметьте минимум один правильный вариант.',
code='llm_shape',
)
if not has_multi and correct_n > 1:
raise LlmError(
f'Вопрос {i + 1}: с одним правильным ответом должен быть один вариант '
f'isCorrect, либо укажите hasMultipleAnswers: true.',
code='llm_shape',
)
questions.append({'text': text, 'hasMultipleAnswers': has_multi, 'options': options})
return {'title': title, 'description': description, 'questions': questions}
def assert_draft_matches_shape(o: dict, shape: list[dict]) -> None:
"""Проверяет, что число вопросов и вариантов = ровно как в shape."""
qs = o.get('questions') if isinstance(o, dict) else None
if not isinstance(qs, list):
raise LlmError('В ответе нет questions.', code='llm_shape')
if len(qs) != len(shape):
raise LlmError(
f'Ожидалось вопросов: {len(shape)}, в ответе: {len(qs)}.',
code='llm_shape',
)
for i, (q, sh) in enumerate(zip(qs, shape)):
opts = q.get('options') if isinstance(q, dict) else None
if not isinstance(opts, list):
raise LlmError(f'Вопрос {i + 1}: нет options.', code='llm_shape')
if len(opts) != sh['optionsCount']:
raise LlmError(
f'Вопрос {i + 1}: ожидалось вариантов {sh["optionsCount"]}, в ответе: {len(opts)}.',
code='llm_shape',
)
if bool(q.get('hasMultipleAnswers')) != sh['hasMultipleAnswers']:
raise LlmError(
f'Вопрос {i + 1}: hasMultipleAnswers должен быть {sh["hasMultipleAnswers"]}.',
code='llm_shape',
)
+95
View File
@@ -0,0 +1,95 @@
"""Контент редактора: тест + активная версия + дерево вопросов с правильными вариантами.
Порт `getEditorContent` + `loadQuestionsForVersion` (только includeCorrect=true вариант)
из `services/testAttemptService.js`.
"""
from __future__ import annotations
from sqlalchemy import text
from ..db import get_engine
from ..messages import RU
from .test_access import is_test_author
class HttpError(Exception):
def __init__(self, status: int, message: str):
super().__init__(message)
self.status = status
self.message = message
def load_questions_for_version(conn, test_version_id, *, include_correct: bool) -> list[dict]:
qrows = conn.execute(
text(
'SELECT id, text, question_order, has_multiple_answers '
'FROM questions WHERE test_version_id = :v ORDER BY question_order'
),
{'v': test_version_id},
).mappings().all()
out = []
for r in qrows:
orows = conn.execute(
text(
'SELECT id, text, is_correct, option_order '
'FROM answer_options WHERE question_id = :q ORDER BY option_order'
),
{'q': r['id']},
).mappings().all()
options = []
for o in orows:
base = {
'id': str(o['id']),
'text': o['text'],
'optionOrder': o['option_order'],
}
if include_correct:
base['isCorrect'] = bool(o['is_correct'])
options.append(base)
out.append(
{
'id': str(r['id']),
'text': r['text'],
'questionOrder': r['question_order'],
'hasMultipleAnswers': bool(r['has_multiple_answers']),
'options': options,
}
)
return out
def get_editor_content(user_id: str, test_id: str) -> dict:
eng = get_engine()
with eng.connect() as conn:
tr = conn.execute(
text(
'SELECT id, title, description, passing_threshold, created_by '
'FROM tests WHERE id = :id'
),
{'id': test_id},
).mappings().first()
if not tr:
raise HttpError(404, 'Тест не найден.')
if not is_test_author(tr['created_by'], user_id):
raise HttpError(403, 'Доступ запрещён.')
tv = conn.execute(
text(
'SELECT id FROM test_versions WHERE test_id = :id AND is_active = true LIMIT 1'
),
{'id': test_id},
).mappings().first()
if not tv:
raise HttpError(400, 'Нет активной версии теста.')
version_id = tv['id']
questions = load_questions_for_version(conn, version_id, include_correct=True)
return {
'test': {
'id': str(tr['id']),
'title': tr['title'],
'description': tr['description'],
'passingThreshold': tr['passing_threshold'],
},
'activeVersionId': str(version_id),
'questions': questions,
}
+156
View File
@@ -0,0 +1,156 @@
"""OpenAI-совместимый клиент Chat Completions (порт `services/llmClient.js`)."""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Optional
import urllib.request
import urllib.error
import json as _json
class LlmError(Exception):
"""Ошибка работы с LLM API."""
def __init__(self, message: str, code: str = 'llm_error', status: int | None = None):
super().__init__(message)
self.code = code
self.status = status
@dataclass
class LlmConfig:
provider: str
api_key: str
base_url: str
model: str
def get_llm_config() -> Optional[LlmConfig]:
if k := os.environ.get('DEEPSEEK_API_KEY'):
return LlmConfig(
provider='deepseek',
api_key=k,
base_url=(os.environ.get('LLM_BASE_URL') or 'https://api.deepseek.com/v1').rstrip('/'),
model=os.environ.get('LLM_MODEL') or 'deepseek-chat',
)
if k := os.environ.get('OPENAI_API_KEY'):
return LlmConfig(
provider='openai',
api_key=k,
base_url=(os.environ.get('LLM_BASE_URL') or 'https://api.openai.com/v1').rstrip('/'),
model=os.environ.get('LLM_MODEL') or 'gpt-4o-mini',
)
return None
def chat_completion_text_content(
cfg: LlmConfig,
system: str,
user: str,
temperature: float = 0.25,
timeout: int = 120,
) -> str:
"""Возвращает `assistant.message.content` (строку)."""
body: dict = {
'model': cfg.model,
'messages': [
{'role': 'system', 'content': system},
{'role': 'user', 'content': user},
],
'temperature': temperature,
}
if (os.environ.get('LLM_NO_JSON') or '').strip() != '1':
body['response_format'] = {'type': 'json_object'}
req = urllib.request.Request(
f'{cfg.base_url}/chat/completions',
data=_json.dumps(body).encode('utf-8'),
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {cfg.api_key}',
},
method='POST',
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = _json.loads(resp.read().decode('utf-8'))
except urllib.error.HTTPError as e:
text = ''
try:
text = e.read().decode('utf-8', errors='replace')
except Exception:
pass
raise LlmError(
f'LLM {e.code}: {(text or "").replace(chr(10), " ")[:280]}',
code='llm_http',
status=e.code,
)
except (urllib.error.URLError, TimeoutError) as e:
msg = str(getattr(e, 'reason', '') or e)
if 'timed out' in msg.lower():
raise LlmError('Превышен таймаут ожидания ответа LLM (120 с).', code='llm_timeout')
raise LlmError(f'Сбой сети при обращении к LLM: {msg}', code='llm_network')
try:
content = data['choices'][0]['message']['content']
except (KeyError, IndexError, TypeError):
content = None
if not isinstance(content, str) or not content.strip():
raise LlmError('Пустой content в ответе API.', code='llm_empty')
return content
def ping_llm(timeout: int = 30) -> dict:
"""Smoke-проверка подключения к LLM. Не бросает исключений — всё в результате.
Возвращает: {'ok': bool, 'provider', 'model', 'error'?, 'latencyMs'?, 'sample'?}
"""
import time
cfg = get_llm_config()
if cfg is None:
return {
'ok': False,
'configured': False,
'error': 'Ключ не задан. Задайте DEEPSEEK_API_KEY или OPENAI_API_KEY в .env.',
}
started = time.monotonic()
try:
raw = chat_completion_text_content(
cfg,
'Отвечай ТОЛЬКО JSON: {"ok": true}.',
'ping',
temperature=0.0,
timeout=timeout,
)
ms = int((time.monotonic() - started) * 1000)
return {
'ok': True,
'configured': True,
'provider': cfg.provider,
'model': cfg.model,
'latencyMs': ms,
'sample': raw[:120],
}
except LlmError as e:
ms = int((time.monotonic() - started) * 1000)
return {
'ok': False,
'configured': True,
'provider': cfg.provider,
'model': cfg.model,
'latencyMs': ms,
'error': str(e),
'code': e.code,
}
except Exception as e:
return {
'ok': False,
'configured': True,
'provider': cfg.provider,
'model': cfg.model,
'error': f'{type(e).__name__}: {e}',
'code': 'unknown',
}
+108
View File
@@ -0,0 +1,108 @@
"""Кто видит тест: автор + назначенные пользователи (порт `services/testAccessService.js`)."""
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import text
from ..db import get_engine
def is_test_author(created_by, user_id) -> bool:
"""`tests.created_by` — UUID. Сравниваем по строковому представлению."""
if created_by is None or user_id is None:
return False
return str(created_by) == str(user_id)
@dataclass
class AccessResult:
ok: bool
is_author: bool
not_found: bool
def user_has_test_access(user_id: str, test_id: str) -> AccessResult:
eng = get_engine()
with eng.connect() as conn:
row = conn.execute(
text('SELECT created_by FROM tests WHERE id = :id'),
{'id': test_id},
).mappings().first()
if not row:
return AccessResult(ok=False, is_author=False, not_found=True)
if is_test_author(row['created_by'], user_id):
return AccessResult(ok=True, is_author=True, not_found=False)
ar = conn.execute(
text(
"""
SELECT 1
FROM test_assignments ta
INNER JOIN test_versions tv_a ON tv_a.id = ta.test_version_id
INNER JOIN test_assignment_targets tat ON tat.assignment_id = ta.id
WHERE tv_a.test_id = :test_id
AND tat.target_type = 'user'
AND tat.target_id = :user_id
LIMIT 1
"""
),
{'test_id': test_id, 'user_id': user_id},
).first()
return AccessResult(ok=ar is not None, is_author=False, not_found=False)
def list_visible_tests(user_id: str) -> list[dict]:
"""Каталог: только активная цепочка + (автор OR назначен)."""
eng = get_engine()
with eng.connect() as conn:
rows = conn.execute(
text(
"""
SELECT DISTINCT t.id, t.title, t.description, t.is_active AS chain_active,
t.created_at, t.updated_at,
tv.id AS active_version_id, tv.version,
t.created_by, u.full_name AS author_full_name
FROM tests t
INNER JOIN test_versions tv ON tv.test_id = t.id AND tv.is_active = true
INNER JOIN users u ON u.id = t.created_by
WHERE t.is_active = true
AND (
t.created_by = :uid
OR EXISTS (
SELECT 1
FROM test_assignments ta
INNER JOIN test_versions tv2 ON tv2.id = ta.test_version_id
INNER JOIN test_assignment_targets tat ON tat.assignment_id = ta.id
WHERE tv2.test_id = t.id
AND tat.target_type = 'user'
AND tat.target_id = :uid
)
)
ORDER BY t.updated_at DESC NULLS LAST, t.created_at DESC
"""
),
{'uid': user_id},
).mappings().all()
return [dict(r) for r in rows]
def list_hidden_by_author(user_id: str) -> list[dict]:
"""Скрытые автором цепочки (`is_active = false`) — видны только автору."""
eng = get_engine()
with eng.connect() as conn:
rows = conn.execute(
text(
"""
SELECT t.id, t.title, t.description, t.is_active AS chain_active,
t.created_at, t.updated_at, tv.id AS active_version_id, tv.version,
t.created_by, u.full_name AS author_full_name
FROM tests t
INNER JOIN test_versions tv ON tv.test_id = t.id AND tv.is_active = true
INNER JOIN users u ON u.id = t.created_by
WHERE t.is_active = false AND t.created_by = :uid
ORDER BY t.updated_at DESC NULLS LAST, t.created_at DESC
"""
),
{'uid': user_id},
).mappings().all()
return [dict(r) for r in rows]
+22
View File
@@ -0,0 +1,22 @@
"""Утилиты по цепочке теста (попытки/версии)."""
from __future__ import annotations
from sqlalchemy import text
def has_any_attempt_for_test(conn, test_id: str) -> bool:
"""`conn` может быть Connection или Engine — обе поддерживают .execute()."""
row = conn.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM test_attempts ta
INNER JOIN test_versions tv ON ta.test_version_id = tv.id
WHERE tv.test_id = :test_id
) AS has_any
"""
),
{'test_id': test_id},
).first()
return bool(row[0])
+234
View File
@@ -0,0 +1,234 @@
"""Создание/правка теста, fork версии при наличии попыток (порт `testDraftService.js`)."""
from __future__ import annotations
from typing import Any
from sqlalchemy import text
from ..db import get_engine
from ..messages import RU
from .test_access import is_test_author
from .test_chain import has_any_attempt_for_test
class HttpError(Exception):
def __init__(self, status: int, message: str):
super().__init__(message)
self.status = status
self.message = message
def create_test_with_version(author_id: str, *, title: str, description: str | None) -> dict:
eng = get_engine()
with eng.begin() as conn:
t = conn.execute(
text(
"""
INSERT INTO tests (title, description, created_by, is_active, is_versioned)
VALUES (:title, :desc, :uid, true, true) RETURNING id
"""
),
{'title': title, 'desc': description or None, 'uid': author_id},
).mappings().first()
test_id = t['id']
v = conn.execute(
text(
"""
INSERT INTO test_versions (test_id, version, is_active, parent_id)
VALUES (:tid, 1, true, NULL) RETURNING id
"""
),
{'tid': test_id},
).mappings().first()
return {'testId': str(test_id), 'versionId': str(v['id'])}
def _get_active_version_row(conn, test_id: str) -> dict | None:
row = conn.execute(
text(
'SELECT * FROM test_versions WHERE test_id = :id AND is_active = true LIMIT 1'
),
{'id': test_id},
).mappings().first()
return dict(row) if row else None
def _copy_question_tree(conn, from_version_id, to_version_id) -> None:
questions = conn.execute(
text(
'SELECT id, text, question_order, has_multiple_answers '
'FROM questions WHERE test_version_id = :v ORDER BY question_order'
),
{'v': from_version_id},
).mappings().all()
for q in questions:
new_q = conn.execute(
text(
"""
INSERT INTO questions (test_version_id, text, question_order, has_multiple_answers)
VALUES (:v, :text, :ord, :multi) RETURNING id
"""
),
{
'v': to_version_id,
'text': q['text'],
'ord': q['question_order'],
'multi': q['has_multiple_answers'],
},
).mappings().first()
nqid = new_q['id']
opts = conn.execute(
text(
'SELECT text, is_correct, option_order FROM answer_options '
'WHERE question_id = :q ORDER BY option_order'
),
{'q': q['id']},
).mappings().all()
for o in opts:
conn.execute(
text(
"""
INSERT INTO answer_options (question_id, text, is_correct, option_order)
VALUES (:q, :text, :ic, :ord)
"""
),
{'q': nqid, 'text': o['text'], 'ic': o['is_correct'], 'ord': o['option_order']},
)
def _replace_version_content(conn, test_version_id, payload: dict) -> None:
conn.execute(
text(
"""
DELETE FROM answer_options WHERE question_id IN (
SELECT id FROM questions WHERE test_version_id = :v
)
"""
),
{'v': test_version_id},
)
conn.execute(
text('DELETE FROM questions WHERE test_version_id = :v'),
{'v': test_version_id},
)
questions = payload.get('questions') or []
for i, q in enumerate(questions):
ins_q = conn.execute(
text(
"""
INSERT INTO questions (test_version_id, text, question_order, has_multiple_answers)
VALUES (:v, :text, :ord, :multi) RETURNING id
"""
),
{
'v': test_version_id,
'text': q.get('text'),
'ord': q.get('question_order') or (i + 1),
'multi': bool(q.get('hasMultipleAnswers')),
},
).mappings().first()
qid = ins_q['id']
opts = q.get('options') or []
for j, o in enumerate(opts):
conn.execute(
text(
"""
INSERT INTO answer_options (question_id, text, is_correct, option_order)
VALUES (:q, :text, :ic, :ord)
"""
),
{
'q': qid,
'text': o.get('text'),
'ic': bool(o.get('isCorrect')),
'ord': o.get('option_order') or (j + 1),
},
)
def _fork_new_version(conn, test_id: str) -> dict:
av = _get_active_version_row(conn, test_id)
if not av:
raise HttpError(500, RU['internal']) # invariant: должна быть активная версия
mx = conn.execute(
text(
'SELECT COALESCE(MAX(version), 0) AS v FROM test_versions WHERE test_id = :t'
),
{'t': test_id},
).mappings().first()
next_v = (mx['v'] or 0) + 1
conn.execute(
text('UPDATE test_versions SET is_active = false WHERE test_id = :t'),
{'t': test_id},
)
nv = conn.execute(
text(
"""
INSERT INTO test_versions (test_id, version, is_active, parent_id)
VALUES (:t, :ver, true, :parent) RETURNING *
"""
),
{'t': test_id, 'ver': next_v, 'parent': av['id']},
).mappings().first()
_copy_question_tree(conn, av['id'], nv['id'])
return dict(nv)
def save_test_draft(author_id: str, test_id: str, payload: dict) -> dict:
if not isinstance(payload, dict):
payload = {}
eng = get_engine()
with eng.begin() as conn:
t = conn.execute(
text('SELECT id, created_by FROM tests WHERE id = :id'),
{'id': test_id},
).mappings().first()
if not t:
raise HttpError(404, RU['testNotFound'] if 'testNotFound' in RU else 'Тест не найден.')
if not is_test_author(t['created_by'], author_id):
raise HttpError(403, 'Доступ запрещён.')
if payload.get('title') is not None or payload.get('description') is not None:
conn.execute(
text(
"""
UPDATE tests
SET title = COALESCE(:title, title),
description = COALESCE(:desc, description),
updated_at = CURRENT_TIMESTAMP
WHERE id = :id
"""
),
{
'title': payload.get('title'),
'desc': payload.get('description'),
'id': test_id,
},
)
if payload.get('passingThreshold') is not None:
try:
raw = float(payload['passingThreshold'])
pt = max(0, min(100, round(raw)))
conn.execute(
text(
'UPDATE tests SET passing_threshold = :pt, updated_at = CURRENT_TIMESTAMP WHERE id = :id'
),
{'pt': pt, 'id': test_id},
)
except (TypeError, ValueError):
pass
has_attempts = has_any_attempt_for_test(conn, test_id)
version_row = _get_active_version_row(conn, test_id)
if not version_row:
raise HttpError(500, 'Нет активной версии теста.')
forked = False
if has_attempts and 'questions' in payload and payload.get('questions') is not None:
version_row = _fork_new_version(conn, test_id)
forked = True
if payload.get('questions') is not None:
_replace_version_content(conn, version_row['id'], payload)
return {'testId': test_id, 'versionId': str(version_row['id']), 'forked': forked}