Files
RAG_helper/services/document_processor.py
T
AR 15 M4 e534a74460 feat(sprint2.5): чистка чанков и переиндексация
Чанкер тащил в базу markdown-мусор: навигационные блоки «Вернуться на:»
со списками ссылок, инлайн-ссылки [текст](url) в теле, служебные
пометки _Источник: .../file.md_, лишние пустые строки. Всё это ело
контекст LLM и засоряло правую панель отладки.

- services/text_cleanup: clean_markdown_text — удаляет навигационные
  строки, строки-только-ссылки (обычно это меню), служебные _Источник:_,
  раскрывает инлайн-ссылки [x](url) → x, сжимает 3+ переносов до 2.
- services/document_processor: process_document теперь возвращает
  (id, raw_text, sections, chunks); чистку применяем к заголовкам и
  телам секций; чанки короче 20 символов выбрасываем с пересчётом
  индексов. Вспомогательная rechunk_raw_text — для переиндексации.

Чтобы переиндексировать без повторной загрузки файла, нужен исходный
текст. Вводим отдельный слой:
- новая таблица SQLite documents (id, name, file_type, raw_text,
  created_at, updated_at) + миграция Alembic 7ee7296ccd6d.
- db/models/Document + регистрация в db.models.__init__.
- services/document_service: save/get/list/delete для raw_text.
- routers/documents.upload: сохраняет raw_text в SQLite перед
  индексацией в Chroma; delete убирает и из SQLite, и из Chroma.
- Новые эндпоинты POST /documents/{id}/reindex и
  POST /documents/reindex-all — берут raw_text из SQLite, пропускают
  через rechunk_raw_text, заменяют чанки в Chroma.

Существующие 4 документа были перезалиты вручную (решение: не делать
одноразовый backfill, проще залить заново). Старая Chroma очищена,
новые чанки прошли через чистку — мусор ушёл.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 11:15:08 +05:00

