Files
ATOCore/tests/test_backfill_chunk_project_ids.py

155 lines
5.4 KiB
Python
Raw Normal View History

"""Tests for explicit chunk project_id metadata backfill."""
import json
import atocore.config as config
from atocore.models.database import get_connection, init_db
from scripts import backfill_chunk_project_ids as backfill
def _write_registry(tmp_path, monkeypatch):
vault_dir = tmp_path / "vault"
drive_dir = tmp_path / "drive"
config_dir = tmp_path / "config"
project_dir = vault_dir / "incoming" / "projects" / "p04-gigabit"
project_dir.mkdir(parents=True)
drive_dir.mkdir()
config_dir.mkdir()
registry_path = config_dir / "project-registry.json"
registry_path.write_text(
json.dumps(
{
"projects": [
{
"id": "p04-gigabit",
"aliases": ["p04"],
"ingest_roots": [
{"source": "vault", "subpath": "incoming/projects/p04-gigabit"}
],
}
]
}
),
encoding="utf-8",
)
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(vault_dir))
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(drive_dir))
monkeypatch.setenv("ATOCORE_PROJECT_REGISTRY_PATH", str(registry_path))
config.settings = config.Settings()
return project_dir
def _insert_chunk(file_path, metadata=None, chunk_id="chunk-1"):
with get_connection() as conn:
conn.execute(
"""
INSERT INTO source_documents (id, file_path, file_hash, title, doc_type, tags)
VALUES (?, ?, ?, ?, ?, ?)
""",
("doc-1", str(file_path), "hash", "Title", "markdown", "[]"),
)
conn.execute(
"""
INSERT INTO source_chunks
(id, document_id, chunk_index, content, heading_path, char_count, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
chunk_id,
"doc-1",
0,
"content",
"Overview",
7,
json.dumps(metadata if metadata is not None else {}),
),
)
class FakeVectorStore:
def __init__(self, metadatas):
self.metadatas = dict(metadatas)
self.updated = []
def get_metadatas(self, ids):
returned_ids = [chunk_id for chunk_id in ids if chunk_id in self.metadatas]
return {
"ids": returned_ids,
"metadatas": [self.metadatas[chunk_id] for chunk_id in returned_ids],
}
def update_metadatas(self, ids, metadatas):
self.updated.append((list(ids), list(metadatas)))
for chunk_id, metadata in zip(ids, metadatas, strict=True):
self.metadatas[chunk_id] = metadata
def test_backfill_dry_run_is_non_mutating(tmp_data_dir, tmp_path, monkeypatch):
init_db()
project_dir = _write_registry(tmp_path, monkeypatch)
_insert_chunk(project_dir / "status.md")
result = backfill.backfill(apply=False)
assert result["updates"] == 1
with get_connection() as conn:
row = conn.execute("SELECT metadata FROM source_chunks WHERE id = ?", ("chunk-1",)).fetchone()
assert json.loads(row["metadata"]) == {}
def test_backfill_apply_updates_chroma_then_sql(tmp_data_dir, tmp_path, monkeypatch):
init_db()
project_dir = _write_registry(tmp_path, monkeypatch)
_insert_chunk(project_dir / "status.md", metadata={"source_file": "status.md"})
fake_store = FakeVectorStore({"chunk-1": {"source_file": "status.md"}})
monkeypatch.setattr(backfill, "get_vector_store", lambda: fake_store)
result = backfill.backfill(apply=True, require_chroma_snapshot=True)
assert result["applied_updates"] == 1
assert fake_store.metadatas["chunk-1"]["project_id"] == "p04-gigabit"
with get_connection() as conn:
row = conn.execute("SELECT metadata FROM source_chunks WHERE id = ?", ("chunk-1",)).fetchone()
assert json.loads(row["metadata"])["project_id"] == "p04-gigabit"
def test_backfill_apply_requires_snapshot_confirmation(tmp_data_dir, tmp_path, monkeypatch):
init_db()
project_dir = _write_registry(tmp_path, monkeypatch)
_insert_chunk(project_dir / "status.md")
try:
backfill.backfill(apply=True)
except ValueError as exc:
assert "Chroma backup" in str(exc)
else:
raise AssertionError("Expected snapshot confirmation requirement")
def test_backfill_missing_vector_skips_sql_update(tmp_data_dir, tmp_path, monkeypatch):
init_db()
project_dir = _write_registry(tmp_path, monkeypatch)
_insert_chunk(project_dir / "status.md")
fake_store = FakeVectorStore({})
monkeypatch.setattr(backfill, "get_vector_store", lambda: fake_store)
result = backfill.backfill(apply=True, require_chroma_snapshot=True)
assert result["updates"] == 1
assert result["applied_updates"] == 0
assert result["missing_vectors"] == 1
with get_connection() as conn:
row = conn.execute("SELECT metadata FROM source_chunks WHERE id = ?", ("chunk-1",)).fetchone()
assert json.loads(row["metadata"]) == {}
def test_backfill_skips_malformed_metadata(tmp_data_dir, tmp_path, monkeypatch):
init_db()
project_dir = _write_registry(tmp_path, monkeypatch)
_insert_chunk(project_dir / "status.md", metadata=[])
result = backfill.backfill(apply=False)
assert result["updates"] == 0
assert result["malformed_metadata"] == 1