You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

429 lines
18 KiB

from PyQt6.QtCore import QThread, QMutex, QWaitCondition, pyqtSignal
import time
import os
import json
import tiktoken
from typing import List, Tuple, Optional
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from llama_cpp import Llama
import torch
class MedicalRAG:
def __init__(
self,
model_path: str,
corpus_path: str = "rag_corpus.json",
db_path: str = "./chroma_db",
embedding_model_name: str = "cointegrated/rubert-tiny2",
top_k: int = 3,
n_ctx: int = 2048,
n_threads: int = None,
token_multiplier: int = 3,
n_gpu_layers: int = 0,
use_gpu_for_embeddings: bool = False,
low_memory: bool = True
):
self.corpus_path = corpus_path
self.top_k = top_k
self.token_multiplier = token_multiplier
self.low_memory = low_memory
# Автоматическое определение потоков
if n_threads is None:
import multiprocessing
n_threads = max(1, multiprocessing.cpu_count() - 1)
# Проверка доступности GPU
self.has_gpu = torch.cuda.is_available() and not low_memory
if self.has_gpu:
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024 ** 3
print(f"✅ GPU доступен: {torch.cuda.get_device_name()} ({gpu_memory:.1f} GB)")
if gpu_memory < 4:
n_gpu_layers = min(n_gpu_layers, 10)
elif gpu_memory < 8:
n_gpu_layers = min(n_gpu_layers, 20)
else:
print(" Используется CPU режим")
n_gpu_layers = 0
use_gpu_for_embeddings = False
# Инициализация токенизатора
print("Инициализация токенизатора...")
self.encoding = None
try:
self.encoding = tiktoken.get_encoding("cl100k_base")
except Exception as e:
print(f" Токенизатор не загружен: {e}")
# Эмбеддинги
print("Загрузка эмбеддинг-модели...")
device = "cuda" if (self.has_gpu and use_gpu_for_embeddings) else "cpu"
model_kwargs = {}
if low_memory:
model_kwargs = {
'device': device,
'model_kwargs': {'torch_dtype': torch.float16}
}
self.embedding_function = SentenceTransformerEmbeddingFunction(
model_name=embedding_model_name,
**model_kwargs
)
# ChromaDB
print("Инициализация ChromaDB...")
self.client = chromadb.PersistentClient(path=db_path)
collection_metadata = {"hnsw:space": "cosine"}
if low_memory:
collection_metadata.update({
"hnsw:construction_ef": 100,
"hnsw:M": 16,
})
self.collection = self.client.get_or_create_collection(
name="medical_anamnesis",
embedding_function=self.embedding_function,
metadata=collection_metadata
)
if self.collection.count() == 0:
print("Загрузка данных в коллекцию...")
self._load_corpus()
else:
print(f"Коллекция содержит {self.collection.count()} записей")
# Загрузка LLM
if not os.path.exists(model_path):
raise FileNotFoundError(
f"Модель не найдена: {model_path}\n"
"Для слабых компьютеров рекомендуется использовать меньшие модели."
)
print("Загрузка языковой модели...")
llm_params = {
"model_path": model_path,
"n_ctx": n_ctx,
"n_threads": n_threads,
"verbose": False,
"low_vram": low_memory,
"use_mlock": not low_memory,
}
if self.has_gpu and n_gpu_layers > 0:
llm_params.update({
"n_gpu_layers": n_gpu_layers,
"main_gpu": 0,
"tensor_split": None,
})
print(f"Используется GPU с {n_gpu_layers} слоями")
else:
print("Используется CPU")
try:
self.llm = Llama(**llm_params)
print("✅ Система готова к работе!")
except Exception as e:
print(f"❌ Ошибка загрузки модели: {e}")
raise
def _load_corpus(self):
"""Загрузка корпуса с обработкой ошибок"""
try:
with open(self.corpus_path, "r", encoding="utf-8") as f:
data = json.load(f)
batch_size = 50 if self.low_memory else 100
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
self.collection.add(
documents=[item["full"] for item in batch],
metadatas=[{"short": item["short"]} for item in batch],
ids=[f"id_{i + j}" for j in range(len(batch))]
)
print(f"Загружено {min(i + batch_size, len(data))}/{len(data)} записей")
except Exception as e:
print(f"❌ Ошибка загрузки корпуса: {e}")
raise
def count_tokens(self, text: str) -> int:
"""Подсчет токенов"""
if self.encoding:
return len(self.encoding.encode(text))
else:
return len(text.split())
def build_prompt_with_token_management(self, short_note: str, processing_type: str,
max_context_tokens: int = 1500) -> Tuple[str, int]:
"""Строит промпт с оптимизированным управлением токенами"""
examples = self.retrieve(short_note)
# Разные системные сообщения для жалоб и анамнеза
if processing_type == 'complaints':
system_msg = (
"На основе примеров напиши развёрнуто жалобы пациента, грамотно с медицинской точки зрения. "
"Напиши жалобы в одно предложение, одной строкой. "
"Не пиши вводных слов и фраз. Только жалобы пациента. "
"Неуместно писать диагнозы и план лечения. "
"Расшифруй все сокращения. "
"Отвечай сразу без размышлений."
)
else: # history
system_msg = (
"На основе примеров напиши развёрнутый анамнез заболевания, грамотно с медицинской точки зрения. "
# "Опиши историю развития заболевания: когда началось, как развивалось, какое лечение получал. "
"Неуместно писать диагнозы и план лечения. "
"Используй медицинскую терминологию. "
"Расшифруй все сокращения. "
"Отвечай сразу без размышлений."
)
system_tokens = self.count_tokens(system_msg)
note_tokens = self.count_tokens(short_note)
available_tokens = max_context_tokens - system_tokens - note_tokens - 150
selected_examples = []
current_tokens = 0
for example in examples:
example_tokens = self.count_tokens(example)
if current_tokens + example_tokens <= available_tokens:
selected_examples.append(example)
current_tokens += example_tokens
else:
if self.low_memory:
break
elif len(selected_examples) > 0:
break
context = "\n".join([f"Пример: {ex}" for ex in selected_examples])
user_msg = f"""Примеры:
{context}
{"Жалобы" if processing_type == 'complaints' else "Анамнез"}: "{short_note}"
"""
prompt = (
f"<|im_start|>system\n{system_msg}<|im_end|>\n"
f"<|im_start|>user\n{user_msg}<|im_end|>\n"
"<|im_start|>assistant\n"
)
prompt_tokens = self.count_tokens(prompt)
return prompt, prompt_tokens
def retrieve(self, query: str, n: int = None) -> List[str]:
"""Оптимизированный поиск"""
n = n or min(self.top_k, 3)
try:
results = self.collection.query(
query_texts=[query],
n_results=n
)
return results["documents"][0]
except Exception as e:
print(f" Ошибка поиска: {e}")
return []
def generate(self, short_note: str, processing_type: str) -> str:
"""Генерация с оптимизацией памяти"""
prompt, prompt_tokens = self.build_prompt_with_token_management(short_note, processing_type)
available_tokens = 2048 - prompt_tokens - 30
max_tokens = min(prompt_tokens * self.token_multiplier, available_tokens, 512)
print(f"📊 Токены: промпт={prompt_tokens}, ответ={max_tokens}")
print(f" Устройство: {'GPU' if self.has_gpu else 'CPU'}")
try:
output = self.llm(
prompt,
max_tokens=max_tokens,
temperature=0.1,
stop=["<|im_end|>"],
echo=False,
stream=False
)
result = output["choices"][0]["text"].strip()
return result
except Exception as e:
print(f"❌ Ошибка генерации: {e}")
return ""
def __call__(self, short_note: str, processing_type: str) -> str:
return self.generate(short_note, processing_type)
class AIWorker(QThread):
"""Worker для обработки данных нейросетью"""
text_processed = pyqtSignal(str, str) # text, processing_type
image_processed = pyqtSignal(str)
processing_finished = pyqtSignal(str) # processing_type
def __init__(self):
super().__init__()
self.mutex = QMutex()
self.condition = QWaitCondition()
self.text_to_process = None
self.image_to_process = None
self.processing_type = None # 'complaints', 'history', 'image'
self._active = True
# Инициализация MedicalRAG
self.rag = None
self._init_rag()
def _init_rag(self):
"""Инициализация MedicalRAG системы"""
try:
import psutil
# Автоматическое определение режима низкой памяти
total_memory = psutil.virtual_memory().total / 1024 ** 3
low_memory_mode = total_memory < 8
print(f"💾 Общая память: {total_memory:.1f} GB")
print(f"🔧 Режим низкой памяти: {'Да' if low_memory_mode else 'Нет'}")
# Путь к модели - нужно будет настроить под вашу систему
model_path = "./models/YandexGPT-5-Lite-8B-instruct-Q4_K_M.gguf"
# Если модель не найдена, используем имитацию
if not os.path.exists(model_path):
print(" Модель не найдена, используется имитация LLM")
self.rag = None
return
self.rag = MedicalRAG(
model_path=model_path,
n_ctx=2048,
n_gpu_layers=10 if not low_memory_mode else 0,
use_gpu_for_embeddings=not low_memory_mode,
low_memory=low_memory_mode
)
print("✅ MedicalRAG инициализирован успешно!")
except Exception as e:
print(f"❌ Ошибка инициализации MedicalRAG: {e}")
print(" Будет использоваться имитация LLM")
self.rag = None
def process_text(self, text, processing_type):
"""Запускает обработку текста"""
self.mutex.lock()
self.text_to_process = text
self.image_to_process = None
self.processing_type = processing_type
self.condition.wakeOne()
self.mutex.unlock()
def process_image(self, image_path):
"""Запускает обработку изображения"""
self.mutex.lock()
self.image_to_process = image_path
self.text_to_process = None
self.processing_type = 'image'
self.condition.wakeOne()
self.mutex.unlock()
def stop(self):
"""Останавливает worker"""
self._active = False
self.condition.wakeOne()
self.wait()
def run(self):
"""Основной цикл обработки"""
while self._active:
self.mutex.lock()
if not self.text_to_process and not self.image_to_process:
self.condition.wait(self.mutex)
if not self._active:
self.mutex.unlock()
break
if self.processing_type in ['complaints', 'history'] and self.text_to_process:
text = self.text_to_process
processing_type = self.processing_type
self.text_to_process = None
self.mutex.unlock()
# Обработка текста с использованием LLM
result = self.process_text_with_llm(text, processing_type)
self.text_processed.emit(result, processing_type)
elif self.processing_type == 'image' and self.image_to_process:
image_path = self.image_to_process
self.image_to_process = None
self.mutex.unlock()
# Обработка изображения
result = self.simulate_image_processing(image_path)
self.image_processed.emit(result)
else:
self.mutex.unlock()
self.processing_finished.emit(self.processing_type)
def process_text_with_llm(self, text, processing_type):
"""Обработка текста с использованием LLM"""
if not text.strip():
return ""
# Если LLM не инициализирован, используем имитацию
if self.rag is None:
return self.simulate_text_processing(text, processing_type)
try:
print(f"🔍 Обработка {processing_type} с помощью LLM...")
print(f"📥 Входной текст: {text}")
# Обработка через LLM
result = self.rag(text, processing_type)
# Если LLM вернул пустой результат, используем исходный текст
if not result or result.strip() == "":
print(" LLM вернул пустой ответ, используется исходный текст")
result = self._format_fallback_text(text, processing_type)
else:
print(f"📤 Результат LLM: {result}")
return result
except Exception as e:
print(f"❌ Ошибка при обработке LLM: {e}")
# В случае ошибки возвращаем форматированный исходный текст
return self._format_fallback_text(text, processing_type)
def _format_fallback_text(self, text, processing_type):
"""Форматирует исходный текст как fallback"""
if processing_type == 'complaints':
return f"🤖 Анализ жалоб ИИ:\n\n{text}\n\nВывод: Требуется дополнительное обследование."
else:
return f"🤖 Анализ анамнеза ИИ:\n\n{text}\n\nВывод: Анамнез требует уточнения."
def simulate_text_processing(self, text, processing_type):
"""Имитация обработки текста (fallback)"""
time.sleep(2) # Имитация задержки обработки
if processing_type == 'complaints':
return f"🤖 Анализ жалоб ИИ:\n\n{text}\n\nВывод: Выявлены характерные симптомы требующие дополнительного обследования."
else:
return f"🤖 Анализ анамнеза ИИ:\n\n{text}\n\nВывод: Анамнез указывает на хронический характер заболевания."
def simulate_image_processing(self, image_path):
"""Имитация обработки изображения нейросетью"""
time.sleep(3) # Имитация задержки обработки
filename = os.path.basename(image_path)
return f"🖼 Анализ изображения '{filename}':\n\nОбнаружены патологические изменения. Рекомендуется консультация специалиста."