You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.4 KiB

"""Маршруты auth: HTML (`/login`, `/logout`) и JSON (`/api/auth/*`)."""
from __future__ import annotations
import logging
from flask import (
Blueprint,
flash,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from ..config import get_dev_fio_password, is_assignment_feature_enabled, is_dev_ui
from ..messages import RU
from .decorators import login_required, current_user
from .services import AuthError, authenticate_credentials
log = logging.getLogger(__name__)
auth_bp = Blueprint('auth', __name__)
def _safe_next(default: str = '/') -> str:
"""Защита от open-redirect: разрешаем только относительные пути."""
nxt = request.values.get('next') or default
if not nxt.startswith('/') or nxt.startswith('//'):
return default
return nxt
def _do_login(login: str, password: str):
user = authenticate_credentials(login, password)
session.clear()
session['user_id'] = user.id
session.permanent = True
return user
# ─── HTML ────────────────────────────────────────────────────────────
@auth_bp.route('/login', methods=['GET'])
def login_page():
if current_user() is not None:
return redirect(_safe_next('/'))
return render_template(
'auth/login.html',
next=_safe_next('/'),
dev_fio_enabled=bool(get_dev_fio_password()),
)
@auth_bp.route('/login', methods=['POST'])
def login_submit():
login = (request.form.get('login') or '').strip()
password = request.form.get('password') or ''
try:
_do_login(login, password)
except AuthError as e:
flash(e.message, 'error')
return render_template(
'auth/login.html',
next=_safe_next('/'),
login=login,
dev_fio_enabled=bool(get_dev_fio_password()),
), e.status
except Exception:
log.exception('login_submit failed')
flash(RU['loginFailed'], 'error')
return render_template(
'auth/login.html',
next=_safe_next('/'),
login=login,
dev_fio_enabled=bool(get_dev_fio_password()),
), 500
return redirect(_safe_next('/'))
@auth_bp.route('/logout', methods=['POST', 'GET'])
def logout():
session.clear()
if request.method == 'GET':
return redirect(url_for('auth.login_page'))
return redirect(url_for('auth.login_page'))
# ─── JSON API ────────────────────────────────────────────────────────
@auth_bp.route('/api/auth/login', methods=['POST'])
def api_login():
data = request.get_json(silent=True) or {}
login = (data.get('login') or '').strip()
password = data.get('password') or ''
try:
user = _do_login(login, password)
except AuthError as e:
return jsonify(error=e.message), e.status
except Exception:
log.exception('api_login failed')
return jsonify(error=RU['loginFailed']), 500
return jsonify(user=user.to_public_dict())
@auth_bp.route('/api/auth/logout', methods=['POST'])
def api_logout():
session.clear()
return jsonify(message=RU['loggedOut'])
@auth_bp.route('/api/auth/dev/assignment-directory', methods=['GET'])
@login_required
def api_assignment_directory():
"""Список сотрудников для назначения теста с поиском по имени/логину и фильтром отдела."""
from ..db import get_session
from ..models import Department, User as UserModel
q_str = (request.args.get('q') or '').strip()
dept_filter = (request.args.get('department') or '').strip()
clinic_filter = (request.args.get('clinic') or 'all').strip()
session = get_session()
dept_rows = session.query(Department).order_by(Department.name).all()
dept_by_name = {d.name: d.id for d in dept_rows}
departments = [d.name for d in dept_rows]
query = (
session.query(UserModel)
.filter(UserModel.is_active.is_(True))
)
if q_str:
like = f'%{q_str}%'
query = query.filter(
UserModel.full_name.ilike(like) | UserModel.login.ilike(like)
)
if dept_filter and dept_filter in dept_by_name:
query = query.filter(UserModel.department_id == dept_by_name[dept_filter])
if clinic_filter == 'with':
query = query.filter(UserModel.staff_id.isnot(None))
elif clinic_filter == 'without':
query = query.filter(UserModel.staff_id.is_(None))
users = query.order_by(UserModel.full_name).limit(200).all()
dept_name_by_id = {d.id: d.name for d in dept_rows}
people = [
{
'staffId': str(u.id),
'fio': u.full_name,
'webLogin': u.login,
'role': u.role,
'department': dept_name_by_id.get(u.department_id) if u.department_id else None,
'clinicUserId': u.staff_id,
}
for u in users
]
return jsonify(people=people, departments=departments)
@auth_bp.route('/api/auth/me', methods=['GET'])
@login_required
def api_me():
user = current_user()
return jsonify(
user=user.to_public_dict() if user else None,
devUi=is_dev_ui(),
assignmentUi=is_assignment_feature_enabled(),
)