|
|
from PyQt6.QtCore import QThread, QMutex, QWaitCondition, pyqtSignal |
|
|
import time |
|
|
import os |
|
|
import json |
|
|
import tiktoken |
|
|
from typing import List, Tuple, Optional |
|
|
import chromadb |
|
|
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction |
|
|
from llama_cpp import Llama |
|
|
import torch |
|
|
|
|
|
|
|
|
class MedicalRAG: |
|
|
def __init__( |
|
|
self, |
|
|
model_path: str, |
|
|
corpus_path: str = "rag_corpus.json", |
|
|
db_path: str = "./chroma_db", |
|
|
embedding_model_name: str = "cointegrated/rubert-tiny2", |
|
|
top_k: int = 3, |
|
|
n_ctx: int = 2048, |
|
|
n_threads: int = None, |
|
|
token_multiplier: int = 3, |
|
|
n_gpu_layers: int = 0, |
|
|
use_gpu_for_embeddings: bool = False, |
|
|
low_memory: bool = True |
|
|
): |
|
|
self.corpus_path = corpus_path |
|
|
self.top_k = top_k |
|
|
self.token_multiplier = token_multiplier |
|
|
self.low_memory = low_memory |
|
|
|
|
|
# Автоматическое определение потоков |
|
|
if n_threads is None: |
|
|
import multiprocessing |
|
|
n_threads = max(1, multiprocessing.cpu_count() - 1) |
|
|
|
|
|
# Проверка доступности GPU |
|
|
self.has_gpu = torch.cuda.is_available() and not low_memory |
|
|
if self.has_gpu: |
|
|
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3 |
|
|
print(f"✅ GPU доступен: {torch.cuda.get_device_name()} ({gpu_memory:.1f} GB)") |
|
|
if gpu_memory < 4: |
|
|
n_gpu_layers = min(n_gpu_layers, 10) |
|
|
elif gpu_memory < 8: |
|
|
n_gpu_layers = min(n_gpu_layers, 20) |
|
|
else: |
|
|
print("ℹ️ Используется CPU режим") |
|
|
n_gpu_layers = 0 |
|
|
use_gpu_for_embeddings = False |
|
|
|
|
|
# Инициализация токенизатора |
|
|
print("Инициализация токенизатора...") |
|
|
self.encoding = None |
|
|
try: |
|
|
self.encoding = tiktoken.get_encoding("cl100k_base") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Токенизатор не загружен: {e}") |
|
|
|
|
|
# Эмбеддинги |
|
|
print("Загрузка эмбеддинг-модели...") |
|
|
device = "cuda" if (self.has_gpu and use_gpu_for_embeddings) else "cpu" |
|
|
|
|
|
model_kwargs = {} |
|
|
if low_memory: |
|
|
model_kwargs = { |
|
|
'device': device, |
|
|
'model_kwargs': {'torch_dtype': torch.float16} |
|
|
} |
|
|
|
|
|
self.embedding_function = SentenceTransformerEmbeddingFunction( |
|
|
model_name=embedding_model_name, |
|
|
**model_kwargs |
|
|
) |
|
|
|
|
|
# ChromaDB |
|
|
print("Инициализация ChromaDB...") |
|
|
self.client = chromadb.PersistentClient(path=db_path) |
|
|
|
|
|
collection_metadata = {"hnsw:space": "cosine"} |
|
|
if low_memory: |
|
|
collection_metadata.update({ |
|
|
"hnsw:construction_ef": 100, |
|
|
"hnsw:M": 16, |
|
|
}) |
|
|
|
|
|
self.collection = self.client.get_or_create_collection( |
|
|
name="medical_anamnesis", |
|
|
embedding_function=self.embedding_function, |
|
|
metadata=collection_metadata |
|
|
) |
|
|
|
|
|
if self.collection.count() == 0: |
|
|
print("Загрузка данных в коллекцию...") |
|
|
self._load_corpus() |
|
|
else: |
|
|
print(f"Коллекция содержит {self.collection.count()} записей") |
|
|
|
|
|
# Загрузка LLM |
|
|
if not os.path.exists(model_path): |
|
|
raise FileNotFoundError( |
|
|
f"Модель не найдена: {model_path}\n" |
|
|
"Для слабых компьютеров рекомендуется использовать меньшие модели." |
|
|
) |
|
|
|
|
|
print("Загрузка языковой модели...") |
|
|
|
|
|
llm_params = { |
|
|
"model_path": model_path, |
|
|
"n_ctx": n_ctx, |
|
|
"n_threads": n_threads, |
|
|
"verbose": False, |
|
|
"low_vram": low_memory, |
|
|
"use_mlock": not low_memory, |
|
|
} |
|
|
|
|
|
if self.has_gpu and n_gpu_layers > 0: |
|
|
llm_params.update({ |
|
|
"n_gpu_layers": n_gpu_layers, |
|
|
"main_gpu": 0, |
|
|
"tensor_split": None, |
|
|
}) |
|
|
print(f"Используется GPU с {n_gpu_layers} слоями") |
|
|
else: |
|
|
print("Используется CPU") |
|
|
|
|
|
try: |
|
|
self.llm = Llama(**llm_params) |
|
|
print("✅ Система готова к работе!") |
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка загрузки модели: {e}") |
|
|
raise |
|
|
|
|
|
def _load_corpus(self): |
|
|
"""Загрузка корпуса с обработкой ошибок""" |
|
|
try: |
|
|
with open(self.corpus_path, "r", encoding="utf-8") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
batch_size = 50 if self.low_memory else 100 |
|
|
for i in range(0, len(data), batch_size): |
|
|
batch = data[i:i + batch_size] |
|
|
self.collection.add( |
|
|
documents=[item["full"] for item in batch], |
|
|
metadatas=[{"short": item["short"]} for item in batch], |
|
|
ids=[f"id_{i + j}" for j in range(len(batch))] |
|
|
) |
|
|
print(f"Загружено {min(i + batch_size, len(data))}/{len(data)} записей") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка загрузки корпуса: {e}") |
|
|
raise |
|
|
|
|
|
def count_tokens(self, text: str) -> int: |
|
|
"""Подсчет токенов""" |
|
|
if self.encoding: |
|
|
return len(self.encoding.encode(text)) |
|
|
else: |
|
|
return len(text.split()) |
|
|
|
|
|
def build_prompt_with_token_management(self, short_note: str, processing_type: str, |
|
|
max_context_tokens: int = 1500) -> Tuple[str, int]: |
|
|
"""Строит промпт с оптимизированным управлением токенами""" |
|
|
|
|
|
examples = self.retrieve(short_note) |
|
|
|
|
|
# Разные системные сообщения для жалоб и анамнеза |
|
|
if processing_type == 'complaints': |
|
|
system_msg = ( |
|
|
"На основе примеров напиши развёрнуто жалобы пациента, грамотно с медицинской точки зрения. " |
|
|
"Напиши жалобы в одно предложение, одной строкой. " |
|
|
"Не пиши вводных слов и фраз. Только жалобы пациента. " |
|
|
"Неуместно писать диагнозы и план лечения. " |
|
|
"Расшифруй все сокращения. " |
|
|
"Отвечай сразу без размышлений." |
|
|
) |
|
|
else: # history |
|
|
system_msg = ( |
|
|
"На основе примеров напиши развёрнутый анамнез заболевания, грамотно с медицинской точки зрения. " |
|
|
# "Опиши историю развития заболевания: когда началось, как развивалось, какое лечение получал. " |
|
|
"Неуместно писать диагнозы и план лечения. " |
|
|
"Используй медицинскую терминологию. " |
|
|
"Расшифруй все сокращения. " |
|
|
"Отвечай сразу без размышлений." |
|
|
) |
|
|
|
|
|
system_tokens = self.count_tokens(system_msg) |
|
|
note_tokens = self.count_tokens(short_note) |
|
|
|
|
|
available_tokens = max_context_tokens - system_tokens - note_tokens - 150 |
|
|
|
|
|
selected_examples = [] |
|
|
current_tokens = 0 |
|
|
|
|
|
for example in examples: |
|
|
example_tokens = self.count_tokens(example) |
|
|
if current_tokens + example_tokens <= available_tokens: |
|
|
selected_examples.append(example) |
|
|
current_tokens += example_tokens |
|
|
else: |
|
|
if self.low_memory: |
|
|
break |
|
|
elif len(selected_examples) > 0: |
|
|
break |
|
|
|
|
|
context = "\n".join([f"Пример: {ex}" for ex in selected_examples]) |
|
|
|
|
|
user_msg = f"""Примеры: |
|
|
{context} |
|
|
|
|
|
{"Жалобы" if processing_type == 'complaints' else "Анамнез"}: "{short_note}" |
|
|
""" |
|
|
|
|
|
prompt = ( |
|
|
f"<|im_start|>system\n{system_msg}<|im_end|>\n" |
|
|
f"<|im_start|>user\n{user_msg}<|im_end|>\n" |
|
|
"<|im_start|>assistant\n" |
|
|
) |
|
|
|
|
|
prompt_tokens = self.count_tokens(prompt) |
|
|
return prompt, prompt_tokens |
|
|
|
|
|
def retrieve(self, query: str, n: int = None) -> List[str]: |
|
|
"""Оптимизированный поиск""" |
|
|
n = n or min(self.top_k, 3) |
|
|
try: |
|
|
results = self.collection.query( |
|
|
query_texts=[query], |
|
|
n_results=n |
|
|
) |
|
|
return results["documents"][0] |
|
|
except Exception as e: |
|
|
print(f"⚠️ Ошибка поиска: {e}") |
|
|
return [] |
|
|
|
|
|
def generate(self, short_note: str, processing_type: str) -> str: |
|
|
"""Генерация с оптимизацией памяти""" |
|
|
prompt, prompt_tokens = self.build_prompt_with_token_management(short_note, processing_type) |
|
|
|
|
|
available_tokens = 2048 - prompt_tokens - 30 |
|
|
max_tokens = min(prompt_tokens * self.token_multiplier, available_tokens, 512) |
|
|
|
|
|
print(f"📊 Токены: промпт={prompt_tokens}, ответ={max_tokens}") |
|
|
print(f"⚡️ Устройство: {'GPU' if self.has_gpu else 'CPU'}") |
|
|
|
|
|
try: |
|
|
output = self.llm( |
|
|
prompt, |
|
|
max_tokens=max_tokens, |
|
|
temperature=0.1, |
|
|
stop=["<|im_end|>"], |
|
|
echo=False, |
|
|
stream=False |
|
|
) |
|
|
|
|
|
result = output["choices"][0]["text"].strip() |
|
|
return result |
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка генерации: {e}") |
|
|
return "" |
|
|
|
|
|
def __call__(self, short_note: str, processing_type: str) -> str: |
|
|
return self.generate(short_note, processing_type) |
|
|
|
|
|
|
|
|
class AIWorker(QThread): |
|
|
"""Worker для обработки данных нейросетью""" |
|
|
text_processed = pyqtSignal(str, str) # text, processing_type |
|
|
image_processed = pyqtSignal(str) |
|
|
processing_finished = pyqtSignal(str) # processing_type |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.mutex = QMutex() |
|
|
self.condition = QWaitCondition() |
|
|
self.text_to_process = None |
|
|
self.image_to_process = None |
|
|
self.processing_type = None # 'complaints', 'history', 'image' |
|
|
self._active = True |
|
|
|
|
|
# Инициализация MedicalRAG |
|
|
self.rag = None |
|
|
self._init_rag() |
|
|
|
|
|
def _init_rag(self): |
|
|
"""Инициализация MedicalRAG системы""" |
|
|
try: |
|
|
import psutil |
|
|
|
|
|
# Автоматическое определение режима низкой памяти |
|
|
total_memory = psutil.virtual_memory().total / 1024 ** 3 |
|
|
low_memory_mode = total_memory < 8 |
|
|
|
|
|
print(f"💾 Общая память: {total_memory:.1f} GB") |
|
|
print(f"🔧 Режим низкой памяти: {'Да' if low_memory_mode else 'Нет'}") |
|
|
|
|
|
# Путь к модели - нужно будет настроить под вашу систему |
|
|
model_path = "./models/YandexGPT-5-Lite-8B-instruct-Q4_K_M.gguf" |
|
|
|
|
|
# Если модель не найдена, используем имитацию |
|
|
if not os.path.exists(model_path): |
|
|
print("⚠️ Модель не найдена, используется имитация LLM") |
|
|
self.rag = None |
|
|
return |
|
|
|
|
|
self.rag = MedicalRAG( |
|
|
model_path=model_path, |
|
|
n_ctx=2048, |
|
|
n_gpu_layers=10 if not low_memory_mode else 0, |
|
|
use_gpu_for_embeddings=not low_memory_mode, |
|
|
low_memory=low_memory_mode |
|
|
) |
|
|
print("✅ MedicalRAG инициализирован успешно!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка инициализации MedicalRAG: {e}") |
|
|
print("⚠️ Будет использоваться имитация LLM") |
|
|
self.rag = None |
|
|
|
|
|
def process_text(self, text, processing_type): |
|
|
"""Запускает обработку текста""" |
|
|
self.mutex.lock() |
|
|
self.text_to_process = text |
|
|
self.image_to_process = None |
|
|
self.processing_type = processing_type |
|
|
self.condition.wakeOne() |
|
|
self.mutex.unlock() |
|
|
|
|
|
def process_image(self, image_path): |
|
|
"""Запускает обработку изображения""" |
|
|
self.mutex.lock() |
|
|
self.image_to_process = image_path |
|
|
self.text_to_process = None |
|
|
self.processing_type = 'image' |
|
|
self.condition.wakeOne() |
|
|
self.mutex.unlock() |
|
|
|
|
|
def stop(self): |
|
|
"""Останавливает worker""" |
|
|
self._active = False |
|
|
self.condition.wakeOne() |
|
|
self.wait() |
|
|
|
|
|
def run(self): |
|
|
"""Основной цикл обработки""" |
|
|
while self._active: |
|
|
self.mutex.lock() |
|
|
if not self.text_to_process and not self.image_to_process: |
|
|
self.condition.wait(self.mutex) |
|
|
|
|
|
if not self._active: |
|
|
self.mutex.unlock() |
|
|
break |
|
|
|
|
|
if self.processing_type in ['complaints', 'history'] and self.text_to_process: |
|
|
text = self.text_to_process |
|
|
processing_type = self.processing_type |
|
|
self.text_to_process = None |
|
|
self.mutex.unlock() |
|
|
|
|
|
# Обработка текста с использованием LLM |
|
|
result = self.process_text_with_llm(text, processing_type) |
|
|
self.text_processed.emit(result, processing_type) |
|
|
|
|
|
elif self.processing_type == 'image' and self.image_to_process: |
|
|
image_path = self.image_to_process |
|
|
self.image_to_process = None |
|
|
self.mutex.unlock() |
|
|
|
|
|
# Обработка изображения |
|
|
result = self.simulate_image_processing(image_path) |
|
|
self.image_processed.emit(result) |
|
|
else: |
|
|
self.mutex.unlock() |
|
|
|
|
|
self.processing_finished.emit(self.processing_type) |
|
|
|
|
|
def process_text_with_llm(self, text, processing_type): |
|
|
"""Обработка текста с использованием LLM""" |
|
|
if not text.strip(): |
|
|
return "" |
|
|
|
|
|
# Если LLM не инициализирован, используем имитацию |
|
|
if self.rag is None: |
|
|
return self.simulate_text_processing(text, processing_type) |
|
|
|
|
|
try: |
|
|
print(f"🔍 Обработка {processing_type} с помощью LLM...") |
|
|
print(f"📥 Входной текст: {text}") |
|
|
|
|
|
# Обработка через LLM |
|
|
result = self.rag(text, processing_type) |
|
|
|
|
|
# Если LLM вернул пустой результат, используем исходный текст |
|
|
if not result or result.strip() == "": |
|
|
print("⚠️ LLM вернул пустой ответ, используется исходный текст") |
|
|
result = self._format_fallback_text(text, processing_type) |
|
|
else: |
|
|
print(f"📤 Результат LLM: {result}") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Ошибка при обработке LLM: {e}") |
|
|
# В случае ошибки возвращаем форматированный исходный текст |
|
|
return self._format_fallback_text(text, processing_type) |
|
|
|
|
|
def _format_fallback_text(self, text, processing_type): |
|
|
"""Форматирует исходный текст как fallback""" |
|
|
if processing_type == 'complaints': |
|
|
return f"🤖 Анализ жалоб ИИ:\n\n{text}\n\nВывод: Требуется дополнительное обследование." |
|
|
else: |
|
|
return f"🤖 Анализ анамнеза ИИ:\n\n{text}\n\nВывод: Анамнез требует уточнения." |
|
|
|
|
|
def simulate_text_processing(self, text, processing_type): |
|
|
"""Имитация обработки текста (fallback)""" |
|
|
time.sleep(2) # Имитация задержки обработки |
|
|
|
|
|
if processing_type == 'complaints': |
|
|
return f"🤖 Анализ жалоб ИИ:\n\n{text}\n\nВывод: Выявлены характерные симптомы требующие дополнительного обследования." |
|
|
else: |
|
|
return f"🤖 Анализ анамнеза ИИ:\n\n{text}\n\nВывод: Анамнез указывает на хронический характер заболевания." |
|
|
|
|
|
def simulate_image_processing(self, image_path): |
|
|
"""Имитация обработки изображения нейросетью""" |
|
|
time.sleep(3) # Имитация задержки обработки |
|
|
|
|
|
filename = os.path.basename(image_path) |
|
|
return f"🖼️ Анализ изображения '{filename}':\n\nОбнаружены патологические изменения. Рекомендуется консультация специалиста." |