You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
7.3 KiB
217 lines
7.3 KiB
"""Бизнес-логика логина — порт `backend/src/routes/auth.js` + `utils/auth.js`. |
|
|
|
Поддерживает оба режима: |
|
- Локальный (по умолчанию): bcrypt в `clinic_tests.users.password_hash`. |
|
- HR_AUTH=1: пароль проверяется в `hr_bot_test.users` (Werkzeug scrypt/pbkdf2), |
|
затем находим запись `staff_members` по `web_login` и UPSERT-им в |
|
`clinic_tests.users` по `staff_id`. |
|
|
|
Werkzeug-хеши проверяются через `werkzeug.security.check_password_hash`, |
|
bcrypt-хеши — через пакет `bcrypt`. |
|
""" |
|
from __future__ import annotations |
|
|
|
from dataclasses import dataclass |
|
from typing import Optional |
|
|
|
import bcrypt |
|
from sqlalchemy import text |
|
from werkzeug.security import check_password_hash as _werkzeug_check |
|
|
|
from ..config import ( |
|
HR_MANAGED_PASSWORD_PLACEHOLDER, |
|
is_hr_auth_enabled, |
|
) |
|
from ..db import get_engine, get_hr_engine |
|
from ..messages import RU |
|
from .hr_role import map_hr_role_to_app |
|
|
|
|
|
@dataclass |
|
class AuthUser: |
|
id: str # UUID в виде строки |
|
login: str |
|
full_name: str | None |
|
role: str |
|
department_id: Optional[str] |
|
staff_id: Optional[int] |
|
|
|
def to_public_dict(self) -> dict: |
|
out = { |
|
'id': str(self.id), |
|
'login': self.login, |
|
'fullName': self.full_name, |
|
'role': self.role, |
|
'departmentId': self.department_id, |
|
} |
|
out['staffId'] = self.staff_id |
|
return out |
|
|
|
|
|
class AuthError(Exception): |
|
"""Ошибка авторизации с HTTP-кодом и сообщением для пользователя.""" |
|
|
|
def __init__(self, status: int, message: str) -> None: |
|
super().__init__(message) |
|
self.status = status |
|
self.message = message |
|
|
|
|
|
def _verify_password(plain: str, hashed: str | None) -> bool: |
|
"""Универсальная проверка пароля: bcrypt + Werkzeug (scrypt/pbkdf2).""" |
|
if not hashed: |
|
return False |
|
if hashed == HR_MANAGED_PASSWORD_PLACEHOLDER: |
|
return False |
|
if hashed.startswith('scrypt:') or hashed.startswith('pbkdf2:'): |
|
try: |
|
return _werkzeug_check(hashed, plain) |
|
except Exception: |
|
return False |
|
if hashed.startswith('$2'): |
|
try: |
|
return bcrypt.checkpw(plain.encode('utf-8'), hashed.encode('utf-8')) |
|
except Exception: |
|
return False |
|
try: |
|
return _werkzeug_check(hashed, plain) |
|
except Exception: |
|
return False |
|
|
|
|
|
def authenticate_credentials(login: str, password: str) -> AuthUser: |
|
"""Главная точка входа. Возвращает AuthUser или поднимает AuthError.""" |
|
login = (login or '').strip() |
|
password = password or '' |
|
if not login or not password: |
|
raise AuthError(400, RU['loginAndPasswordRequired']) |
|
|
|
if is_hr_auth_enabled(): |
|
return _authenticate_via_hr(login, password) |
|
return _authenticate_local(login, password) |
|
|
|
|
|
# ─── локальный режим ──────────────────────────────────────────────── |
|
|
|
def _authenticate_local(login: str, password: str) -> AuthUser: |
|
eng = get_engine() |
|
with eng.connect() as conn: |
|
row = conn.execute( |
|
text( |
|
'SELECT id, login, password_hash, full_name, role, department_id, staff_id ' |
|
'FROM users WHERE login = :login AND is_active = true' |
|
), |
|
{'login': login}, |
|
).mappings().first() |
|
|
|
if not row: |
|
raise AuthError(401, RU['invalidCredentials']) |
|
|
|
if row['password_hash'] == HR_MANAGED_PASSWORD_PLACEHOLDER: |
|
raise AuthError(401, RU['useHrLogin']) |
|
|
|
if not _verify_password(password, row['password_hash']): |
|
raise AuthError(401, RU['invalidCredentials']) |
|
|
|
return AuthUser( |
|
id=str(row['id']), |
|
login=row['login'], |
|
full_name=row['full_name'], |
|
role=row['role'], |
|
department_id=row['department_id'], |
|
staff_id=row['staff_id'], |
|
) |
|
|
|
|
|
# ─── HR_AUTH=1 ────────────────────────────────────────────────────── |
|
|
|
def _authenticate_via_hr(login: str, password: str) -> AuthUser: |
|
hr_eng = get_hr_engine() |
|
if hr_eng is None: |
|
raise AuthError(500, RU['hrDatabaseUrlMissing']) |
|
|
|
with hr_eng.connect() as hr_conn: |
|
u = hr_conn.execute( |
|
text( |
|
'SELECT id, username, password_hash, role FROM users ' |
|
'WHERE LOWER(TRIM(username)) = LOWER(TRIM(:login))' |
|
), |
|
{'login': login}, |
|
).mappings().first() |
|
if not u or not u['password_hash']: |
|
raise AuthError(401, RU['invalidCredentials']) |
|
if not _verify_password(password, u['password_hash']): |
|
raise AuthError(401, RU['invalidCredentials']) |
|
|
|
s = hr_conn.execute( |
|
text( |
|
'SELECT id, fio FROM staff_members ' |
|
"WHERE LOWER(TRIM(COALESCE(web_login, ''))) = LOWER(TRIM(:login))" |
|
), |
|
{'login': login}, |
|
).mappings().first() |
|
if not s: |
|
raise AuthError(403, RU['noStaffForLogin']) |
|
|
|
staff_id = int(s['id']) |
|
fio = s['fio'] or login |
|
app_role = map_hr_role_to_app(u['role']) |
|
|
|
eng = get_engine() |
|
with eng.begin() as conn: |
|
row = conn.execute( |
|
text( |
|
""" |
|
INSERT INTO users (login, password_hash, full_name, role, |
|
department_id, is_active, staff_id) |
|
VALUES (:login, :ph, :fn, :role, NULL, true, :staff_id) |
|
ON CONFLICT (staff_id) DO UPDATE SET |
|
login = EXCLUDED.login, |
|
full_name = EXCLUDED.full_name, |
|
role = EXCLUDED.role, |
|
password_hash = EXCLUDED.password_hash |
|
RETURNING id, login, full_name, role, department_id, staff_id |
|
""" |
|
), |
|
{ |
|
'login': login, |
|
'ph': HR_MANAGED_PASSWORD_PLACEHOLDER, |
|
'fn': fio, |
|
'role': app_role, |
|
'staff_id': staff_id, |
|
}, |
|
).mappings().first() |
|
|
|
return AuthUser( |
|
id=str(row['id']), |
|
login=row['login'], |
|
full_name=row['full_name'], |
|
role=row['role'], |
|
department_id=row['department_id'], |
|
staff_id=row['staff_id'], |
|
) |
|
|
|
|
|
def load_user_by_id(user_id: str) -> Optional[AuthUser]: |
|
"""Догружает пользователя из `clinic_tests.users` (используется при каждом запросе).""" |
|
if not user_id: |
|
return None |
|
eng = get_engine() |
|
with eng.connect() as conn: |
|
row = conn.execute( |
|
text( |
|
'SELECT id, login, full_name, role, department_id, staff_id ' |
|
'FROM users WHERE id = :id AND is_active = true' |
|
), |
|
{'id': user_id}, |
|
).mappings().first() |
|
if not row: |
|
return None |
|
return AuthUser( |
|
id=str(row['id']), |
|
login=row['login'], |
|
full_name=row['full_name'], |
|
role=row['role'], |
|
department_id=row['department_id'], |
|
staff_id=row['staff_id'], |
|
)
|
|
|