Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
a2a4f86
implement md-header-splitter and add tests
OGuggenbuehl Jul 11, 2025
4337a5b
rework md-header splitter to rewrite md-header levels
OGuggenbuehl Jul 29, 2025
393cd53
remove deprecated test
OGuggenbuehl Jul 29, 2025
de6b0d9
Merge branch 'main' into feature/md-header-splitter
OGuggenbuehl Aug 11, 2025
0e9f955
Update haystack/components/preprocessors/markdown_header_splitter.py
OGuggenbuehl Sep 9, 2025
fad1ed7
use native types
OGuggenbuehl Sep 9, 2025
8910485
move to haystack logging
OGuggenbuehl Sep 9, 2025
b3114e6
Update haystack/components/preprocessors/markdown_header_splitter.py
OGuggenbuehl Sep 9, 2025
2abec16
docstrings improvements
OGuggenbuehl Sep 9, 2025
3917116
Merge branch 'feature/md-header-splitter' of https://github.com/OGugg…
OGuggenbuehl Sep 9, 2025
7f92dc9
fix CustomDocumentSplitter arguments
OGuggenbuehl Sep 9, 2025
6d75b58
remove header prefix from content
OGuggenbuehl Sep 9, 2025
c1bb05e
rework split_id assignment to avoid collisions
OGuggenbuehl Sep 9, 2025
970ec90
remove unneeded dese methods
OGuggenbuehl Sep 9, 2025
169cb06
cleanup
OGuggenbuehl Sep 9, 2025
3dc0504
cleanup
OGuggenbuehl Sep 9, 2025
bcbbf9a
add tests
OGuggenbuehl Sep 16, 2025
c7a8756
move initialization of secondary-splitter out of run method
OGuggenbuehl Sep 19, 2025
356ca73
move _custom_document_splitter to class method
OGuggenbuehl Sep 19, 2025
5dde973
removed the _CustomDocumentSplitter class. splitting logic is now enc…
OGuggenbuehl Sep 19, 2025
59c81c7
return to standard feed-forward character and add tests for page brea…
OGuggenbuehl Sep 19, 2025
8fc8281
quit exposing splitting_function param since it shouldn't be changed …
OGuggenbuehl Sep 19, 2025
191d98d
remove test section in module
OGuggenbuehl Sep 19, 2025
3e76544
add license header
OGuggenbuehl Sep 19, 2025
ed5dc6f
add release note
OGuggenbuehl Sep 19, 2025
38e04e7
minor refactor for type safety
OGuggenbuehl Sep 23, 2025
6518ce4
Update haystack/components/preprocessors/markdown_header_splitter.py
OGuggenbuehl Sep 23, 2025
21451f1
remove unneeded release notes entries
OGuggenbuehl Sep 23, 2025
a6028a0
improved documentation for methods
OGuggenbuehl Sep 23, 2025
aca4d4c
improve method naming
OGuggenbuehl Sep 23, 2025
7ef16a7
improved page-number assignment & added return in docstring
OGuggenbuehl Sep 23, 2025
876b244
Merge branch 'main' into feature/md-header-splitter
OGuggenbuehl Sep 23, 2025
5203603
unified page-counting
OGuggenbuehl Sep 24, 2025
debe17e
simplify conditional secondary-split initialization and usage
OGuggenbuehl Sep 24, 2025
fc2cc58
Merge branch 'feature/md-header-splitter' of https://github.com/OGugg…
OGuggenbuehl Sep 24, 2025
b74cefc
fix linting error
OGuggenbuehl Sep 24, 2025
e7e9872
clearly specify the use of ATX-style headers (#) only
OGuggenbuehl Sep 24, 2025
f66e77b
reference doc_id when logging no headers found
OGuggenbuehl Sep 24, 2025
445ffe8
initialize md-header pattern as private variable once
OGuggenbuehl Sep 24, 2025
1b2160b
add example to for inferring header levels to docstring
OGuggenbuehl Sep 25, 2025
94218fa
improve empty document handling
OGuggenbuehl Sep 25, 2025
b6e2486
more explicit testing for inferred headers
OGuggenbuehl Sep 25, 2025
530eafa
fix linting issue
OGuggenbuehl Sep 25, 2025
44e0454
improved empty content handling test cases
OGuggenbuehl Sep 26, 2025
47e3b9e
remove all functionality related to inferring md-header levels
OGuggenbuehl Sep 29, 2025
12fbf8b
compile regex-pattern in init for performance gains
OGuggenbuehl Sep 30, 2025
393f13f
Update haystack/components/preprocessors/markdown_header_splitter.py
OGuggenbuehl Oct 13, 2025
85d9553
change all "none" to proper None values
OGuggenbuehl Oct 13, 2025
5aaec38
fix minor
OGuggenbuehl Oct 13, 2025
00799f6
explicitly test doc content
OGuggenbuehl Oct 13, 2025
645ec7f
rename parentheaders to parent_headers
OGuggenbuehl Oct 13, 2025
da16dd9
test split_id, doc length
OGuggenbuehl Oct 13, 2025
4769715
check meta content
OGuggenbuehl Oct 13, 2025
9b68b76
remove unneeded test
OGuggenbuehl Oct 13, 2025
020a2fe
make split_id testing more robust
OGuggenbuehl Oct 13, 2025
26c7825
more realistic overlap test sample
OGuggenbuehl Oct 14, 2025
b40036a
assign split_id globally to all output docs
OGuggenbuehl Oct 14, 2025
fb6ed86
taste page numbers explicitly
OGuggenbuehl Oct 14, 2025
a00f758
cleanup pagebreak test
OGuggenbuehl Oct 14, 2025
186115f
minor
OGuggenbuehl Oct 14, 2025
88a0460
return doc unchunked if no headers have content
OGuggenbuehl Oct 14, 2025
07ff103
add doc-id to logging statement for unsplit documents
OGuggenbuehl Oct 16, 2025
83c7c07
remove unneeded logs
OGuggenbuehl Oct 16, 2025
3bd6176
minor cleanup
OGuggenbuehl Oct 16, 2025
51b093e
simplify page-number tracking method to not return count, just the up…
OGuggenbuehl Oct 16, 2025
e333d12
add dev comment to mypy check for doc.content is None
OGuggenbuehl Oct 16, 2025
6e348f8
Update haystack/components/preprocessors/markdown_header_splitter.py
OGuggenbuehl Oct 16, 2025
5a4c74f
Merge branch 'feature/md-header-splitter' of https://github.com/OGugg…
OGuggenbuehl Oct 16, 2025
a77253c
remove split meta flattening
OGuggenbuehl Oct 16, 2025
489bffd
keep empty meta return consistent
OGuggenbuehl Oct 16, 2025
7ac9338
remove unneeded content is none check
OGuggenbuehl Oct 16, 2025
39c0c17
update tests to reflect empty meta dict for unsplit docs
OGuggenbuehl Oct 16, 2025
2881178
clean up total_page counts
OGuggenbuehl Oct 16, 2025
3fe2882
remove unneeded meta check
OGuggenbuehl Oct 16, 2025
78083d2
Update test/components/preprocessors/test_markdown_header_splitter.py
OGuggenbuehl Oct 16, 2025
29b92a6
implement keep_headers parameter
OGuggenbuehl Oct 17, 2025
18ffc54
remove meta-dict flattening
OGuggenbuehl Oct 17, 2025
abb2b34
Update test/components/preprocessors/test_markdown_header_splitter.py
OGuggenbuehl Oct 21, 2025
2174cc2
add minor sanity checks
OGuggenbuehl Oct 21, 2025
35fc3ab
Merge branch 'feature/md-header-splitter' of https://github.com/OGugg…
OGuggenbuehl Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 324 additions & 0 deletions haystack/components/preprocessors/markdown_header_splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import re
from typing import Literal, Optional

from haystack import Document, component, logging
from haystack.components.preprocessors import DocumentSplitter

logger = logging.getLogger(__name__)


@component
class MarkdownHeaderSplitter:
"""
Split documents at ATX-style Markdown headers (#), with optional secondary splitting.

This component processes text documents by:
- Splitting them into chunks at Markdown headers (e.g., '#', '##', etc.), preserving header hierarchy as metadata.
- Optionally applying a secondary split (by word, passage, period, or line) to each chunk
(using haystack's DocumentSplitter).
- Preserving and propagating metadata such as parent headers, page numbers, and split IDs.
"""

def __init__(
self,
*,
page_break_character: str = "\f",
keep_headers: bool = True,
secondary_split: Optional[Literal["word", "passage", "period", "line"]] = None,
split_length: int = 200,
split_overlap: int = 0,
split_threshold: int = 0,
skip_empty_documents: bool = True,
):
"""
Initialize the MarkdownHeaderSplitter.

:param page_break_character: Character used to identify page breaks. Defaults to form feed ("\f").
:param keep_headers: If True, headers are kept in the content. If False, headers are moved to metadata.
Defaults to True.
Comment on lines +41 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if keep_headers is True we don't store them in the metadata?

:param secondary_split: Optional secondary split condition after header splitting.
Options are None, "word", "passage", "period", "line". Defaults to None.
:param split_length: The maximum number of units in each split when using secondary splitting. Defaults to 200.
:param split_overlap: The number of overlapping units for each split when using secondary splitting.
Defaults to 0.
:param split_threshold: The minimum number of units per split when using secondary splitting. Defaults to 0.
:param skip_empty_documents: If True, skip documents with empty content. If False, process empty documents.
Defaults to True.
Comment on lines +49 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's reuse the docstring from DocumentSplitter

Suggested change
:param skip_empty_documents: If True, skip documents with empty content. If False, process empty documents.
Defaults to True.
:param skip_empty_documents: Choose whether to skip documents with empty content. Default is True.
Set to False when downstream components in the Pipeline (like LLMDocumentContentExtractor) can extract text
from non-textual documents.

"""
self.page_break_character = page_break_character
self.secondary_split = secondary_split
self.split_length = split_length
self.split_overlap = split_overlap
self.split_threshold = split_threshold
self.skip_empty_documents = skip_empty_documents
self.keep_headers = keep_headers
self._header_pattern = re.compile(r"(?m)^(#{1,6}) (.+)$") # ATX-style .md-headers

# initialize secondary_splitter only if needed
if self.secondary_split:
self.secondary_splitter = DocumentSplitter(
split_by=self.secondary_split,
split_length=self.split_length,
split_overlap=self.split_overlap,
split_threshold=self.split_threshold,
)
Comment on lines +63 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized we are missing a warm_up method for this component MarkdownHeaderSplitter which calls the warm_up of DocumentSplitter if secondary_split is provided.

This is needed in the case where the secondary_split of sentence is chosen.

So I'd add a warm up method like

    def warm_up(self):
        """
        Warm up the MarkdownHeaderSplitter
        """
        if self.secondary_splitter and not self._is_warmed_up:
            self.secondary_splitter.warm_up()
            self._is_warmed_up = True

then add self._is_warmed_up = False in the init method of MarkdownHeaderSplitter


def _split_text_by_markdown_headers(self, text: str, doc_id: str) -> list[dict]:
"""Split text by ATX-style headers (#) and create chunks with appropriate metadata."""
logger.debug("Splitting text by markdown headers")

# find headers
matches = list(re.finditer(self._header_pattern, text))

# return unsplit if no headers found
if not matches:
logger.info(
"No headers found in document {doc_id}; returning full document as single chunk.", doc_id=doc_id
)
return [{"content": text, "meta": {}}]

# process headers and build chunks
chunks: list[dict] = []
header_stack: list[Optional[str]] = [None] * 6
active_parents: list[str] = [] # track active parent headers
pending_headers: list[str] = [] # store empty headers to prepend to next content
has_content = False # flag to track if any header has content

for i, match in enumerate(matches):
# extract header info
header_prefix = match.group(1)
header_text = match.group(2).strip()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid using strip() here? This would remove any white space characters from the text we probably want to keep.

level = len(header_prefix)

# get content
start = match.end()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
content = text[start:end].strip()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here let's remove the .strip()


# update header stack to track nesting
header_stack[level - 1] = header_text
for j in range(level, 6):
header_stack[j] = None

# prepare header_line if keep_headers
header_line = f"{header_prefix} {header_text}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't use .strip() anywhere then we should be able to do

Suggested change
header_line = f"{header_prefix} {header_text}"
header_line = f"{header_prefix}{header_text}"

I believe

Also if this is only used if self.keep_headers is True we could move it's construction to inside that if statement.


# skip splits w/o content
if not content:
# add as parent for subsequent headers
active_parents = [h for h in header_stack[: level - 1] if h is not None]
active_parents.append(header_text)
if self.keep_headers:
pending_headers.append(header_line)
continue

has_content = True # at least one header has content
parent_headers = list(active_parents)

logger.debug(
"Creating chunk for header '{header_text}' at level {level}", header_text=header_text, level=level
)

if self.keep_headers:
# add pending & current header to content
chunk_content = ""
if pending_headers:
chunk_content += "\n".join(pending_headers) + "\n"
chunk_content += f"{header_line}\n{content}"
Comment on lines +129 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here we shouldn't just add in our own newline characters. If we avoid using strip earlier we should be able to do

Suggested change
if pending_headers:
chunk_content += "\n".join(pending_headers) + "\n"
chunk_content += f"{header_line}\n{content}"
if pending_headers:
chunk_content += "".join(pending_headers)
chunk_content += f"{header_line}{content}"

chunks.append(
{
"content": chunk_content,
"meta": {} if self.keep_headers else {"header": header_text, "parent_headers": parent_headers},
}
)
pending_headers = [] # reset pending headers
else:
chunks.append({"content": content, "meta": {"header": header_text, "parent_headers": parent_headers}})

# reset active parents
active_parents = [h for h in header_stack[: level - 1] if h is not None]

# return doc unchunked if no headers have content
if not has_content:
logger.info(
"Document {doc_id} contains only headers with no content; returning original document.", doc_id=doc_id
)
return [{"content": text, "meta": {}}]

return chunks

def _apply_secondary_splitting(self, documents: list[Document]) -> list[Document]:
"""
Apply secondary splitting while preserving header metadata and structure.

Ensures page counting is maintained across splits.
"""
result_docs = []

for doc in documents:
if doc.content is None:
result_docs.append(doc)
continue

content_for_splitting: str = doc.content

if not self.keep_headers: # skip header extraction if keep_headers
# extract header information
header_match = re.search(self._header_pattern, doc.content)
if header_match:
content_for_splitting = doc.content[header_match.end() :]

if not content_for_splitting or not content_for_splitting.strip(): # skip empty content
result_docs.append(doc)
continue

# track page from meta
current_page = doc.meta.get("page_number", 1)

secondary_splits = self.secondary_splitter.run(
documents=[Document(content=content_for_splitting, meta=doc.meta)]
)["documents"]

# split processing
for i, split in enumerate(secondary_splits):
# calculate page number for this split
if i > 0 and secondary_splits[i - 1].content:
current_page = self._update_page_number_with_breaks(secondary_splits[i - 1].content, current_page)

# set page number to meta
split.meta["page_number"] = current_page

# preserve header metadata if we're not keeping headers in content
if not self.keep_headers:
for key in ["header", "parent_headers"]:
if key in doc.meta:
split.meta[key] = doc.meta[key]

result_docs.append(split)

logger.debug(
"Secondary splitting complete. Final count: {final_count} documents.", final_count=len(result_docs)
)
return result_docs

def _update_page_number_with_breaks(self, content: str, current_page: int) -> int:
"""
Update page number based on page breaks in content.

:param content: Content to check for page breaks
:param current_page: Current page number
:return: New current page number
"""
if not isinstance(content, str):
return current_page

page_breaks = content.count(self.page_break_character)
new_page_number = current_page + page_breaks

if page_breaks > 0:
logger.debug(
"Found {page_breaks} page breaks, page number updated: {old} → {new}",
page_breaks=page_breaks,
old=current_page,
new=new_page_number,
)

return new_page_number

def _split_documents_by_markdown_headers(self, documents: list[Document]) -> list[Document]:
"""Split a list of documents by markdown headers, preserving metadata."""

result_docs = []
for doc in documents:
logger.debug("Splitting document with id={doc_id}", doc_id=doc.id)
# mypy: doc.content is Optional[str], so we must check for None before passing to splitting method
if doc.content is None:
continue
splits = self._split_text_by_markdown_headers(doc.content, doc.id)
docs = []

current_page = doc.meta.get("page_number", 1) if doc.meta else 1
total_pages = doc.content.count(self.page_break_character) + 1
logger.debug(
"Processing page number: {current_page} out of {total_pages}",
current_page=current_page,
total_pages=total_pages,
)
for split in splits:
meta = {}
if doc.meta:
meta = doc.meta.copy()
meta.update({"source_id": doc.id, "page_number": current_page})
if split.get("meta"):
meta.update(split["meta"])
current_page = self._update_page_number_with_breaks(split["content"], current_page)
docs.append(Document(content=split["content"], meta=meta))
logger.debug(
"Split into {num_docs} documents for id={doc_id}, final page: {current_page}",
num_docs=len(docs),
doc_id=doc.id,
current_page=current_page,
)
result_docs.extend(docs)
return result_docs

@component.output_types(documents=list[Document])
def run(self, documents: list[Document]) -> dict[str, list[Document]]:
"""
Run the markdown header splitter with optional secondary splitting.

:param documents: List of documents to split

:returns: A dictionary with the following key:
- `documents`: List of documents with the split texts. Each document includes:
- A metadata field `source_id` to track the original document.
- A metadata field `page_number` to track the original page number.
- A metadata field `split_id` to uniquely identify each split chunk.
- All other metadata copied from the original document.
"""
# validate input documents
for doc in documents:
if doc.content is None:
raise ValueError(
(
"MarkdownHeaderSplitter only works with text documents but content for document ID"
f" {doc.id} is None."
)
)
if not isinstance(doc.content, str):
raise ValueError("MarkdownHeaderSplitter only works with text documents (str content).")

processed_documents = []
for doc in documents:
# handle empty documents
if not doc.content or not doc.content.strip():
if self.skip_empty_documents:
logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id)
continue
# keep empty documents
processed_documents.append(doc)
logger.warning(
"Document ID {doc_id} has an empty content. Keeping this document as per configuration.",
doc_id=doc.id,
)
continue

