"""Подписка ветки на документы базы знаний (Спринт 7, часть A — мульти-RAG, подход A). Связь M:N через таблицу `intent_documents`. Сервис умеет: - читать список document_id для ветки и обратно — список intent_code для документа; - атомарно перезаписывать любой из этих списков (PUT-семантика). При retrieval в `chat_service` для активной ветки берём `list_documents_for_intent_code` и передаём в `vectorstore.query(document_ids=...)`. Дефолт пустой подписки — пустой список (= 0 чанков), это сознательное решение Спринта 7. """ import logging from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from db.models import Document, Intent, IntentDocument logger = logging.getLogger(__name__) async def list_documents_for_intent(session: AsyncSession, intent_id: int) -> list[str]: """Список document_id, на которые подписана ветка.""" stmt = ( select(IntentDocument.document_id) .where(IntentDocument.intent_id == intent_id) .order_by(IntentDocument.created_at) ) return list((await session.execute(stmt)).scalars().all()) async def list_documents_for_intent_code(session: AsyncSession, intent_code: str) -> list[str]: """То же, но по коду ветки. Если ветки нет — пустой список.""" stmt = ( select(IntentDocument.document_id) .join(Intent, Intent.id == IntentDocument.intent_id) .where(Intent.code == intent_code) .order_by(IntentDocument.created_at) ) return list((await session.execute(stmt)).scalars().all()) async def list_intents_for_document(session: AsyncSession, document_id: str) -> list[str]: """Список кодов веток, в которых используется документ.""" stmt = ( select(Intent.code) .join(IntentDocument, IntentDocument.intent_id == Intent.id) .where(IntentDocument.document_id == document_id) .order_by(Intent.order_index, Intent.id) ) return list((await session.execute(stmt)).scalars().all()) async def set_documents_for_intent( session: AsyncSession, intent_id: int, document_ids: list[str], ) -> list[str]: """Перезаписать список подписок ветки целиком. Удаляем строки, которых нет в новом списке; добавляем недостающие. Возвращаем актуальный список после обновления. """ # Существующие подписки этой ветки. existing = set(await list_documents_for_intent(session, intent_id)) desired = set(document_ids) to_delete = existing - desired to_add = desired - existing if to_delete: await session.execute( delete(IntentDocument) .where(IntentDocument.intent_id == intent_id) .where(IntentDocument.document_id.in_(to_delete)) ) if to_add: # Проверяем, что документы существуют — иначе FK-ошибка на коммите будет # неинформативной. Документы, которых нет, тихо пропускаем (это типичная # гонка: оператор удалил документ в одном табе, в другом нажал «сохранить»). existing_doc_ids = set((await session.execute( select(Document.id).where(Document.id.in_(to_add)) )).scalars().all()) missing = to_add - existing_doc_ids if missing: logger.warning( "set_documents_for_intent: skipping unknown document_ids=%s for intent_id=%d", sorted(missing), intent_id, ) for document_id in to_add & existing_doc_ids: session.add(IntentDocument(intent_id=intent_id, document_id=document_id)) await session.commit() return await list_documents_for_intent(session, intent_id) async def set_intents_for_document( session: AsyncSession, document_id: str, intent_codes: list[str], ) -> list[str]: """Перезаписать список веток, к которым привязан документ. Возвращаем актуальный список кодов после обновления. """ # Текущий набор intent_id для документа. current_stmt = ( select(IntentDocument.intent_id) .where(IntentDocument.document_id == document_id) ) existing_ids = set((await session.execute(current_stmt)).scalars().all()) # Желаемый набор intent_id (по кодам). Неизвестные коды — пропускаем. desired_stmt = select(Intent.id, Intent.code).where(Intent.code.in_(intent_codes)) code_to_id: dict[str, int] = {} for row in (await session.execute(desired_stmt)).all(): code_to_id[row.code] = row.id desired_ids = set(code_to_id.values()) missing_codes = set(intent_codes) - set(code_to_id.keys()) if missing_codes: logger.warning( "set_intents_for_document: skipping unknown intent_codes=%s for document_id=%s", sorted(missing_codes), document_id, ) to_delete = existing_ids - desired_ids to_add = desired_ids - existing_ids if to_delete: await session.execute( delete(IntentDocument) .where(IntentDocument.document_id == document_id) .where(IntentDocument.intent_id.in_(to_delete)) ) for intent_id in to_add: session.add(IntentDocument(intent_id=intent_id, document_id=document_id)) await session.commit() return await list_intents_for_document(session, document_id)