"""Heading-aware recursive markdown chunking.""" import re from dataclasses import dataclass, field import atocore.config as _config @dataclass class Chunk: content: str chunk_index: int heading_path: str char_count: int metadata: dict = field(default_factory=dict) def chunk_markdown( body: str, base_metadata: dict | None = None, max_size: int | None = None, overlap: int | None = None, min_size: int | None = None, ) -> list[Chunk]: """Split markdown body into chunks using heading-aware strategy. 1. Split on H2 boundaries 2. If section > max_size, split on H3 3. If still > max_size, split on paragraph breaks 4. If still > max_size, hard split with overlap """ max_size = max_size or _config.settings.chunk_max_size overlap = overlap or _config.settings.chunk_overlap min_size = min_size or _config.settings.chunk_min_size base_metadata = base_metadata or {} sections = _split_by_heading(body, level=2) raw_chunks: list[tuple[str, str]] = [] # (heading_path, content) for heading, content in sections: if len(content) <= max_size: raw_chunks.append((heading, content)) else: # Try splitting on H3 subsections = _split_by_heading(content, level=3) for sub_heading, sub_content in subsections: full_path = ( f"{heading} > {sub_heading}" if heading and sub_heading else heading or sub_heading ) if len(sub_content) <= max_size: raw_chunks.append((full_path, sub_content)) else: # Split on paragraphs para_chunks = _split_by_paragraphs( sub_content, max_size, overlap ) for pc in para_chunks: raw_chunks.append((full_path, pc)) # Build final chunks, filtering out too-small ones chunks = [] idx = 0 for heading_path, content in raw_chunks: content = content.strip() if len(content) < min_size: continue chunks.append( Chunk( content=content, chunk_index=idx, heading_path=heading_path, char_count=len(content), metadata={**base_metadata}, ) ) idx += 1 return chunks def _split_by_heading(text: str, level: int) -> list[tuple[str, str]]: """Split text by heading level. Returns (heading_text, section_content) pairs.""" pattern = rf"^({'#' * level})\s+(.+)$" parts: list[tuple[str, str]] = [] current_heading = "" current_lines: list[str] = [] for line in text.split("\n"): match = re.match(pattern, line) if match: # Save previous section if current_lines: parts.append((current_heading, "\n".join(current_lines))) current_heading = match.group(2).strip() current_lines = [] else: current_lines.append(line) # Save last section if current_lines: parts.append((current_heading, "\n".join(current_lines))) return parts def _split_by_paragraphs( text: str, max_size: int, overlap: int ) -> list[str]: """Split text by paragraph breaks, then hard-split if needed.""" paragraphs = re.split(r"\n\n+", text) chunks: list[str] = [] current = "" for para in paragraphs: para = para.strip() if not para: continue if len(current) + len(para) + 2 <= max_size: current = f"{current}\n\n{para}" if current else para else: if current: chunks.append(current) # If single paragraph exceeds max, hard split if len(para) > max_size: chunks.extend(_hard_split(para, max_size, overlap)) else: current = para continue current = "" if current: chunks.append(current) return chunks def _hard_split(text: str, max_size: int, overlap: int) -> list[str]: """Hard split text at max_size with overlap.""" # Prevent infinite loop: overlap must be less than max_size if overlap >= max_size: overlap = max_size // 4 chunks = [] start = 0 while start < len(text): end = start + max_size chunks.append(text[start:end]) start = end - overlap return chunks