Files
RAG_helper/services/document_processor.py
T
AR 15 M4 4aac59313d feat(sprint8.5+8.6): чанкер v2 (иерархия H1/H2/H3) + регрессия 4 веток в UI
Sprint 8.5 — чанкер v2 (services/document_processor.py):
- markdown-it-py для md-входа: каждый H2 открывает свою секцию, H3 идёт в тело
- множественные H1 — штатный кейс (new_booking.md = 8 H1, шаги воронки + группы);
  H1 без H2 → секция heading=H1; преамбула H1 (тело до первого H2) игнорируется
- YAML frontmatter (--- ... ---) отрезается, в индекс не попадает
- breadcrumb «## {H2}» как первая строка каждого subchunk'а
- merge коротких хвостов и sentence-overlap — только внутри одной H2-секции
- excluded_section_headings в config.py
- 17 unit-тестов на stdlib unittest (tests/test_document_processor_v2.py),
  включая smoke по реальным general_info.md (тимпанометрия → правильная секция)
  и new_booking.md (защита от регрессии множественных H1)
- ТЗ: docs/CHUNKER_v2_TZ.md

Sprint 8.6 — регрессия остальных 4 веток (static/regression.html):
- 4 опции в селекторе режима: branch:price_question (40 кейсов),
  branch:medical_question (29), branch:escalate_human (14), branch:reschedule (16)
- бэкенд из 8b уже параметрический — правок в сервисе не потребовалось
- new_booking вне скоупа — state-machine, под него отдельный 8c (multi-turn)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 09:59:12 +05:00

