Browse Source

перенос SFTP мониторинг в сервис Calls_WEB_Client_main

dev
poturaevpetr 2 weeks ago
parent
commit
6efd32f368
  1. 6
      apiApp/config.py
  2. 332
      apiApp/routers/audio_management_router.py
  3. 2
      main.py

6
apiApp/config.py

@ -6,6 +6,9 @@ BASE_DIR = Path(__file__).resolve().parent.parent
UPLOAD_FOLDER = BASE_DIR / "uploads"
UPLOAD_FOLDER.mkdir(exist_ok=True)
# Audio files path (shared with Calls_WEB_Client_main)
AUDIOFILES_PATH = os.getenv("AUDIOFILES_PATH", "/app/audiofiles")
# Database
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./speech_analytics.db")
@ -21,6 +24,9 @@ APP_VERSION = "1.0.0"
PORT = int(os.getenv("PORT", "8000"))
HOST = os.getenv("HOST", "localhost")
# GigaAM API Configuration
GIGAAM_API_URL = os.getenv("GIGAAM_API_URL", "http://gigaam_api:5001")
# Calls_WEB_Client_main Webhook Configuration
CALLS_WEB_CLIENT_URL = os.getenv(
"CALLS_WEB_CLIENT_URL",

332
apiApp/routers/audio_management_router.py

@ -0,0 +1,332 @@
"""
API endpoints для управления аудиофайлами (регистрация и пакетная обработка)
Используется Calls_WEB_Client_main для оркестрации процесса распознавания
"""
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import Optional, List
import os
import logging
from datetime import datetime
from apiApp.database import get_db
from apiApp.database.Audio import Audio
from apiApp.config import AUDIOFILES_PATH
logger = logging.getLogger(__name__)
audio_management_router = APIRouter()
class AudioRegisterRequest(BaseModel):
"""Запрос на регистрацию аудиофайла"""
filename: str
file_path: str # Полный путь к файлу в общей папке audiofiles
class AudioProcessAllRequest(BaseModel):
"""Запрос на пакетное распознавание"""
limit: int = 100
class AudioRegisterResponse(BaseModel):
"""Ответ на регистрацию аудиофайла"""
id: str
filename: str
file_size: int
created_at: datetime
@audio_management_router.post("/audio/register", response_model=AudioRegisterResponse, status_code=201)
async def register_audio_file(
request: AudioRegisterRequest,
db: Session = Depends(get_db)
):
"""
Регистрация аудиофайла в БД (без копирования файла)
Создаёт запись в таблице Audio для файла, который уже находится
в общей папке audiofiles. НЕ копирует файл, только создаёт запись в БД.
Args:
request: {filename: "in-xxx.wav", file_path: "/app/audiofiles/in-xxx.wav"}
Returns:
201 Created + информация о созданной записи
400 Bad Request если файл уже зарегистрирован
404 Not Found если файл не существует на диске
"""
try:
filename = request.filename
file_path = request.file_path
logger.info(f"📝 Регистрация файла: {filename}")
# Проверяем, что файл уже существует в общей папке
if not os.path.exists(file_path):
logger.error(f"❌ Файл не найден: {file_path}")
raise HTTPException(
status_code=404,
detail=f'Файл не найден на диске: {file_path}'
)
# Проверяем, что файл не был уже зарегистрирован
existing_audio = db.query(Audio).filter(Audio.filename == filename).first()
if existing_audio:
logger.warning(f" Файл уже зарегистрирован: {filename}")
raise HTTPException(
status_code=400,
detail=f'Файл {filename} уже зарегистрирован в БД'
)
# Получаем размер файла
file_size = os.path.getsize(file_path)
# Создаём запись в БД
audio = Audio()
audio.filename = filename
audio.file_size = file_size
audio.index_date = datetime.utcnow()
db.add(audio)
db.commit()
db.refresh(audio)
logger.info(f"✅ Файл зарегистрирован: {filename} (audio_id={audio.id})")
return AudioRegisterResponse(
id=str(audio.id),
filename=audio.filename,
file_size=audio.file_size,
created_at=audio.index_date
)
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f"❌ Ошибка при регистрации файла: {e}")
raise HTTPException(
status_code=500,
detail=f'Ошибка при регистрации: {str(e)}'
)
def process_audio_file(audio_id: str, db: Session):
"""
Фоновая обработка одного аудиофайла
Отправляет запрос в GigaAM API для распознавания
"""
try:
audio = db.query(Audio).filter(Audio.id == audio_id).first()
if not audio:
logger.error(f"❌ Audio {audio_id} не найден")
return
logger.info(f"🎵 Запуск распознавания для {audio.filename}")
# Формируем запрос в GigaAM API
from apiApp.config import GIGAAM_API_URL
api_url = f"{GIGAAM_API_URL}/api/call/process"
payload = {
"filename": audio.filename
}
# Отправляем запрос в GigaAM API
import requests
response = requests.post(api_url, json=payload, timeout=10)
if response.status_code == 200 or response.status_code == 202:
logger.info(f"✅ Запущено распознавание для {audio.filename}")
else:
logger.error(f"❌ Ошибка запуска распознавания для {audio.filename}: {response.status_code} - {response.text}")
except Exception as e:
logger.error(f"❌ Ошибка при обработке {audio_id}: {e}")
@audio_management_router.post("/audio/process-all")
async def process_all_pending_audio(
request: AudioProcessAllRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
"""
Запуск распознавания для всех Audio без AiConclusion
Находит все записи Audio, у которых нет AiConclusion, и запускает
распознавание для них (до указанного лимита).
Args:
request: {limit: 100} - максимум файлов для обработки
Returns:
200 OK + {
"started_count": 15,
"pending_files": ["file1.wav", "file2.wav", ...],
"total_pending": 50
}
"""
try:
limit = request.limit
logger.info(f"🚀 Поиск Audio без AiConclusion (limit={limit})")
# Находим все Audio без AiConclusion
pending_audio = db.query(Audio).filter(
Audio.AiConclusion == None
).order_by(Audio.index_date.asc()).limit(limit).all()
total_pending = db.query(Audio).filter(Audio.AiConclusion == None).count()
if not pending_audio:
logger.info(" Нет файлов для распознавания")
return {
"started_count": 0,
"pending_files": [],
"total_pending": 0,
"message": "Нет файлов без AiConclusion"
}
logger.info(f"📋 Найдено файлов для обработки: {len(pending_audio)} из {total_pending}")
# Добавляем задачи в фон
started_count = 0
pending_files = []
for audio in pending_audio:
# Проверяем, что файл существует
file_path = os.path.join(AUDIOFILES_PATH, audio.filename)
if not os.path.exists(file_path):
logger.warning(f" Файл не найден на диске: {audio.filename}")
continue
# Добавляем в фон (асинхронно)
# В FastAPI используем BackgroundTasks
# Но нужно создавать новую сессию для каждого таска
pending_files.append(audio.filename)
started_count += 1
# Запускаем обработку в фоне
# Используем lambda для захвата audio_id
background_tasks.add_task(
process_single_audio,
str(audio.id)
)
logger.info(f"✅ Запущено распознавание для {started_count} файлов")
return {
"started_count": started_count,
"pending_files": pending_files,
"total_pending": total_pending
}
except Exception as e:
logger.error(f"❌ Ошибка при запуске пакетного распознавания: {e}")
raise HTTPException(
status_code=500,
detail=f'Ошибка: {str(e)}'
)
def process_single_audio(audio_id: str):
"""
Обработка одного аудиофайла в фоне
Создаёт новую DB сессию для обработки
"""
from apiApp.database import SessionLocal
db = SessionLocal()
try:
process_audio_file(audio_id, db)
finally:
db.close()
@audio_management_router.get("/audio/pending")
async def get_pending_audio(
db: Session = Depends(get_db),
limit: int = 100
):
"""
Получить список Audio без AiConclusion
Query Parameters:
limit: Максимум файлов (default: 100)
Returns:
Список файлов, ожидающих распознавания
"""
try:
pending_audio = db.query(Audio).filter(
Audio.AiConclusion == None
).order_by(Audio.index_date.asc()).limit(limit).all()
files_info = []
for audio in pending_audio:
file_path = os.path.join(AUDIOFILES_PATH, audio.filename)
exists = os.path.exists(file_path)
files_info.append({
"audio_id": str(audio.id),
"filename": audio.filename,
"file_size": audio.file_size,
"created_at": audio.index_date.isoformat() if audio.index_date else None,
"exists_on_disk": exists
})
total_pending = db.query(Audio).filter(Audio.AiConclusion == None).count()
return {
"total_pending": total_pending,
"count": len(files_info),
"files": files_info
}
except Exception as e:
logger.error(f"❌ Ошибка при получении списка: {e}")
raise HTTPException(
status_code=500,
detail=str(e)
)
@audio_management_router.get("/audio/stats")
async def get_audio_stats(db: Session = Depends(get_db)):
"""
Получить статистику по аудиофайлам
Returns:
Статистика по Audio записям
"""
try:
total_audio = db.query(Audio).count()
with_conclusion = db.query(Audio).filter(Audio.AiConclusion != None).count()
without_conclusion = db.query(Audio).filter(Audio.AiConclusion == None).count()
# Проверяем существование файлов на диске
all_audio = db.query(Audio).all()
existing_count = 0
for audio in all_audio:
file_path = os.path.join(AUDIOFILES_PATH, audio.filename)
if os.path.exists(file_path):
existing_count += 1
return {
"total_audio": total_audio,
"with_conclusion": with_conclusion,
"without_conclusion": without_conclusion,
"existing_on_disk": existing_count,
"missing_on_disk": total_audio - existing_count
}
except Exception as e:
logger.error(f"❌ Ошибка при получении статистики: {e}")
raise HTTPException(
status_code=500,
detail=str(e)
)

2
main.py

@ -10,6 +10,7 @@ from apiApp.database import engine, Base
from apiApp.routers import audio_router, recognition_router
from apiApp.routers.ai_conclusion_router import ai_conclusion_router
from apiApp.routers.audio_files_router import audio_files_router
from apiApp.routers.audio_management_router import audio_management_router
# Настройка логирования
logging.basicConfig(level=logging.INFO)
@ -60,6 +61,7 @@ app.include_router(audio_router, prefix=API_V1_PREFIX, tags=["audio"])
app.include_router(recognition_router, prefix=API_V1_PREFIX, tags=["recognition"])
app.include_router(ai_conclusion_router, prefix=API_V1_PREFIX, tags=["ai_conclusion"])
app.include_router(audio_files_router, prefix=API_V1_PREFIX, tags=["audio_files"])
app.include_router(audio_management_router, prefix=API_V1_PREFIX, tags=["audio_management"])
# Статические файлы (для загрузки аудио)
app.mount("/uploads", StaticFiles(directory=str(UPLOAD_FOLDER)), name="uploads")

Loading…
Cancel
Save