Browse Source

send files parallel mode

dev
poturaevpetr 2 weeks ago
parent
commit
5a1e296d41
  1. 26
      autoLoader/loader/loader.py
  2. 135
      autoLoader/loader/recognition_checker.py

26
autoLoader/loader/loader.py

@ -57,15 +57,8 @@ class Loader():
print("❌ База данных не готова. Пропускаем распознавание.")
return
# Получаем файлы без заключения
files_without_conclusion = checker.get_files_without_conclusion(limit=limit)
if not files_without_conclusion:
print("✅ Все файлы уже имеют заключения")
return
# Отправляем только что загруженные файлы
sent_count = 0
# Фильтруем только что загруженные файлы без заключения
files_to_send = []
for audio in self.loaded_files:
# Проверяем, есть ли у файла заключение
with get_db_session() as db:
@ -77,13 +70,18 @@ class Loader():
if existing_conclusion:
print(f" Файл {audio.filename} уже имеет заключение")
continue
else:
files_to_send.append(audio)
if not files_to_send:
print("✅ Все файлы уже имеют заключения")
return
# Отправляем на распознавание
success = checker.send_to_recognition(audio)
if success:
sent_count += 1
# Отправляем файлы параллельно
print(f"🚀 Отправка {len(files_to_send)} файлов параллельно...")
results = checker.send_to_recognition_parallel(files_to_send)
print(f"✅ Отправлено {sent_count} из {len(self.loaded_files)} файлов на распознавание")
print(f"✅ Отправлено {results['sent']} из {results['total']} файлов на распознавание")
def load(self):
# Проверяем БД перед началом работы

135
autoLoader/loader/recognition_checker.py

@ -5,6 +5,8 @@ import requests
from sqlalchemy import inspect
from typing import List, Optional
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from autoLoader.database import get_db_session, Audio, AiConclusion
from autoLoader.config import GIGAAM_API_URL
@ -20,12 +22,13 @@ logger = logging.getLogger(__name__)
class RecognitionChecker:
"""Класс для проверки и отправки файлов на распознавание"""
def __init__(self, api_url: Optional[str] = None):
def __init__(self, api_url: Optional[str] = None, max_workers: int = 5):
"""
Инициализация checker
Args:
api_url: URL API GigaAM для распознавания (если None, берётся из config)
max_workers: Максимальное количество параллельных запросов
"""
# Если api_url не передан, берём из config.py
if api_url is None:
@ -33,8 +36,15 @@ class RecognitionChecker:
self.api_url = f"{api_url}/api/call/process"
self.timeout = 10 # таймаут запроса в секундах
self.max_workers = max_workers # количество параллельных потоков
# Thread-safe счётчики для статистики
self._lock = threading.Lock()
self._sent_count = 0
self._failed_count = 0
logger.info(f"✅ RecognitionChecker инициализирован с URL: {self.api_url}")
logger.info(f"📊 Параллельная отправка: до {max_workers} запросов одновременно")
def check_database(self) -> bool:
"""
@ -120,7 +130,7 @@ class RecognitionChecker:
}
try:
logger.info(f"📤 Отправка файла {audio.filename} на распознавание...")
logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {audio.filename} на распознавание...")
response = requests.post(
self.api_url,
@ -129,28 +139,131 @@ class RecognitionChecker:
)
if response.status_code == 200 or response.status_code == 202:
logger.info(f"✅ Файл {audio.filename} успешно отправлен на распознавание")
logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {audio.filename} успешно отправлен")
# Thread-safe обновление счётчиков
with self._lock:
self._sent_count += 1
return True
else:
logger.error(f"❌ Ошибка API {response.status_code}: {response.text}")
logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка API {response.status_code}: {response.text}")
# Thread-safe обновление счётчиков
with self._lock:
self._failed_count += 1
return False
except requests.exceptions.Timeout:
logger.error(f"❌ Таймаут при отправке файла {audio.filename}")
logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {audio.filename}")
with self._lock:
self._failed_count += 1
return False
except requests.exceptions.ConnectionError:
logger.error(f"❌ Не удалось подключиться к API {self.api_url}")
logger.error(f"❌ [Thread-{threading.current_thread().name}] Не удалось подключиться к API {self.api_url}")
with self._lock:
self._failed_count += 1
return False
except Exception as e:
logger.error(f"❌ Ошибка при отправке {audio.filename}: {e}")
logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {audio.filename}: {e}")
with self._lock:
self._failed_count += 1
return False
def process_all_pending(self, limit: Optional[int] = None) -> dict:
def send_to_recognition_parallel(self, audio_list: List[Audio]) -> dict:
"""
Отправляет несколько файлов на распознавание параллельно
Args:
audio_list: Список объектов Audio для распознавания
Returns:
Словарь с результатами отправки
"""
if not audio_list:
logger.info(" Список файлов пуст, нечего отправлять")
return {
"total": 0,
"sent": 0,
"failed": 0,
"files": []
}
# Сбрасываем счётчики
self._sent_count = 0
self._failed_count = 0
logger.info(f"🚀 Начинаем параллельную отправку {len(audio_list)} файлов")
logger.info(f"📊 Количество потоков: {self.max_workers}")
results = {
"total": len(audio_list),
"sent": 0,
"failed": 0,
"files": []
}
# Используем ThreadPoolExecutor для параллельной отправки
with ThreadPoolExecutor(max_workers=self.max_workers, thread_name_prefix="SendReq") as executor:
# Запускаем все задачи
future_to_audio = {
executor.submit(self.send_to_recognition, audio): audio
for audio in audio_list
}
# Обрабатываем результаты по мере завершения
for future in as_completed(future_to_audio):
audio = future_to_audio[future]
try:
success = future.result()
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"success": success
}
results["files"].append(result)
except Exception as exc:
logger.error(f"❌ Файл {audio.filename} сгенерировал исключение: {exc}")
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"success": False,
"error": str(exc)
}
results["files"].append(result)
# Получаем итоговую статистику из счётчиков
results["sent"] = self._sent_count
results["failed"] = self._failed_count
# Логирование итогов
logger.info(f"📊 Итого параллельной отправки:")
logger.info(f" - Всего: {results['total']}")
logger.info(f" - Отправлено: {results['sent']}")
logger.info(f" - Ошибок: {results['failed']}")
return results
def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> dict:
"""
Находит и отправляет все файлы без заключения на распознавание
Args:
limit: Максимальное количество файлов для обработки
parallel: Использовать параллельную отправку (по умолчанию True)
Returns:
Словарь с результатами обработки
@ -168,6 +281,12 @@ class RecognitionChecker:
"files": []
}
# Выбираем метод отправки
if parallel:
logger.info("🚀 Используем параллельную отправку")
return self.send_to_recognition_parallel(files_without_conclusion)
else:
logger.info("📤 Используем последовательную отправку")
results = {
"total": len(files_without_conclusion),
"sent": 0,

Loading…
Cancel
Save