processed_documents.append(doc)

if not processed_documents:
return {"documents": []}

header_split_docs = self._split_documents_by_markdown_headers(processed_documents)
Comment on lines +295 to +315
Copy link
Contributor

@sjrl sjrl Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could refactor this a bit to make the code overall simpler. E.g.

Suggested change
processed_documents = []
for doc in documents:
# handle empty documents
if not doc.content or not doc.content.strip():
if self.skip_empty_documents:
logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id)
continue
# keep empty documents
processed_documents.append(doc)
logger.warning(
"Document ID {doc_id} has an empty content. Keeping this document as per configuration.",
doc_id=doc.id,
)
continue
processed_documents.append(doc)
if not processed_documents:
return {"documents": []}
header_split_docs = self._split_documents_by_markdown_headers(processed_documents)
processed_documents = []
for doc in documents:
# handle empty documents
if not doc.content or not doc.content.strip():
if self.skip_empty_documents:
logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id)
continue
# keep empty documents
processed_documents.append(doc)
logger.warning(
"Document ID {doc_id} has an empty content. Keeping this document as per configuration.",
doc_id=doc.id,
)
continue
else:
header_split_docs = self._split_documents_by_markdown_headers([doc])
final_docs = self._apply_secondary_splitting(header_split_docs) if self.secondary_split else header_split_docs
processed_documents.extend(final_docs)

