Files
ATOCore/tests/test_ingestion.py

171 lines
5.8 KiB
Python
Raw Normal View History

"""Tests for the ingestion pipeline."""
from atocore.ingestion.parser import parse_markdown
from atocore.models.database import get_connection, init_db
from atocore.ingestion.pipeline import ingest_file, ingest_folder
def test_parse_markdown(sample_markdown):
"""Test markdown parsing with frontmatter."""
parsed = parse_markdown(sample_markdown)
assert parsed.title == "AtoCore Architecture"
assert "atocore" in parsed.tags
assert "architecture" in parsed.tags
assert len(parsed.body) > 0
assert len(parsed.headings) > 0
def test_parse_extracts_headings(sample_markdown):
"""Test that headings are extracted correctly."""
parsed = parse_markdown(sample_markdown)
heading_texts = [h[1] for h in parsed.headings]
assert "AtoCore Architecture" in heading_texts
assert "Overview" in heading_texts
def test_ingest_file(tmp_data_dir, sample_markdown):
"""Test ingesting a single file."""
init_db()
result = ingest_file(sample_markdown)
assert result["status"] == "ingested"
assert result["chunks"] > 0
# Verify the file was stored in DB
with get_connection() as conn:
doc = conn.execute(
"SELECT COUNT(*) as c FROM source_documents WHERE file_path = ?",
(str(sample_markdown.resolve()),),
).fetchone()
assert doc["c"] == 1
chunks = conn.execute(
"SELECT COUNT(*) as c FROM source_chunks sc "
"JOIN source_documents sd ON sc.document_id = sd.id "
"WHERE sd.file_path = ?",
(str(sample_markdown.resolve()),),
).fetchone()
assert chunks["c"] > 0
def test_ingest_skips_unchanged(tmp_data_dir, sample_markdown):
"""Test that re-ingesting unchanged file is skipped."""
init_db()
ingest_file(sample_markdown)
result = ingest_file(sample_markdown)
assert result["status"] == "skipped"
def test_ingest_updates_changed(tmp_data_dir, sample_markdown):
"""Test that changed files are re-ingested."""
init_db()
ingest_file(sample_markdown)
# Modify the file
sample_markdown.write_text(
sample_markdown.read_text(encoding="utf-8") + "\n\n## New Section\n\nNew content added.",
encoding="utf-8",
)
result = ingest_file(sample_markdown)
assert result["status"] == "ingested"
def test_parse_markdown_uses_supplied_text(sample_markdown):
"""Parsing should be able to reuse pre-read content from ingestion."""
latin_text = """---\ntags: parser\n---\n# Parser Title\n\nBody text."""
parsed = parse_markdown(sample_markdown, text=latin_text)
assert parsed.title == "Parser Title"
assert "parser" in parsed.tags
def test_reingest_empty_replaces_stale_chunks(tmp_data_dir, sample_markdown, monkeypatch):
"""Re-ingesting a file with no chunks should clear stale DB/vector state."""
init_db()
class FakeVectorStore:
def __init__(self):
self.deleted_ids = []
def add(self, ids, documents, metadatas):
return None
def delete(self, ids):
self.deleted_ids.extend(ids)
fake_store = FakeVectorStore()
monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: fake_store)
first = ingest_file(sample_markdown)
assert first["status"] == "ingested"
sample_markdown.write_text("# Changed\n\nThis update should now produce no chunks after monkeypatching.", encoding="utf-8")
monkeypatch.setattr("atocore.ingestion.pipeline.chunk_markdown", lambda *args, **kwargs: [])
second = ingest_file(sample_markdown)
assert second["status"] == "empty"
with get_connection() as conn:
chunk_count = conn.execute("SELECT COUNT(*) AS c FROM source_chunks").fetchone()
assert chunk_count["c"] == 0
assert fake_store.deleted_ids
def test_ingest_folder_includes_markdown_extension(tmp_data_dir, sample_folder, monkeypatch):
"""Folder ingestion should include both .md and .markdown files."""
init_db()
markdown_file = sample_folder / "third_note.markdown"
markdown_file.write_text("# Third Note\n\nThis file should be discovered during folder ingestion.", encoding="utf-8")
class FakeVectorStore:
def add(self, ids, documents, metadatas):
return None
def delete(self, ids):
return None
@property
def count(self):
return 0
monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore())
results = ingest_folder(sample_folder)
files = {result["file"] for result in results if "file" in result}
assert str(markdown_file.resolve()) in files
def test_purge_deleted_files_does_not_match_sibling_prefix(tmp_data_dir, sample_folder, monkeypatch):
"""Purging one folder should not delete entries from a sibling folder with the same prefix."""
init_db()
class FakeVectorStore:
def add(self, ids, documents, metadatas):
return None
def delete(self, ids):
return None
@property
def count(self):
return 0
monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore())
kept_folder = tmp_data_dir / "notes"
kept_folder.mkdir()
kept_file = kept_folder / "keep.md"
kept_file.write_text("# Keep\n\nThis document should survive purge.", encoding="utf-8")
ingest_file(kept_file)
purge_folder = tmp_data_dir / "notes-project"
purge_folder.mkdir()
purge_file = purge_folder / "gone.md"
purge_file.write_text("# Gone\n\nThis document will be purged.", encoding="utf-8")
ingest_file(purge_file)
purge_file.unlink()
ingest_folder(purge_folder, purge_deleted=True)
with get_connection() as conn:
rows = conn.execute("SELECT file_path FROM source_documents").fetchall()
remaining_paths = {row["file_path"] for row in rows}
assert str(kept_file.resolve()) in remaining_paths