You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
7.3 KiB

"""Бизнес-логика логина — порт `backend/src/routes/auth.js` + `utils/auth.js`.
Поддерживает оба режима:
- Локальный (по умолчанию): bcrypt в `clinic_tests.users.password_hash`.
- HR_AUTH=1: пароль проверяется в `hr_bot_test.users` (Werkzeug scrypt/pbkdf2),
затем находим запись `staff_members` по `web_login` и UPSERT-им в
`clinic_tests.users` по `staff_id`.
Werkzeug-хеши проверяются через `werkzeug.security.check_password_hash`,
bcrypt-хеши — через пакет `bcrypt`.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import bcrypt
from sqlalchemy import text
from werkzeug.security import check_password_hash as _werkzeug_check
from ..config import (
HR_MANAGED_PASSWORD_PLACEHOLDER,
is_hr_auth_enabled,
)
from ..db import get_engine, get_hr_engine
from ..messages import RU
from .hr_role import map_hr_role_to_app
@dataclass
class AuthUser:
id: str # UUID в виде строки
login: str
full_name: str | None
role: str
department_id: Optional[str]
staff_id: Optional[int]
def to_public_dict(self) -> dict:
out = {
'id': str(self.id),
'login': self.login,
'fullName': self.full_name,
'role': self.role,
'departmentId': self.department_id,
}
out['staffId'] = self.staff_id
return out
class AuthError(Exception):
"""Ошибка авторизации с HTTP-кодом и сообщением для пользователя."""
def __init__(self, status: int, message: str) -> None:
super().__init__(message)
self.status = status
self.message = message
def _verify_password(plain: str, hashed: str | None) -> bool:
"""Универсальная проверка пароля: bcrypt + Werkzeug (scrypt/pbkdf2)."""
if not hashed:
return False
if hashed == HR_MANAGED_PASSWORD_PLACEHOLDER:
return False
if hashed.startswith('scrypt:') or hashed.startswith('pbkdf2:'):
try:
return _werkzeug_check(hashed, plain)
except Exception:
return False
if hashed.startswith('$2'):
try:
return bcrypt.checkpw(plain.encode('utf-8'), hashed.encode('utf-8'))
except Exception:
return False
try:
return _werkzeug_check(hashed, plain)
except Exception:
return False
def authenticate_credentials(login: str, password: str) -> AuthUser:
"""Главная точка входа. Возвращает AuthUser или поднимает AuthError."""
login = (login or '').strip()
password = password or ''
if not login or not password:
raise AuthError(400, RU['loginAndPasswordRequired'])
if is_hr_auth_enabled():
return _authenticate_via_hr(login, password)
return _authenticate_local(login, password)
# ─── локальный режим ────────────────────────────────────────────────
def _authenticate_local(login: str, password: str) -> AuthUser:
eng = get_engine()
with eng.connect() as conn:
row = conn.execute(
text(
'SELECT id, login, password_hash, full_name, role, department_id, staff_id '
'FROM users WHERE login = :login AND is_active = true'
),
{'login': login},
).mappings().first()
if not row:
raise AuthError(401, RU['invalidCredentials'])
if row['password_hash'] == HR_MANAGED_PASSWORD_PLACEHOLDER:
raise AuthError(401, RU['useHrLogin'])
if not _verify_password(password, row['password_hash']):
raise AuthError(401, RU['invalidCredentials'])
return AuthUser(
id=str(row['id']),
login=row['login'],
full_name=row['full_name'],
role=row['role'],
department_id=row['department_id'],
staff_id=row['staff_id'],
)
# ─── HR_AUTH=1 ──────────────────────────────────────────────────────
def _authenticate_via_hr(login: str, password: str) -> AuthUser:
hr_eng = get_hr_engine()
if hr_eng is None:
raise AuthError(500, RU['hrDatabaseUrlMissing'])
with hr_eng.connect() as hr_conn:
u = hr_conn.execute(
text(
'SELECT id, username, password_hash, role FROM users '
'WHERE LOWER(TRIM(username)) = LOWER(TRIM(:login))'
),
{'login': login},
).mappings().first()
if not u or not u['password_hash']:
raise AuthError(401, RU['invalidCredentials'])
if not _verify_password(password, u['password_hash']):
raise AuthError(401, RU['invalidCredentials'])
s = hr_conn.execute(
text(
'SELECT id, fio FROM staff_members '
"WHERE LOWER(TRIM(COALESCE(web_login, ''))) = LOWER(TRIM(:login))"
),
{'login': login},
).mappings().first()
if not s:
raise AuthError(403, RU['noStaffForLogin'])
staff_id = int(s['id'])
fio = s['fio'] or login
app_role = map_hr_role_to_app(u['role'])
eng = get_engine()
with eng.begin() as conn:
row = conn.execute(
text(
"""
INSERT INTO users (login, password_hash, full_name, role,
department_id, is_active, staff_id)
VALUES (:login, :ph, :fn, :role, NULL, true, :staff_id)
ON CONFLICT (staff_id) DO UPDATE SET
login = EXCLUDED.login,
full_name = EXCLUDED.full_name,
role = EXCLUDED.role,
password_hash = EXCLUDED.password_hash
RETURNING id, login, full_name, role, department_id, staff_id
"""
),
{
'login': login,
'ph': HR_MANAGED_PASSWORD_PLACEHOLDER,
'fn': fio,
'role': app_role,
'staff_id': staff_id,
},
).mappings().first()
return AuthUser(
id=str(row['id']),
login=row['login'],
full_name=row['full_name'],
role=row['role'],
department_id=row['department_id'],
staff_id=row['staff_id'],
)
def load_user_by_id(user_id: str) -> Optional[AuthUser]:
"""Догружает пользователя из `clinic_tests.users` (используется при каждом запросе)."""
if not user_id:
return None
eng = get_engine()
with eng.connect() as conn:
row = conn.execute(
text(
'SELECT id, login, full_name, role, department_id, staff_id '
'FROM users WHERE id = :id AND is_active = true'
),
{'id': user_id},
).mappings().first()
if not row:
return None
return AuthUser(
id=str(row['id']),
login=row['login'],
full_name=row['full_name'],
role=row['role'],
department_id=row['department_id'],
staff_id=row['staff_id'],
)