504 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import io
import logging
import re
import uuid
from dataclasses import dataclass, field
from pathlib import Path
import fitz # pymupdf
import yaml
from docx import Document as DocxDocument
from markdown_it import MarkdownIt
from config import settings
from services.text_cleanup import clean_markdown_text
MIN_CHUNK_TEXT_LENGTH = 20 # чанки короче — выбрасываем (обычно это хвосты после чистки)
logger = logging.getLogger(__name__)
@dataclass
class ParsedSection:
heading: str
heading_level: int
body: str
page_number: int = 0
@dataclass
class Chunk:
text: str
section: str = ""
page_number: int = 0
chunk_index: int = 0
@dataclass
class ParsedMarkdown:
"""Результат парсинга md: frontmatter (если был) + список H2-секций."""
frontmatter: dict = field(default_factory=dict)
sections: list[ParsedSection] = field(default_factory=list)
# --- Parsers ---
def parse_pdf(file_bytes: bytes) -> list[ParsedSection]:
doc = fitz.open(stream=file_bytes, filetype="pdf")
sections: list[ParsedSection] = []
current_heading = ""
current_body_lines: list[str] = []
current_page = 0
for page_num in range(len(doc)):
page = doc[page_num]
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if "lines" not in block:
continue
for line in block["lines"]:
text = "".join(span["text"] for span in line["spans"]).strip()
if not text:
continue
max_size = max(span["size"] for span in line["spans"])
is_bold = any("bold" in span["font"].lower() for span in line["spans"])
if (max_size >= 14 or (is_bold and max_size >= 12)) and len(text) < 200:
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=1 if max_size >= 16 else 2,
body="\n".join(current_body_lines).strip(),
page_number=current_page,
))
current_heading = text
current_body_lines = []
current_page = page_num + 1
else:
current_body_lines.append(text)
if not current_heading:
current_page = page_num + 1
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=2,
body="\n".join(current_body_lines).strip(),
page_number=current_page,
))
doc.close()
return sections
def parse_docx(file_bytes: bytes) -> list[ParsedSection]:
doc = DocxDocument(io.BytesIO(file_bytes))
sections: list[ParsedSection] = []
current_heading = ""
current_level = 0
current_body_lines: list[str] = []
for para in doc.paragraphs:
text = para.text.strip()
if not text:
continue
style_name = (para.style.name or "").lower()
if "heading" in style_name or "title" in style_name:
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
level_match = re.search(r"\d+", style_name)
current_level = int(level_match.group()) if level_match else 1
current_heading = text
current_body_lines = []
else:
current_body_lines.append(text)
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
return sections
def parse_text(file_bytes: bytes, is_markdown: bool = False) -> list[ParsedSection]:
"""Parse wiki-style TXT/MD.
Эвристики под wiki операторов:
- markdown-заголовки (#, ##, ...)
- нумерованные пункты «1.», «1.1.», «1.1.1.»
- FAQ-паттерн «В:» / «Вопрос:» — воспринимаем как начало новой секции
- ALL-CAPS строки (короткие) — заголовок
"""
text = file_bytes.decode("utf-8", errors="replace")
lines = text.split("\n")
sections: list[ParsedSection] = []
current_heading = ""
current_level = 0
current_body_lines: list[str] = []
md_heading_re = re.compile(r"^(#{1,6})\s+(.+)")
numbered_heading_re = re.compile(r"^(\d+(?:\.\d+)*\.?)\s+([А-ЯЁA-Z].*)")
faq_question_re = re.compile(r"^(В|Вопрос|Q|Question)\s*[:\.]\s*(.+)", re.IGNORECASE)
for line in lines:
stripped = line.strip()
heading_text = None
heading_level = 0
md_match = md_heading_re.match(stripped)
if md_match:
heading_level = len(md_match.group(1))
heading_text = md_match.group(2).strip()
if not heading_text:
num_match = numbered_heading_re.match(stripped)
if num_match and len(stripped) < 200:
dots = num_match.group(1).count(".")
heading_level = max(1, dots + 1)
heading_text = stripped
if not heading_text:
faq_match = faq_question_re.match(stripped)
if faq_match and len(stripped) < 300:
heading_text = faq_match.group(2).strip()
heading_level = 3
if not heading_text and stripped.isupper() and 3 < len(stripped) < 200:
heading_text = stripped
heading_level = 1
if heading_text:
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
current_heading = heading_text
current_level = heading_level
current_body_lines = []
else:
current_body_lines.append(line)
if current_heading or current_body_lines:
sections.append(ParsedSection(
heading=current_heading,
heading_level=current_level or 1,
body="\n".join(current_body_lines).strip(),
))
return sections
# --- Markdown parser v2 (иерархия H1/H2/H3, frontmatter, второй H1 → cut) ---
def _split_frontmatter(text: str) -> tuple[dict, str]:
"""Если файл начинается со строки `---`, отрезает YAML-frontmatter и парсит его.
Возвращает (frontmatter_dict, body_text). Если frontmatter не найден или невалиден —
словарь пустой, body_text == исходный text.
"""
if not text.startswith("---"):
return {}, text
lines = text.split("\n")
if not lines or lines[0].strip() != "---":
return {}, text
end = -1
for i in range(1, len(lines)):
if lines[i].strip() == "---":
end = i
break
if end == -1:
return {}, text
fm_text = "\n".join(lines[1:end])
body = "\n".join(lines[end + 1:]).lstrip("\n")
try:
fm = yaml.safe_load(fm_text) or {}
except yaml.YAMLError as exc:
logger.warning("Failed to parse YAML frontmatter: %s", exc)
return {}, text
if not isinstance(fm, dict):
return {}, body
return fm, body
def parse_markdown(text: str, source_label: str = "") -> ParsedMarkdown:
"""Парсер markdown с иерархией H1/H2/H3.
Правила (см. docs/CHUNKER_v2_TZ.md):
- YAML frontmatter (`--- ... ---` в самом начале) — отрезается, в текст не идёт.
- Каждый H1 — это «корневая секция» документа. Множественные H1 поддерживаются
штатно (например, `new_booking.md` имеет 8 H1 — шаги воронки).
- Внутри H1 каждый H2 открывает свою секцию (heading H2 → ParsedSection.heading).
Преамбула H1 (тело между H1 и его первым H2) игнорируется — обычно это служебка
или вступление, дублирующее заголовок.
- Если внутри H1 нет ни одного H2 — H1 сам становится одной секцией с heading H1
и body = всё его содержимое.
- H3 и ниже не открывают секций — идут в тело текущей H2 как есть (`### {текст}`).
"""
frontmatter, body_text = _split_frontmatter(text)
_ = source_label # пока используется только потенциально для будущих логов
md = MarkdownIt("commonmark")
tokens = md.parse(body_text)
lines = body_text.split("\n")
total_lines = len(lines)
# (level, start_line, body_start_line, heading_text)
headings: list[tuple[int, int, int, str]] = []
i = 0
while i < len(tokens):
t = tokens[i]
if t.type == "heading_open" and t.map is not None:
level = int(t.tag[1])
inline = tokens[i + 1] if i + 1 < len(tokens) else None
heading_text = inline.content.strip() if inline is not None else ""
headings.append((level, t.map[0], t.map[1], heading_text))
i += 1
h1_positions = [idx for idx, h in enumerate(headings) if h[0] == 1]
sections: list[ParsedSection] = []
# Файл без H1 — старый кейс (только H2). Обрабатываем как «один виртуальный H1»
# с диапазоном на весь файл, чтобы не ветвиться.
if not h1_positions:
h1_groups = [(0, total_lines, "", -1)] # (h1_body_start, h1_end_line, h1_heading, h1_idx)
else:
h1_groups = []
for pos_idx, h_idx in enumerate(h1_positions):
_, _, h1_body_start, h1_text = headings[h_idx]
if pos_idx + 1 < len(h1_positions):
h1_end = headings[h1_positions[pos_idx + 1]][1]
else:
h1_end = total_lines
h1_groups.append((h1_body_start, h1_end, h1_text, h_idx))
for h1_body_start, h1_end, h1_heading, h1_idx in h1_groups:
# H2-индексы внутри текущего H1-диапазона.
h2_indices_in_group = [
idx for idx, h in enumerate(headings)
if h[0] == 2 and h1_body_start <= h[1] < h1_end
]
if h2_indices_in_group:
for pos, idx in enumerate(h2_indices_in_group):
_, _, body_start, heading_text = headings[idx]
if pos + 1 < len(h2_indices_in_group):
body_end = headings[h2_indices_in_group[pos + 1]][1]
else:
body_end = h1_end
body = "\n".join(lines[body_start:body_end]).strip()
sections.append(ParsedSection(
heading=heading_text,
heading_level=2,
body=body,
))
else:
# H1 без H2 — body H1 идёт одной секцией с heading=H1.
body = "\n".join(lines[h1_body_start:h1_end]).strip()
if body or h1_heading:
sections.append(ParsedSection(
heading=h1_heading,
heading_level=1 if h1_heading else 2,
body=body,
))
return ParsedMarkdown(frontmatter=frontmatter, sections=sections)
# --- Chunker ---
def _split_sentences(text: str) -> list[str]:
sentences = re.split(r"(?<=[.!?])\s+", text)
return [s.strip() for s in sentences if s.strip()]
def chunk_sections(
sections: list[ParsedSection],
max_chunk_size: int | None = None,
min_chunk_size: int | None = None,
overlap_sentences: int | None = None,
) -> list[Chunk]:
"""Чанкинг секций с инвариантом «один чанк ⊆ одна H2-секция».
Ключевые правила (см. docs/CHUNKER_v2_TZ.md):
- Внутри секции разрезаем тело по абзацам (`\\n\\n`).
- В каждом subchunk-е первая строка — breadcrumb `## {heading H2}`.
- Merge коротких хвостов и sentence-overlap работают только внутри одной секции.
- Секции с heading из `excluded_section_headings` пропускаются.
- Секции с пустым heading (PDF/DOCX без заголовка) индексируются без breadcrumb,
чтобы не терять контент при reindex наследия. Для md-входа таких не бывает.
"""
max_size = max_chunk_size or settings.max_chunk_size
min_size = min_chunk_size or settings.min_chunk_size
overlap = overlap_sentences or settings.overlap_sentences
excluded = set(settings.excluded_section_headings or [])
final: list[Chunk] = []
for section in sections:
if section.heading and section.heading in excluded:
continue
body = section.body.strip()
if not body and not section.heading:
continue
breadcrumb = f"## {section.heading}" if section.heading else ""
if breadcrumb:
full_text = f"{breadcrumb}\n\n{body}" if body else breadcrumb
else:
full_text = body
if len(full_text) <= max_size:
section_chunks = [full_text]
else:
paragraphs = [p.strip() for p in re.split(r"\n{2,}", body) if p.strip()]
section_chunks = []
current = breadcrumb
for para in paragraphs:
# Стоимость склейки: текущий + "\n\n" + para.
projected = (current + "\n\n" + para) if current else para
if len(projected) > max_size and current and current != breadcrumb:
section_chunks.append(current)
current = f"{breadcrumb}\n\n{para}" if breadcrumb else para
else:
current = projected
if current and current != breadcrumb:
section_chunks.append(current)
# Merge коротких хвостов — только внутри одной секции.
merged: list[str] = []
for ch in section_chunks:
if merged and len(merged[-1]) < min_size:
merged[-1] = merged[-1] + "\n\n" + ch
else:
merged.append(ch)
# Sentence-overlap — только между subchunk'ами одной секции.
if overlap > 0 and len(merged) > 1:
with_overlap = [merged[0]]
for i in range(1, len(merged)):
prev_sentences = _split_sentences(merged[i - 1])
overlap_text = " ".join(prev_sentences[-overlap:])
if not overlap_text or overlap_text in merged[i]:
with_overlap.append(merged[i])
continue
cur = merged[i]
# Вставляем overlap после breadcrumb-строки, чтобы заголовок остался первой строкой.
if breadcrumb and cur.startswith(breadcrumb + "\n\n"):
rest = cur[len(breadcrumb) + 2:]
new_text = f"{breadcrumb}\n\n{overlap_text}\n\n{rest}"
else:
new_text = f"{overlap_text}\n\n{cur}"
with_overlap.append(new_text)
merged = with_overlap
for ch_text in merged:
final.append(Chunk(
text=ch_text.strip(),
section=section.heading,
page_number=section.page_number,
))
for i, c in enumerate(final):
c.chunk_index = i
return final
# --- Main processor ---
def _sections_to_markdown(sections: list[ParsedSection]) -> str:
"""Собрать секции в markdown-подобный текст для повторной нарезки.
Все секции пишем как H2 — это нормализует выгрузки PDF/DOCX, где `heading_level`
может быть 1 или 2. Иначе reindex через `parse_markdown` потерял бы контент:
одиночный H1 трактуется как корень документа, второй H1 → WARN-обрыв.
"""
parts = []
for s in sections:
if s.heading:
parts.append(f"## {s.heading}")
if s.body:
parts.append(s.body)
return "\n\n".join(parts).strip()
def process_document(
file_bytes: bytes, filename: str
) -> tuple[str, str, list[ParsedSection], list[Chunk]]:
"""Парсит документ, чистит markdown-мусор, режет на чанки.
Returns: (document_id, raw_text, sections, chunks)
raw_text — очищенный текст, пригодный для переиндексации с новыми правилами.
"""
document_id = str(uuid.uuid4())
ext = Path(filename).suffix.lower()
if ext == ".pdf":
sections = parse_pdf(file_bytes)
raw_text = _sections_to_markdown(sections)
elif ext in (".docx", ".doc"):
sections = parse_docx(file_bytes)
raw_text = _sections_to_markdown(sections)
elif ext == ".md":
raw_text = file_bytes.decode("utf-8", errors="replace")
_, body_text = _split_frontmatter(raw_text)
cleaned = clean_markdown_text(body_text)
sections = parse_markdown(cleaned, source_label=filename).sections
elif ext == ".txt":
raw_text = file_bytes.decode("utf-8", errors="replace")
sections = parse_text(raw_text.encode("utf-8"), is_markdown=False)
else:
raise ValueError(f"Unsupported file format: {ext}")
# Страховка — чистим секции, даже если в исходнике уже очищали.
for s in sections:
s.heading = clean_markdown_text(s.heading) if s.heading else ""
s.body = clean_markdown_text(s.body)
sections = [s for s in sections if s.heading or s.body.strip()]
if not sections:
logger.warning("No sections found in %s", filename)
return document_id, raw_text, [], []
chunks = chunk_sections(sections)
# Отбрасываем пустые и совсем мелкие хвосты; переиндексируем.
chunks = [c for c in chunks if len(c.text.strip()) >= MIN_CHUNK_TEXT_LENGTH]
for i, c in enumerate(chunks):
c.chunk_index = i
logger.info("Processed '%s': %d sections → %d chunks (cleaned)", filename, len(sections), len(chunks))
return document_id, raw_text, sections, chunks
def rechunk_raw_text(raw_text: str) -> list[Chunk]:
"""Для переиндексации: режем сохранённый текст с актуальными правилами чистки."""
_, body_text = _split_frontmatter(raw_text)
cleaned = clean_markdown_text(body_text)
sections = parse_markdown(cleaned).sections
for s in sections:
s.heading = clean_markdown_text(s.heading) if s.heading else ""
s.body = clean_markdown_text(s.body)
sections = [s for s in sections if s.heading or s.body.strip()]
chunks = chunk_sections(sections)
chunks = [c for c in chunks if len(c.text.strip()) >= MIN_CHUNK_TEXT_LENGTH]
for i, c in enumerate(chunks):
c.chunk_index = i
return chunks