import io import logging import re import uuid from dataclasses import dataclass from pathlib import Path import fitz # pymupdf from docx import Document as DocxDocument from config import settings logger = logging.getLogger(__name__) @dataclass class ParsedSection: heading: str heading_level: int body: str page_number: int = 0 @dataclass class Chunk: text: str section: str = "" page_number: int = 0 chunk_index: int = 0 # --- Parsers --- def parse_pdf(file_bytes: bytes) -> list[ParsedSection]: doc = fitz.open(stream=file_bytes, filetype="pdf") sections: list[ParsedSection] = [] current_heading = "" current_body_lines: list[str] = [] current_page = 0 for page_num in range(len(doc)): page = doc[page_num] blocks = page.get_text("dict")["blocks"] for block in blocks: if "lines" not in block: continue for line in block["lines"]: text = "".join(span["text"] for span in line["spans"]).strip() if not text: continue max_size = max(span["size"] for span in line["spans"]) is_bold = any("bold" in span["font"].lower() for span in line["spans"]) if (max_size >= 14 or (is_bold and max_size >= 12)) and len(text) < 200: if current_heading or current_body_lines: sections.append(ParsedSection( heading=current_heading, heading_level=1 if max_size >= 16 else 2, body="\n".join(current_body_lines).strip(), page_number=current_page, )) current_heading = text current_body_lines = [] current_page = page_num + 1 else: current_body_lines.append(text) if not current_heading: current_page = page_num + 1 if current_heading or current_body_lines: sections.append(ParsedSection( heading=current_heading, heading_level=2, body="\n".join(current_body_lines).strip(), page_number=current_page, )) doc.close() return sections def parse_docx(file_bytes: bytes) -> list[ParsedSection]: doc = DocxDocument(io.BytesIO(file_bytes)) sections: list[ParsedSection] = [] current_heading = "" current_level = 0 current_body_lines: list[str] = [] for para in doc.paragraphs: text = para.text.strip() if not text: continue style_name = (para.style.name or "").lower() if "heading" in style_name or "title" in style_name: if current_heading or current_body_lines: sections.append(ParsedSection( heading=current_heading, heading_level=current_level or 1, body="\n".join(current_body_lines).strip(), )) level_match = re.search(r"\d+", style_name) current_level = int(level_match.group()) if level_match else 1 current_heading = text current_body_lines = [] else: current_body_lines.append(text) if current_heading or current_body_lines: sections.append(ParsedSection( heading=current_heading, heading_level=current_level or 1, body="\n".join(current_body_lines).strip(), )) return sections def parse_text(file_bytes: bytes, is_markdown: bool = False) -> list[ParsedSection]: """Parse wiki-style TXT/MD. Эвристики под wiki операторов: - markdown-заголовки (#, ##, ...) - нумерованные пункты «1.», «1.1.», «1.1.1.» - FAQ-паттерн «В:» / «Вопрос:» — воспринимаем как начало новой секции - ALL-CAPS строки (короткие) — заголовок """ text = file_bytes.decode("utf-8", errors="replace") lines = text.split("\n") sections: list[ParsedSection] = [] current_heading = "" current_level = 0 current_body_lines: list[str] = [] md_heading_re = re.compile(r"^(#{1,6})\s+(.+)") numbered_heading_re = re.compile(r"^(\d+(?:\.\d+)*\.?)\s+([А-ЯЁA-Z].*)") faq_question_re = re.compile(r"^(В|Вопрос|Q|Question)\s*[:\.]\s*(.+)", re.IGNORECASE) for line in lines: stripped = line.strip() heading_text = None heading_level = 0 md_match = md_heading_re.match(stripped) if md_match: heading_level = len(md_match.group(1)) heading_text = md_match.group(2).strip() if not heading_text: num_match = numbered_heading_re.match(stripped) if num_match and len(stripped) < 200: dots = num_match.group(1).count(".") heading_level = max(1, dots + 1) heading_text = stripped if not heading_text: faq_match = faq_question_re.match(stripped) if faq_match and len(stripped) < 300: heading_text = faq_match.group(2).strip() heading_level = 3 if not heading_text and stripped.isupper() and 3 < len(stripped) < 200: heading_text = stripped heading_level = 1 if heading_text: if current_heading or current_body_lines: sections.append(ParsedSection( heading=current_heading, heading_level=current_level or 1, body="\n".join(current_body_lines).strip(), )) current_heading = heading_text current_level = heading_level current_body_lines = [] else: current_body_lines.append(line) if current_heading or current_body_lines: sections.append(ParsedSection( heading=current_heading, heading_level=current_level or 1, body="\n".join(current_body_lines).strip(), )) return sections # --- Chunker --- def _split_sentences(text: str) -> list[str]: sentences = re.split(r"(?<=[.!?])\s+", text) return [s.strip() for s in sentences if s.strip()] def chunk_sections( sections: list[ParsedSection], max_chunk_size: int | None = None, min_chunk_size: int | None = None, overlap_sentences: int | None = None, ) -> list[Chunk]: """Чанкинг wiki-секций. - Малые секции (FAQ-ответы) держим целиком — один чанк = одна тема. - Большие секции (регламенты) режем по абзацам, с overlap последних N предложений. - Мелкие соседние секции склеиваем, чтобы не плодить огрызки. """ max_size = max_chunk_size or settings.max_chunk_size min_size = min_chunk_size or settings.min_chunk_size overlap = overlap_sentences or settings.overlap_sentences raw_chunks: list[Chunk] = [] for section in sections: heading_prefix = f"{section.heading}\n\n" if section.heading else "" full_text = heading_prefix + section.body if len(full_text) <= max_size: raw_chunks.append(Chunk( text=full_text.strip(), section=section.heading, page_number=section.page_number, )) else: paragraphs = section.body.split("\n") current_text = heading_prefix for para in paragraphs: if len(current_text) + len(para) + 1 > max_size and len(current_text) > len(heading_prefix): raw_chunks.append(Chunk( text=current_text.strip(), section=section.heading, page_number=section.page_number, )) current_text = heading_prefix + para + "\n" else: current_text += para + "\n" if current_text.strip() and current_text.strip() != heading_prefix.strip(): raw_chunks.append(Chunk( text=current_text.strip(), section=section.heading, page_number=section.page_number, )) merged: list[Chunk] = [] for chunk in raw_chunks: if merged and len(merged[-1].text) < min_size: merged[-1].text += "\n\n" + chunk.text if not merged[-1].section: merged[-1].section = chunk.section else: merged.append(Chunk( text=chunk.text, section=chunk.section, page_number=chunk.page_number, )) final: list[Chunk] = [] for i, chunk in enumerate(merged): if i > 0 and overlap > 0: prev_sentences = _split_sentences(merged[i - 1].text) overlap_text = " ".join(prev_sentences[-overlap:]) if overlap_text and overlap_text not in chunk.text: chunk.text = overlap_text + "\n\n" + chunk.text chunk.chunk_index = i final.append(chunk) return final # --- Main processor --- def process_document(file_bytes: bytes, filename: str) -> tuple[str, list[ParsedSection], list[Chunk]]: document_id = str(uuid.uuid4()) ext = Path(filename).suffix.lower() if ext == ".pdf": sections = parse_pdf(file_bytes) elif ext in (".docx", ".doc"): sections = parse_docx(file_bytes) elif ext == ".md": sections = parse_text(file_bytes, is_markdown=True) elif ext == ".txt": sections = parse_text(file_bytes, is_markdown=False) else: raise ValueError(f"Unsupported file format: {ext}") if not sections: logger.warning("No sections found in %s", filename) return document_id, [], [] chunks = chunk_sections(sections) logger.info("Processed '%s': %d sections → %d chunks", filename, len(sections), len(chunks)) return document_id, sections, chunks