171 lines
5.8 KiB
Python
171 lines
5.8 KiB
Python
"""Tests for the ingestion pipeline."""
|
|
|
|
from atocore.ingestion.parser import parse_markdown
|
|
from atocore.models.database import get_connection, init_db
|
|
from atocore.ingestion.pipeline import ingest_file, ingest_folder
|
|
|
|
|
|
def test_parse_markdown(sample_markdown):
|
|
"""Test markdown parsing with frontmatter."""
|
|
parsed = parse_markdown(sample_markdown)
|
|
assert parsed.title == "AtoCore Architecture"
|
|
assert "atocore" in parsed.tags
|
|
assert "architecture" in parsed.tags
|
|
assert len(parsed.body) > 0
|
|
assert len(parsed.headings) > 0
|
|
|
|
|
|
def test_parse_extracts_headings(sample_markdown):
|
|
"""Test that headings are extracted correctly."""
|
|
parsed = parse_markdown(sample_markdown)
|
|
heading_texts = [h[1] for h in parsed.headings]
|
|
assert "AtoCore Architecture" in heading_texts
|
|
assert "Overview" in heading_texts
|
|
|
|
|
|
def test_ingest_file(tmp_data_dir, sample_markdown):
|
|
"""Test ingesting a single file."""
|
|
init_db()
|
|
result = ingest_file(sample_markdown)
|
|
assert result["status"] == "ingested"
|
|
assert result["chunks"] > 0
|
|
|
|
# Verify the file was stored in DB
|
|
with get_connection() as conn:
|
|
doc = conn.execute(
|
|
"SELECT COUNT(*) as c FROM source_documents WHERE file_path = ?",
|
|
(str(sample_markdown.resolve()),),
|
|
).fetchone()
|
|
assert doc["c"] == 1
|
|
|
|
chunks = conn.execute(
|
|
"SELECT COUNT(*) as c FROM source_chunks sc "
|
|
"JOIN source_documents sd ON sc.document_id = sd.id "
|
|
"WHERE sd.file_path = ?",
|
|
(str(sample_markdown.resolve()),),
|
|
).fetchone()
|
|
assert chunks["c"] > 0
|
|
|
|
|
|
def test_ingest_skips_unchanged(tmp_data_dir, sample_markdown):
|
|
"""Test that re-ingesting unchanged file is skipped."""
|
|
init_db()
|
|
ingest_file(sample_markdown)
|
|
result = ingest_file(sample_markdown)
|
|
assert result["status"] == "skipped"
|
|
|
|
|
|
def test_ingest_updates_changed(tmp_data_dir, sample_markdown):
|
|
"""Test that changed files are re-ingested."""
|
|
init_db()
|
|
ingest_file(sample_markdown)
|
|
|
|
# Modify the file
|
|
sample_markdown.write_text(
|
|
sample_markdown.read_text(encoding="utf-8") + "\n\n## New Section\n\nNew content added.",
|
|
encoding="utf-8",
|
|
)
|
|
result = ingest_file(sample_markdown)
|
|
assert result["status"] == "ingested"
|
|
|
|
|
|
def test_parse_markdown_uses_supplied_text(sample_markdown):
|
|
"""Parsing should be able to reuse pre-read content from ingestion."""
|
|
latin_text = """---\ntags: parser\n---\n# Parser Title\n\nBody text."""
|
|
parsed = parse_markdown(sample_markdown, text=latin_text)
|
|
assert parsed.title == "Parser Title"
|
|
assert "parser" in parsed.tags
|
|
|
|
|
|
def test_reingest_empty_replaces_stale_chunks(tmp_data_dir, sample_markdown, monkeypatch):
|
|
"""Re-ingesting a file with no chunks should clear stale DB/vector state."""
|
|
init_db()
|
|
|
|
class FakeVectorStore:
|
|
def __init__(self):
|
|
self.deleted_ids = []
|
|
|
|
def add(self, ids, documents, metadatas):
|
|
return None
|
|
|
|
def delete(self, ids):
|
|
self.deleted_ids.extend(ids)
|
|
|
|
fake_store = FakeVectorStore()
|
|
monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: fake_store)
|
|
|
|
first = ingest_file(sample_markdown)
|
|
assert first["status"] == "ingested"
|
|
|
|
sample_markdown.write_text("# Changed\n\nThis update should now produce no chunks after monkeypatching.", encoding="utf-8")
|
|
monkeypatch.setattr("atocore.ingestion.pipeline.chunk_markdown", lambda *args, **kwargs: [])
|
|
second = ingest_file(sample_markdown)
|
|
assert second["status"] == "empty"
|
|
|
|
with get_connection() as conn:
|
|
chunk_count = conn.execute("SELECT COUNT(*) AS c FROM source_chunks").fetchone()
|
|
assert chunk_count["c"] == 0
|
|
|
|
assert fake_store.deleted_ids
|
|
|
|
|
|
def test_ingest_folder_includes_markdown_extension(tmp_data_dir, sample_folder, monkeypatch):
|
|
"""Folder ingestion should include both .md and .markdown files."""
|
|
init_db()
|
|
markdown_file = sample_folder / "third_note.markdown"
|
|
markdown_file.write_text("# Third Note\n\nThis file should be discovered during folder ingestion.", encoding="utf-8")
|
|
|
|
class FakeVectorStore:
|
|
def add(self, ids, documents, metadatas):
|
|
return None
|
|
|
|
def delete(self, ids):
|
|
return None
|
|
|
|
@property
|
|
def count(self):
|
|
return 0
|
|
|
|
monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore())
|
|
results = ingest_folder(sample_folder)
|
|
files = {result["file"] for result in results if "file" in result}
|
|
assert str(markdown_file.resolve()) in files
|
|
|
|
|
|
def test_purge_deleted_files_does_not_match_sibling_prefix(tmp_data_dir, sample_folder, monkeypatch):
|
|
"""Purging one folder should not delete entries from a sibling folder with the same prefix."""
|
|
init_db()
|
|
|
|
class FakeVectorStore:
|
|
def add(self, ids, documents, metadatas):
|
|
return None
|
|
|
|
def delete(self, ids):
|
|
return None
|
|
|
|
@property
|
|
def count(self):
|
|
return 0
|
|
|
|
monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore())
|
|
|
|
kept_folder = tmp_data_dir / "notes"
|
|
kept_folder.mkdir()
|
|
kept_file = kept_folder / "keep.md"
|
|
kept_file.write_text("# Keep\n\nThis document should survive purge.", encoding="utf-8")
|
|
ingest_file(kept_file)
|
|
|
|
purge_folder = tmp_data_dir / "notes-project"
|
|
purge_folder.mkdir()
|
|
purge_file = purge_folder / "gone.md"
|
|
purge_file.write_text("# Gone\n\nThis document will be purged.", encoding="utf-8")
|
|
ingest_file(purge_file)
|
|
purge_file.unlink()
|
|
|
|
ingest_folder(purge_folder, purge_deleted=True)
|
|
|
|
with get_connection() as conn:
|
|
rows = conn.execute("SELECT file_path FROM source_documents").fetchall()
|
|
remaining_paths = {row["file_path"] for row in rows}
|
|
assert str(kept_file.resolve()) in remaining_paths
|