Three additive upgrades borrowed from Karpathy's LLM Wiki pattern: 1. CONTRADICTION DETECTION: auto-triage now has a fourth verdict — "contradicts". When a candidate conflicts with an existing memory (not duplicates, genuine disagreement like "Option A selected" vs "Option B selected"), the triage model flags it and leaves it in the queue for human review instead of silently rejecting or double-storing. Preserves source tension rather than suppressing it. 2. WEEKLY LINT PASS: scripts/lint_knowledge_base.py checks for: - Orphan memories (active but zero references after 14 days) - Stale candidates (>7 days unreviewed) - Unused entities (no relationships) - Empty-state projects - Unregistered projects auto-detected in memories Runs Sundays via the cron. Outputs a report. 3. WEEKLY SYNTHESIS: scripts/synthesize_projects.py uses sonnet to generate a 3-5 sentence "current state" paragraph per project from state + memories + entities. Cached in project_state under status/synthesis_cache. Wiki project pages now show this at the top under "Current State (auto-synthesis)". Falls back to a deterministic summary if no cache exists. deploy/dalidou/batch-extract.sh: added Step C (synthesis) and Step D (lint) gated to Sundays via date check. All additive — nothing existing changes behavior. The database remains the source of truth; these operations just produce better synthesized views and catch rot. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
171 lines
6.0 KiB
Python
171 lines
6.0 KiB
Python
"""Weekly lint pass — health check for the AtoCore knowledge base.
|
|
|
|
Inspired by Karpathy's LLM Wiki pattern (the 'lint' operation).
|
|
Checks for orphans, stale claims, contradictions, and gaps.
|
|
Outputs a report that can be posted to the wiki as needs_review.
|
|
|
|
Usage:
|
|
python3 scripts/lint_knowledge_base.py --base-url http://dalidou:8100
|
|
|
|
Run weekly via cron, or on-demand when the knowledge base feels stale.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import urllib.request
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100")
|
|
ORPHAN_AGE_DAYS = 14
|
|
|
|
|
|
def api_get(base_url: str, path: str):
|
|
with urllib.request.urlopen(f"{base_url}{path}", timeout=15) as r:
|
|
return json.loads(r.read())
|
|
|
|
|
|
def parse_ts(ts: str) -> datetime | None:
|
|
if not ts:
|
|
return None
|
|
try:
|
|
return datetime.strptime(ts[:19], "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
args = parser.parse_args()
|
|
b = args.base_url
|
|
now = datetime.now(timezone.utc)
|
|
orphan_threshold = now - timedelta(days=ORPHAN_AGE_DAYS)
|
|
|
|
print(f"=== AtoCore Lint — {now.strftime('%Y-%m-%d %H:%M UTC')} ===\n")
|
|
|
|
findings = {
|
|
"orphan_memories": [],
|
|
"stale_candidates": [],
|
|
"unused_entities": [],
|
|
"empty_state_projects": [],
|
|
"unregistered_projects": [],
|
|
}
|
|
|
|
# 1. Orphan memories: active but never reinforced after N days
|
|
memories = api_get(b, "/memory?active_only=true&limit=500").get("memories", [])
|
|
for m in memories:
|
|
updated = parse_ts(m.get("updated_at", ""))
|
|
if m.get("reference_count", 0) == 0 and updated and updated < orphan_threshold:
|
|
findings["orphan_memories"].append({
|
|
"id": m["id"],
|
|
"type": m["memory_type"],
|
|
"project": m.get("project") or "(none)",
|
|
"age_days": (now - updated).days,
|
|
"content": m["content"][:120],
|
|
})
|
|
|
|
# 2. Stale candidates: been in queue > 7 days without triage
|
|
candidates = api_get(b, "/memory?status=candidate&limit=500").get("memories", [])
|
|
stale_threshold = now - timedelta(days=7)
|
|
for c in candidates:
|
|
updated = parse_ts(c.get("updated_at", ""))
|
|
if updated and updated < stale_threshold:
|
|
findings["stale_candidates"].append({
|
|
"id": c["id"],
|
|
"age_days": (now - updated).days,
|
|
"content": c["content"][:120],
|
|
})
|
|
|
|
# 3. Unused entities: no relationships in either direction
|
|
entities = api_get(b, "/entities?limit=500").get("entities", [])
|
|
for e in entities:
|
|
try:
|
|
detail = api_get(b, f"/entities/{e['id']}")
|
|
if not detail.get("relationships"):
|
|
findings["unused_entities"].append({
|
|
"id": e["id"],
|
|
"type": e["entity_type"],
|
|
"name": e["name"],
|
|
"project": e.get("project") or "(none)",
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
# 4. Registered projects with no state entries
|
|
try:
|
|
projects = api_get(b, "/projects").get("projects", [])
|
|
for p in projects:
|
|
state = api_get(b, f"/project/state/{p['id']}").get("entries", [])
|
|
if not state:
|
|
findings["empty_state_projects"].append(p["id"])
|
|
except Exception:
|
|
pass
|
|
|
|
# 5. Memories tagged to unregistered projects (auto-detection candidates)
|
|
registered_ids = {p["id"] for p in projects} | {
|
|
a for p in projects for a in p.get("aliases", [])
|
|
}
|
|
all_mems = api_get(b, "/memory?limit=500").get("memories", [])
|
|
for m in all_mems:
|
|
proj = m.get("project", "")
|
|
if proj and proj not in registered_ids and proj != "(none)":
|
|
if proj not in findings["unregistered_projects"]:
|
|
findings["unregistered_projects"].append(proj)
|
|
|
|
# Print report
|
|
print(f"## Orphan memories (active, no reinforcement, >{ORPHAN_AGE_DAYS} days old)")
|
|
if findings["orphan_memories"]:
|
|
print(f" Found: {len(findings['orphan_memories'])}")
|
|
for o in findings["orphan_memories"][:10]:
|
|
print(f" - [{o['type']}] {o['project']} ({o['age_days']}d): {o['content']}")
|
|
else:
|
|
print(" (none)")
|
|
|
|
print(f"\n## Stale candidates (>7 days in queue)")
|
|
if findings["stale_candidates"]:
|
|
print(f" Found: {len(findings['stale_candidates'])}")
|
|
for s in findings["stale_candidates"][:10]:
|
|
print(f" - ({s['age_days']}d): {s['content']}")
|
|
else:
|
|
print(" (none)")
|
|
|
|
print(f"\n## Unused entities (no relationships)")
|
|
if findings["unused_entities"]:
|
|
print(f" Found: {len(findings['unused_entities'])}")
|
|
for u in findings["unused_entities"][:10]:
|
|
print(f" - [{u['type']}] {u['project']}: {u['name']}")
|
|
else:
|
|
print(" (none)")
|
|
|
|
print(f"\n## Empty-state projects")
|
|
if findings["empty_state_projects"]:
|
|
print(f" Found: {len(findings['empty_state_projects'])}")
|
|
for p in findings["empty_state_projects"]:
|
|
print(f" - {p}")
|
|
else:
|
|
print(" (none)")
|
|
|
|
print(f"\n## Unregistered projects detected in memories")
|
|
if findings["unregistered_projects"]:
|
|
print(f" Found: {len(findings['unregistered_projects'])}")
|
|
print(" These were auto-detected by extraction — consider registering them:")
|
|
for p in findings["unregistered_projects"]:
|
|
print(f" - {p}")
|
|
else:
|
|
print(" (none)")
|
|
|
|
total_findings = sum(
|
|
len(v) if isinstance(v, list) else 0 for v in findings.values()
|
|
)
|
|
print(f"\n=== Total findings: {total_findings} ===")
|
|
|
|
# Return exit code based on findings count (for CI)
|
|
return 0 if total_findings == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|