feat: tunable ranking, refresh status, chroma backup + admin endpoints

Three small improvements that move the operational baseline forward
without changing the existing trust model.

1. Tunable retrieval ranking weights
   - rank_project_match_boost, rank_query_token_step,
     rank_query_token_cap, rank_path_high_signal_boost,
     rank_path_low_signal_penalty are now Settings fields
   - all overridable via ATOCORE_* env vars
   - retriever no longer hard-codes 2.0 / 1.18 / 0.72 / 0.08 / 1.32
   - lets ranking be tuned per environment as Wave 1 is exercised
     without code changes

2. /projects/{name}/refresh status
   - refresh_registered_project now returns an overall status field
     ("ingested", "partial", "nothing_to_ingest") plus roots_ingested
     and roots_skipped counters
   - ProjectRefreshResponse advertises the new fields so callers can
     rely on them
   - covers the case where every configured root is missing on disk

3. Chroma cold snapshot + admin backup endpoints
   - create_runtime_backup now accepts include_chroma and writes a
     cold directory copy of the chroma persistence path
   - new list_runtime_backups() and validate_backup() helpers
   - new endpoints:
     - POST /admin/backup            create snapshot (optional chroma)
     - GET  /admin/backup            list snapshots
     - GET  /admin/backup/{stamp}/validate  structural validation
   - chroma snapshots are taken under exclusive_ingestion() so a refresh
     or ingest cannot race with the cold copy
   - backup metadata records what was actually included and how big

Tests:
- 8 new tests covering tunable weights, refresh status branches
  (ingested / partial / nothing_to_ingest), chroma snapshot, list,
  validate, and the API endpoints (including the lock-acquisition path)
- existing fake refresh stubs in test_api_storage.py updated for the
  expanded ProjectRefreshResponse model
- full suite: 105 passing (was 97)

next-steps doc updated to reflect that the chroma snapshot + restore
validation gap from current-state.md is now closed in code; only the
operational retention policy remains.
This commit is contained in:
2026-04-06 18:42:19 -04:00
parent 14ab7c8e9f
commit c9b9eede25
10 changed files with 615 additions and 13 deletions

View File

@@ -54,6 +54,9 @@ This working list should be read alongside:
- exercise the new SQLite + registry snapshot path on Dalidou
- Chroma backup or rebuild policy
- retention and restore validation
- admin backup endpoint now supports `include_chroma` cold snapshot
under the ingestion lock and `validate` confirms each snapshot is
openable; remaining work is the operational retention policy
8. Keep deeper automatic runtime integration modest until the organic read-only
model has proven value

View File

