You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
4.4 KiB

"""Подключение к PostgreSQL — тот же паттерн, что в HR_TG_Bot/tgFlaskForm/db/session.py.
В Этапе 1 работаем с БД `clinic_tests` (схема не меняется). Опционально доступна
вторая БД `hr_bot_test` для HR-аутентификации (см. .env.example, флаг HR_AUTH).
"""
from __future__ import annotations
import os
import threading
from typing import Optional
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
_lock = threading.Lock()
_engine: Optional[Engine] = None
_session_local: Optional[sessionmaker] = None
_hr_engine: Optional[Engine] = None
_hr_session_local: Optional[sessionmaker] = None
def get_database_url() -> str:
"""URL основной БД (`clinic_tests`).
Приоритет: DATABASE_URL → отдельные DB_*-переменные.
"""
if db_url := os.environ.get('DATABASE_URL'):
return db_url.strip()
db_host = os.environ.get('DB_HOST', 'localhost')
db_port = os.environ.get('DB_PORT', '5432')
db_name = os.environ.get('DB_NAME', 'clinic_tests')
db_user = os.environ.get('DB_USER', 'hr_bot_user')
db_password = os.environ.get('DB_PASSWORD', 'hrbot123')
return f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
def get_hr_database_url() -> Optional[str]:
"""URL БД HR (`hr_bot_test`) — только если включён HR_AUTH."""
if not _hr_auth_enabled():
return None
if url := os.environ.get('HR_DATABASE_URL'):
return url.strip()
return None
def _hr_auth_enabled() -> bool:
val = (os.environ.get('HR_AUTH') or '').strip().lower()
return val in ('1', 'true', 'yes', 'on')
def get_engine() -> Engine:
"""Возвращает общий engine основной БД (singleton на процесс)."""
global _engine
if _engine is not None:
return _engine
with _lock:
if _engine is not None:
return _engine
_engine = create_engine(
get_database_url(),
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
)
return _engine
def get_session():
"""Создаёт новую ORM-сессию поверх общего engine."""
global _session_local
if _session_local is None:
with _lock:
if _session_local is None:
_session_local = sessionmaker(bind=get_engine())
return _session_local()
def get_hr_engine() -> Optional[Engine]:
"""Engine для HR-БД. Возвращает None, если HR_AUTH не включён."""
if not _hr_auth_enabled():
return None
global _hr_engine
if _hr_engine is not None:
return _hr_engine
url = get_hr_database_url()
if not url:
return None
with _lock:
if _hr_engine is not None:
return _hr_engine
_hr_engine = create_engine(
url,
poolclass=QueuePool,
pool_size=3,
max_overflow=5,
pool_pre_ping=True,
)
return _hr_engine
def get_hr_session():
"""Сессия для HR-БД (или None при выключенном HR_AUTH)."""
eng = get_hr_engine()
if eng is None:
return None
global _hr_session_local
if _hr_session_local is None:
with _lock:
if _hr_session_local is None:
_hr_session_local = sessionmaker(bind=eng)
return _hr_session_local()
def ping() -> dict:
"""Smoke-проверка подключения к БД (используется в /health)."""
out: dict = {'main': 'unknown'}
try:
with get_engine().connect() as conn:
conn.exec_driver_sql('SELECT 1')
out['main'] = 'ok'
except Exception as e:
out['main'] = f'error: {type(e).__name__}: {e}'
if _hr_auth_enabled():
out['hr'] = 'unknown'
try:
eng = get_hr_engine()
if eng is None:
out['hr'] = 'disabled (HR_DATABASE_URL not set)'
else:
with eng.connect() as conn:
conn.exec_driver_sql('SELECT 1')
out['hr'] = 'ok'
except Exception as e:
out['hr'] = f'error: {type(e).__name__}: {e}'
return out