You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
13 KiB

import os
import json
import tiktoken
from typing import List, Tuple, Optional
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from llama_cpp import Llama
import torch
# Отключаем телеметрию Chroma в самом начале
os.environ["CHROMA_TELEMETRY"] = "false"
class MedicalRAG:
def __init__(
self,
model_path: str,
corpus_path: str = "rag_corpus.json",
db_path: str = "./chroma_db",
embedding_model_name: str = "cointegrated/rubert-tiny2",
top_k: int = 3,
n_ctx: int = 2048, # Уменьшено для слабых компьютеров
n_threads: int = None, # Автоматическое определение
token_multiplier: int = 3, # Уменьшено
n_gpu_layers: int = 0, # По умолчанию CPU для совместимости
use_gpu_for_embeddings: bool = False, # По умолчанию CPU
low_memory: bool = True # Режим низкой памяти
):
self.corpus_path = corpus_path
self.top_k = top_k
self.token_multiplier = token_multiplier
self.low_memory = low_memory
# === Автоматическое определение потоков ===
if n_threads is None:
import multiprocessing
n_threads = max(1, multiprocessing.cpu_count() - 1)
# === Проверка доступности GPU (с оптимизацией) ===
self.has_gpu = torch.cuda.is_available() and not low_memory
if self.has_gpu:
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3
print(f"✅ GPU доступен: {torch.cuda.get_device_name()} ({gpu_memory:.1f} GB)")
# Автоматическая настройка слоев GPU в зависимости от памяти
if gpu_memory < 4: # Маломощные GPU
n_gpu_layers = min(n_gpu_layers, 10)
elif gpu_memory < 8: # Средние GPU
n_gpu_layers = min(n_gpu_layers, 20)
else:
print(" Используется CPU режим")
n_gpu_layers = 0
use_gpu_for_embeddings = False
# === Оптимизированная инициализация токенизатора ===
print("Инициализация токенизатора...")
self.encoding = None
try:
self.encoding = tiktoken.get_encoding("cl100k_base")
except Exception as e:
print(f" Токенизатор не загружен: {e}")
# === Эмбеддинги с оптимизацией памяти ===
print("Загрузка эмбеддинг-модели...")
device = "cuda" if (self.has_gpu and use_gpu_for_embeddings) else "cpu"
# Параметры для экономии памяти
model_kwargs = {}
if low_memory:
model_kwargs = {
'device': device,
'model_kwargs': {'torch_dtype': torch.float16} # Половина точности
}
self.embedding_function = SentenceTransformerEmbeddingFunction(
model_name=embedding_model_name,
**model_kwargs
)
# === ChromaDB с оптимизацией ===
print("Инициализация ChromaDB...")
self.client = chromadb.PersistentClient(path=db_path)
# Упрощенные настройки для экономии памяти
collection_metadata = {"hnsw:space": "cosine"}
if low_memory:
collection_metadata.update({
"hnsw:construction_ef": 100, # Меньше использование памяти
"hnsw:M": 16, # Меньше связей в графе
})
self.collection = self.client.get_or_create_collection(
name="medical_anamnesis",
embedding_function=self.embedding_function,
metadata=collection_metadata
)
if self.collection.count() == 0:
print("Загрузка данных в коллекцию...")
self._load_corpus()
else:
print(f"Коллекция содержит {self.collection.count()} записей")
# === Оптимизированная загрузка LLM ===
if not os.path.exists(model_path):
raise FileNotFoundError(
f"Модель не найдена: {model_path}\n"
"Для слабых компьютеров рекомендуется использовать меньшие модели."
)
print("Загрузка языковой модели...")
# Базовые параметры для всех устройств
llm_params = {
"model_path": model_path,
"n_ctx": n_ctx,
"n_threads": n_threads,
"verbose": False,
"low_vram": low_memory, # Всегда включаем для совместимости
"use_mlock": not low_memory, # Блокировка памяти только если достаточно RAM
}
# Параметры только для GPU
if self.has_gpu and n_gpu_layers > 0:
llm_params.update({
"n_gpu_layers": n_gpu_layers,
"main_gpu": 0,
"tensor_split": None, # Упрощаем для совместимости
})
print(f"Используется GPU с {n_gpu_layers} слоями")
else:
print("Используется CPU")
try:
self.llm = Llama(**llm_params)
print("✅ Система готова к работе!")
except Exception as e:
print(f"❌ Ошибка загрузки модели: {e}")
raise
def _load_corpus(self):
"""Загрузка корпуса с обработкой ошибок"""
try:
with open(self.corpus_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Пакетная обработка для больших корпусов
batch_size = 50 if self.low_memory else 100
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
self.collection.add(
documents=[item["full"] for item in batch],
metadatas=[{"short": item["short"]} for item in batch],
ids=[f"id_{i + j}" for j in range(len(batch))]
)
print(f"Загружено {min(i + batch_size, len(data))}/{len(data)} записей")
except Exception as e:
print(f"❌ Ошибка загрузки корпуса: {e}")
raise
def count_tokens(self, text: str) -> int:
"""Оптимизированный подсчет токенов"""
if self.encoding:
return len(self.encoding.encode(text))
else:
# Упрощенный подсчет для совместимости
return len(text.split()) # Приблизительно по словам
def build_prompt_with_token_management(self, short_note: str, max_context_tokens: int = 1500) -> Tuple[str, int]:
"""Строит промпт с оптимизированным управлением токенами"""
examples = self.retrieve(short_note)
system_msg = (
"На основе примеров напиши развёрнуто жалобы пациента, грамотно с медицинской точки зрения. "
"Напиши жалобы в одно предложение, одной строкой. "
"Не пиши вводных слов и фраз. Только жалобы пациента. "
"Неуместно писать диагнозы и план лечения. "
"Расшифруй все сокращения. "
"Отвечай сразу без размышлений."
)
system_tokens = self.count_tokens(system_msg)
note_tokens = self.count_tokens(short_note)
# Более консервативный расчет доступных токенов
available_tokens = max_context_tokens - system_tokens - note_tokens - 150
selected_examples = []
current_tokens = 0
for example in examples:
example_tokens = self.count_tokens(example)
if current_tokens + example_tokens <= available_tokens:
selected_examples.append(example)
current_tokens += example_tokens
else:
if self.low_memory:
break # Быстрый выход в режиме низкой памяти
elif len(selected_examples) > 0:
break # Сохраняем хотя бы один пример
# Упрощенный контекст
context = "\n".join([f"Пример: {ex}" for ex in selected_examples])
user_msg = f"""Примеры:
{context}
Жалобы: "{short_note}"
"""
import pprint
pprint.pprint(system_msg + " " + user_msg)
prompt = (
f"<|im_start|>system\n{system_msg}<|im_end|>\n"
f"<|im_start|>user\n{user_msg}<|im_end|>\n"
"<|im_start|>assistant\n"
)
prompt_tokens = self.count_tokens(prompt)
return prompt, prompt_tokens
def retrieve(self, query: str, n: int = None) -> List[str]:
"""Оптимизированный поиск"""
n = n or min(self.top_k, 3) # Ограничиваем для слабых компьютеров
try:
results = self.collection.query(
query_texts=[query],
n_results=n
)
return results["documents"][0]
except Exception as e:
print(f" Ошибка поиска: {e}")
return []
def generate(self, short_note: str) -> str:
"""Генерация с оптимизацией памяти"""
prompt, prompt_tokens = self.build_prompt_with_token_management(short_note)
# Более консервативный расчет максимальных токенов
available_tokens = 2048 - prompt_tokens - 30 # Запас уменьшен
max_tokens = min(prompt_tokens * self.token_multiplier, available_tokens, 512) # Жесткий лимит
print(f"📊 Токены: промпт={prompt_tokens}, ответ={max_tokens}")
print(f" Устройство: {'GPU' if self.has_gpu else 'CPU'}")
try:
output = self.llm(
prompt,
max_tokens=max_tokens,
temperature=0.1,
stop=["<|im_end|>"],
echo=False,
stream=False # Отключаем streaming для стабильности
)
result = output["choices"][0]["text"].strip()
return result
except Exception as e:
print(f"❌ Ошибка генерации: {e}")
return ""
def __call__(self, short_note: str) -> str:
return self.generate(short_note)
# === Упрощенный запуск ===
if __name__ == "__main__":
import time
# Автоматическое определение режима низкой памяти
import psutil
total_memory = psutil.virtual_memory().total / 1024 ** 3
low_memory_mode = total_memory < 8 # Меньше 8GB RAM
print(f"💾 Общая память: {total_memory:.1f} GB")
print(f"🔧 Режим низкой памяти: {'Да' if low_memory_mode else 'Нет'}")
rag = MedicalRAG(
model_path="./models/YandexGPT-5-Lite-8B-instruct-Q4_K_M.gguf",
n_ctx=2048, # Уменьшенный контекст
n_gpu_layers=10 if not low_memory_mode else 0, # Адаптивное количество слоев
use_gpu_for_embeddings=not low_memory_mode,
low_memory=low_memory_mode
)
# Промты для тестирования
test_notes = [
"Кашель сухой, температура 38",
"А.д. 140, заложенность ушей",
"а.д. 140/80, т.36.6",
"снижение слуха 2 года"
]
for note in test_notes:
print(f"\n📥 Кратко: {note}")
start_time = time.time()
result = rag(note)
elapsed_time = time.time() - start_time
print(f"⏱ Время: {elapsed_time:.2f} сек")
if result:
print(f"📤 Развёрнуто: {result}")
else:
print("❌ Пустой ответ")
print("" * 50)