"""Tests for the ingestion pipeline.""" from atocore.ingestion.parser import parse_markdown from atocore.models.database import get_connection, init_db from atocore.ingestion.pipeline import ingest_file, ingest_folder def test_parse_markdown(sample_markdown): """Test markdown parsing with frontmatter.""" parsed = parse_markdown(sample_markdown) assert parsed.title == "AtoCore Architecture" assert "atocore" in parsed.tags assert "architecture" in parsed.tags assert len(parsed.body) > 0 assert len(parsed.headings) > 0 def test_parse_extracts_headings(sample_markdown): """Test that headings are extracted correctly.""" parsed = parse_markdown(sample_markdown) heading_texts = [h[1] for h in parsed.headings] assert "AtoCore Architecture" in heading_texts assert "Overview" in heading_texts def test_ingest_file(tmp_data_dir, sample_markdown): """Test ingesting a single file.""" init_db() result = ingest_file(sample_markdown) assert result["status"] == "ingested" assert result["chunks"] > 0 # Verify the file was stored in DB with get_connection() as conn: doc = conn.execute( "SELECT COUNT(*) as c FROM source_documents WHERE file_path = ?", (str(sample_markdown.resolve()),), ).fetchone() assert doc["c"] == 1 chunks = conn.execute( "SELECT COUNT(*) as c FROM source_chunks sc " "JOIN source_documents sd ON sc.document_id = sd.id " "WHERE sd.file_path = ?", (str(sample_markdown.resolve()),), ).fetchone() assert chunks["c"] > 0 def test_ingest_skips_unchanged(tmp_data_dir, sample_markdown): """Test that re-ingesting unchanged file is skipped.""" init_db() ingest_file(sample_markdown) result = ingest_file(sample_markdown) assert result["status"] == "skipped" def test_ingest_updates_changed(tmp_data_dir, sample_markdown): """Test that changed files are re-ingested.""" init_db() ingest_file(sample_markdown) # Modify the file sample_markdown.write_text( sample_markdown.read_text(encoding="utf-8") + "\n\n## New Section\n\nNew content added.", encoding="utf-8", ) result = ingest_file(sample_markdown) assert result["status"] == "ingested" def test_parse_markdown_uses_supplied_text(sample_markdown): """Parsing should be able to reuse pre-read content from ingestion.""" latin_text = """---\ntags: parser\n---\n# Parser Title\n\nBody text.""" parsed = parse_markdown(sample_markdown, text=latin_text) assert parsed.title == "Parser Title" assert "parser" in parsed.tags def test_reingest_empty_replaces_stale_chunks(tmp_data_dir, sample_markdown, monkeypatch): """Re-ingesting a file with no chunks should clear stale DB/vector state.""" init_db() class FakeVectorStore: def __init__(self): self.deleted_ids = [] def add(self, ids, documents, metadatas): return None def delete(self, ids): self.deleted_ids.extend(ids) fake_store = FakeVectorStore() monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: fake_store) first = ingest_file(sample_markdown) assert first["status"] == "ingested" sample_markdown.write_text("# Changed\n\nThis update should now produce no chunks after monkeypatching.", encoding="utf-8") monkeypatch.setattr("atocore.ingestion.pipeline.chunk_markdown", lambda *args, **kwargs: []) second = ingest_file(sample_markdown) assert second["status"] == "empty" with get_connection() as conn: chunk_count = conn.execute("SELECT COUNT(*) AS c FROM source_chunks").fetchone() assert chunk_count["c"] == 0 assert fake_store.deleted_ids def test_ingest_folder_includes_markdown_extension(tmp_data_dir, sample_folder, monkeypatch): """Folder ingestion should include both .md and .markdown files.""" init_db() markdown_file = sample_folder / "third_note.markdown" markdown_file.write_text("# Third Note\n\nThis file should be discovered during folder ingestion.", encoding="utf-8") class FakeVectorStore: def add(self, ids, documents, metadatas): return None def delete(self, ids): return None @property def count(self): return 0 monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore()) results = ingest_folder(sample_folder) files = {result["file"] for result in results if "file" in result} assert str(markdown_file.resolve()) in files def test_purge_deleted_files_does_not_match_sibling_prefix(tmp_data_dir, sample_folder, monkeypatch): """Purging one folder should not delete entries from a sibling folder with the same prefix.""" init_db() class FakeVectorStore: def add(self, ids, documents, metadatas): return None def delete(self, ids): return None @property def count(self): return 0 monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore()) kept_folder = tmp_data_dir / "notes" kept_folder.mkdir() kept_file = kept_folder / "keep.md" kept_file.write_text("# Keep\n\nThis document should survive purge.", encoding="utf-8") ingest_file(kept_file) purge_folder = tmp_data_dir / "notes-project" purge_folder.mkdir() purge_file = purge_folder / "gone.md" purge_file.write_text("# Gone\n\nThis document will be purged.", encoding="utf-8") ingest_file(purge_file) purge_file.unlink() ingest_folder(purge_folder, purge_deleted=True) with get_connection() as conn: rows = conn.execute("SELECT file_path FROM source_documents").fetchall() remaining_paths = {row["file_path"] for row in rows} assert str(kept_file.resolve()) in remaining_paths