From cceaf65576467a0752d7c2357d3591d4b1c9ee1d Mon Sep 17 00:00:00 2001 From: poturaevpetr Date: Fri, 26 Dec 2025 01:21:07 +0500 Subject: [PATCH] fix checker --- autoLoader/loader/loader.py | 8 +- autoLoader/loader/recognition_checker.py | 85 ++-- autoLoader/loader/recognition_checker_old.py | 389 +++++++++++++++++++ 3 files changed, 438 insertions(+), 44 deletions(-) create mode 100644 autoLoader/loader/recognition_checker_old.py diff --git a/autoLoader/loader/loader.py b/autoLoader/loader/loader.py index 482578a..7f872a4 100644 --- a/autoLoader/loader/loader.py +++ b/autoLoader/loader/loader.py @@ -71,7 +71,13 @@ class Loader(): print(f"⏭️ Файл {audio.filename} уже имеет заключение") continue else: - files_to_send.append(audio) + # Конвертируем SQLAlchemy объект в словарь + files_to_send.append({ + 'id': str(audio.id), + 'filename': audio.filename, + 'file_size': audio.file_size, + 'index_date': audio.index_date.isoformat() if audio.index_date else None + }) if not files_to_send: print("✅ Все файлы уже имеют заключения") diff --git a/autoLoader/loader/recognition_checker.py b/autoLoader/loader/recognition_checker.py index 119df55..b2a107e 100644 --- a/autoLoader/loader/recognition_checker.py +++ b/autoLoader/loader/recognition_checker.py @@ -1,12 +1,14 @@ """ -Класс для проверки файлов без AI заключения и отправки на распознавание +Исправленная версия recognition_checker.py +Избегает detached instance error путём использования словарей вместо SQLAlchemy объектов """ + import requests from sqlalchemy import inspect -from typing import List, Optional +from typing import List, Optional, Dict, Any import logging -from concurrent.futures import ThreadPoolExecutor, as_completed import threading +from concurrent.futures import ThreadPoolExecutor, as_completed from autoLoader.database import get_db_session, Audio, AiConclusion from autoLoader.config import GIGAAM_API_URL @@ -67,7 +69,7 @@ class RecognitionChecker: return True - def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Audio]: + def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Находит все файлы, у которых нет AI заключения @@ -75,7 +77,7 @@ class RecognitionChecker: limit: Ограничение количества файлов (None = все) Returns: - Список объектов Audio без заключения + Список словарей с информацией о файлах (избегаем detached instance) """ if not self.check_database(): logger.error("❌ База данных не готова") @@ -97,40 +99,52 @@ class RecognitionChecker: # Находим все audio, у которых нет заключения if conclusion_ids: - files_without_conclusion = db.query(Audio).filter( + audio_objects = db.query(Audio).filter( ~Audio.id.in_(conclusion_ids) ).all() else: # Если заключений нет вообще - все файлы без заключения - files_without_conclusion = db.query(Audio).all() + audio_objects = db.query(Audio).all() - logger.info(f"📊 Найдено файлов без заключения: {len(files_without_conclusion)}") + # Конвертируем в словари внутри сессии БД + files_data = [] + for audio in audio_objects: + files_data.append({ + 'id': str(audio.id), + 'filename': audio.filename, + 'file_size': audio.file_size, + 'index_date': audio.index_date.isoformat() if audio.index_date else None + }) + + logger.info(f"📊 Найдено файлов без заключения: {len(files_data)}") if limit: - return files_without_conclusion[:limit] + return files_data[:limit] - return files_without_conclusion + return files_data except Exception as e: logger.error(f"❌ Ошибка при поиске файлов: {e}") return [] - def send_to_recognition(self, audio: Audio) -> bool: + def send_to_recognition(self, audio_data: Dict[str, Any]) -> bool: """ Отправляет файл на распознавание в GigaAM API Args: - audio: Объект Audio для распознавания + audio_data: Словарь с данными об аудио файле Returns: True если успешно отправлен, иначе False """ + filename = audio_data.get('filename') + payload = { - "filename": audio.filename + "filename": filename } try: - logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {audio.filename} на распознавание...") + logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {filename} на распознавание...") response = requests.post( self.api_url, @@ -139,7 +153,7 @@ class RecognitionChecker: ) if response.status_code == 200 or response.status_code == 202: - logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {audio.filename} успешно отправлен") + logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {filename} успешно отправлен") # Thread-safe обновление счётчиков with self._lock: @@ -156,7 +170,7 @@ class RecognitionChecker: return False except requests.exceptions.Timeout: - logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {audio.filename}") + logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {filename}") with self._lock: self._failed_count += 1 @@ -170,19 +184,19 @@ class RecognitionChecker: return False except Exception as e: - logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {audio.filename}: {e}") + logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {filename}: {e}") with self._lock: self._failed_count += 1 return False - def send_to_recognition_parallel(self, audio_list: List[Audio]) -> dict: + def send_to_recognition_parallel(self, audio_list: List[Dict[str, Any]]) -> Dict[str, Any]: """ Отправляет несколько файлов на распознавание параллельно Args: - audio_list: Список объектов Audio для распознавания + audio_list: Список словарей с данными об аудио файлах Returns: Словарь с результатами отправки @@ -226,19 +240,19 @@ class RecognitionChecker: success = future.result() result = { - "filename": audio.filename, - "audio_id": str(audio.id), + "filename": audio.get('filename'), + "audio_id": audio.get('id'), "success": success } results["files"].append(result) except Exception as exc: - logger.error(f"❌ Файл {audio.filename} сгенерировал исключение: {exc}") + logger.error(f"❌ Файл {audio.get('filename')} сгенерировал исключение: {exc}") result = { - "filename": audio.filename, - "audio_id": str(audio.id), + "filename": audio.get('filename'), + "audio_id": audio.get('id'), "success": False, "error": str(exc) } @@ -257,7 +271,7 @@ class RecognitionChecker: return results - def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> dict: + def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> Dict[str, Any]: """ Находит и отправляет все файлы без заключения на распознавание @@ -298,8 +312,8 @@ class RecognitionChecker: success = self.send_to_recognition(audio) result = { - "filename": audio.filename, - "audio_id": str(audio.id), + "filename": audio.get('filename'), + "audio_id": audio.get('id'), "success": success } @@ -347,7 +361,7 @@ class RecognitionChecker: # Удобная функция для запуска из командной строки -def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] = None): +def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]: """ Обрабатывает все файлы без заключения @@ -372,18 +386,3 @@ def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] = # Обрабатываем файлы return checker.process_all_pending(limit) - - -if __name__ == "__main__": - # Пример использования - import sys - - api_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:5001/api/call/process" - limit = int(sys.argv[2]) if len(sys.argv) > 2 else None - - results = process_pending_files(api_url, limit) - - print(f"\n📊 Результаты:") - print(f"Всего: {results['total']}") - print(f"Отправлено: {results['sent']}") - print(f"Ошибок: {results['failed']}") diff --git a/autoLoader/loader/recognition_checker_old.py b/autoLoader/loader/recognition_checker_old.py new file mode 100644 index 0000000..119df55 --- /dev/null +++ b/autoLoader/loader/recognition_checker_old.py @@ -0,0 +1,389 @@ +""" +Класс для проверки файлов без AI заключения и отправки на распознавание +""" +import requests +from sqlalchemy import inspect +from typing import List, Optional +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +from autoLoader.database import get_db_session, Audio, AiConclusion +from autoLoader.config import GIGAAM_API_URL + +# Настройка логирования +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class RecognitionChecker: + """Класс для проверки и отправки файлов на распознавание""" + + def __init__(self, api_url: Optional[str] = None, max_workers: int = 5): + """ + Инициализация checker + + Args: + api_url: URL API GigaAM для распознавания (если None, берётся из config) + max_workers: Максимальное количество параллельных запросов + """ + # Если api_url не передан, берём из config.py + if api_url is None: + api_url = GIGAAM_API_URL + + self.api_url = f"{api_url}/api/call/process" + self.timeout = 10 # таймаут запроса в секундах + self.max_workers = max_workers # количество параллельных потоков + + # Thread-safe счётчики для статистики + self._lock = threading.Lock() + self._sent_count = 0 + self._failed_count = 0 + + logger.info(f"✅ RecognitionChecker инициализирован с URL: {self.api_url}") + logger.info(f"📊 Параллельная отправка: до {max_workers} запросов одновременно") + + def check_database(self) -> bool: + """ + Проверяет существование необходимых таблиц в БД + + Returns: + True если таблицы существуют, иначе False + """ + from autoLoader.database import engine + + inspector = inspect(engine) + existing_tables = inspector.get_table_names() + + required_tables = ['audio', 'ai_conclusion'] + missing_tables = [t for t in required_tables if t not in existing_tables] + + if missing_tables: + logger.error(f"❌ Отсутствуют таблицы: {missing_tables}") + return False + + return True + + def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Audio]: + """ + Находит все файлы, у которых нет AI заключения + + Args: + limit: Ограничение количества файлов (None = все) + + Returns: + Список объектов Audio без заключения + """ + if not self.check_database(): + logger.error("❌ База данных не готова") + return [] + + try: + with get_db_session() as db: + # Подзапрос: находим все audio_id, у которых есть заключение + from sqlalchemy import distinct + + audio_with_conclusion = db.query( + distinct(AiConclusion.audio_id) + ).filter( + AiConclusion.audio_id.isnot(None) + ).all() + + # Извлекаем ID из кортежей + conclusion_ids = [row[0] for row in audio_with_conclusion] + + # Находим все audio, у которых нет заключения + if conclusion_ids: + files_without_conclusion = db.query(Audio).filter( + ~Audio.id.in_(conclusion_ids) + ).all() + else: + # Если заключений нет вообще - все файлы без заключения + files_without_conclusion = db.query(Audio).all() + + logger.info(f"📊 Найдено файлов без заключения: {len(files_without_conclusion)}") + + if limit: + return files_without_conclusion[:limit] + + return files_without_conclusion + + except Exception as e: + logger.error(f"❌ Ошибка при поиске файлов: {e}") + return [] + + def send_to_recognition(self, audio: Audio) -> bool: + """ + Отправляет файл на распознавание в GigaAM API + + Args: + audio: Объект Audio для распознавания + + Returns: + True если успешно отправлен, иначе False + """ + payload = { + "filename": audio.filename + } + + try: + logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {audio.filename} на распознавание...") + + response = requests.post( + self.api_url, + json=payload, + timeout=self.timeout + ) + + if response.status_code == 200 or response.status_code == 202: + logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {audio.filename} успешно отправлен") + + # Thread-safe обновление счётчиков + with self._lock: + self._sent_count += 1 + + return True + else: + logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка API {response.status_code}: {response.text}") + + # Thread-safe обновление счётчиков + with self._lock: + self._failed_count += 1 + + return False + + except requests.exceptions.Timeout: + logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {audio.filename}") + + with self._lock: + self._failed_count += 1 + + return False + except requests.exceptions.ConnectionError: + logger.error(f"❌ [Thread-{threading.current_thread().name}] Не удалось подключиться к API {self.api_url}") + + with self._lock: + self._failed_count += 1 + + return False + except Exception as e: + logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {audio.filename}: {e}") + + with self._lock: + self._failed_count += 1 + + return False + + def send_to_recognition_parallel(self, audio_list: List[Audio]) -> dict: + """ + Отправляет несколько файлов на распознавание параллельно + + Args: + audio_list: Список объектов Audio для распознавания + + Returns: + Словарь с результатами отправки + """ + if not audio_list: + logger.info("⏭️ Список файлов пуст, нечего отправлять") + return { + "total": 0, + "sent": 0, + "failed": 0, + "files": [] + } + + # Сбрасываем счётчики + self._sent_count = 0 + self._failed_count = 0 + + logger.info(f"🚀 Начинаем параллельную отправку {len(audio_list)} файлов") + logger.info(f"📊 Количество потоков: {self.max_workers}") + + results = { + "total": len(audio_list), + "sent": 0, + "failed": 0, + "files": [] + } + + # Используем ThreadPoolExecutor для параллельной отправки + with ThreadPoolExecutor(max_workers=self.max_workers, thread_name_prefix="SendReq") as executor: + # Запускаем все задачи + future_to_audio = { + executor.submit(self.send_to_recognition, audio): audio + for audio in audio_list + } + + # Обрабатываем результаты по мере завершения + for future in as_completed(future_to_audio): + audio = future_to_audio[future] + + try: + success = future.result() + + result = { + "filename": audio.filename, + "audio_id": str(audio.id), + "success": success + } + + results["files"].append(result) + + except Exception as exc: + logger.error(f"❌ Файл {audio.filename} сгенерировал исключение: {exc}") + + result = { + "filename": audio.filename, + "audio_id": str(audio.id), + "success": False, + "error": str(exc) + } + + results["files"].append(result) + + # Получаем итоговую статистику из счётчиков + results["sent"] = self._sent_count + results["failed"] = self._failed_count + + # Логирование итогов + logger.info(f"📊 Итого параллельной отправки:") + logger.info(f" - Всего: {results['total']}") + logger.info(f" - Отправлено: {results['sent']}") + logger.info(f" - Ошибок: {results['failed']}") + + return results + + def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> dict: + """ + Находит и отправляет все файлы без заключения на распознавание + + Args: + limit: Максимальное количество файлов для обработки + parallel: Использовать параллельную отправку (по умолчанию True) + + Returns: + Словарь с результатами обработки + """ + logger.info("🔍 Поиск файлов без AI заключения...") + + files_without_conclusion = self.get_files_without_conclusion(limit) + + if not files_without_conclusion: + logger.info("✅ Все файлы обработаны") + return { + "total": 0, + "sent": 0, + "failed": 0, + "files": [] + } + + # Выбираем метод отправки + if parallel: + logger.info("🚀 Используем параллельную отправку") + return self.send_to_recognition_parallel(files_without_conclusion) + else: + logger.info("📤 Используем последовательную отправку") + results = { + "total": len(files_without_conclusion), + "sent": 0, + "failed": 0, + "files": [] + } + + for audio in files_without_conclusion: + success = self.send_to_recognition(audio) + + result = { + "filename": audio.filename, + "audio_id": str(audio.id), + "success": success + } + + results["files"].append(result) + + if success: + results["sent"] += 1 + else: + results["failed"] += 1 + + # Логирование итогов + logger.info(f"📊 Итого:") + logger.info(f" - Всего: {results['total']}") + logger.info(f" - Отправлено: {results['sent']}") + logger.info(f" - Ошибок: {results['failed']}") + + return results + + def check_api_availability(self) -> bool: + """ + Проверяет доступность GigaAM API + + Returns: + True если API доступен, иначе False + """ + try: + # Проверяем health endpoint или просто подключение + response = requests.get( + self.api_url.replace("/process", "/status"), # Пробуем /status + timeout=5 + ) + + if response.status_code in [200, 404]: # 404 тоже ок - API работает + logger.info("✅ GigaAM API доступен") + return True + + except requests.exceptions.ConnectionError: + logger.warning("⚠️ GigaAM API недоступен") + return False + except Exception as e: + logger.warning(f"⚠️ Ошибка проверки API: {e}") + return False + + return True + + +# Удобная функция для запуска из командной строки +def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] = None): + """ + Обрабатывает все файлы без заключения + + Args: + api_url: URL GigaAM API (если None, берётся из config.py) + limit: Максимальное количество файлов для обработки + + Returns: + Результаты обработки + """ + checker = RecognitionChecker(api_url) + + # Проверяем доступность API + if not checker.check_api_availability(): + logger.error("❌ GigaAM API недоступен. Проверьте, запущен ли сервис.") + return { + "total": 0, + "sent": 0, + "failed": 0, + "error": "API unavailable" + } + + # Обрабатываем файлы + return checker.process_all_pending(limit) + + +if __name__ == "__main__": + # Пример использования + import sys + + api_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:5001/api/call/process" + limit = int(sys.argv[2]) if len(sys.argv) > 2 else None + + results = process_pending_files(api_url, limit) + + print(f"\n📊 Результаты:") + print(f"Всего: {results['total']}") + print(f"Отправлено: {results['sent']}") + print(f"Ошибок: {results['failed']}")