Compare commits

...

11 Commits

Author SHA1 Message Date
poturaevpetr 2a61e8422d Merge remote-tracking branch 'origin/dev' into dev 2025-12-25 22:38:29 +05:00
poturaevpetr f58c95708f add post file to gigaam service 2025-12-25 22:38:09 +05:00
PoturaevPetr efa4d775e4 fix type field audio 2025-12-25 22:22:44 +05:00
poturaevpetr ebfb48a2b8 create tables 2025-12-25 22:14:58 +05:00
poturaevpetr 577dc277e0 fix model import 2025-12-25 22:08:58 +05:00
poturaevpetr c2289bfc45 fix model 2025-12-25 22:03:53 +05:00
poturaevpetr fa4c46173f add write audio to db 2025-12-25 21:57:30 +05:00
PoturaevPetr 711d7b6b1e fix field table 2025-12-25 21:46:57 +05:00
PoturaevPetr 8b471c771a add autoLoader 2025-12-25 21:46:19 +05:00
PoturaevPetr d4db29f710 update requirements.txt 2025-12-25 15:01:59 +05:00
poturaevpetr cfcc84eac9 add env.test 2025-12-25 14:24:57 +05:00
19 changed files with 638 additions and 11 deletions
+24
View File
@@ -0,0 +1,24 @@
# Database
# DATABASE_URL=sqlite:///./speech_analytics.db
# Для PostgreSQL:
DATABASE_URL=postgresql://postgres_test:test_user@postgres_test:5432/audiofiles_db
# API Settings
API_V1_PREFIX=/api/v1
MAX_UPLOAD_SIZE=104857600 # 100MB in bytes
# Application
APP_TITLE=Speech Analytics API
APP_VERSION=1.0.0
# Server
HOST=0.0.0.0
PORT=5056
RELOAD=True
#SFTP
SFPT_HOSTNAME = 192.168.1.150
SFPT_USERNAME = monitor
SFPT_PASSWORD = Audio4analy6!6
FILESAPTH = audiofiles
+3
View File
@@ -17,3 +17,6 @@ ALLOWED_AUDIO_EXTENSIONS = {".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac"}
# Application # Application
APP_TITLE = "Speech Analytics API" APP_TITLE = "Speech Analytics API"
APP_VERSION = "1.0.0" APP_VERSION = "1.0.0"
PORT = int(os.getenv("PORT", "8000"))
HOST = os.getenv("HOST", "localhost")
+1
View File
@@ -19,3 +19,4 @@ class AiConclusion(Base):
end_date = Column(DateTime) end_date = Column(DateTime)
audio = relationship("Audio", back_populates="ai_conclusion") audio = relationship("Audio", back_populates="ai_conclusion")
versions = relationship("ConclusionVersion", back_populates="ai_conclusion")
+3 -3
View File
@@ -1,4 +1,4 @@
from sqlalchemy import Column, String, DateTime, UUID, ForeignKey, Float, Integer from sqlalchemy import Column, String, DateTime, Text, UUID, ForeignKey, Float, Integer
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from apiApp.database import Base from apiApp.database import Base
import uuid import uuid
@@ -9,9 +9,9 @@ class Audio(Base):
__tablename__ = "audio" __tablename__ = "audio"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
filename = Column(String(255), nullable=False) filename = Column(Text, nullable=False)
index_date = Column(DateTime, default=datetime.utcnow) index_date = Column(DateTime, default=datetime.utcnow)
file_path = Column(String(500)) file_path = Column(Text)
duration = Column(Float) duration = Column(Float)
file_size = Column(Integer) file_size = Column(Integer)
+4 -1
View File
@@ -1,4 +1,5 @@
from sqlalchemy import Column, UUID, ForeignKey, Integer, Text from sqlalchemy import Column, UUID, ForeignKey, Integer, Text
from sqlalchemy.orm import relationship
from apiApp.database import Base from apiApp.database import Base
import uuid import uuid
@@ -7,6 +8,8 @@ class ConclusionVersion(Base):
__tablename__ = "conclusion_version" __tablename__ = "conclusion_version"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
conclusion_id = Column(UUID(as_uuid=True), ForeignKey("conclusion.id")) conclusion_id = Column(UUID(as_uuid=True), ForeignKey("ai_conclusion.id"))
version = Column(Integer) version = Column(Integer)
content = Column(Text) content = Column(Text)
ai_conclusion = relationship("AiConclusion", back_populates="versions")
+2 -2
View File
@@ -11,6 +11,6 @@ class Operator(Base):
fio = Column(String(100)) fio = Column(String(100))
num = Column(Integer) num = Column(Integer)
calls = relationship("Call", back_populates="operator") # TODO: Добавить relationship когда будет создана модель Call
# calls = relationship("Call", back_populates="operator")
+3
View File
@@ -29,3 +29,6 @@ from apiApp.database.Operator import Operator
from apiApp.database.Audio import Audio from apiApp.database.Audio import Audio
from apiApp.database.AiConclusion import AiConclusion from apiApp.database.AiConclusion import AiConclusion
from apiApp.database.ConclusionVersion import ConclusionVersion from apiApp.database.ConclusionVersion import ConclusionVersion
# Все модели должны быть импортированы здесь для правильной работы SQLAlchemy metadata
# Это гарантирует, что все relationship будут работать корректно
+3
View File
@@ -0,0 +1,3 @@
from autoLoader.database import *
from autoLoader.loader import Loader
loader = Loader()
+7
View File
@@ -0,0 +1,7 @@
import os
SFTP_HOSTNAME = os.getenv("SFPT_HOSTNAME", "192.168.1.150")
SFTP_USERNAME = os.getenv("SFPT_USERNAME", "monitor")
SFTP_PASSWORD = os.getenv("SFPT_PASSWORD", "Audio4analy6!6")
FILESAPTH = os.getenv("FILESAPTH", "audiofiles")
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./speech_analytics.db")
+41
View File
@@ -0,0 +1,41 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from autoLoader.config import DATABASE_URL
from contextlib import contextmanager
# Создаём engine, но используем Base из apiApp.database
from apiApp.database import Base, engine
# SessionLocal (используем тот же engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Зависимость для получения сессии БД (для FastAPI)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Контекстный менеджер для использования в loader
@contextmanager
def get_db_session():
"""Контекстный менеджер для работы с БД в loader"""
db = SessionLocal()
try:
yield db
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()
# Импортируем ТОЛЬКО нужные модели из apiApp.database
# НЕ импортируем из autoLoader.database, чтобы избежать дублирования таблиц
from apiApp.database.Audio import Audio
from apiApp.database.AiConclusion import AiConclusion
+5
View File
@@ -0,0 +1,5 @@
from .loader import Loader
from .connector import ConnectorSFTP
from .recognition_checker import RecognitionChecker, process_pending_files
__all__ = ['Loader', 'ConnectorSFTP', 'RecognitionChecker', 'process_pending_files']
+53
View File
@@ -0,0 +1,53 @@
from autoLoader.config import SFTP_HOSTNAME, SFTP_USERNAME, SFTP_PASSWORD
import socket
def check_connection():
"""Проверка доступности сервера"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
try:
result = sock.connect_ex((SFTP_HOSTNAME, 22))
if result == 0:
print("Порт 22 доступен")
else:
print(f"Порт 22 недоступен. Код ошибки: {result}")
except Exception as e:
print(f"Ошибка проверки соединения: {e}")
finally:
sock.close()
# Перед подключением вызовите проверку
# check_connection()
import paramiko
class ConnectorSFTP():
def __init__(self):
self.sftp = None
self.ssh = paramiko.SSHClient()
def connect(self, remote_path: str):
try:
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(
hostname=SFTP_HOSTNAME,
username=SFTP_USERNAME,
password=SFTP_PASSWORD,
)
self.sftp = self.ssh.open_sftp()
remote_path = remote_path.lstrip('/') # Удаляем начальный слэш, если есть
self.sftp.chdir(remote_path) # Переходим в директори
except paramiko.AuthenticationException:
print("Ошибка аутентификации. Проверьте имя пользователя и пароль.")
return None, None
except paramiko.SSHException as e:
print(f"Ошибка SSH: {e}")
return None, None
except FileNotFoundError:
print(f"Директория {remote_path} не найдена на сервере.")
return None, None
except IOError as e:
print(f"Ошибка ввода-вывода: {e}")
return None, None
+106
View File
@@ -0,0 +1,106 @@
from autoLoader.config import FILESAPTH
from autoLoader.loader import ConnectorSFTP
import datetime, os
from autoLoader.database import Audio, get_db_session
from sqlalchemy import inspect
local_path = os.path.join(os.getcwd(), FILESAPTH)
class Loader():
def __init__(self):
self.call_types = ['in']
pass
def filter_call(self, filename: str):
if filename.split("-")[0] in self.call_types:
return True
return False
def check_database(self):
"""Проверяет существование таблиц в БД"""
from autoLoader.database import engine
inspector = inspect(engine)
existing_tables = inspector.get_table_names()
if 'audio' not in existing_tables:
print("❌ Таблица 'audio' не существует в базе данных!")
print("💡 Запустите 'python init_db.py' для создания таблиц")
return False
return True
def load(self):
# Проверяем БД перед началом работы
if not self.check_database():
exit(1)
date_now = datetime.datetime.now()# - datetime.timedelta(days=1)
remote_path = f"/{date_now.strftime('%Y/%m/%d')}"
connector = ConnectorSFTP()
connector.connect(remote_path=remote_path)
sftp_client = connector.sftp
ssh_client = connector.ssh
if sftp_client is None or ssh_client is None:
print("Не удалось подключиться к SFTP. Завершение работы.")
exit(1)
try:
listdir = sftp_client.listdir()
os.makedirs(local_path, exist_ok = True)
for file in listdir:
if self.filter_call(filename=file):
remote_file = f"{file}".lstrip('/')
filepath = os.path.join(local_path, file)
# Проверяем, существует ли файл локально
if os.path.exists(filepath):
print(f"Файл уже существует локально: {file}")
continue
try:
# Скачиваем файл
sftp_client.get(remote_file, filepath)
print(f"📥 Скачан файл: {remote_file}")
# Получаем размер файла
file_size = os.path.getsize(filepath)
# Сохраняем в БД через контекстный менеджер
with get_db_session() as db:
# Проверяем, есть ли уже такой файл в БД
existing_audio = db.query(Audio).filter(Audio.filename == file).first()
if existing_audio:
print(f"⏭️ Файл {file} уже есть в БД, пропускаем")
continue
# Создаём новую запись
audio = Audio()
audio.index_date = datetime.datetime.now()
audio.filename = file
audio.file_size = file_size
db.add(audio)
# commit произойдёт автоматически при выходе из контекста
print(f"✅ Файл {file} сохранён в БД")
except Exception as e:
print(f"❌ Ошибка при обработке файла {remote_file}: {e}")
# Если файл скачался, но ошибка в БД - удаляем файл
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"🗑️ Файл {file} удалён из-за ошибки")
except:
pass
finally:
# Закрываем соединения
sftp_client.close()
ssh_client.close()
+263
View File
@@ -0,0 +1,263 @@
"""
Класс для проверки файлов без AI заключения и отправки на распознавание
"""
import requests
from sqlalchemy import inspect
from typing import List, Optional
import logging
from autoLoader.database import get_db_session, Audio, AiConclusion
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RecognitionChecker:
"""Класс для проверки и отправки файлов на распознавание"""
def __init__(self, api_url: str = "http://localhost:5001/api/call/process"):
"""
Инициализация checker
Args:
api_url: URL API GigaAM для распознавания
"""
self.api_url = api_url
self.timeout = 10 # таймаут запроса в секундах
def check_database(self) -> bool:
"""
Проверяет существование необходимых таблиц в БД
Returns:
True если таблицы существуют, иначе False
"""
from autoLoader.database import engine
inspector = inspect(engine)
existing_tables = inspector.get_table_names()
required_tables = ['audio', 'ai_conclusion']
missing_tables = [t for t in required_tables if t not in existing_tables]
if missing_tables:
logger.error(f"❌ Отсутствуют таблицы: {missing_tables}")
return False
return True
def get_files_without_conclusion(self, limit: Optional[int] = None) -> List[Audio]:
"""
Находит все файлы, у которых нет AI заключения
Args:
limit: Ограничение количества файлов (None = все)
Returns:
Список объектов Audio без заключения
"""
if not self.check_database():
logger.error("❌ База данных не готова")
return []
try:
with get_db_session() as db:
# Подзапрос: находим все audio_id, у которых есть заключение
from sqlalchemy import distinct
audio_with_conclusion = db.query(
distinct(AiConclusion.audio_id)
).filter(
AiConclusion.audio_id.isnot(None)
).all()
# Извлекаем ID из кортежей
conclusion_ids = [row[0] for row in audio_with_conclusion]
# Находим все audio, у которых нет заключения
if conclusion_ids:
files_without_conclusion = db.query(Audio).filter(
~Audio.id.in_(conclusion_ids)
).all()
else:
# Если заключений нет вообще - все файлы без заключения
files_without_conclusion = db.query(Audio).all()
logger.info(f"📊 Найдено файлов без заключения: {len(files_without_conclusion)}")
if limit:
return files_without_conclusion[:limit]
return files_without_conclusion
except Exception as e:
logger.error(f"❌ Ошибка при поиске файлов: {e}")
return []
def send_to_recognition(self, audio: Audio) -> bool:
"""
Отправляет файл на распознавание в GigaAM API
Args:
audio: Объект Audio для распознавания
Returns:
True если успешно отправлен, иначе False
"""
payload = {
"filename": audio.filename
}
try:
logger.info(f"📤 Отправка файла {audio.filename} на распознавание...")
response = requests.post(
self.api_url,
json=payload,
timeout=self.timeout
)
if response.status_code == 200 or response.status_code == 202:
logger.info(f"✅ Файл {audio.filename} успешно отправлен на распознавание")
return True
else:
logger.error(f"❌ Ошибка API {response.status_code}: {response.text}")
return False
except requests.exceptions.Timeout:
logger.error(f"❌ Таймаут при отправке файла {audio.filename}")
return False
except requests.exceptions.ConnectionError:
logger.error(f"❌ Не удалось подключиться к API {self.api_url}")
return False
except Exception as e:
logger.error(f"❌ Ошибка при отправке {audio.filename}: {e}")
return False
def process_all_pending(self, limit: Optional[int] = None) -> dict:
"""
Находит и отправляет все файлы без заключения на распознавание
Args:
limit: Максимальное количество файлов для обработки
Returns:
Словарь с результатами обработки
"""
logger.info("🔍 Поиск файлов без AI заключения...")
files_without_conclusion = self.get_files_without_conclusion(limit)
if not files_without_conclusion:
logger.info("✅ Все файлы обработаны")
return {
"total": 0,
"sent": 0,
"failed": 0,
"files": []
}
results = {
"total": len(files_without_conclusion),
"sent": 0,
"failed": 0,
"files": []
}
for audio in files_without_conclusion:
success = self.send_to_recognition(audio)
result = {
"filename": audio.filename,
"audio_id": str(audio.id),
"success": success
}
results["files"].append(result)
if success:
results["sent"] += 1
else:
results["failed"] += 1
# Логирование итогов
logger.info(f"📊 Итого:")
logger.info(f" - Всего: {results['total']}")
logger.info(f" - Отправлено: {results['sent']}")
logger.info(f" - Ошибок: {results['failed']}")
return results
def check_api_availability(self) -> bool:
"""
Проверяет доступность GigaAM API
Returns:
True если API доступен, иначе False
"""
try:
# Проверяем health endpoint или просто подключение
response = requests.get(
self.api_url.replace("/process", "/status"), # Пробуем /status
timeout=5
)
if response.status_code in [200, 404]: # 404 тоже ок - API работает
logger.info("✅ GigaAM API доступен")
return True
except requests.exceptions.ConnectionError:
logger.warning("⚠️ GigaAM API недоступен")
return False
except Exception as e:
logger.warning(f"⚠️ Ошибка проверки API: {e}")
return False
return True
# Удобная функция для запуска из командной строки
def process_pending_files(api_url: str = "http://localhost:5001/api/call/process", limit: int = None):
"""
Обрабатывает все файлы без заключения
Args:
api_url: URL GigaAM API
limit: Максимальное количество файлов для обработки
Returns:
Результаты обработки
"""
checker = RecognitionChecker(api_url)
# Проверяем доступность API
if not checker.check_api_availability():
logger.error("❌ GigaAM API недоступен. Проверьте, запущен ли сервис.")
return {
"total": 0,
"sent": 0,
"failed": 0,
"error": "API unavailable"
}
# Обрабатываем файлы
return checker.process_all_pending(limit)
if __name__ == "__main__":
# Пример использования
import sys
api_url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:5001/api/call/process"
limit = int(sys.argv[2]) if len(sys.argv) > 2 else None
results = process_pending_files(api_url, limit)
print(f"\n📊 Результаты:")
print(f"Всего: {results['total']}")
print(f"Отправлено: {results['sent']}")
print(f"Ошибок: {results['failed']}")
Executable
+39
View File
@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""
Скрипт для инициализации базы данных
Создаёт все необходимые таблицы
"""
from apiApp.database import Base, engine
from sqlalchemy import inspect
def init_database():
"""Создаёт все таблицы в базе данных"""
print("🔧 Инициализация базы данных...")
# Проверяем существующие таблицы
inspector = inspect(engine)
existing_tables = inspector.get_table_names()
if existing_tables:
print(f"📋 Существующие таблицы: {', '.join(existing_tables)}")
# Создаём все таблицы
Base.metadata.create_all(bind=engine)
# Проверяем результат
inspector = inspect(engine)
all_tables = inspector.get_table_names()
print(f"✅ Создано таблиц: {len(all_tables)}")
for table in all_tables:
print(f" - {table}")
return all_tables
if __name__ == "__main__":
try:
tables = init_database()
print(f"\n🎉 База данных готова! Создано таблиц: {len(tables)}")
except Exception as e:
print(f"\n❌ Ошибка при создании таблиц: {e}")
exit(1)
+2 -2
View File
@@ -5,7 +5,7 @@ from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
import logging import logging
from apiApp.config import APP_TITLE, APP_VERSION, API_V1_PREFIX, UPLOAD_FOLDER, DATABASE_URL from apiApp.config import APP_TITLE, APP_VERSION, API_V1_PREFIX, UPLOAD_FOLDER, DATABASE_URL, PORT, HOST
from apiApp.database import engine, Base from apiApp.database import engine, Base
from apiApp.routers import audio_router, recognition_router from apiApp.routers import audio_router, recognition_router
@@ -77,4 +77,4 @@ async def health_check():
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) uvicorn.run(app, host=HOST, port=PORT)
+2
View File
@@ -4,3 +4,5 @@ sqlalchemy==2.0.35
pydantic==2.9.2 pydantic==2.9.2
python-multipart==0.0.12 python-multipart==0.0.12
aiofiles==24.1.0 aiofiles==24.1.0
psycopg2-binary
paramiko
Executable
+62
View File
@@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""
Скрипт для проверки и отправки файлов на распознавание
"""
from autoLoader import process_pending_files
import sys
import logging
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# URL GigaAM API по умолчанию
DEFAULT_API_URL = "http://localhost:5001/api/call/process"
def main():
"""Главная функция"""
# Получаем параметры из командной строки
api_url = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_API_URL
limit = int(sys.argv[2]) if len(sys.argv) > 2 else None
print(f"🚀 Запуск проверки файлов на распознавание")
print(f"📡 API URL: {api_url}")
if limit:
print(f"📊 Лимит: {limit} файлов")
print()
# Запускаем проверку
results = process_pending_files(api_url=api_url, limit=limit)
# Итоги
print(f"\n{'='*60}")
print(f"📊 ИТОГИ ОБРАБОТКИ:")
print(f"{'='*60}")
print(f"Всего файлов без заключения: {results['total']}")
print(f"✅ Отправлено на распознавание: {results['sent']}")
print(f"❌ Ошибок при отправке: {results['failed']}")
if 'error' in results:
print(f"\n⚠️ {results['error']}")
# Детали по каждому файлу
if results.get('files'):
print(f"\n📋 Детали:")
for file_result in results['files']:
status = "" if file_result['success'] else ""
print(f" {status} {file_result['filename']}")
print(f"{'='*60}\n")
# Код выхода
if results['failed'] > 0:
sys.exit(1)
else:
sys.exit(0)
if __name__ == "__main__":
main()
+12
View File
@@ -0,0 +1,12 @@
from autoLoader import loader
from apiApp.database import Base, engine
# Создаём таблицы, если они не существуют
print("🔧 Создание таблиц базы данных...")
Base.metadata.create_all(bind=engine)
print("✅ Таблицы созданы")
# Запускаем загрузчик
print("\n🚀 Запуск AutoLoader...")
loader.load()
print("✅ AutoLoader завершил работу")