@@ -34,6 +34,11 @@ from atocore.memory.service import (
update_memory,
)
from atocore.observability.logger import get_logger
from atocore.ops.backup import (
create_runtime_backup,
list_runtime_backups,
validate_backup,
)
from atocore.projects.registry import (
build_project_registration_proposal,
get_project_registry_template,
@@ -69,6 +74,9 @@ class ProjectRefreshResponse(BaseModel):
aliases: list[str]
description: str
purge_deleted: bool
status: str
roots_ingested: int
roots_skipped: int
roots: list[dict]
@@ -438,6 +446,49 @@ def api_invalidate_project_state(req: ProjectStateInvalidateRequest) -> dict:
return {"status": "invalidated", "project": req.project, "category": req.category, "key": req.key}
class BackupCreateRequest(BaseModel):
include_chroma: bool = False
@router.post("/admin/backup")
def api_create_backup(req: BackupCreateRequest | None = None) -> dict:
"""Create a runtime backup snapshot.
When ``include_chroma`` is true the call holds the ingestion lock so a
safe cold copy of the vector store can be taken without racing against
refresh or ingest endpoints.
"""
payload = req or BackupCreateRequest()
try:
if payload.include_chroma:
with exclusive_ingestion():
metadata = create_runtime_backup(include_chroma=True)
else:
metadata = create_runtime_backup(include_chroma=False)
except Exception as e:
log.error("admin_backup_failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Backup failed: {e}")
return metadata
@router.get("/admin/backup")
def api_list_backups() -> dict:
"""List all runtime backups under the configured backup directory."""
return {
"backup_dir": str(_config.settings.resolved_backup_dir),
"backups": list_runtime_backups(),
}
@router.get("/admin/backup/{stamp}/validate")
def api_validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable."""
result = validate_backup(stamp)
if not result.get("exists", False):
raise HTTPException(status_code=404, detail=f"Backup not found: {stamp}")
return result
@router.get("/health")
def api_health() -> dict:
"""Health check."""

View File

@@ -40,6 +40,15 @@ class Settings(BaseSettings):
context_budget: int = 3000
context_top_k: int = 15
# Retrieval ranking weights (tunable per environment).
# All multipliers default to the values used since Wave 1; tighten or
# loosen them via ATOCORE_* env vars without touching code.
rank_project_match_boost: float = 2.0
rank_query_token_step: float = 0.08
rank_query_token_cap: float = 1.32
rank_path_high_signal_boost: float = 1.18
rank_path_low_signal_penalty: float = 0.72
model_config = {"env_prefix": "ATOCORE_"}
@property

View File

@@ -1,8 +1,24 @@
"""Create safe runtime backups for the AtoCore machine store."""
"""Create safe runtime backups for the AtoCore machine store.
This module is intentionally conservative:
- The SQLite snapshot uses the online ``conn.backup()`` API and is safe to
call while the database is in use.
- The project registry snapshot is a simple file copy of the canonical
registry JSON.
- The Chroma snapshot is a *cold* directory copy. To stay safe it must be
taken while no ingestion is running. The recommended pattern from the API
layer is to acquire ``exclusive_ingestion()`` for the duration of the
backup so refreshes and ingestions cannot run concurrently with the copy.
The backup metadata file records what was actually included so restore
tooling does not have to guess.
"""
from __future__ import annotations
import json
import shutil
import sqlite3
from datetime import datetime, UTC
from pathlib import Path
@@ -14,8 +30,17 @@ from atocore.observability.logger import get_logger
log = get_logger("backup")
def create_runtime_backup(timestamp: datetime | None = None) -> dict:
"""Create a hot backup of the SQLite DB plus registry/config metadata."""
def create_runtime_backup(
timestamp: datetime | None = None,
include_chroma: bool = False,
) -> dict:
"""Create a hot SQLite backup plus registry/config metadata.
When ``include_chroma`` is true the Chroma persistence directory is also
snapshotted as a cold directory copy. The caller is responsible for
ensuring no ingestion is running concurrently. The HTTP layer enforces
this by holding ``exclusive_ingestion()`` around the call.
"""
init_db()
now = timestamp or datetime.now(UTC)
stamp = now.strftime("%Y%m%dT%H%M%SZ")
@@ -23,6 +48,7 @@ def create_runtime_backup(timestamp: datetime | None = None) -> dict:
backup_root = _config.settings.resolved_backup_dir / "snapshots" / stamp
db_backup_dir = backup_root / "db"
config_backup_dir = backup_root / "config"
chroma_backup_dir = backup_root / "chroma"
metadata_path = backup_root / "backup-metadata.json"
db_backup_dir.mkdir(parents=True, exist_ok=True)
@@ -35,7 +61,26 @@ def create_runtime_backup(timestamp: datetime | None = None) -> dict:
registry_path = _config.settings.resolved_project_registry_path
if registry_path.exists():
registry_snapshot = config_backup_dir / registry_path.name
registry_snapshot.write_text(registry_path.read_text(encoding="utf-8"), encoding="utf-8")
registry_snapshot.write_text(
registry_path.read_text(encoding="utf-8"), encoding="utf-8"
)
chroma_snapshot_path = ""
chroma_files_copied = 0
chroma_bytes_copied = 0
if include_chroma:
source_chroma = _config.settings.chroma_path
if source_chroma.exists() and source_chroma.is_dir():
chroma_backup_dir.mkdir(parents=True, exist_ok=True)
chroma_files_copied, chroma_bytes_copied = _copy_directory_tree(
source_chroma, chroma_backup_dir
)
chroma_snapshot_path = str(chroma_backup_dir)
else:
log.info(
"chroma_snapshot_skipped_missing",
path=str(source_chroma),
)
metadata = {
"created_at": now.isoformat(),
@@ -43,14 +88,134 @@ def create_runtime_backup(timestamp: datetime | None = None) -> dict:
"db_snapshot_path": str(db_snapshot_path),
"db_size_bytes": db_snapshot_path.stat().st_size,
"registry_snapshot_path": str(registry_snapshot) if registry_snapshot else "",
"vector_store_note": "Chroma hot backup is not included in this script; use a cold snapshot or rebuild/export workflow.",
"chroma_snapshot_path": chroma_snapshot_path,
"chroma_snapshot_bytes": chroma_bytes_copied,
"chroma_snapshot_files": chroma_files_copied,
"chroma_snapshot_included": include_chroma,
"vector_store_note": (
"Chroma snapshot included as cold directory copy."
if include_chroma and chroma_snapshot_path
else "Chroma hot backup is not included; rerun with include_chroma=True under exclusive_ingestion()."
),
}
metadata_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")
metadata_path.write_text(
json.dumps(metadata, indent=2, ensure_ascii=True) + "\n",
encoding="utf-8",
)
log.info("runtime_backup_created", backup_root=str(backup_root), db_snapshot=str(db_snapshot_path))
log.info(
"runtime_backup_created",
backup_root=str(backup_root),
db_snapshot=str(db_snapshot_path),
chroma_included=include_chroma,
chroma_bytes=chroma_bytes_copied,
)
return metadata
def list_runtime_backups() -> list[dict]:
"""List all runtime backups under the configured backup directory."""
snapshots_root = _config.settings.resolved_backup_dir / "snapshots"
if not snapshots_root.exists() or not snapshots_root.is_dir():
return []
entries: list[dict] = []
for snapshot_dir in sorted(snapshots_root.iterdir()):
if not snapshot_dir.is_dir():
continue
metadata_path = snapshot_dir / "backup-metadata.json"
entry: dict = {
"stamp": snapshot_dir.name,
"path": str(snapshot_dir),
"has_metadata": metadata_path.exists(),
}
if metadata_path.exists():
try:
entry["metadata"] = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
entry["metadata"] = None
entry["metadata_error"] = "invalid_json"
entries.append(entry)
return entries
def validate_backup(stamp: str) -> dict:
"""Validate that a previously created backup is structurally usable.
Checks:
- the snapshot directory exists
- the SQLite snapshot is openable and ``PRAGMA integrity_check`` returns ok
- the registry snapshot, if recorded, parses as JSON
- the chroma snapshot directory, if recorded, exists
"""
snapshot_dir = _config.settings.resolved_backup_dir / "snapshots" / stamp
result: dict = {
"stamp": stamp,
"path": str(snapshot_dir),
"exists": snapshot_dir.exists(),
"db_ok": False,
"registry_ok": None,
"chroma_ok": None,
"errors": [],
}
if not snapshot_dir.exists():
result["errors"].append("snapshot_directory_missing")
return result
metadata_path = snapshot_dir / "backup-metadata.json"
if not metadata_path.exists():
result["errors"].append("metadata_missing")
return result
try:
metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
result["errors"].append(f"metadata_invalid_json: {exc}")
return result
result["metadata"] = metadata
db_path = Path(metadata.get("db_snapshot_path", ""))
if not db_path.exists():
result["errors"].append("db_snapshot_missing")
else:
try:
with sqlite3.connect(str(db_path)) as conn:
row = conn.execute("PRAGMA integrity_check").fetchone()
result["db_ok"] = bool(row and row[0] == "ok")
if not result["db_ok"]:
result["errors"].append(
f"db_integrity_check_failed: {row[0] if row else 'no_row'}"
)
except sqlite3.DatabaseError as exc:
result["errors"].append(f"db_open_failed: {exc}")
registry_snapshot_path = metadata.get("registry_snapshot_path", "")
if registry_snapshot_path:
registry_path = Path(registry_snapshot_path)
if not registry_path.exists():
result["registry_ok"] = False
result["errors"].append("registry_snapshot_missing")
else:
try:
json.loads(registry_path.read_text(encoding="utf-8"))
result["registry_ok"] = True
except json.JSONDecodeError as exc:
result["registry_ok"] = False
result["errors"].append(f"registry_invalid_json: {exc}")
chroma_snapshot_path = metadata.get("chroma_snapshot_path", "")
if chroma_snapshot_path:
chroma_dir = Path(chroma_snapshot_path)
if chroma_dir.exists() and chroma_dir.is_dir():
result["chroma_ok"] = True
else:
result["chroma_ok"] = False
result["errors"].append("chroma_snapshot_missing")
result["valid"] = not result["errors"]
return result
def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn = sqlite3.connect(str(source_path))
dest_conn = sqlite3.connect(str(dest_path))
@@ -61,6 +226,21 @@ def _backup_sqlite_db(source_path: Path, dest_path: Path) -> None:
source_conn.close()
def _copy_directory_tree(source: Path, dest: Path) -> tuple[int, int]:
"""Copy a directory tree and return (file_count, total_bytes)."""
if dest.exists():
shutil.rmtree(dest)
shutil.copytree(source, dest)
file_count = 0
total_bytes = 0
for path in dest.rglob("*"):
if path.is_file():
file_count += 1
total_bytes += path.stat().st_size
return file_count, total_bytes
def main() -> None:
result = create_runtime_backup()
print(json.dumps(result, indent=2, ensure_ascii=True))

View File

@@ -255,12 +255,23 @@ def get_registered_project(project_name: str) -> RegisteredProject | None:
def refresh_registered_project(project_name: str, purge_deleted: bool = False) -> dict:
"""Ingest all configured source roots for a registered project."""
"""Ingest all configured source roots for a registered project.
The returned dict carries an overall ``status`` so callers can tell at a
glance whether the refresh was fully successful, partial, or did nothing
at all because every configured root was missing or not a directory:
- ``ingested``: every root was a real directory and was ingested
- ``partial``: at least one root ingested and at least one was unusable
- ``nothing_to_ingest``: no roots were usable
"""
project = get_registered_project(project_name)
if project is None:
raise ValueError(f"Unknown project: {project_name}")
roots = []
ingested_count = 0
skipped_count = 0
for source_ref in project.ingest_roots:
resolved = _resolve_ingest_root(source_ref)
root_result = {
@@ -271,9 +282,11 @@ def refresh_registered_project(project_name: str, purge_deleted: bool = False) -
}
if not resolved.exists():
roots.append({**root_result, "status": "missing"})
skipped_count += 1
continue
if not resolved.is_dir():
roots.append({**root_result, "status": "not_directory"})
skipped_count += 1
continue
roots.append(
@@ -283,12 +296,23 @@ def refresh_registered_project(project_name: str, purge_deleted: bool = False) -
"results": ingest_folder(resolved, purge_deleted=purge_deleted),
}
)
ingested_count += 1
if ingested_count == 0:
overall_status = "nothing_to_ingest"
elif skipped_count == 0:
overall_status = "ingested"
else:
overall_status = "partial"
return {
"project": project.project_id,
"aliases": list(project.aliases),
"description": project.description,
"purge_deleted": purge_deleted,
"status": overall_status,
"roots_ingested": ingested_count,
"roots_skipped": skipped_count,
"roots": roots,
}

View File

@@ -173,7 +173,7 @@ def _project_match_boost(project_hint: str, metadata: dict) -> float:
for candidate in candidate_names:
if candidate and candidate in searchable:
return 2.0
return _config.settings.rank_project_match_boost
return 1.0
@@ -198,7 +198,10 @@ def _query_match_boost(query: str, metadata: dict) -> float:
matches = sum(1 for token in set(tokens) if token in searchable)
if matches <= 0:
return 1.0
return min(1.0 + matches * 0.08, 1.32)
return min(
1.0 + matches * _config.settings.rank_query_token_step,
_config.settings.rank_query_token_cap,
)
def _path_signal_boost(metadata: dict) -> float:
@@ -213,9 +216,9 @@ def _path_signal_boost(metadata: dict) -> float:
multiplier = 1.0
if any(hint in searchable for hint in _LOW_SIGNAL_HINTS):
multiplier *= 0.72
multiplier *= _config.settings.rank_path_low_signal_penalty
if any(hint in searchable for hint in _HIGH_SIGNAL_HINTS):
multiplier *= 1.18
multiplier *= _config.settings.rank_path_high_signal_boost
return multiplier

View File

@@ -129,6 +129,9 @@ def test_project_refresh_endpoint_uses_registered_roots(tmp_data_dir, monkeypatc
"aliases": ["p05"],
"description": "P05 docs",
"purge_deleted": purge_deleted,
"status": "ingested",
"roots_ingested": 1,
"roots_skipped": 0,
"roots": [
{
"source": "vault",
@@ -173,6 +176,9 @@ def test_project_refresh_endpoint_serializes_ingestion(tmp_data_dir, monkeypatch
"aliases": ["p05"],
"description": "P05 docs",
"purge_deleted": purge_deleted,
"status": "nothing_to_ingest",
"roots_ingested": 0,
"roots_skipped": 0,
"roots": [],
}
@@ -429,6 +435,125 @@ def test_project_update_endpoint_rejects_collisions(tmp_data_dir, monkeypatch):
assert "collisions" in response.json()["detail"]
def test_admin_backup_create_without_chroma(tmp_data_dir, monkeypatch):
config.settings = config.Settings()
captured = {}
def fake_create_runtime_backup(timestamp=None, include_chroma=False):
captured["include_chroma"] = include_chroma
return {
"created_at": "2026-04-06T23:00:00+00:00",
"backup_root": "/tmp/fake",
"db_snapshot_path": "/tmp/fake/db/atocore.db",
"db_size_bytes": 0,
"registry_snapshot_path": "",
"chroma_snapshot_path": "",
"chroma_snapshot_bytes": 0,
"chroma_snapshot_files": 0,
"chroma_snapshot_included": False,
"vector_store_note": "skipped",
}
monkeypatch.setattr("atocore.api.routes.create_runtime_backup", fake_create_runtime_backup)
client = TestClient(app)
response = client.post("/admin/backup", json={})
assert response.status_code == 200
assert captured == {"include_chroma": False}
body = response.json()
assert body["chroma_snapshot_included"] is False
def test_admin_backup_create_with_chroma_holds_lock(tmp_data_dir, monkeypatch):
config.settings = config.Settings()
events = []
@contextmanager
def fake_lock():
events.append("enter")
try:
yield
finally:
events.append("exit")
def fake_create_runtime_backup(timestamp=None, include_chroma=False):
events.append(("backup", include_chroma))
return {
"created_at": "2026-04-06T23:30:00+00:00",
"backup_root": "/tmp/fake",
"db_snapshot_path": "/tmp/fake/db/atocore.db",
"db_size_bytes": 0,
"registry_snapshot_path": "",
"chroma_snapshot_path": "/tmp/fake/chroma",
"chroma_snapshot_bytes": 4,
"chroma_snapshot_files": 1,
"chroma_snapshot_included": True,
"vector_store_note": "included",
}
monkeypatch.setattr("atocore.api.routes.exclusive_ingestion", fake_lock)
monkeypatch.setattr("atocore.api.routes.create_runtime_backup", fake_create_runtime_backup)
client = TestClient(app)
response = client.post("/admin/backup", json={"include_chroma": True})
assert response.status_code == 200
assert events == ["enter", ("backup", True), "exit"]
assert response.json()["chroma_snapshot_included"] is True
def test_admin_backup_list_and_validate_endpoints(tmp_data_dir, monkeypatch):
config.settings = config.Settings()
def fake_list_runtime_backups():
return [
{
"stamp": "20260406T220000Z",
"path": "/tmp/fake/snapshots/20260406T220000Z",
"has_metadata": True,
"metadata": {"db_snapshot_path": "/tmp/fake/snapshots/20260406T220000Z/db/atocore.db"},
}
]
def fake_validate_backup(stamp):
if stamp == "missing":
return {
"stamp": stamp,
"path": f"/tmp/fake/snapshots/{stamp}",
"exists": False,
"errors": ["snapshot_directory_missing"],
}
return {
"stamp": stamp,
"path": f"/tmp/fake/snapshots/{stamp}",
"exists": True,
"db_ok": True,
"registry_ok": True,
"chroma_ok": None,
"valid": True,
"errors": [],
}
monkeypatch.setattr("atocore.api.routes.list_runtime_backups", fake_list_runtime_backups)
monkeypatch.setattr("atocore.api.routes.validate_backup", fake_validate_backup)
client = TestClient(app)
listing = client.get("/admin/backup")
assert listing.status_code == 200
listing_body = listing.json()
assert "backup_dir" in listing_body
assert listing_body["backups"][0]["stamp"] == "20260406T220000Z"
valid = client.get("/admin/backup/20260406T220000Z/validate")
assert valid.status_code == 200
assert valid.json()["valid"] is True
missing = client.get("/admin/backup/missing/validate")
assert missing.status_code == 404
def test_query_endpoint_accepts_project_hint(monkeypatch):
def fake_retrieve(prompt, top_k=10, filter_tags=None, project_hint=None):
assert prompt == "architecture"

View File

@@ -6,7 +6,11 @@ from datetime import UTC, datetime
import atocore.config as config
from atocore.models.database import init_db
from atocore.ops.backup import create_runtime_backup
from atocore.ops.backup import (
create_runtime_backup,
list_runtime_backups,
validate_backup,
)
def test_create_runtime_backup_copies_db_and_registry(tmp_path, monkeypatch):
@@ -53,6 +57,89 @@ def test_create_runtime_backup_copies_db_and_registry(tmp_path, monkeypatch):
assert metadata["registry_snapshot_path"] == str(registry_snapshot)
def test_create_runtime_backup_includes_chroma_when_requested(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
# Create a fake chroma directory tree with a couple of files.
chroma_dir = config.settings.chroma_path
(chroma_dir / "collection-a").mkdir(parents=True, exist_ok=True)
(chroma_dir / "collection-a" / "data.bin").write_bytes(b"\x00\x01\x02\x03")
(chroma_dir / "metadata.json").write_text('{"ok":true}', encoding="utf-8")
result = create_runtime_backup(
datetime(2026, 4, 6, 20, 0, 0, tzinfo=UTC),
include_chroma=True,
)
finally:
config.settings = original_settings
chroma_snapshot_root = (
tmp_path / "backups" / "snapshots" / "20260406T200000Z" / "chroma"
)
assert result["chroma_snapshot_included"] is True
assert result["chroma_snapshot_path"] == str(chroma_snapshot_root)
assert result["chroma_snapshot_files"] >= 2
assert result["chroma_snapshot_bytes"] > 0
assert (chroma_snapshot_root / "collection-a" / "data.bin").exists()
assert (chroma_snapshot_root / "metadata.json").exists()
def test_list_and_validate_runtime_backups(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))
monkeypatch.setenv(
"ATOCORE_PROJECT_REGISTRY_PATH", str(tmp_path / "config" / "project-registry.json")
)
original_settings = config.settings
try:
config.settings = config.Settings()
init_db()
first = create_runtime_backup(datetime(2026, 4, 6, 21, 0, 0, tzinfo=UTC))
second = create_runtime_backup(datetime(2026, 4, 6, 22, 0, 0, tzinfo=UTC))
listing = list_runtime_backups()
first_validation = validate_backup("20260406T210000Z")
second_validation = validate_backup("20260406T220000Z")
missing_validation = validate_backup("20260101T000000Z")
finally:
config.settings = original_settings
assert len(listing) == 2
assert {entry["stamp"] for entry in listing} == {
"20260406T210000Z",
"20260406T220000Z",
}
for entry in listing:
assert entry["has_metadata"] is True
assert entry["metadata"]["db_snapshot_path"]
assert first_validation["valid"] is True
assert first_validation["db_ok"] is True
assert first_validation["errors"] == []
assert second_validation["valid"] is True
assert missing_validation["exists"] is False
assert "snapshot_directory_missing" in missing_validation["errors"]
# both metadata paths are reachable on disk
assert json.loads(
(tmp_path / "backups" / "snapshots" / "20260406T210000Z" / "backup-metadata.json")
.read_text(encoding="utf-8")
)["db_snapshot_path"] == first["db_snapshot_path"]
assert second["db_snapshot_path"].endswith("atocore.db")
def test_create_runtime_backup_handles_missing_registry(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_BACKUP_DIR", str(tmp_path / "backups"))

View File

@@ -44,6 +44,22 @@ def test_settings_keep_legacy_db_path_when_present(tmp_path, monkeypatch):
assert settings.db_path == legacy_db.resolve()
def test_ranking_weights_are_tunable_via_env(monkeypatch):
monkeypatch.setenv("ATOCORE_RANK_PROJECT_MATCH_BOOST", "3.5")
monkeypatch.setenv("ATOCORE_RANK_QUERY_TOKEN_STEP", "0.12")
monkeypatch.setenv("ATOCORE_RANK_QUERY_TOKEN_CAP", "1.5")
monkeypatch.setenv("ATOCORE_RANK_PATH_HIGH_SIGNAL_BOOST", "1.25")
monkeypatch.setenv("ATOCORE_RANK_PATH_LOW_SIGNAL_PENALTY", "0.5")
settings = config.Settings()
assert settings.rank_project_match_boost == 3.5
assert settings.rank_query_token_step == 0.12
assert settings.rank_query_token_cap == 1.5
assert settings.rank_path_high_signal_boost == 1.25
assert settings.rank_path_low_signal_penalty == 0.5
def test_ensure_runtime_dirs_creates_machine_dirs_only(tmp_path, monkeypatch):
monkeypatch.setenv("ATOCORE_DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(tmp_path / "vault-source"))

View File

@@ -154,6 +154,110 @@ def test_refresh_registered_project_ingests_registered_roots(tmp_path, monkeypat
assert calls[0][0].endswith("p06-polisher")
assert calls[0][1] is False
assert result["roots"][0]["status"] == "ingested"
assert result["status"] == "ingested"
assert result["roots_ingested"] == 1
assert result["roots_skipped"] == 0
def test_refresh_registered_project_reports_nothing_to_ingest_when_all_missing(
tmp_path, monkeypatch
):
vault_dir = tmp_path / "vault"
drive_dir = tmp_path / "drive"
config_dir = tmp_path / "config"
vault_dir.mkdir()
drive_dir.mkdir()
config_dir.mkdir()
registry_path = config_dir / "project-registry.json"
registry_path.write_text(
json.dumps(
{
"projects": [
{
"id": "p07-ghost",
"aliases": ["ghost"],
"description": "Project whose roots do not exist on disk",
"ingest_roots": [
{"source": "vault", "subpath": "incoming/projects/p07-ghost"}
],
}
]
}
),
encoding="utf-8",
)
def fail_ingest_folder(path, purge_deleted=True):
raise AssertionError(f"ingest_folder should not be called for missing root: {path}")
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(vault_dir))
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(drive_dir))
monkeypatch.setenv("ATOCORE_PROJECT_REGISTRY_PATH", str(registry_path))
original_settings = config.settings
try:
config.settings = config.Settings()
monkeypatch.setattr("atocore.projects.registry.ingest_folder", fail_ingest_folder)
result = refresh_registered_project("ghost")
finally:
config.settings = original_settings
assert result["status"] == "nothing_to_ingest"
assert result["roots_ingested"] == 0
assert result["roots_skipped"] == 1
assert result["roots"][0]["status"] == "missing"
def test_refresh_registered_project_reports_partial_status(tmp_path, monkeypatch):
vault_dir = tmp_path / "vault"
drive_dir = tmp_path / "drive"
config_dir = tmp_path / "config"
real_root = vault_dir / "incoming" / "projects" / "p08-mixed"
real_root.mkdir(parents=True)
drive_dir.mkdir()
config_dir.mkdir()
registry_path = config_dir / "project-registry.json"
registry_path.write_text(
json.dumps(
{
"projects": [
{
"id": "p08-mixed",
"aliases": ["mixed"],
"description": "One root present, one missing",
"ingest_roots": [
{"source": "vault", "subpath": "incoming/projects/p08-mixed"},
{"source": "vault", "subpath": "incoming/projects/p08-mixed-missing"},
],
}
]
}
),
encoding="utf-8",
)
def fake_ingest_folder(path, purge_deleted=True):
return [{"file": str(path / "README.md"), "status": "ingested"}]
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(vault_dir))
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(drive_dir))
monkeypatch.setenv("ATOCORE_PROJECT_REGISTRY_PATH", str(registry_path))
original_settings = config.settings
try:
config.settings = config.Settings()
monkeypatch.setattr("atocore.projects.registry.ingest_folder", fake_ingest_folder)
result = refresh_registered_project("mixed")
finally:
config.settings = original_settings
assert result["status"] == "partial"
assert result["roots_ingested"] == 1
assert result["roots_skipped"] == 1
statuses = sorted(root["status"] for root in result["roots"])
assert statuses == ["ingested", "missing"]
def test_project_registry_template_has_expected_shape():