From 5a1e296d4174d40a7bdfbf208f646f5b7875e033 Mon Sep 17 00:00:00 2001 From: poturaevpetr Date: Thu, 25 Dec 2025 23:34:56 +0500 Subject: [PATCH] send files parallel mode --- autoLoader/loader/loader.py | 26 ++-- autoLoader/loader/recognition_checker.py | 181 +++++++++++++++++++---- 2 files changed, 162 insertions(+), 45 deletions(-) diff --git a/autoLoader/loader/loader.py b/autoLoader/loader/loader.py index cc74b4a..455bd69 100644 --- a/autoLoader/loader/loader.py +++ b/autoLoader/loader/loader.py @@ -57,15 +57,8 @@ class Loader(): print("❌ База данных не готова. Пропускаем распознавание.") return - # Получаем файлы без заключения - files_without_conclusion = checker.get_files_without_conclusion(limit=limit) - - if not files_without_conclusion: - print("✅ Все файлы уже имеют заключения") - return - - # Отправляем только что загруженные файлы - sent_count = 0 + # Фильтруем только что загруженные файлы без заключения + files_to_send = [] for audio in self.loaded_files: # Проверяем, есть ли у файла заключение with get_db_session() as db: @@ -77,13 +70,18 @@ class Loader(): if existing_conclusion: print(f"⏭️ Файл {audio.filename} уже имеет заключение") continue + else: + files_to_send.append(audio) + + if not files_to_send: + print("✅ Все файлы уже имеют заключения") + return - # Отправляем на распознавание - success = checker.send_to_recognition(audio) - if success: - sent_count += 1 + # Отправляем файлы параллельно + print(f"🚀 Отправка {len(files_to_send)} файлов параллельно...") + results = checker.send_to_recognition_parallel(files_to_send) - print(f"✅ Отправлено {sent_count} из {len(self.loaded_files)} файлов на распознавание") + print(f"✅ Отправлено {results['sent']} из {results['total']} файлов на распознавание") def load(self): # Проверяем БД перед началом работы diff --git a/autoLoader/loader/recognition_checker.py b/autoLoader/loader/recognition_checker.py index 84831a2..119df55 100644 --- a/autoLoader/loader/recognition_checker.py +++ b/autoLoader/loader/recognition_checker.py @@ -5,6 +5,8 @@ import requests from sqlalchemy import inspect from typing import List, Optional import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading from autoLoader.database import get_db_session, Audio, AiConclusion from autoLoader.config import GIGAAM_API_URL @@ -20,12 +22,13 @@ logger = logging.getLogger(__name__) class RecognitionChecker: """Класс для проверки и отправки файлов на распознавание""" - def __init__(self, api_url: Optional[str] = None): + def __init__(self, api_url: Optional[str] = None, max_workers: int = 5): """ Инициализация checker Args: api_url: URL API GigaAM для распознавания (если None, берётся из config) + max_workers: Максимальное количество параллельных запросов """ # Если api_url не передан, берём из config.py if api_url is None: @@ -33,8 +36,15 @@ class RecognitionChecker: self.api_url = f"{api_url}/api/call/process" self.timeout = 10 # таймаут запроса в секундах + self.max_workers = max_workers # количество параллельных потоков + + # Thread-safe счётчики для статистики + self._lock = threading.Lock() + self._sent_count = 0 + self._failed_count = 0 logger.info(f"✅ RecognitionChecker инициализирован с URL: {self.api_url}") + logger.info(f"📊 Параллельная отправка: до {max_workers} запросов одновременно") def check_database(self) -> bool: """ @@ -120,7 +130,7 @@ class RecognitionChecker: } try: - logger.info(f"📤 Отправка файла {audio.filename} на распознавание...") + logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {audio.filename} на распознавание...") response = requests.post( self.api_url, @@ -129,38 +139,56 @@ class RecognitionChecker: ) if response.status_code == 200 or response.status_code == 202: - logger.info(f"✅ Файл {audio.filename} успешно отправлен на распознавание") + logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {audio.filename} успешно отправлен") + + # Thread-safe обновление счётчиков + with self._lock: + self._sent_count += 1 + return True else: - logger.error(f"❌ Ошибка API {response.status_code}: {response.text}") + logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка API {response.status_code}: {response.text}") + + # Thread-safe обновление счётчиков + with self._lock: + self._failed_count += 1 + return False except requests.exceptions.Timeout: - logger.error(f"❌ Таймаут при отправке файла {audio.filename}") + logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {audio.filename}") + + with self._lock: + self._failed_count += 1 + return False except requests.exceptions.ConnectionError: - logger.error(f"❌ Не удалось подключиться к API {self.api_url}") + logger.error(f"❌ [Thread-{threading.current_thread().name}] Не удалось подключиться к API {self.api_url}") + + with self._lock: + self._failed_count += 1 + return False except Exception as e: - logger.error(f"❌ Ошибка при отправке {audio.filename}: {e}") + logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {audio.filename}: {e}") + + with self._lock: + self._failed_count += 1 + return False - def process_all_pending(self, limit: Optional[int] = None) -> dict: + def send_to_recognition_parallel(self, audio_list: List[Audio]) -> dict: """ - Находит и отправляет все файлы без заключения на распознавание + Отправляет несколько файлов на распознавание параллельно Args: - limit: Максимальное количество файлов для обработки + audio_list: Список объектов Audio для распознавания Returns: - Словарь с результатами обработки + Словарь с результатами отправки """ - logger.info("🔍 Поиск файлов без AI заключения...") - - files_without_conclusion = self.get_files_without_conclusion(limit) - - if not files_without_conclusion: - logger.info("✅ Все файлы обработаны") + if not audio_list: + logger.info("⏭️ Список файлов пуст, нечего отправлять") return { "total": 0, "sent": 0, @@ -168,37 +196,128 @@ class RecognitionChecker: "files": [] } + # Сбрасываем счётчики + self._sent_count = 0 + self._failed_count = 0 + + logger.info(f"🚀 Начинаем параллельную отправку {len(audio_list)} файлов") + logger.info(f"📊 Количество потоков: {self.max_workers}") + results = { - "total": len(files_without_conclusion), + "total": len(audio_list), "sent": 0, "failed": 0, "files": [] } - for audio in files_without_conclusion: - success = self.send_to_recognition(audio) - - result = { - "filename": audio.filename, - "audio_id": str(audio.id), - "success": success + # Используем ThreadPoolExecutor для параллельной отправки + with ThreadPoolExecutor(max_workers=self.max_workers, thread_name_prefix="SendReq") as executor: + # Запускаем все задачи + future_to_audio = { + executor.submit(self.send_to_recognition, audio): audio + for audio in audio_list } - results["files"].append(result) + # Обрабатываем результаты по мере завершения + for future in as_completed(future_to_audio): + audio = future_to_audio[future] - if success: - results["sent"] += 1 - else: - results["failed"] += 1 + try: + success = future.result() + + result = { + "filename": audio.filename, + "audio_id": str(audio.id), + "success": success + } + + results["files"].append(result) + + except Exception as exc: + logger.error(f"❌ Файл {audio.filename} сгенерировал исключение: {exc}") + + result = { + "filename": audio.filename, + "audio_id": str(audio.id), + "success": False, + "error": str(exc) + } + + results["files"].append(result) + + # Получаем итоговую статистику из счётчиков + results["sent"] = self._sent_count + results["failed"] = self._failed_count # Логирование итогов - logger.info(f"📊 Итого:") + logger.info(f"📊 Итого параллельной отправки:") logger.info(f" - Всего: {results['total']}") logger.info(f" - Отправлено: {results['sent']}") logger.info(f" - Ошибок: {results['failed']}") return results + def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> dict: + """ + Находит и отправляет все файлы без заключения на распознавание + + Args: + limit: Максимальное количество файлов для обработки + parallel: Использовать параллельную отправку (по умолчанию True) + + Returns: + Словарь с результатами обработки + """ + logger.info("🔍 Поиск файлов без AI заключения...") + + files_without_conclusion = self.get_files_without_conclusion(limit) + + if not files_without_conclusion: + logger.info("✅ Все файлы обработаны") + return { + "total": 0, + "sent": 0, + "failed": 0, + "files": [] + } + + # Выбираем метод отправки + if parallel: + logger.info("🚀 Используем параллельную отправку") + return self.send_to_recognition_parallel(files_without_conclusion) + else: + logger.info("📤 Используем последовательную отправку") + results = { + "total": len(files_without_conclusion), + "sent": 0, + "failed": 0, + "files": [] + } + + for audio in files_without_conclusion: + success = self.send_to_recognition(audio) + + result = { + "filename": audio.filename, + "audio_id": str(audio.id), + "success": success + } + + results["files"].append(result) + + if success: + results["sent"] += 1 + else: + results["failed"] += 1 + + # Логирование итогов + logger.info(f"📊 Итого:") + logger.info(f" - Всего: {results['total']}") + logger.info(f" - Отправлено: {results['sent']}") + logger.info(f" - Ошибок: {results['failed']}") + + return results + def check_api_availability(self) -> bool: """ Проверяет доступность GigaAM API