Версия цифровой рецепции с резализованным механизмом отслеживания трека пациента по зонам
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

200 lines
6.3 KiB

"""Прямой доступ к pgvector через psycopg2.
Расширение time-tracker/apps/face-service/database.py для домена «Цифровой
рецепции»: новая схема (patient_id, track_id, camera_id, quality, captured_at),
cross-camera re-id (top-K в окне T минут с фильтром по камерам),
поиск пациента (только эмбеддинги с patient_id IS NOT NULL).
"""
import os
import uuid
from datetime import datetime, timedelta
from typing import Any
import psycopg2
import psycopg2.extras
from pgvector.psycopg2 import register_vector
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://postgres:postgres@localhost:5434/reception",
)
def get_connection():
conn = psycopg2.connect(DATABASE_URL)
register_vector(conn)
return conn
# ---------- WRITE ----------
def save_embedding_with_meta(
embedding,
track_id: str,
camera_id: str,
quality: float,
captured_at: datetime | None = None,
patient_id: str | None = None,
) -> str:
"""Сохраняет эмбеддинг с привязкой к треку/камере/пациенту.
Возвращает id записи. captured_at по умолчанию — now().
"""
record_id = str(uuid.uuid4())
captured_at = captured_at or datetime.utcnow()
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO face_embeddings
(id, embedding, track_id, camera_id, quality, patient_id, captured_at, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
""",
(record_id, embedding, track_id, camera_id, quality, patient_id, captured_at),
)
conn.commit()
finally:
conn.close()
return record_id
def attach_track_to_patient(track_id: str, patient_id: str) -> int:
"""Привязывает все эмбеддинги трека к пациенту. Возвращает кол-во затронутых строк."""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"UPDATE face_embeddings SET patient_id = %s WHERE track_id = %s",
(patient_id, track_id),
)
affected = cur.rowcount
conn.commit()
finally:
conn.close()
return affected
# ---------- READ ----------
def find_topk_in_window(
embedding,
camera_id: str,
window_minutes: int = 5,
k: int = 5,
exclude_same_camera: bool = True,
) -> list[dict[str, Any]]:
"""Top-K ближайших эмбеддингов с других камер в окне последних window_minutes минут.
Используется для cross-camera re-id: на новой камере появился человек,
ищем тот же эмбеддинг с другой камеры в недавнем прошлом → склейка треков.
"""
since = datetime.utcnow() - timedelta(minutes=window_minutes)
conn = get_connection()
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
sql = """
SELECT fe.track_id, fe.camera_id, fe.captured_at,
(fe.embedding <=> %s::vector) AS distance
FROM face_embeddings fe
WHERE fe.captured_at >= %s
AND fe.track_id IS NOT NULL
"""
params: list[Any] = [embedding, since]
if exclude_same_camera:
sql += " AND fe.camera_id <> %s"
params.append(camera_id)
sql += " ORDER BY distance ASC LIMIT %s"
params.append(k)
cur.execute(sql, params)
rows = cur.fetchall()
finally:
conn.close()
return [
{
"track_id": str(r["track_id"]),
"camera_id": str(r["camera_id"]),
"captured_at": r["captured_at"].isoformat(),
"distance": float(r["distance"]),
}
for r in rows
]
def find_nearest_patient(embedding, threshold: float) -> dict[str, Any] | None:
"""Узнать пациента по лицу: ищет ближайший эмбеддинг среди записей,
у которых patient_id IS NOT NULL (т.е. дано согласие).
Возвращает {patient_id, confidence, distance} или None.
"""
conn = get_connection()
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT fe.patient_id, (fe.embedding <=> %s::vector) AS distance
FROM face_embeddings fe
WHERE fe.patient_id IS NOT NULL
ORDER BY distance ASC
LIMIT 1
""",
(embedding,),
)
row = cur.fetchone()
finally:
conn.close()
if row is None:
return None
distance = float(row["distance"])
if distance > threshold:
return None
confidence = round(max(0.0, 1.0 - (distance / threshold)), 3)
return {
"patient_id": str(row["patient_id"]),
"confidence": confidence,
"distance": distance,
}
# ---------- DELETE ----------
def delete_patient_embeddings(patient_id: str) -> int:
"""Удаляет все эмбеддинги пациента. Возвращает кол-во удалённых.
Вызывается при отзыве согласия (через 24 ч).
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM face_embeddings WHERE patient_id = %s",
(patient_id,),
)
deleted = cur.rowcount
conn.commit()
finally:
conn.close()
return deleted
def count_patient_embeddings(patient_id: str) -> int:
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM face_embeddings WHERE patient_id = %s",
(patient_id,),
)
return cur.fetchone()[0]
finally:
conn.close()