"""Бизнес-логика логина (ORM-версия). Поддерживает оба режима: - Локальный (по умолчанию): bcrypt / Werkzeug в `clinic_tests.users.password_hash`. - HR_AUTH=1: пароль проверяется в `hr_bot_test.users` (raw SQL, внешняя схема), затем UPSERT в `clinic_tests.users` по `staff_id`. """ from __future__ import annotations import re from dataclasses import dataclass from typing import Optional import bcrypt from sqlalchemy import text from werkzeug.security import check_password_hash as _werkzeug_check from ..config import ( HR_MANAGED_PASSWORD_PLACEHOLDER, get_dev_fio_password, is_hr_auth_enabled, ) from ..db import get_engine, get_session from ..messages import RU from ..models import User from .hr_role import map_hr_role_to_app @dataclass class AuthUser: id: str login: str full_name: str | None role: str department_id: Optional[str] staff_id: Optional[int] def to_public_dict(self) -> dict: return { 'id': str(self.id), 'login': self.login, 'fullName': self.full_name, 'role': self.role, 'departmentId': str(self.department_id) if self.department_id else None, 'staffId': self.staff_id, } class AuthError(Exception): def __init__(self, status: int, message: str) -> None: super().__init__(message) self.status = status self.message = message def _verify_password(plain: str, hashed: str | None) -> bool: if not hashed: return False if hashed == HR_MANAGED_PASSWORD_PLACEHOLDER: return False if hashed.startswith('scrypt:') or hashed.startswith('pbkdf2:'): try: return _werkzeug_check(hashed, plain) except Exception: return False if hashed.startswith('$2'): try: return bcrypt.checkpw(plain.encode('utf-8'), hashed.encode('utf-8')) except Exception: return False try: return _werkzeug_check(hashed, plain) except Exception: return False def _user_to_auth(user: User) -> AuthUser: return AuthUser( id=str(user.id), login=user.login, full_name=user.full_name, role=user.role, department_id=str(user.department_id) if user.department_id else None, staff_id=user.staff_id, ) def _normalize_fio(s: str) -> str: """Приводим ФИО к одному виду: trim, пробелы, неразрывный пробел, много переводов строк.""" t = str(s or '') t = t.replace('\u00a0', ' ') return re.sub(r'\s+', ' ', t).strip() def authenticate_credentials(login: str, password: str) -> AuthUser: login = (login or '').strip() password = password or '' if not login or not password: raise AuthError(400, RU['loginAndPasswordRequired']) # Dev-режим: вход по ФИО из HR с общим паролем dev_pw = get_dev_fio_password() if dev_pw and password.strip() == dev_pw.strip(): try: return _authenticate_dev_fio(login) except AuthError: pass # если ФИО не найдено — продолжаем обычную проверку if is_hr_auth_enabled(): return _authenticate_via_hr(login, password) return _authenticate_local(login, password) def _authenticate_dev_fio(fio_raw: str) -> AuthUser: """Dev-режим: найти сотрудника по ФИО в HR-системе и создать/обновить локального пользователя.""" from ..db import get_hr_engine hr_eng = get_hr_engine() if hr_eng is None: raise AuthError(500, RU['hrDatabaseUrlMissing']) fio_needle = _normalize_fio(fio_raw) if not fio_needle: raise AuthError(401, RU['invalidCredentials']) needle = fio_needle.lower() with hr_eng.connect() as hr_conn: rows = hr_conn.execute( text( """ SELECT id, fio, web_login FROM staff_members WHERE fio IS NOT NULL AND lower( regexp_replace( trim(replace(replace(coalesce(fio, ''), chr(160), ' '), E'\\t', ' ')), '[[:space:]]+', ' ', 'g' ) ) = :needle """ ), {'needle': needle}, ).mappings().all() if len(rows) != 1: raise AuthError(401, RU['invalidCredentials']) s = rows[0] web_login = (s['web_login'] or '').strip() or f'fio_{s["id"]}' staff_id = int(s['id']) full_name = s['fio'] or fio_raw session = get_session() user = session.query(User).filter(User.staff_id == staff_id).first() if not user: # Не переиспользовать строку другого человека только по совпадению логина HR occupied = session.query(User).filter(User.login == web_login).first() login_val = web_login if occupied and occupied.staff_id != staff_id: login_val = f'hr_staff_{staff_id}' user = User( login=login_val, password_hash=HR_MANAGED_PASSWORD_PLACEHOLDER, full_name=full_name, role='employee', is_active=True, staff_id=staff_id, ) session.add(user) else: user.full_name = full_name user.password_hash = HR_MANAGED_PASSWORD_PLACEHOLDER user.is_active = True # Обновить логин с HR только если конфликта нет if web_login != user.login: taken = session.query(User).filter( User.login == web_login, User.id != user.id, ).first() if not taken: user.login = web_login session.commit() session.refresh(user) return _user_to_auth(user) def _authenticate_local(login: str, password: str) -> AuthUser: session = get_session() user = ( session.query(User) .filter(User.login == login, User.is_active.is_(True)) .first() ) if not user: raise AuthError(401, RU['invalidCredentials']) if user.password_hash == HR_MANAGED_PASSWORD_PLACEHOLDER: raise AuthError(401, RU['useHrLogin']) if not _verify_password(password, user.password_hash): raise AuthError(401, RU['invalidCredentials']) return _user_to_auth(user) def _authenticate_via_hr(login: str, password: str) -> AuthUser: from ..db import get_hr_engine hr_eng = get_hr_engine() if hr_eng is None: raise AuthError(500, RU['hrDatabaseUrlMissing']) with hr_eng.connect() as hr_conn: u = hr_conn.execute( text( 'SELECT id, username, password_hash, role FROM users ' 'WHERE LOWER(TRIM(username)) = LOWER(TRIM(:login))' ), {'login': login}, ).mappings().first() if not u or not u['password_hash']: raise AuthError(401, RU['invalidCredentials']) if not _verify_password(password, u['password_hash']): raise AuthError(401, RU['invalidCredentials']) s = hr_conn.execute( text( 'SELECT id, fio FROM staff_members ' "WHERE LOWER(TRIM(COALESCE(web_login, ''))) = LOWER(TRIM(:login))" ), {'login': login}, ).mappings().first() if not s: raise AuthError(403, RU['noStaffForLogin']) staff_id = int(s['id']) fio = s['fio'] or login app_role = map_hr_role_to_app(u['role']) # UPSERT через ORM: ищем по staff_id, затем по login, затем создаём session = get_session() user = session.query(User).filter(User.staff_id == staff_id).first() if not user: # при первом входе staff_id ещё не проставлен — ищем по login user = session.query(User).filter(User.login == login).first() if user: user.login = login user.full_name = fio user.role = app_role user.password_hash = HR_MANAGED_PASSWORD_PLACEHOLDER user.staff_id = staff_id user.is_active = True else: user = User( login=login, password_hash=HR_MANAGED_PASSWORD_PLACEHOLDER, full_name=fio, role=app_role, is_active=True, staff_id=staff_id, ) session.add(user) session.commit() session.refresh(user) return _user_to_auth(user) def load_user_by_id(user_id: str) -> Optional[AuthUser]: if not user_id: return None session = get_session() try: import uuid as _uuid uid = _uuid.UUID(user_id) except (ValueError, AttributeError): return None user = ( session.query(User) .filter(User.id == uid, User.is_active.is_(True)) .first() ) if not user: return None return _user_to_auth(user)