"""Маршруты auth: HTML (`/login`, `/logout`) и JSON (`/api/auth/*`).""" from __future__ import annotations import logging from flask import ( Blueprint, flash, jsonify, redirect, render_template, request, session, url_for, ) from ..config import get_dev_fio_password, is_assignment_feature_enabled, is_dev_ui, is_hr_auth_enabled from ..messages import RU from .decorators import login_required, current_user from .services import AuthError, authenticate_credentials log = logging.getLogger(__name__) auth_bp = Blueprint('auth', __name__) def _safe_next(default: str = '/') -> str: """Защита от open-redirect: разрешаем только относительные пути.""" nxt = request.values.get('next') or default if not nxt.startswith('/') or nxt.startswith('//'): return default return nxt def _do_login(login: str, password: str): user = authenticate_credentials(login, password) session.clear() session['user_id'] = user.id session.permanent = True return user # ─── HTML ──────────────────────────────────────────────────────────── @auth_bp.route('/login', methods=['GET']) def login_page(): if current_user() is not None: return redirect(_safe_next('/')) return render_template( 'auth/login.html', next=_safe_next('/'), dev_fio_enabled=bool(get_dev_fio_password()), hr_auth_enabled=is_hr_auth_enabled(), ) @auth_bp.route('/login', methods=['POST']) def login_submit(): login = (request.form.get('login') or '').strip() password = request.form.get('password') or '' try: _do_login(login, password) except AuthError as e: flash(e.message, 'error') return render_template( 'auth/login.html', next=_safe_next('/'), login=login, dev_fio_enabled=bool(get_dev_fio_password()), hr_auth_enabled=is_hr_auth_enabled(), ), e.status except Exception: log.exception('login_submit failed') flash(RU['loginFailed'], 'error') return render_template( 'auth/login.html', next=_safe_next('/'), login=login, dev_fio_enabled=bool(get_dev_fio_password()), hr_auth_enabled=is_hr_auth_enabled(), ), 500 return redirect(_safe_next('/')) @auth_bp.route('/logout', methods=['POST', 'GET']) def logout(): session.clear() if request.method == 'GET': return redirect(url_for('auth.login_page')) return redirect(url_for('auth.login_page')) # ─── JSON API ──────────────────────────────────────────────────────── @auth_bp.route('/api/auth/login', methods=['POST']) def api_login(): data = request.get_json(silent=True) or {} login = (data.get('login') or '').strip() password = data.get('password') or '' try: user = _do_login(login, password) except AuthError as e: return jsonify(error=e.message), e.status except Exception: log.exception('api_login failed') return jsonify(error=RU['loginFailed']), 500 return jsonify(user=user.to_public_dict()) @auth_bp.route('/api/auth/logout', methods=['POST']) def api_logout(): session.clear() return jsonify(message=RU['loggedOut']) @auth_bp.route('/api/auth/dev/assignment-directory', methods=['GET']) @login_required def api_assignment_directory(): """Список сотрудников для назначения теста с поиском по имени/логину и фильтром отдела.""" from ..db import get_session from ..models import Department, User as UserModel q_str = (request.args.get('q') or '').strip() dept_filter = (request.args.get('department') or '').strip() clinic_filter = (request.args.get('clinic') or 'all').strip() session = get_session() dept_rows = session.query(Department).order_by(Department.name).all() dept_by_name = {d.name: d.id for d in dept_rows} departments = [d.name for d in dept_rows] query = ( session.query(UserModel) .filter(UserModel.is_active.is_(True)) ) if q_str: like = f'%{q_str}%' query = query.filter( UserModel.full_name.ilike(like) | UserModel.login.ilike(like) ) if dept_filter and dept_filter in dept_by_name: query = query.filter(UserModel.department_id == dept_by_name[dept_filter]) if clinic_filter == 'with': query = query.filter(UserModel.staff_id.isnot(None)) elif clinic_filter == 'without': query = query.filter(UserModel.staff_id.is_(None)) users = query.order_by(UserModel.full_name).limit(200).all() dept_name_by_id = {d.id: d.name for d in dept_rows} people = [ { 'staffId': str(u.id), 'fio': u.full_name, 'webLogin': u.login, 'role': u.role, 'department': dept_name_by_id.get(u.department_id) if u.department_id else None, 'clinicUserId': u.staff_id, } for u in users ] return jsonify(people=people, departments=departments) @auth_bp.route('/api/auth/me', methods=['GET']) @login_required def api_me(): user = current_user() return jsonify( user=user.to_public_dict() if user else None, devUi=is_dev_ui(), assignmentUi=is_assignment_feature_enabled(), )