Browse Source

fix checker

dev
poturaevpetr 2 weeks ago
parent
commit
cceaf65576
  1. 8
      autoLoader/loader/loader.py
  2. 85
      autoLoader/loader/recognition_checker.py
  3. 389
      autoLoader/loader/recognition_checker_old.py

8
autoLoader/loader/loader.py

@ -71,7 +71,13 @@ class Loader():
print(f" Файл {audio.filename} уже имеет заключение")
continue
else:
files_to_send.append(audio)
# Конвертируем SQLAlchemy объект в словарь
files_to_send.append({
'id': str(audio.id),
'filename': audio.filename,
'file_size': audio.file_size,
'index_date': audio.index_date.isoformat() if audio.index_date else None
})
if not files_to_send:
print("✅ Все файлы уже имеют заключения")

85
autoLoader/loader/recognition_checker.py

@ -1,12 +1,14 @@
"""
Класс для проверки файлов без AI заключения и отправки на распознавание
Исправленная версия recognition_checker.py
Избегает detached instance error путём использования словарей вместо SQLAlchemy объектов
"""
import requests
from sqlalchemy import inspect
from typing import List, Optional
from typing import List, Optional, Dict, Any
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from autoLoader.database import get_db_session, Audio, AiConclusion
from autoLoader.config import GIGAAM_API_URL
@ -67,7 +69,7 @@ class RecognitionChecker:
return True
def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Audio]:
def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Находит все файлы, у которых нет AI заключения
@ -75,7 +77,7 @@ class RecognitionChecker:
limit: Ограничение количества файлов (None = все)
Returns:
Список объектов Audio без заключения
Список словарей с информацией о файлах (избегаем detached instance)
"""
if not self.check_database():
logger.error("❌ База данных не готова")
@ -97,40 +99,52 @@ class RecognitionChecker:
# Находим все audio, у которых нет заключения
if conclusion_ids:
files_without_conclusion = db.query(Audio).filter(
audio_objects = db.query(Audio).filter(
~Audio.id.in_(conclusion_ids)
).all()
else:
# Если заключений нет вообще - все файлы без заключения
files_without_conclusion = db.query(Audio).all()
audio_objects = db.query(Audio).all()
logger.info(f"📊 Найдено файлов без заключения: {len(files_without_conclusion)}")
# Конвертируем в словари внутри сессии БД
files_data = []
for audio in audio_objects:
files_data.append({
'id': str(audio.id),
'filename': audio.filename,
'file_size': audio.file_size,
'index_date': audio.index_date.isoformat() if audio.index_date else None
})
logger.info(f"📊 Найдено файлов без заключения: {len(files_data)}")
if limit:
return files_without_conclusion[:limit]
return files_data[:limit]
return files_without_conclusion
return files_data
except Exception as e:
logger.error(f"❌ Ошибка при поиске файлов: {e}")
return []
def send_to_recognition(self, audio: Audio) -> bool:
def send_to_recognition(self, audio_data: Dict[str, Any]) -> bool:
"""
Отправляет файл на распознавание в GigaAM API
Args:
audio: Объект Audio для распознавания
audio_data: Словарь с данными об аудио файле
Returns:
True если успешно отправлен, иначе False
"""
filename = audio_data.get('filename')
payload = {
"filename": audio.filename
"filename": filename
}
try:
logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {audio.filename} на распознавание...")
logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {filename} на распознавание...")
response = requests.post(
self.api_url,
@ -139,7 +153,7 @@ class RecognitionChecker:
)
if response.status_code == 200 or response.status_code == 202:
logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {audio.filename} успешно отправлен")
logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {filename} успешно отправлен")
# Thread-safe обновление счётчиков
with self._lock:
@ -156,7 +170,7 @@ class RecognitionChecker:
return False
except requests.exceptions.Timeout:
logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {audio.filename}")
logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {filename}")
with self._lock:
self._failed_count += 1
@ -170,19 +184,19 @@ class RecognitionChecker:
return False
except Exception as e:
logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {audio.filename}: {e}")
logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {filename}: {e}")
with self._lock:
self._failed_count += 1
return False
def send_to_recognition_parallel(self, audio_list: List[Audio]) -> dict:
def send_to_recognition_parallel(self, audio_list: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Отправляет несколько файлов на распознавание параллельно
Args:
audio_list: Список объектов Audio для распознавания
audio_list: Список словарей с данными об аудио файлах
Returns:
Словарь с результатами отправки
@ -226,19 +240,19 @@ class RecognitionChecker:
success = future.result()
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"filename": audio.get('filename'),
"audio_id": audio.get('id'),
"success": success
}
results["files"].append(result)
except Exception as exc:
logger.error(f"❌ Файл {audio.filename} сгенерировал исключение: {exc}")
logger.error(f"❌ Файл {audio.get('filename')} сгенерировал исключение: {exc}")
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"filename": audio.get('filename'),
"audio_id": audio.get('id'),
"success": False,
"error": str(exc)
}
@ -257,7 +271,7 @@ class RecognitionChecker:
return results
def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> dict:
def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> Dict[str, Any]:
"""
Находит и отправляет все файлы без заключения на распознавание
@ -298,8 +312,8 @@ class RecognitionChecker:
success = self.send_to_recognition(audio)
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"filename": audio.get('filename'),
"audio_id": audio.get('id'),
"success": success
}
@ -347,7 +361,7 @@ class RecognitionChecker:
# Удобная функция для запуска из командной строки
def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] = None):
def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]:
"""
Обрабатывает все файлы без заключения
@ -372,18 +386,3 @@ def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] =
# Обрабатываем файлы
return checker.process_all_pending(limit)
if __name__ == "__main__":
# Пример использования
import sys
api_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:5001/api/call/process"
limit = int(sys.argv[2]) if len(sys.argv) > 2 else None
results = process_pending_files(api_url, limit)
print(f"\n📊 Результаты:")
print(f"Всего: {results['total']}")
print(f"Отправлено: {results['sent']}")
print(f"Ошибок: {results['failed']}")

389
autoLoader/loader/recognition_checker_old.py

@ -0,0 +1,389 @@
"""
Класс для проверки файлов без AI заключения и отправки на распознавание
"""
import requests
from sqlalchemy import inspect
from typing import List, Optional
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from autoLoader.database import get_db_session, Audio, AiConclusion
from autoLoader.config import GIGAAM_API_URL
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RecognitionChecker:
"""Класс для проверки и отправки файлов на распознавание"""
def __init__(self, api_url: Optional[str] = None, max_workers: int = 5):
"""
Инициализация checker
Args:
api_url: URL API GigaAM для распознавания (если None, берётся из config)
max_workers: Максимальное количество параллельных запросов
"""
# Если api_url не передан, берём из config.py
if api_url is None:
api_url = GIGAAM_API_URL
self.api_url = f"{api_url}/api/call/process"
self.timeout = 10 # таймаут запроса в секундах
self.max_workers = max_workers # количество параллельных потоков
# Thread-safe счётчики для статистики
self._lock = threading.Lock()
self._sent_count = 0
self._failed_count = 0
logger.info(f"✅ RecognitionChecker инициализирован с URL: {self.api_url}")
logger.info(f"📊 Параллельная отправка: до {max_workers} запросов одновременно")
def check_database(self) -> bool:
"""
Проверяет существование необходимых таблиц в БД
Returns:
True если таблицы существуют, иначе False
"""
from autoLoader.database import engine
inspector = inspect(engine)
existing_tables = inspector.get_table_names()
required_tables = ['audio', 'ai_conclusion']
missing_tables = [t for t in required_tables if t not in existing_tables]
if missing_tables:
logger.error(f"❌ Отсутствуют таблицы: {missing_tables}")
return False
return True
def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Audio]:
"""
Находит все файлы, у которых нет AI заключения
Args:
limit: Ограничение количества файлов (None = все)
Returns:
Список объектов Audio без заключения
"""
if not self.check_database():
logger.error("❌ База данных не готова")
return []
try:
with get_db_session() as db:
# Подзапрос: находим все audio_id, у которых есть заключение
from sqlalchemy import distinct
audio_with_conclusion = db.query(
distinct(AiConclusion.audio_id)
).filter(
AiConclusion.audio_id.isnot(None)
).all()
# Извлекаем ID из кортежей
conclusion_ids = [row[0] for row in audio_with_conclusion]
# Находим все audio, у которых нет заключения
if conclusion_ids:
files_without_conclusion = db.query(Audio).filter(
~Audio.id.in_(conclusion_ids)
).all()
else:
# Если заключений нет вообще - все файлы без заключения
files_without_conclusion = db.query(Audio).all()
logger.info(f"📊 Найдено файлов без заключения: {len(files_without_conclusion)}")
if limit:
return files_without_conclusion[:limit]
return files_without_conclusion
except Exception as e:
logger.error(f"❌ Ошибка при поиске файлов: {e}")
return []
def send_to_recognition(self, audio: Audio) -> bool:
"""
Отправляет файл на распознавание в GigaAM API
Args:
audio: Объект Audio для распознавания
Returns:
True если успешно отправлен, иначе False
"""
payload = {
"filename": audio.filename
}
try:
logger.info(f"📤 [Thread-{threading.current_thread().name}] Отправка файла {audio.filename} на распознавание...")
response = requests.post(
self.api_url,
json=payload,
timeout=self.timeout
)
if response.status_code == 200 or response.status_code == 202:
logger.info(f"✅ [Thread-{threading.current_thread().name}] Файл {audio.filename} успешно отправлен")
# Thread-safe обновление счётчиков
with self._lock:
self._sent_count += 1
return True
else:
logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка API {response.status_code}: {response.text}")
# Thread-safe обновление счётчиков
with self._lock:
self._failed_count += 1
return False
except requests.exceptions.Timeout:
logger.error(f"❌ [Thread-{threading.current_thread().name}] Таймаут при отправке файла {audio.filename}")
with self._lock:
self._failed_count += 1
return False
except requests.exceptions.ConnectionError:
logger.error(f"❌ [Thread-{threading.current_thread().name}] Не удалось подключиться к API {self.api_url}")
with self._lock:
self._failed_count += 1
return False
except Exception as e:
logger.error(f"❌ [Thread-{threading.current_thread().name}] Ошибка при отправке {audio.filename}: {e}")
with self._lock:
self._failed_count += 1
return False
def send_to_recognition_parallel(self, audio_list: List[Audio]) -> dict:
"""
Отправляет несколько файлов на распознавание параллельно
Args:
audio_list: Список объектов Audio для распознавания
Returns:
Словарь с результатами отправки
"""
if not audio_list:
logger.info(" Список файлов пуст, нечего отправлять")
return {
"total": 0,
"sent": 0,
"failed": 0,
"files": []
}
# Сбрасываем счётчики
self._sent_count = 0
self._failed_count = 0
logger.info(f"🚀 Начинаем параллельную отправку {len(audio_list)} файлов")
logger.info(f"📊 Количество потоков: {self.max_workers}")
results = {
"total": len(audio_list),
"sent": 0,
"failed": 0,
"files": []
}
# Используем ThreadPoolExecutor для параллельной отправки
with ThreadPoolExecutor(max_workers=self.max_workers, thread_name_prefix="SendReq") as executor:
# Запускаем все задачи
future_to_audio = {
executor.submit(self.send_to_recognition, audio): audio
for audio in audio_list
}
# Обрабатываем результаты по мере завершения
for future in as_completed(future_to_audio):
audio = future_to_audio[future]
try:
success = future.result()
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"success": success
}
results["files"].append(result)
except Exception as exc:
logger.error(f"❌ Файл {audio.filename} сгенерировал исключение: {exc}")
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"success": False,
"error": str(exc)
}
results["files"].append(result)
# Получаем итоговую статистику из счётчиков
results["sent"] = self._sent_count
results["failed"] = self._failed_count
# Логирование итогов
logger.info(f"📊 Итого параллельной отправки:")
logger.info(f" - Всего: {results['total']}")
logger.info(f" - Отправлено: {results['sent']}")
logger.info(f" - Ошибок: {results['failed']}")
return results
def process_all_pending(self, limit: Optional[int] = None, parallel: bool = True) -> dict:
"""
Находит и отправляет все файлы без заключения на распознавание
Args:
limit: Максимальное количество файлов для обработки
parallel: Использовать параллельную отправку (по умолчанию True)
Returns:
Словарь с результатами обработки
"""
logger.info("🔍 Поиск файлов без AI заключения...")
files_without_conclusion = self.get_files_without_conclusion(limit)
if not files_without_conclusion:
logger.info("✅ Все файлы обработаны")
return {
"total": 0,
"sent": 0,
"failed": 0,
"files": []
}
# Выбираем метод отправки
if parallel:
logger.info("🚀 Используем параллельную отправку")
return self.send_to_recognition_parallel(files_without_conclusion)
else:
logger.info("📤 Используем последовательную отправку")
results = {
"total": len(files_without_conclusion),
"sent": 0,
"failed": 0,
"files": []
}
for audio in files_without_conclusion:
success = self.send_to_recognition(audio)
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"success": success
}
results["files"].append(result)
if success:
results["sent"] += 1
else:
results["failed"] += 1
# Логирование итогов
logger.info(f"📊 Итого:")
logger.info(f" - Всего: {results['total']}")
logger.info(f" - Отправлено: {results['sent']}")
logger.info(f" - Ошибок: {results['failed']}")
return results
def check_api_availability(self) -> bool:
"""
Проверяет доступность GigaAM API
Returns:
True если API доступен, иначе False
"""
try:
# Проверяем health endpoint или просто подключение
response = requests.get(
self.api_url.replace("/process", "/status"), # Пробуем /status
timeout=5
)
if response.status_code in [200, 404]: # 404 тоже ок - API работает
logger.info("✅ GigaAM API доступен")
return True
except requests.exceptions.ConnectionError:
logger.warning(" GigaAM API недоступен")
return False
except Exception as e:
logger.warning(f" Ошибка проверки API: {e}")
return False
return True
# Удобная функция для запуска из командной строки
def process_pending_files(api_url: Optional[str] = None, limit: Optional[int] = None):
"""
Обрабатывает все файлы без заключения
Args:
api_url: URL GigaAM API (если None, берётся из config.py)
limit: Максимальное количество файлов для обработки
Returns:
Результаты обработки
"""
checker = RecognitionChecker(api_url)
# Проверяем доступность API
if not checker.check_api_availability():
logger.error("❌ GigaAM API недоступен. Проверьте, запущен ли сервис.")
return {
"total": 0,
"sent": 0,
"failed": 0,
"error": "API unavailable"
}
# Обрабатываем файлы
return checker.process_all_pending(limit)
if __name__ == "__main__":
# Пример использования
import sys
api_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:5001/api/call/process"
limit = int(sys.argv[2]) if len(sys.argv) > 2 else None
results = process_pending_files(api_url, limit)
print(f"\n📊 Результаты:")
print(f"Всего: {results['total']}")
print(f"Отправлено: {results['sent']}")
print(f"Ошибок: {results['failed']}")
Loading…
Cancel
Save