"""Прямой доступ к pgvector через psycopg2. Расширение time-tracker/apps/face-service/database.py для домена «Цифровой рецепции»: новая схема (patient_id, track_id, camera_id, quality, captured_at), cross-camera re-id (top-K в окне T минут с фильтром по камерам), поиск пациента (только эмбеддинги с patient_id IS NOT NULL). """ import os import uuid from datetime import datetime, timedelta from typing import Any import psycopg2 import psycopg2.extras from pgvector.psycopg2 import register_vector from dotenv import load_dotenv load_dotenv() DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql://postgres:postgres@localhost:5434/reception", ) def get_connection(): conn = psycopg2.connect(DATABASE_URL) register_vector(conn) return conn # ---------- WRITE ---------- def save_embedding_with_meta( embedding, track_id: str, camera_id: str, quality: float, captured_at: datetime | None = None, patient_id: str | None = None, ) -> str: """Сохраняет эмбеддинг с привязкой к треку/камере/пациенту. Возвращает id записи. captured_at по умолчанию — now(). """ record_id = str(uuid.uuid4()) captured_at = captured_at or datetime.utcnow() conn = get_connection() try: with conn.cursor() as cur: cur.execute( """ INSERT INTO face_embeddings (id, embedding, track_id, camera_id, quality, patient_id, captured_at, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW()) """, (record_id, embedding, track_id, camera_id, quality, patient_id, captured_at), ) conn.commit() finally: conn.close() return record_id def attach_track_to_patient(track_id: str, patient_id: str) -> int: """Привязывает все эмбеддинги трека к пациенту. Возвращает кол-во затронутых строк.""" conn = get_connection() try: with conn.cursor() as cur: cur.execute( "UPDATE face_embeddings SET patient_id = %s WHERE track_id = %s", (patient_id, track_id), ) affected = cur.rowcount conn.commit() finally: conn.close() return affected # ---------- READ ---------- def find_topk_in_window( embedding, camera_id: str, window_minutes: int = 5, k: int = 5, exclude_same_camera: bool = True, ) -> list[dict[str, Any]]: """Top-K ближайших эмбеддингов с других камер в окне последних window_minutes минут. Используется для cross-camera re-id: на новой камере появился человек, ищем тот же эмбеддинг с другой камеры в недавнем прошлом → склейка треков. """ since = datetime.utcnow() - timedelta(minutes=window_minutes) conn = get_connection() try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: sql = """ SELECT fe.track_id, fe.camera_id, fe.captured_at, (fe.embedding <=> %s::vector) AS distance FROM face_embeddings fe WHERE fe.captured_at >= %s AND fe.track_id IS NOT NULL """ params: list[Any] = [embedding, since] if exclude_same_camera: sql += " AND fe.camera_id <> %s" params.append(camera_id) sql += " ORDER BY distance ASC LIMIT %s" params.append(k) cur.execute(sql, params) rows = cur.fetchall() finally: conn.close() return [ { "track_id": str(r["track_id"]), "camera_id": str(r["camera_id"]), "captured_at": r["captured_at"].isoformat(), "distance": float(r["distance"]), } for r in rows ] def find_nearest_patient(embedding, threshold: float) -> dict[str, Any] | None: """Узнать пациента по лицу: ищет ближайший эмбеддинг среди записей, у которых patient_id IS NOT NULL (т.е. дано согласие). Возвращает {patient_id, confidence, distance} или None. """ conn = get_connection() try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ SELECT fe.patient_id, (fe.embedding <=> %s::vector) AS distance FROM face_embeddings fe WHERE fe.patient_id IS NOT NULL ORDER BY distance ASC LIMIT 1 """, (embedding,), ) row = cur.fetchone() finally: conn.close() if row is None: return None distance = float(row["distance"]) if distance > threshold: return None confidence = round(max(0.0, 1.0 - (distance / threshold)), 3) return { "patient_id": str(row["patient_id"]), "confidence": confidence, "distance": distance, } # ---------- DELETE ---------- def delete_patient_embeddings(patient_id: str) -> int: """Удаляет все эмбеддинги пациента. Возвращает кол-во удалённых. Вызывается при отзыве согласия (через 24 ч). """ conn = get_connection() try: with conn.cursor() as cur: cur.execute( "DELETE FROM face_embeddings WHERE patient_id = %s", (patient_id,), ) deleted = cur.rowcount conn.commit() finally: conn.close() return deleted def count_patient_embeddings(patient_id: str) -> int: conn = get_connection() try: with conn.cursor() as cur: cur.execute( "SELECT COUNT(*) FROM face_embeddings WHERE patient_id = %s", (patient_id,), ) return cur.fetchone()[0] finally: conn.close()