this way we know we will only run the splitting logic on non-empty documents. Which hopefully means we don't need to worry about handling documents with empty contents within the splitting logic (i.e. all checks for empty strings can be removed)


# secondary splitting if configured
final_docs = self._apply_secondary_splitting(header_split_docs) if self.secondary_split else header_split_docs

# assign split_id to all output documents
for idx, doc in enumerate(final_docs):
Copy link
Contributor

@sjrl sjrl Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct. The split_id should range from 0 to number of chunks from a single parent document.

From what I understand final_docs contains all chunks produced from all parent documents. And this code would assign the split_id from 0 to number of total chunks which would be incorrect.

Is that right or am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the split_ids would need to be added directly in self._split_documents_by_markdown_headers and then updated in self._apply_secondary_splitting (if secondary splitting is provided)

doc.meta["split_id"] = idx

return {"documents": final_docs}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
features:
- |
Introduced the `MarkdownHeaderSplitter` component:
- Splits documents into chunks at Markdown headers (`#`, `##`, etc.), preserving header hierarchy as metadata.
- Optionally infers and rewrites header levels for documents where header structure is ambiguous (e.g. documents parsed using Docling).
- Supports secondary splitting (by word, passage, period, or line) for further chunking after header-based splitting using Haystack's `DocumentSplitter`.
- Preserves and propagates metadata such as parent headers and page numbers.
- Handles edge cases such as documents with no headers, empty content, and non-text documents.
Loading
Loading