353 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import io
import logging
import re
import uuid
from dataclasses import dataclass
from pathlib import Path
import fitz # pymupdf
from docx import Document as DocxDocument
from config import settings
from services.text_cleanup import clean_markdown_text
MIN_CHUNK_TEXT_LENGTH = 20 # чанки короче — выбрасываем (обычно это хвосты после чистки)
logger = logging.getLogger(__name__)
@dataclass
class ParsedSection:
heading: str
heading_level: int
body: str
page_number: int = 0
@dataclass
class Chunk:
text: str
section: str = ""
page_number: int = 0
chunk_index: int = 0
# --- Parsers ---
def parse_pdf(file_bytes: bytes) -> list[ParsedSection]:
doc = fitz.open(stream=file_bytes, filetype="pdf")
sections: list[ParsedSection] = []
current_heading = ""
current_body_lines: list[str] = []
current_page = 0
for page_num in range(len(doc)):
page = doc[page_num]
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if "lines" not in block:
continue
for line in block["lines"]:
text = "".join(span["text"] for span in line["spans"]).strip()
if not text:
continue
max_size = max(span["size"] for span in line["spans"])
is_bold = any("bold" in span["font"].lower() for span in line["spans"])
if (max_size >= 14 or (is_bold and max_size >= 12)) and len(text) < 200:
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=1 if max_size >= 16 else 2,
body="\n".join(current_body_lines).strip(),
page_number=current_page,
))
current_heading = text
current_body_lines = []
current_page = page_num + 1
else:
current_body_lines.append(text)
if not current_heading:
current_page = page_num + 1
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=2,
body="\n".join(current_body_lines).strip(),
page_number=current_page,
))
doc.close()
return sections
def parse_docx(file_bytes: bytes) -> list[ParsedSection]:
doc = DocxDocument(io.BytesIO(file_bytes))
sections: list[ParsedSection] = []
current_heading = ""
current_level = 0
current_body_lines: list[str] = []
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
style_name = (para.style.name or "").lower()
if "heading" in style_name or "title" in style_name:
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
level_match = re.search(r"\d+", style_name)
current_level = int(level_match.group()) if level_match else 1
current_heading = text
current_body_lines = []
else:
current_body_lines.append(text)
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
return sections
def parse_text(file_bytes: bytes, is_markdown: bool = False) -> list[ParsedSection]:
"""Parse wiki-style TXT/MD.
Эвристики под wiki операторов:
- markdown-заголовки (#, ##, ...)
- нумерованные пункты «1.», «1.1.», «1.1.1.»
- FAQ-паттерн «В:» / «Вопрос:» — воспринимаем как начало новой секции
- ALL-CAPS строки (короткие) — заголовок
"""
text = file_bytes.decode("utf-8", errors="replace")
lines = text.split("\n")
sections: list[ParsedSection] = []
current_heading = ""
current_level = 0
current_body_lines: list[str] = []
md_heading_re = re.compile(r"^(#{1,6})\s+(.+)")
numbered_heading_re = re.compile(r"^(\d+(?:\.\d+)*\.?)\s+([А-ЯЁA-Z].*)")
faq_question_re = re.compile(r"^(В|Вопрос|Q|Question)\s*[:\.]\s*(.+)", re.IGNORECASE)
for line in lines:
stripped = line.strip()
heading_text = None
heading_level = 0
md_match = md_heading_re.match(stripped)
if md_match:
heading_level = len(md_match.group(1))
heading_text = md_match.group(2).strip()
if not heading_text:
num_match = numbered_heading_re.match(stripped)
if num_match and len(stripped) < 200:
dots = num_match.group(1).count(".")
heading_level = max(1, dots + 1)
heading_text = stripped
if not heading_text:
faq_match = faq_question_re.match(stripped)
if faq_match and len(stripped) < 300:
heading_text = faq_match.group(2).strip()
heading_level = 3
if not heading_text and stripped.isupper() and 3 < len(stripped) < 200:
heading_text = stripped
heading_level = 1
if heading_text:
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
current_heading = heading_text
current_level = heading_level
current_body_lines = []
else:
current_body_lines.append(line)
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
return sections
# --- Chunker ---
def _split_sentences(text: str) -> list[str]:
sentences = re.split(r"(?<=[.!?])\s+", text)
return [s.strip() for s in sentences if s.strip()]
def chunk_sections(
sections: list[ParsedSection],
max_chunk_size: int | None = None,
min_chunk_size: int | None = None,
overlap_sentences: int | None = None,
) -> list[Chunk]:
"""Чанкинг wiki-секций.
- Малые секции (FAQ-ответы) держим целиком — один чанк = одна тема.
- Большие секции (регламенты) режем по абзацам, с overlap последних N предложений.
- Мелкие соседние секции склеиваем, чтобы не плодить огрызки.
"""
max_size = max_chunk_size or settings.max_chunk_size
min_size = min_chunk_size or settings.min_chunk_size
overlap = overlap_sentences or settings.overlap_sentences
raw_chunks: list[Chunk] = []
for section in sections:
heading_prefix = f"{section.heading}\n\n" if section.heading else ""
full_text = heading_prefix + section.body
if len(full_text) <= max_size:
raw_chunks.append(Chunk(
text=full_text.strip(),
section=section.heading,
page_number=section.page_number,
))
else:
paragraphs = section.body.split("\n")
current_text = heading_prefix
for para in paragraphs:
if len(current_text) + len(para) + 1 > max_size and len(current_text) > len(heading_prefix):
raw_chunks.append(Chunk(
text=current_text.strip(),
section=section.heading,
page_number=section.page_number,
))
current_text = heading_prefix + para + "\n"
else:
current_text += para + "\n"
if current_text.strip() and current_text.strip() != heading_prefix.strip():
raw_chunks.append(Chunk(
text=current_text.strip(),
section=section.heading,
page_number=section.page_number,
))
merged: list[Chunk] = []
for chunk in raw_chunks:
if merged and len(merged[-1].text) < min_size:
merged[-1].text += "\n\n" + chunk.text
if not merged[-1].section:
merged[-1].section = chunk.section
else:
merged.append(Chunk(
text=chunk.text,
section=chunk.section,
page_number=chunk.page_number,
))
final: list[Chunk] = []
for i, chunk in enumerate(merged):
if i > 0 and overlap > 0:
prev_sentences = _split_sentences(merged[i - 1].text)
overlap_text = " ".join(prev_sentences[-overlap:])
if overlap_text and overlap_text not in chunk.text:
chunk.text = overlap_text + "\n\n" + chunk.text
chunk.chunk_index = i
final.append(chunk)
return final
# --- Main processor ---
def _sections_to_markdown(sections: list[ParsedSection]) -> str:
"""Собрать секции в markdown-подобный текст — используется как raw_text для PDF/DOCX,
чтобы при переиндексации можно было снова пропустить через parse_text."""
parts = []
for s in sections:
if s.heading:
parts.append(f"{'#' * max(1, s.heading_level)} {s.heading}")
if s.body:
parts.append(s.body)
return "\n\n".join(parts).strip()
def process_document(
file_bytes: bytes, filename: str
) -> tuple[str, str, list[ParsedSection], list[Chunk]]:
"""Парсит документ, чистит markdown-мусор, режет на чанки.
Returns: (document_id, raw_text, sections, chunks)
raw_text — очищенный текст, пригодный для переиндексации с новыми правилами.
"""
document_id = str(uuid.uuid4())
ext = Path(filename).suffix.lower()
if ext == ".pdf":
sections = parse_pdf(file_bytes)
raw_text = _sections_to_markdown(sections)
elif ext in (".docx", ".doc"):
sections = parse_docx(file_bytes)
raw_text = _sections_to_markdown(sections)
elif ext == ".md":
raw_text = file_bytes.decode("utf-8", errors="replace")
cleaned = clean_markdown_text(raw_text)
sections = parse_text(cleaned.encode("utf-8"), is_markdown=True)
elif ext == ".txt":
raw_text = file_bytes.decode("utf-8", errors="replace")
sections = parse_text(raw_text.encode("utf-8"), is_markdown=False)
else:
raise ValueError(f"Unsupported file format: {ext}")
# Страховка — чистим секции, даже если в исходнике уже очищали.
for s in sections:
s.heading = clean_markdown_text(s.heading) if s.heading else ""
s.body = clean_markdown_text(s.body)
sections = [s for s in sections if s.heading or s.body.strip()]
if not sections:
logger.warning("No sections found in %s", filename)
return document_id, raw_text, [], []
chunks = chunk_sections(sections)
# Отбрасываем пустые и совсем мелкие хвосты; переиндексируем.
chunks = [c for c in chunks if len(c.text.strip()) >= MIN_CHUNK_TEXT_LENGTH]
for i, c in enumerate(chunks):
c.chunk_index = i
logger.info("Processed '%s': %d sections → %d chunks (cleaned)", filename, len(sections), len(chunks))
return document_id, raw_text, sections, chunks
def rechunk_raw_text(raw_text: str) -> list[Chunk]:
"""Для переиндексации: режем сохранённый текст с актуальными правилами чистки."""
cleaned = clean_markdown_text(raw_text)
sections = parse_text(cleaned.encode("utf-8"), is_markdown=True)
for s in sections:
s.heading = clean_markdown_text(s.heading) if s.heading else ""
s.body = clean_markdown_text(s.body)
sections = [s for s in sections if s.heading or s.body.strip()]
chunks = chunk_sections(sections)
chunks = [c for c in chunks if len(c.text.strip()) >= MIN_CHUNK_TEXT_LENGTH]
for i, c in enumerate(chunks):
c.chunk_index = i
return chunks