"""Tests for the ingestion pipeline.""" import json from atocore.ingestion.parser import parse_markdown from atocore.models.database import get_connection, init_db from atocore.ingestion.pipeline import ingest_file, ingest_folder, ingest_project_folder def test_parse_markdown(sample_markdown): """Test markdown parsing with frontmatter.""" parsed = parse_markdown(sample_markdown) assert parsed.title == "AtoCore Architecture" assert "atocore" in parsed.tags assert "architecture" in parsed.tags assert len(parsed.body) > 0 assert len(parsed.headings) > 0 def test_parse_extracts_headings(sample_markdown): """Test that headings are extracted correctly.""" parsed = parse_markdown(sample_markdown) heading_texts = [h[1] for h in parsed.headings] assert "AtoCore Architecture" in heading_texts assert "Overview" in heading_texts def test_ingest_file(tmp_data_dir, sample_markdown): """Test ingesting a single file.""" init_db() result = ingest_file(sample_markdown) assert result["status"] == "ingested" assert result["chunks"] > 0 # Verify the file was stored in DB with get_connection() as conn: doc = conn.execute( "SELECT COUNT(*) as c FROM source_documents WHERE file_path = ?", (str(sample_markdown.resolve()),), ).fetchone() assert doc["c"] == 1 chunks = conn.execute( "SELECT COUNT(*) as c FROM source_chunks sc " "JOIN source_documents sd ON sc.document_id = sd.id " "WHERE sd.file_path = ?", (str(sample_markdown.resolve()),), ).fetchone() assert chunks["c"] > 0 def test_ingest_skips_unchanged(tmp_data_dir, sample_markdown): """Test that re-ingesting unchanged file is skipped.""" init_db() ingest_file(sample_markdown) result = ingest_file(sample_markdown) assert result["status"] == "skipped" def test_ingest_updates_changed(tmp_data_dir, sample_markdown): """Test that changed files are re-ingested.""" init_db() ingest_file(sample_markdown) # Modify the file sample_markdown.write_text( sample_markdown.read_text(encoding="utf-8") + "\n\n## New Section\n\nNew content added.", encoding="utf-8", ) result = ingest_file(sample_markdown) assert result["status"] == "ingested" def test_ingest_file_records_project_id_metadata(tmp_data_dir, sample_markdown, monkeypatch): """Project-aware ingestion should tag DB and vector metadata exactly.""" init_db() class FakeVectorStore: def __init__(self): self.metadatas = [] def add(self, ids, documents, metadatas): self.metadatas.extend(metadatas) def delete(self, ids): return None fake_store = FakeVectorStore() monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: fake_store) result = ingest_file(sample_markdown, project_id="p04-gigabit") assert result["status"] == "ingested" assert fake_store.metadatas assert all(meta["project_id"] == "p04-gigabit" for meta in fake_store.metadatas) with get_connection() as conn: rows = conn.execute("SELECT metadata FROM source_chunks").fetchall() assert rows assert all( json.loads(row["metadata"])["project_id"] == "p04-gigabit" for row in rows ) def test_ingest_file_derives_project_id_from_registry_root(tmp_data_dir, tmp_path, monkeypatch): """Unscoped ingest should preserve ownership for files under registered roots.""" import atocore.config as config vault_dir = tmp_path / "vault" drive_dir = tmp_path / "drive" config_dir = tmp_path / "config" project_dir = vault_dir / "incoming" / "projects" / "p04-gigabit" project_dir.mkdir(parents=True) drive_dir.mkdir() config_dir.mkdir() note = project_dir / "status.md" note.write_text( "# Status\n\nCurrent project status with enough detail to create " "a retrievable chunk for the ingestion pipeline test.", encoding="utf-8", ) registry_path = config_dir / "project-registry.json" registry_path.write_text( json.dumps( { "projects": [ { "id": "p04-gigabit", "aliases": ["p04"], "ingest_roots": [ {"source": "vault", "subpath": "incoming/projects/p04-gigabit"} ], } ] } ), encoding="utf-8", ) class FakeVectorStore: def __init__(self): self.metadatas = [] def add(self, ids, documents, metadatas): self.metadatas.extend(metadatas) def delete(self, ids): return None fake_store = FakeVectorStore() monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(vault_dir)) monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(drive_dir)) monkeypatch.setenv("ATOCORE_PROJECT_REGISTRY_PATH", str(registry_path)) config.settings = config.Settings() monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: fake_store) init_db() result = ingest_file(note) assert result["status"] == "ingested" assert fake_store.metadatas assert all(meta["project_id"] == "p04-gigabit" for meta in fake_store.metadatas) def test_ingest_file_logs_and_fails_open_when_project_derivation_fails( tmp_data_dir, sample_markdown, monkeypatch, ): """A broken registry should be visible but should not block ingestion.""" init_db() warnings = [] class FakeVectorStore: def __init__(self): self.metadatas = [] def add(self, ids, documents, metadatas): self.metadatas.extend(metadatas) def delete(self, ids): return None fake_store = FakeVectorStore() monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: fake_store) monkeypatch.setattr( "atocore.projects.registry.derive_project_id_for_path", lambda path: (_ for _ in ()).throw(ValueError("registry broken")), ) monkeypatch.setattr( "atocore.ingestion.pipeline.log.warning", lambda event, **kwargs: warnings.append((event, kwargs)), ) result = ingest_file(sample_markdown) assert result["status"] == "ingested" assert fake_store.metadatas assert all(meta["project_id"] == "" for meta in fake_store.metadatas) assert warnings[0][0] == "project_id_derivation_failed" assert "registry broken" in warnings[0][1]["error"] def test_ingest_project_folder_passes_project_id_to_files(tmp_data_dir, sample_folder, monkeypatch): seen = [] def fake_ingest_file(path, project_id=""): seen.append((path.name, project_id)) return {"file": str(path), "status": "ingested"} monkeypatch.setattr("atocore.ingestion.pipeline.ingest_file", fake_ingest_file) monkeypatch.setattr("atocore.ingestion.pipeline._purge_deleted_files", lambda *args, **kwargs: 0) ingest_project_folder(sample_folder, project_id="p05-interferometer") assert seen assert {project_id for _, project_id in seen} == {"p05-interferometer"} def test_parse_markdown_uses_supplied_text(sample_markdown): """Parsing should be able to reuse pre-read content from ingestion.""" latin_text = """---\ntags: parser\n---\n# Parser Title\n\nBody text.""" parsed = parse_markdown(sample_markdown, text=latin_text) assert parsed.title == "Parser Title" assert "parser" in parsed.tags def test_reingest_empty_replaces_stale_chunks(tmp_data_dir, sample_markdown, monkeypatch): """Re-ingesting a file with no chunks should clear stale DB/vector state.""" init_db() class FakeVectorStore: def __init__(self): self.deleted_ids = [] def add(self, ids, documents, metadatas): return None def delete(self, ids): self.deleted_ids.extend(ids) fake_store = FakeVectorStore() monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: fake_store) first = ingest_file(sample_markdown) assert first["status"] == "ingested" sample_markdown.write_text("# Changed\n\nThis update should now produce no chunks after monkeypatching.", encoding="utf-8") monkeypatch.setattr("atocore.ingestion.pipeline.chunk_markdown", lambda *args, **kwargs: []) second = ingest_file(sample_markdown) assert second["status"] == "empty" with get_connection() as conn: chunk_count = conn.execute("SELECT COUNT(*) AS c FROM source_chunks").fetchone() assert chunk_count["c"] == 0 assert fake_store.deleted_ids def test_ingest_folder_includes_markdown_extension(tmp_data_dir, sample_folder, monkeypatch): """Folder ingestion should include both .md and .markdown files.""" init_db() markdown_file = sample_folder / "third_note.markdown" markdown_file.write_text("# Third Note\n\nThis file should be discovered during folder ingestion.", encoding="utf-8") class FakeVectorStore: def add(self, ids, documents, metadatas): return None def delete(self, ids): return None @property def count(self): return 0 monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore()) results = ingest_folder(sample_folder) files = {result["file"] for result in results if "file" in result} assert str(markdown_file.resolve()) in files def test_purge_deleted_files_does_not_match_sibling_prefix(tmp_data_dir, sample_folder, monkeypatch): """Purging one folder should not delete entries from a sibling folder with the same prefix.""" init_db() class FakeVectorStore: def add(self, ids, documents, metadatas): return None def delete(self, ids): return None @property def count(self): return 0 monkeypatch.setattr("atocore.ingestion.pipeline.get_vector_store", lambda: FakeVectorStore()) kept_folder = tmp_data_dir / "notes" kept_folder.mkdir() kept_file = kept_folder / "keep.md" kept_file.write_text("# Keep\n\nThis document should survive purge.", encoding="utf-8") ingest_file(kept_file) purge_folder = tmp_data_dir / "notes-project" purge_folder.mkdir() purge_file = purge_folder / "gone.md" purge_file.write_text("# Gone\n\nThis document will be purged.", encoding="utf-8") ingest_file(purge_file) purge_file.unlink() ingest_folder(purge_folder, purge_deleted=True) with get_connection() as conn: rows = conn.execute("SELECT file_path FROM source_documents").fetchall() remaining_paths = {row["file_path"] for row in rows} assert str(kept_file.resolve()) in remaining_paths