import os import json import tiktoken from typing import List, Tuple, Optional import chromadb from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction from llama_cpp import Llama import torch # Отключаем телеметрию Chroma в самом начале os.environ["CHROMA_TELEMETRY"] = "false" class MedicalRAG: def __init__( self, model_path: str, corpus_path: str = "rag_corpus.json", db_path: str = "./chroma_db", embedding_model_name: str = "cointegrated/rubert-tiny2", top_k: int = 3, n_ctx: int = 2048, # Уменьшено для слабых компьютеров n_threads: int = None, # Автоматическое определение token_multiplier: int = 3, # Уменьшено n_gpu_layers: int = 0, # По умолчанию CPU для совместимости use_gpu_for_embeddings: bool = False, # По умолчанию CPU low_memory: bool = True # Режим низкой памяти ): self.corpus_path = corpus_path self.top_k = top_k self.token_multiplier = token_multiplier self.low_memory = low_memory # === Автоматическое определение потоков === if n_threads is None: import multiprocessing n_threads = max(1, multiprocessing.cpu_count() - 1) # === Проверка доступности GPU (с оптимизацией) === self.has_gpu = torch.cuda.is_available() and not low_memory if self.has_gpu: gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3 print(f"✅ GPU доступен: {torch.cuda.get_device_name()} ({gpu_memory:.1f} GB)") # Автоматическая настройка слоев GPU в зависимости от памяти if gpu_memory < 4: # Маломощные GPU n_gpu_layers = min(n_gpu_layers, 10) elif gpu_memory < 8: # Средние GPU n_gpu_layers = min(n_gpu_layers, 20) else: print("ℹ️ Используется CPU режим") n_gpu_layers = 0 use_gpu_for_embeddings = False # === Оптимизированная инициализация токенизатора === print("Инициализация токенизатора...") self.encoding = None try: self.encoding = tiktoken.get_encoding("cl100k_base") except Exception as e: print(f"⚠️ Токенизатор не загружен: {e}") # === Эмбеддинги с оптимизацией памяти === print("Загрузка эмбеддинг-модели...") device = "cuda" if (self.has_gpu and use_gpu_for_embeddings) else "cpu" # Параметры для экономии памяти model_kwargs = {} if low_memory: model_kwargs = { 'device': device, 'model_kwargs': {'torch_dtype': torch.float16} # Половина точности } self.embedding_function = SentenceTransformerEmbeddingFunction( model_name=embedding_model_name, **model_kwargs ) # === ChromaDB с оптимизацией === print("Инициализация ChromaDB...") self.client = chromadb.PersistentClient(path=db_path) # Упрощенные настройки для экономии памяти collection_metadata = {"hnsw:space": "cosine"} if low_memory: collection_metadata.update({ "hnsw:construction_ef": 100, # Меньше использование памяти "hnsw:M": 16, # Меньше связей в графе }) self.collection = self.client.get_or_create_collection( name="medical_anamnesis", embedding_function=self.embedding_function, metadata=collection_metadata ) if self.collection.count() == 0: print("Загрузка данных в коллекцию...") self._load_corpus() else: print(f"Коллекция содержит {self.collection.count()} записей") # === Оптимизированная загрузка LLM === if not os.path.exists(model_path): raise FileNotFoundError( f"Модель не найдена: {model_path}\n" "Для слабых компьютеров рекомендуется использовать меньшие модели." ) print("Загрузка языковой модели...") # Базовые параметры для всех устройств llm_params = { "model_path": model_path, "n_ctx": n_ctx, "n_threads": n_threads, "verbose": False, "low_vram": low_memory, # Всегда включаем для совместимости "use_mlock": not low_memory, # Блокировка памяти только если достаточно RAM } # Параметры только для GPU if self.has_gpu and n_gpu_layers > 0: llm_params.update({ "n_gpu_layers": n_gpu_layers, "main_gpu": 0, "tensor_split": None, # Упрощаем для совместимости }) print(f"Используется GPU с {n_gpu_layers} слоями") else: print("Используется CPU") try: self.llm = Llama(**llm_params) print("✅ Система готова к работе!") except Exception as e: print(f"❌ Ошибка загрузки модели: {e}") raise def _load_corpus(self): """Загрузка корпуса с обработкой ошибок""" try: with open(self.corpus_path, "r", encoding="utf-8") as f: data = json.load(f) # Пакетная обработка для больших корпусов batch_size = 50 if self.low_memory else 100 for i in range(0, len(data), batch_size): batch = data[i:i + batch_size] self.collection.add( documents=[item["full"] for item in batch], metadatas=[{"short": item["short"]} for item in batch], ids=[f"id_{i + j}" for j in range(len(batch))] ) print(f"Загружено {min(i + batch_size, len(data))}/{len(data)} записей") except Exception as e: print(f"❌ Ошибка загрузки корпуса: {e}") raise def count_tokens(self, text: str) -> int: """Оптимизированный подсчет токенов""" if self.encoding: return len(self.encoding.encode(text)) else: # Упрощенный подсчет для совместимости return len(text.split()) # Приблизительно по словам def build_prompt_with_token_management(self, short_note: str, max_context_tokens: int = 1500) -> Tuple[str, int]: """Строит промпт с оптимизированным управлением токенами""" examples = self.retrieve(short_note) system_msg = ( "На основе примеров напиши развёрнуто жалобы пациента, грамотно с медицинской точки зрения. " "Напиши жалобы в одно предложение, одной строкой. " "Не пиши вводных слов и фраз. Только жалобы пациента. " "Неуместно писать диагнозы и план лечения. " "Расшифруй все сокращения. " "Отвечай сразу без размышлений." ) system_tokens = self.count_tokens(system_msg) note_tokens = self.count_tokens(short_note) # Более консервативный расчет доступных токенов available_tokens = max_context_tokens - system_tokens - note_tokens - 150 selected_examples = [] current_tokens = 0 for example in examples: example_tokens = self.count_tokens(example) if current_tokens + example_tokens <= available_tokens: selected_examples.append(example) current_tokens += example_tokens else: if self.low_memory: break # Быстрый выход в режиме низкой памяти elif len(selected_examples) > 0: break # Сохраняем хотя бы один пример # Упрощенный контекст context = "\n".join([f"Пример: {ex}" for ex in selected_examples]) user_msg = f"""Примеры: {context} Жалобы: "{short_note}" """ import pprint pprint.pprint(system_msg + " " + user_msg) prompt = ( f"<|im_start|>system\n{system_msg}<|im_end|>\n" f"<|im_start|>user\n{user_msg}<|im_end|>\n" "<|im_start|>assistant\n" ) prompt_tokens = self.count_tokens(prompt) return prompt, prompt_tokens def retrieve(self, query: str, n: int = None) -> List[str]: """Оптимизированный поиск""" n = n or min(self.top_k, 3) # Ограничиваем для слабых компьютеров try: results = self.collection.query( query_texts=[query], n_results=n ) return results["documents"][0] except Exception as e: print(f"⚠️ Ошибка поиска: {e}") return [] def generate(self, short_note: str) -> str: """Генерация с оптимизацией памяти""" prompt, prompt_tokens = self.build_prompt_with_token_management(short_note) # Более консервативный расчет максимальных токенов available_tokens = 2048 - prompt_tokens - 30 # Запас уменьшен max_tokens = min(prompt_tokens * self.token_multiplier, available_tokens, 512) # Жесткий лимит print(f"📊 Токены: промпт={prompt_tokens}, ответ={max_tokens}") print(f"⚡️ Устройство: {'GPU' if self.has_gpu else 'CPU'}") try: output = self.llm( prompt, max_tokens=max_tokens, temperature=0.1, stop=["<|im_end|>"], echo=False, stream=False # Отключаем streaming для стабильности ) result = output["choices"][0]["text"].strip() return result except Exception as e: print(f"❌ Ошибка генерации: {e}") return "" def __call__(self, short_note: str) -> str: return self.generate(short_note) # === Упрощенный запуск === if __name__ == "__main__": import time # Автоматическое определение режима низкой памяти import psutil total_memory = psutil.virtual_memory().total / 1024 ** 3 low_memory_mode = total_memory < 8 # Меньше 8GB RAM print(f"💾 Общая память: {total_memory:.1f} GB") print(f"🔧 Режим низкой памяти: {'Да' if low_memory_mode else 'Нет'}") rag = MedicalRAG( model_path="./models/YandexGPT-5-Lite-8B-instruct-Q4_K_M.gguf", n_ctx=2048, # Уменьшенный контекст n_gpu_layers=10 if not low_memory_mode else 0, # Адаптивное количество слоев use_gpu_for_embeddings=not low_memory_mode, low_memory=low_memory_mode ) # Промты для тестирования test_notes = [ "Кашель сухой, температура 38", "А.д. 140, заложенность ушей", "а.д. 140/80, т.36.6", "снижение слуха 2 года" ] for note in test_notes: print(f"\n📥 Кратко: {note}") start_time = time.time() result = rag(note) elapsed_time = time.time() - start_time print(f"⏱ Время: {elapsed_time:.2f} сек") if result: print(f"📤 Развёрнуто: {result}") else: print("❌ Пустой ответ") print("─" * 50)