71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
"""Tests for configured source registration and readiness."""
|
|
|
|
import atocore.config as config
|
|
from atocore.ingestion.pipeline import get_source_status, ingest_configured_sources
|
|
|
|
|
|
def test_get_source_status_reports_read_only_inputs(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(tmp_path / "vault"))
|
|
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(tmp_path / "drive"))
|
|
monkeypatch.setenv("ATOCORE_SOURCE_DRIVE_ENABLED", "false")
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
status = get_source_status()
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
assert status[0]["name"] == "vault"
|
|
assert status[0]["enabled"] is True
|
|
assert status[0]["read_only"] is True
|
|
assert status[0]["exists"] is False
|
|
assert status[1]["name"] == "drive"
|
|
assert status[1]["enabled"] is False
|
|
|
|
|
|
def test_ingest_configured_sources_reports_missing_and_disabled(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(tmp_path / "vault"))
|
|
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(tmp_path / "drive"))
|
|
monkeypatch.setenv("ATOCORE_SOURCE_DRIVE_ENABLED", "false")
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
results = ingest_configured_sources()
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
assert results[0]["source"] == "vault"
|
|
assert results[0]["status"] == "missing"
|
|
assert results[1]["source"] == "drive"
|
|
assert results[1]["status"] == "disabled"
|
|
|
|
|
|
def test_ingest_configured_sources_uses_ingest_folder(tmp_path, monkeypatch):
|
|
vault_dir = tmp_path / "vault"
|
|
drive_dir = tmp_path / "drive"
|
|
vault_dir.mkdir()
|
|
drive_dir.mkdir()
|
|
monkeypatch.setenv("ATOCORE_VAULT_SOURCE_DIR", str(vault_dir))
|
|
monkeypatch.setenv("ATOCORE_DRIVE_SOURCE_DIR", str(drive_dir))
|
|
|
|
calls = []
|
|
|
|
def fake_ingest_folder(path, purge_deleted=True):
|
|
calls.append((str(path), purge_deleted))
|
|
return [{"file": str(path / "note.md"), "status": "ingested"}]
|
|
|
|
original_settings = config.settings
|
|
try:
|
|
config.settings = config.Settings()
|
|
monkeypatch.setattr("atocore.ingestion.pipeline.ingest_folder", fake_ingest_folder)
|
|
results = ingest_configured_sources()
|
|
finally:
|
|
config.settings = original_settings
|
|
|
|
assert len(calls) == 2
|
|
assert all(purge_deleted is False for _, purge_deleted in calls)
|
|
assert results[0]["status"] == "ingested"
|
|
assert "results" in results[0]
|