""" Класс для проверки файлов без AI заключения и отправки на распознавание """ import requests from sqlalchemy import inspect from typing import List, Optional import logging from autoLoader.database import get_db_session, Audio, AiConclusion # Настройка логирования logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class RecognitionChecker: """Класс для проверки и отправки файлов на распознавание""" def __init__(self, api_url: str = "http://localhost:5001/api/call/process"): """ Инициализация checker Args: api_url: URL API GigaAM для распознавания """ self.api_url = api_url self.timeout = 10 # таймаут запроса в секундах def check_database(self) -> bool: """ Проверяет существование необходимых таблиц в БД Returns: True если таблицы существуют, иначе False """ from autoLoader.database import engine inspector = inspect(engine) existing_tables = inspector.get_table_names() required_tables = ['audio', 'ai_conclusion'] missing_tables = [t for t in required_tables if t not in existing_tables] if missing_tables: logger.error(f"❌ Отсутствуют таблицы: {missing_tables}") return False return True def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Audio]: """ Находит все файлы, у которых нет AI заключения Args: limit: Ограничение количества файлов (None = все) Returns: Список объектов Audio без заключения """ if not self.check_database(): logger.error("❌ База данных не готова") return [] try: with get_db_session() as db: # Подзапрос: находим все audio_id, у которых есть заключение from sqlalchemy import distinct audio_with_conclusion = db.query( distinct(AiConclusion.audio_id) ).filter( AiConclusion.audio_id.isnot(None) ).all() # Извлекаем ID из кортежей conclusion_ids = [row[0] for row in audio_with_conclusion] # Находим все audio, у которых нет заключения if conclusion_ids: files_without_conclusion = db.query(Audio).filter( ~Audio.id.in_(conclusion_ids) ).all() else: # Если заключений нет вообще - все файлы без заключения files_without_conclusion = db.query(Audio).all() logger.info(f"📊 Найдено файлов без заключения: {len(files_without_conclusion)}") if limit: return files_without_conclusion[:limit] return files_without_conclusion except Exception as e: logger.error(f"❌ Ошибка при поиске файлов: {e}") return [] def send_to_recognition(self, audio: Audio) -> bool: """ Отправляет файл на распознавание в GigaAM API Args: audio: Объект Audio для распознавания Returns: True если успешно отправлен, иначе False """ payload = { "filename": audio.filename } try: logger.info(f"📤 Отправка файла {audio.filename} на распознавание...") response = requests.post( self.api_url, json=payload, timeout=self.timeout ) if response.status_code == 200 or response.status_code == 202: logger.info(f"✅ Файл {audio.filename} успешно отправлен на распознавание") return True else: logger.error(f"❌ Ошибка API {response.status_code}: {response.text}") return False except requests.exceptions.Timeout: logger.error(f"❌ Таймаут при отправке файла {audio.filename}") return False except requests.exceptions.ConnectionError: logger.error(f"❌ Не удалось подключиться к API {self.api_url}") return False except Exception as e: logger.error(f"❌ Ошибка при отправке {audio.filename}: {e}") return False def process_all_pending(self, limit: Optional[int] = None) -> dict: """ Находит и отправляет все файлы без заключения на распознавание Args: limit: Максимальное количество файлов для обработки Returns: Словарь с результатами обработки """ logger.info("🔍 Поиск файлов без AI заключения...") files_without_conclusion = self.get_files_without_conclusion(limit) if not files_without_conclusion: logger.info("✅ Все файлы обработаны") return { "total": 0, "sent": 0, "failed": 0, "files": [] } results = { "total": len(files_without_conclusion), "sent": 0, "failed": 0, "files": [] } for audio in files_without_conclusion: success = self.send_to_recognition(audio) result = { "filename": audio.filename, "audio_id": str(audio.id), "success": success } results["files"].append(result) if success: results["sent"] += 1 else: results["failed"] += 1 # Логирование итогов logger.info(f"📊 Итого:") logger.info(f" - Всего: {results['total']}") logger.info(f" - Отправлено: {results['sent']}") logger.info(f" - Ошибок: {results['failed']}") return results def check_api_availability(self) -> bool: """ Проверяет доступность GigaAM API Returns: True если API доступен, иначе False """ try: # Проверяем health endpoint или просто подключение response = requests.get( self.api_url.replace("/process", "/status"), # Пробуем /status timeout=5 ) if response.status_code in [200, 404]: # 404 тоже ок - API работает logger.info("✅ GigaAM API доступен") return True except requests.exceptions.ConnectionError: logger.warning("⚠️ GigaAM API недоступен") return False except Exception as e: logger.warning(f"⚠️ Ошибка проверки API: {e}") return False return True # Удобная функция для запуска из командной строки def process_pending_files(api_url: str = "http://localhost:5001/api/call/process", limit: int = None): """ Обрабатывает все файлы без заключения Args: api_url: URL GigaAM API limit: Максимальное количество файлов для обработки Returns: Результаты обработки """ checker = RecognitionChecker(api_url) # Проверяем доступность API if not checker.check_api_availability(): logger.error("❌ GigaAM API недоступен. Проверьте, запущен ли сервис.") return { "total": 0, "sent": 0, "failed": 0, "error": "API unavailable" } # Обрабатываем файлы return checker.process_all_pending(limit) if __name__ == "__main__": # Пример использования import sys api_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:5001/api/call/process" limit = int(sys.argv[2]) if len(sys.argv) > 2 else None results = process_pending_files(api_url, limit) print(f"\n📊 Результаты:") print(f"Всего: {results['total']}") print(f"Отправлено: {results['sent']}") print(f"Ошибок: {results['failed']}")