from PyQt6.QtCore import QThread, QMutex, QWaitCondition, pyqtSignal import time import os import json import tiktoken from typing import List, Tuple, Optional import chromadb from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction from llama_cpp import Llama import torch class MedicalRAG: def __init__( self, model_path: str, corpus_path: str = "rag_corpus.json", db_path: str = "./chroma_db", embedding_model_name: str = "cointegrated/rubert-tiny2", top_k: int = 3, n_ctx: int = 2048, n_threads: int = None, token_multiplier: int = 3, n_gpu_layers: int = 0, use_gpu_for_embeddings: bool = False, low_memory: bool = True ): self.corpus_path = corpus_path self.top_k = top_k self.token_multiplier = token_multiplier self.low_memory = low_memory # Автоматическое определение потоков if n_threads is None: import multiprocessing n_threads = max(1, multiprocessing.cpu_count() - 1) # Проверка доступности GPU self.has_gpu = torch.cuda.is_available() and not low_memory if self.has_gpu: gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3 print(f"✅ GPU доступен: {torch.cuda.get_device_name()} ({gpu_memory:.1f} GB)") if gpu_memory < 4: n_gpu_layers = min(n_gpu_layers, 10) elif gpu_memory < 8: n_gpu_layers = min(n_gpu_layers, 20) else: print("ℹ️ Используется CPU режим") n_gpu_layers = 0 use_gpu_for_embeddings = False # Инициализация токенизатора print("Инициализация токенизатора...") self.encoding = None try: self.encoding = tiktoken.get_encoding("cl100k_base") except Exception as e: print(f"⚠️ Токенизатор не загружен: {e}") # Эмбеддинги print("Загрузка эмбеддинг-модели...") device = "cuda" if (self.has_gpu and use_gpu_for_embeddings) else "cpu" model_kwargs = {} if low_memory: model_kwargs = { 'device': device, 'model_kwargs': {'torch_dtype': torch.float16} } self.embedding_function = SentenceTransformerEmbeddingFunction( model_name=embedding_model_name, **model_kwargs ) # ChromaDB print("Инициализация ChromaDB...") self.client = chromadb.PersistentClient(path=db_path) collection_metadata = {"hnsw:space": "cosine"} if low_memory: collection_metadata.update({ "hnsw:construction_ef": 100, "hnsw:M": 16, }) self.collection = self.client.get_or_create_collection( name="medical_anamnesis", embedding_function=self.embedding_function, metadata=collection_metadata ) if self.collection.count() == 0: print("Загрузка данных в коллекцию...") self._load_corpus() else: print(f"Коллекция содержит {self.collection.count()} записей") # Загрузка LLM if not os.path.exists(model_path): raise FileNotFoundError( f"Модель не найдена: {model_path}\n" "Для слабых компьютеров рекомендуется использовать меньшие модели." ) print("Загрузка языковой модели...") llm_params = { "model_path": model_path, "n_ctx": n_ctx, "n_threads": n_threads, "verbose": False, "low_vram": low_memory, "use_mlock": not low_memory, } if self.has_gpu and n_gpu_layers > 0: llm_params.update({ "n_gpu_layers": n_gpu_layers, "main_gpu": 0, "tensor_split": None, }) print(f"Используется GPU с {n_gpu_layers} слоями") else: print("Используется CPU") try: self.llm = Llama(**llm_params) print("✅ Система готова к работе!") except Exception as e: print(f"❌ Ошибка загрузки модели: {e}") raise def _load_corpus(self): """Загрузка корпуса с обработкой ошибок""" try: with open(self.corpus_path, "r", encoding="utf-8") as f: data = json.load(f) batch_size = 50 if self.low_memory else 100 for i in range(0, len(data), batch_size): batch = data[i:i + batch_size] self.collection.add( documents=[item["full"] for item in batch], metadatas=[{"short": item["short"]} for item in batch], ids=[f"id_{i + j}" for j in range(len(batch))] ) print(f"Загружено {min(i + batch_size, len(data))}/{len(data)} записей") except Exception as e: print(f"❌ Ошибка загрузки корпуса: {e}") raise def count_tokens(self, text: str) -> int: """Подсчет токенов""" if self.encoding: return len(self.encoding.encode(text)) else: return len(text.split()) def build_prompt_with_token_management(self, short_note: str, processing_type: str, max_context_tokens: int = 1500) -> Tuple[str, int]: """Строит промпт с оптимизированным управлением токенами""" examples = self.retrieve(short_note) # Разные системные сообщения для жалоб и анамнеза if processing_type == 'complaints': system_msg = ( "На основе примеров напиши развёрнуто жалобы пациента, грамотно с медицинской точки зрения. " "Напиши жалобы в одно предложение, одной строкой. " "Не пиши вводных слов и фраз. Только жалобы пациента. " "Неуместно писать диагнозы и план лечения. " "Расшифруй все сокращения. " "Отвечай сразу без размышлений." ) else: # history system_msg = ( "На основе примеров напиши развёрнутый анамнез заболевания, грамотно с медицинской точки зрения. " # "Опиши историю развития заболевания: когда началось, как развивалось, какое лечение получал. " "Неуместно писать диагнозы и план лечения. " "Используй медицинскую терминологию. " "Расшифруй все сокращения. " "Отвечай сразу без размышлений." ) system_tokens = self.count_tokens(system_msg) note_tokens = self.count_tokens(short_note) available_tokens = max_context_tokens - system_tokens - note_tokens - 150 selected_examples = [] current_tokens = 0 for example in examples: example_tokens = self.count_tokens(example) if current_tokens + example_tokens <= available_tokens: selected_examples.append(example) current_tokens += example_tokens else: if self.low_memory: break elif len(selected_examples) > 0: break context = "\n".join([f"Пример: {ex}" for ex in selected_examples]) user_msg = f"""Примеры: {context} {"Жалобы" if processing_type == 'complaints' else "Анамнез"}: "{short_note}" """ prompt = ( f"<|im_start|>system\n{system_msg}<|im_end|>\n" f"<|im_start|>user\n{user_msg}<|im_end|>\n" "<|im_start|>assistant\n" ) prompt_tokens = self.count_tokens(prompt) return prompt, prompt_tokens def retrieve(self, query: str, n: int = None) -> List[str]: """Оптимизированный поиск""" n = n or min(self.top_k, 3) try: results = self.collection.query( query_texts=[query], n_results=n ) return results["documents"][0] except Exception as e: print(f"⚠️ Ошибка поиска: {e}") return [] def generate(self, short_note: str, processing_type: str) -> str: """Генерация с оптимизацией памяти""" prompt, prompt_tokens = self.build_prompt_with_token_management(short_note, processing_type) available_tokens = 2048 - prompt_tokens - 30 max_tokens = min(prompt_tokens * self.token_multiplier, available_tokens, 512) print(f"📊 Токены: промпт={prompt_tokens}, ответ={max_tokens}") print(f"⚡️ Устройство: {'GPU' if self.has_gpu else 'CPU'}") try: output = self.llm( prompt, max_tokens=max_tokens, temperature=0.1, stop=["<|im_end|>"], echo=False, stream=False ) result = output["choices"][0]["text"].strip() return result except Exception as e: print(f"❌ Ошибка генерации: {e}") return "" def __call__(self, short_note: str, processing_type: str) -> str: return self.generate(short_note, processing_type) class AIWorker(QThread): """Worker для обработки данных нейросетью""" text_processed = pyqtSignal(str, str) # text, processing_type image_processed = pyqtSignal(str) processing_finished = pyqtSignal(str) # processing_type def __init__(self): super().__init__() self.mutex = QMutex() self.condition = QWaitCondition() self.text_to_process = None self.image_to_process = None self.processing_type = None # 'complaints', 'history', 'image' self._active = True # Инициализация MedicalRAG self.rag = None self._init_rag() def _init_rag(self): """Инициализация MedicalRAG системы""" try: import psutil # Автоматическое определение режима низкой памяти total_memory = psutil.virtual_memory().total / 1024 ** 3 low_memory_mode = total_memory < 8 print(f"💾 Общая память: {total_memory:.1f} GB") print(f"🔧 Режим низкой памяти: {'Да' if low_memory_mode else 'Нет'}") # Путь к модели - нужно будет настроить под вашу систему model_path = "./models/YandexGPT-5-Lite-8B-instruct-Q4_K_M.gguf" # Если модель не найдена, используем имитацию if not os.path.exists(model_path): print("⚠️ Модель не найдена, используется имитация LLM") self.rag = None return self.rag = MedicalRAG( model_path=model_path, n_ctx=2048, n_gpu_layers=10 if not low_memory_mode else 0, use_gpu_for_embeddings=not low_memory_mode, low_memory=low_memory_mode ) print("✅ MedicalRAG инициализирован успешно!") except Exception as e: print(f"❌ Ошибка инициализации MedicalRAG: {e}") print("⚠️ Будет использоваться имитация LLM") self.rag = None def process_text(self, text, processing_type): """Запускает обработку текста""" self.mutex.lock() self.text_to_process = text self.image_to_process = None self.processing_type = processing_type self.condition.wakeOne() self.mutex.unlock() def process_image(self, image_path): """Запускает обработку изображения""" self.mutex.lock() self.image_to_process = image_path self.text_to_process = None self.processing_type = 'image' self.condition.wakeOne() self.mutex.unlock() def stop(self): """Останавливает worker""" self._active = False self.condition.wakeOne() self.wait() def run(self): """Основной цикл обработки""" while self._active: self.mutex.lock() if not self.text_to_process and not self.image_to_process: self.condition.wait(self.mutex) if not self._active: self.mutex.unlock() break if self.processing_type in ['complaints', 'history'] and self.text_to_process: text = self.text_to_process processing_type = self.processing_type self.text_to_process = None self.mutex.unlock() # Обработка текста с использованием LLM result = self.process_text_with_llm(text, processing_type) self.text_processed.emit(result, processing_type) elif self.processing_type == 'image' and self.image_to_process: image_path = self.image_to_process self.image_to_process = None self.mutex.unlock() # Обработка изображения result = self.simulate_image_processing(image_path) self.image_processed.emit(result) else: self.mutex.unlock() self.processing_finished.emit(self.processing_type) def process_text_with_llm(self, text, processing_type): """Обработка текста с использованием LLM""" if not text.strip(): return "" # Если LLM не инициализирован, используем имитацию if self.rag is None: return self.simulate_text_processing(text, processing_type) try: print(f"🔍 Обработка {processing_type} с помощью LLM...") print(f"📥 Входной текст: {text}") # Обработка через LLM result = self.rag(text, processing_type) # Если LLM вернул пустой результат, используем исходный текст if not result or result.strip() == "": print("⚠️ LLM вернул пустой ответ, используется исходный текст") result = self._format_fallback_text(text, processing_type) else: print(f"📤 Результат LLM: {result}") return result except Exception as e: print(f"❌ Ошибка при обработке LLM: {e}") # В случае ошибки возвращаем форматированный исходный текст return self._format_fallback_text(text, processing_type) def _format_fallback_text(self, text, processing_type): """Форматирует исходный текст как fallback""" if processing_type == 'complaints': return f"🤖 Анализ жалоб ИИ:\n\n{text}\n\nВывод: Требуется дополнительное обследование." else: return f"🤖 Анализ анамнеза ИИ:\n\n{text}\n\nВывод: Анамнез требует уточнения." def simulate_text_processing(self, text, processing_type): """Имитация обработки текста (fallback)""" time.sleep(2) # Имитация задержки обработки if processing_type == 'complaints': return f"🤖 Анализ жалоб ИИ:\n\n{text}\n\nВывод: Выявлены характерные симптомы требующие дополнительного обследования." else: return f"🤖 Анализ анамнеза ИИ:\n\n{text}\n\nВывод: Анамнез указывает на хронический характер заболевания." def simulate_image_processing(self, image_path): """Имитация обработки изображения нейросетью""" time.sleep(3) # Имитация задержки обработки filename = os.path.basename(image_path) return f"🖼️ Анализ изображения '{filename}':\n\nОбнаружены патологические изменения. Рекомендуется консультация специалиста."