|
|
import os |
|
|
import json |
|
|
import tiktoken |
|
|
from typing import List, Tuple, Optional |
|
|
import chromadb |
|
|
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction |
|
|
from llama_cpp import Llama |
|
|
import torch |
|
|
|
|
|
# Отключаем телеметрию Chroma в самом начале |
|
|
os.environ["CHROMA_TELEMETRY"] = "false" |
|
|
|
|
|
|
|
|
class MedicalRAG: |
|
|
def __init__( |
|
|
self, |
|
|
model_path: str, |
|
|
corpus_path: str = "rag_corpus.json", |
|
|
db_path: str = "./chroma_db", |
|
|
embedding_model_name: str = "cointegrated/rubert-tiny2", |
|
|
top_k: int = 3, |
|
|
n_ctx: int = 2048, # Уменьшено для слабых компьютеров |
|
|
n_threads: int = None, # Автоматическое определение |
|
|
token_multiplier: int = 3, # Уменьшено |
|
|
n_gpu_layers: int = 0, # По умолчанию CPU для совместимости |
|
|
use_gpu_for_embeddings: bool = False, # По умолчанию CPU |
|
|
low_memory: bool = True # Режим низкой памяти |
|
|
): |
|
|
self.corpus_path = corpus_path |
|
|
self.top_k = top_k |
|
|
self.token_multiplier = token_multiplier |
|
|
self.low_memory = low_memory |
|
|
|
|
|
# === Автоматическое определение потоков === |
|
|
if n_threads is None: |
|
|
import multiprocessing |
|
|
n_threads = max(1, multiprocessing.cpu_count() - 1) |
|
|
|
|
|
# === Проверка доступности GPU (с оптимизацией) === |
|
|
self.has_gpu = torch.cuda.is_available() and not low_memory |
|
|
if self.has_gpu: |
|
|
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3 |
|
|
print(f"✅ GPU доступен: {torch.cuda.get_device_name()} ({gpu_memory:.1f} GB)") |
|
|
|
|
|
# Автоматическая настройка слоев GPU в зависимости от памяти |
|
|
if gpu_memory < 4: # Маломощные GPU |
|
|
n_gpu_layers = min(n_gpu_layers, 10) |
|
|
elif gpu_memory < 8: # Средние GPU |
|
|
n_gpu_layers = min(n_gpu_layers, 20) |
|
|
else: |
|
|
print("ℹ️ Используется CPU режим") |
|
|
n_gpu_layers = 0 |
|
|
use_gpu_for_embeddings = False |
|
|
|
|
|
# === Оптимизированная инициализация токенизатора === |
|
|
print("Инициализация токенизатора...") |
|
|
self.encoding = None |
|
|
try: |
|
|
self.encoding = tiktoken.get_encoding("cl100k_base") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Токенизатор не загружен: {e}") |
|
|
|
|
|
# === Эмбеддинги с оптимизацией памяти === |
|
|
print("Загрузка эмбеддинг-модели...") |
|
|
device = "cuda" if (self.has_gpu and use_gpu_for_embeddings) else "cpu" |
|
|
|
|
|
# Параметры для экономии памяти |
|
|
model_kwargs = {} |
|
|
if low_memory: |
|
|
model_kwargs = { |
|
|
'device': device, |
|
|
'model_kwargs': {'torch_dtype': torch.float16} # Половина точности |
|
|
} |
|
|
|
|
|
self.embedding_function = SentenceTransformerEmbeddingFunction( |
|
|
model_name=embedding_model_name, |
|
|
**model_kwargs |
|
|
) |
|
|
|
|
|
# === ChromaDB с оптимизацией === |
|
|
print("Инициализация ChromaDB...") |
|
|
self.client = chromadb.PersistentClient(path=db_path) |
|
|
|
|
|
# Упрощенные настройки для экономии памяти |
|
|
collection_metadata = {"hnsw:space": "cosine"} |
|
|
if low_memory: |
|
|
collection_metadata.update({ |
|
|
"hnsw:construction_ef": 100, # Меньше использование памяти |
|
|
"hnsw:M": 16, # Меньше связей в графе |
|
|
}) |
|
|
|
|
|
self.collection = self.client.get_or_create_collection( |
|
|
name="medical_anamnesis", |
|
|
embedding_function=self.embedding_function, |
|
|
metadata=collection_metadata |
|
|
) |
|
|
|
|
|
if self.collection.count() == 0: |
|
|
print("Загрузка данных в коллекцию...") |
|
|
self._load_corpus() |
|
|
else: |
|
|
print(f"Коллекция содержит {self.collection.count()} записей") |
|
|
|
|
|
# === Оптимизированная загрузка LLM === |
|
|
if not os.path.exists(model_path): |
|
|
raise FileNotFoundError( |
|
|
f"Модель не найдена: {model_path}\n" |
|
|
"Для слабых компьютеров рекомендуется использовать меньшие модели." |
|
|
) |
|
|
|
|
|
print("Загрузка языковой модели...") |
|
|
|
|
|
# Базовые параметры для всех устройств |
|
|
llm_params = { |
|
|
"model_path": model_path, |
|
|
"n_ctx": n_ctx, |
|
|
"n_threads": n_threads, |
|
|
"verbose": False, |
|
|
"low_vram": low_memory, # Всегда включаем для совместимости |
|
|
"use_mlock": not low_memory, # Блокировка памяти только если достаточно RAM |
|
|
} |
|
|
|
|
|
# Параметры только для GPU |
|
|
if self.has_gpu and n_gpu_layers > 0: |
|
|
llm_params.update({ |
|
|
"n_gpu_layers": n_gpu_layers, |
|
|
"main_gpu": 0, |
|
|
"tensor_split": None, # Упрощаем для совместимости |
|
|
}) |
|
|
print(f"Используется GPU с {n_gpu_layers} слоями") |
|
|
else: |
|
|
print("Используется CPU") |
|
|
|
|
|
try: |
|
|
self.llm = Llama(**llm_params) |
|
|
print("✅ Система готова к работе!") |
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка загрузки модели: {e}") |
|
|
raise |
|
|
|
|
|
def _load_corpus(self): |
|
|
"""Загрузка корпуса с обработкой ошибок""" |
|
|
try: |
|
|
with open(self.corpus_path, "r", encoding="utf-8") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
# Пакетная обработка для больших корпусов |
|
|
batch_size = 50 if self.low_memory else 100 |
|
|
for i in range(0, len(data), batch_size): |
|
|
batch = data[i:i + batch_size] |
|
|
self.collection.add( |
|
|
documents=[item["full"] for item in batch], |
|
|
metadatas=[{"short": item["short"]} for item in batch], |
|
|
ids=[f"id_{i + j}" for j in range(len(batch))] |
|
|
) |
|
|
print(f"Загружено {min(i + batch_size, len(data))}/{len(data)} записей") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка загрузки корпуса: {e}") |
|
|
raise |
|
|
|
|
|
def count_tokens(self, text: str) -> int: |
|
|
"""Оптимизированный подсчет токенов""" |
|
|
if self.encoding: |
|
|
return len(self.encoding.encode(text)) |
|
|
else: |
|
|
# Упрощенный подсчет для совместимости |
|
|
return len(text.split()) # Приблизительно по словам |
|
|
|
|
|
def build_prompt_with_token_management(self, short_note: str, max_context_tokens: int = 1500) -> Tuple[str, int]: |
|
|
"""Строит промпт с оптимизированным управлением токенами""" |
|
|
|
|
|
examples = self.retrieve(short_note) |
|
|
|
|
|
system_msg = ( |
|
|
"На основе примеров напиши развёрнуто жалобы пациента, грамотно с медицинской точки зрения. " |
|
|
"Напиши жалобы в одно предложение, одной строкой. " |
|
|
"Не пиши вводных слов и фраз. Только жалобы пациента. " |
|
|
"Неуместно писать диагнозы и план лечения. " |
|
|
"Расшифруй все сокращения. " |
|
|
"Отвечай сразу без размышлений." |
|
|
) |
|
|
|
|
|
system_tokens = self.count_tokens(system_msg) |
|
|
note_tokens = self.count_tokens(short_note) |
|
|
|
|
|
# Более консервативный расчет доступных токенов |
|
|
available_tokens = max_context_tokens - system_tokens - note_tokens - 150 |
|
|
|
|
|
selected_examples = [] |
|
|
current_tokens = 0 |
|
|
|
|
|
for example in examples: |
|
|
example_tokens = self.count_tokens(example) |
|
|
if current_tokens + example_tokens <= available_tokens: |
|
|
selected_examples.append(example) |
|
|
current_tokens += example_tokens |
|
|
else: |
|
|
if self.low_memory: |
|
|
break # Быстрый выход в режиме низкой памяти |
|
|
elif len(selected_examples) > 0: |
|
|
break # Сохраняем хотя бы один пример |
|
|
|
|
|
# Упрощенный контекст |
|
|
context = "\n".join([f"Пример: {ex}" for ex in selected_examples]) |
|
|
|
|
|
user_msg = f"""Примеры: |
|
|
{context} |
|
|
|
|
|
Жалобы: "{short_note}" |
|
|
""" |
|
|
|
|
|
import pprint |
|
|
pprint.pprint(system_msg + " " + user_msg) |
|
|
|
|
|
prompt = ( |
|
|
f"<|im_start|>system\n{system_msg}<|im_end|>\n" |
|
|
f"<|im_start|>user\n{user_msg}<|im_end|>\n" |
|
|
"<|im_start|>assistant\n" |
|
|
) |
|
|
|
|
|
prompt_tokens = self.count_tokens(prompt) |
|
|
return prompt, prompt_tokens |
|
|
|
|
|
def retrieve(self, query: str, n: int = None) -> List[str]: |
|
|
"""Оптимизированный поиск""" |
|
|
n = n or min(self.top_k, 3) # Ограничиваем для слабых компьютеров |
|
|
try: |
|
|
results = self.collection.query( |
|
|
query_texts=[query], |
|
|
n_results=n |
|
|
) |
|
|
return results["documents"][0] |
|
|
except Exception as e: |
|
|
print(f"⚠️ Ошибка поиска: {e}") |
|
|
return [] |
|
|
|
|
|
def generate(self, short_note: str) -> str: |
|
|
"""Генерация с оптимизацией памяти""" |
|
|
prompt, prompt_tokens = self.build_prompt_with_token_management(short_note) |
|
|
|
|
|
# Более консервативный расчет максимальных токенов |
|
|
available_tokens = 2048 - prompt_tokens - 30 # Запас уменьшен |
|
|
max_tokens = min(prompt_tokens * self.token_multiplier, available_tokens, 512) # Жесткий лимит |
|
|
|
|
|
print(f"📊 Токены: промпт={prompt_tokens}, ответ={max_tokens}") |
|
|
print(f"⚡️ Устройство: {'GPU' if self.has_gpu else 'CPU'}") |
|
|
|
|
|
try: |
|
|
output = self.llm( |
|
|
prompt, |
|
|
max_tokens=max_tokens, |
|
|
temperature=0.1, |
|
|
stop=["<|im_end|>"], |
|
|
echo=False, |
|
|
stream=False # Отключаем streaming для стабильности |
|
|
) |
|
|
|
|
|
result = output["choices"][0]["text"].strip() |
|
|
return result |
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка генерации: {e}") |
|
|
return "" |
|
|
|
|
|
def __call__(self, short_note: str) -> str: |
|
|
return self.generate(short_note) |
|
|
|
|
|
|
|
|
# === Упрощенный запуск === |
|
|
if __name__ == "__main__": |
|
|
import time |
|
|
|
|
|
# Автоматическое определение режима низкой памяти |
|
|
import psutil |
|
|
|
|
|
total_memory = psutil.virtual_memory().total / 1024 ** 3 |
|
|
low_memory_mode = total_memory < 8 # Меньше 8GB RAM |
|
|
|
|
|
print(f"💾 Общая память: {total_memory:.1f} GB") |
|
|
print(f"🔧 Режим низкой памяти: {'Да' if low_memory_mode else 'Нет'}") |
|
|
|
|
|
rag = MedicalRAG( |
|
|
model_path="./models/YandexGPT-5-Lite-8B-instruct-Q4_K_M.gguf", |
|
|
n_ctx=2048, # Уменьшенный контекст |
|
|
n_gpu_layers=10 if not low_memory_mode else 0, # Адаптивное количество слоев |
|
|
use_gpu_for_embeddings=not low_memory_mode, |
|
|
low_memory=low_memory_mode |
|
|
) |
|
|
|
|
|
# Промты для тестирования |
|
|
test_notes = [ |
|
|
"Кашель сухой, температура 38", |
|
|
"А.д. 140, заложенность ушей", |
|
|
"а.д. 140/80, т.36.6", |
|
|
"снижение слуха 2 года" |
|
|
] |
|
|
|
|
|
for note in test_notes: |
|
|
print(f"\n📥 Кратко: {note}") |
|
|
start_time = time.time() |
|
|
result = rag(note) |
|
|
elapsed_time = time.time() - start_time |
|
|
|
|
|
print(f"⏱ Время: {elapsed_time:.2f} сек") |
|
|
if result: |
|
|
print(f"📤 Развёрнуто: {result}") |
|
|
else: |
|
|
print("❌ Пустой ответ") |
|
|
print("─" * 50) |