Files
ATOCore/deploy/dalidou/batch-extract.sh
Anto01 02055e8db3 feat: Phase 6 — Living Taxonomy + Universal Capture
Closes two real-use gaps:
1. "APM tool" gap: work done outside Claude Code (desktop, web, phone,
   other machine) was invisible to AtoCore.
2. Project discovery gap: manual JSON-file edits required to promote
   an emerging theme to a first-class project.

B — atocore_remember MCP tool (scripts/atocore_mcp.py):
- New MCP tool for universal capture from any MCP-aware client
  (Claude Desktop, Code, Cursor, Zed, Windsurf, etc.)
- Accepts content (required) + memory_type/project/confidence/
  valid_until/domain_tags (all optional with sensible defaults)
- Creates a candidate memory, goes through the existing 3-tier triage
  (no bypass — the quality gate catches noise)
- Detailed tool description guides Claude on when to invoke: "remember
  this", "save that for later", "don't lose this fact"
- Total tools exposed by MCP server: 14 → 15

C.1 Emerging-concepts detector (scripts/detect_emerging.py):
- Nightly scan of active + candidate memories for:
  * Unregistered project names with ≥3 memory occurrences
  * Top 20 domain_tags by frequency (emerging categories)
  * Active memories with reference_count ≥ 5 + valid_until set
    (reinforced transients — candidates for extension)
- Writes findings to atocore/proposals/* project state entries
- Emits "warning" alert via Phase 4 framework the FIRST time a new
  project crosses the 5-memory alert threshold (avoids spam)
- Configurable via env vars: ATOCORE_EMERGING_PROJECT_MIN (default 3),
  ATOCORE_EMERGING_ALERT_THRESHOLD (default 5), TOP_TAGS_LIMIT (20)

C.2 Registration surface (src/atocore/api/routes.py + wiki.py):
- POST /admin/projects/register-emerging — one-click register with
  sensible defaults (ingest_roots auto-filled with
  vault:incoming/projects/<id>/ convention). Clears the proposal
  from the dashboard list on success.
- Dashboard /admin/dashboard: new "proposals" section with
  unregistered_projects + emerging_categories + reinforced_transients.
- Wiki homepage: "📋 Emerging" section rendering each unregistered
  project as a card with count + 2 sample memory previews + inline
  "📌 Register as project" button that calls the endpoint via fetch,
  reloads the page on success.

C.3 Transient-to-durable extension
(src/atocore/memory/service.py + API + cron):
- New extend_reinforced_valid_until() function — scans active memories
  with valid_until in the next 30 days and reference_count ≥ 5.
  Extends expiry by 90 days. If reference_count ≥ 10, clears expiry
  entirely (makes permanent). Writes audit rows via the Phase 4
  memory_audit framework with actor="transient-to-durable".
- POST /admin/memory/extend-reinforced — API wrapper for cron.
- Matches the user's intuition: "something transient becomes important
  if you keep coming back to it".

Nightly cron (deploy/dalidou/batch-extract.sh):
- Step F2: detect_emerging.py (after F pipeline summary)
- Step F3: /admin/memory/extend-reinforced (before integrity check)
- Both fail-open; errors don't break the pipeline.

Tests: 366 → 374 (+8 for Phase 6):
- 6 tests for extend_reinforced_valid_until covering:
  extension path, permanent path, skip far-future, skip low-refs,
  skip permanent memories, audit row write
- 2 smoke tests for the detector (imports cleanly, handles empty DB)
- MCP tool changes don't need new tests — the wrapper is pure passthrough

Design decisions documented in plan file:
- atocore_remember deliberately doesn't bypass triage (quality gate)
- Detector is passive (surfaces proposals) not active (auto-registers)
- Sensible ingest-root defaults ("vault:incoming/projects/<id>/")
  so registration is one-click with no file-path thinking
- Extension adds 90 days rather than clearing expiry (gradual
  permanence earned through sustained reinforcement)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 08:08:55 -04:00

231 lines
7.2 KiB
Bash

#!/usr/bin/env bash
#
# deploy/dalidou/batch-extract.sh
# --------------------------------
# Host-side LLM batch extraction for Dalidou.
#
# The claude CLI is available on the Dalidou HOST but NOT inside the
# Docker container. This script runs on the host, fetches recent
# interactions from the AtoCore API, runs the LLM extractor locally
# (claude -p sonnet), and posts candidates back to the API.
#
# Intended to be called from cron-backup.sh after backup/cleanup/rsync,
# or manually via:
#
# bash /srv/storage/atocore/app/deploy/dalidou/batch-extract.sh
#
# Environment variables:
# ATOCORE_URL default http://127.0.0.1:8100
# ATOCORE_EXTRACT_LIMIT default 50
set -euo pipefail
ATOCORE_URL="${ATOCORE_URL:-http://127.0.0.1:8100}"
LIMIT="${ATOCORE_EXTRACT_LIMIT:-50}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
APP_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)"
TIMESTAMP="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
log() { printf '[%s] %s\n' "$TIMESTAMP" "$*"; }
# The Python script needs the atocore source on PYTHONPATH
export PYTHONPATH="$APP_DIR/src:${PYTHONPATH:-}"
log "=== AtoCore batch extraction + triage starting ==="
log "URL=$ATOCORE_URL LIMIT=$LIMIT"
# --- Pipeline stats accumulator ---
EXTRACT_OUT=""
TRIAGE_OUT=""
HARNESS_OUT=""
# Step A: Extract candidates from recent interactions
log "Step A: LLM extraction"
EXTRACT_OUT=$(python3 "$APP_DIR/scripts/batch_llm_extract_live.py" \
--base-url "$ATOCORE_URL" \
--limit "$LIMIT" \
2>&1) || {
log "WARN: batch extraction failed (non-blocking)"
}
echo "$EXTRACT_OUT"
# Step B: Auto-triage candidates in the queue
log "Step B: auto-triage"
TRIAGE_OUT=$(python3 "$APP_DIR/scripts/auto_triage.py" \
--base-url "$ATOCORE_URL" \
2>&1) || {
log "WARN: auto-triage failed (non-blocking)"
}
echo "$TRIAGE_OUT"
# Step B2: Auto-promote reinforced candidates + expire stale ones
log "Step B2: auto-promote + expire"
python3 "$APP_DIR/scripts/auto_promote_reinforced.py" \
2>&1 || {
log "WARN: auto-promote/expire failed (non-blocking)"
}
# Step C: Daily project synthesis (keeps wiki/mirror pages fresh)
log "Step C: project synthesis (daily)"
python3 "$APP_DIR/scripts/synthesize_projects.py" \
--base-url "$ATOCORE_URL" \
2>&1 || {
log "WARN: synthesis failed (non-blocking)"
}
# Step D: Weekly lint pass (Sundays only — heavier, not needed daily)
if [[ "$(date -u +%u)" == "7" ]]; then
log "Step D: weekly lint pass"
python3 "$APP_DIR/scripts/lint_knowledge_base.py" \
--base-url "$ATOCORE_URL" \
2>&1 || true
fi
# Step E: Retrieval harness (daily)
log "Step E: retrieval harness"
HARNESS_OUT=$(python3 "$APP_DIR/scripts/retrieval_eval.py" \
--json \
--base-url "$ATOCORE_URL" \
2>&1) || {
log "WARN: retrieval harness failed (non-blocking)"
}
echo "$HARNESS_OUT"
# Step F: Persist pipeline summary to project state
log "Step F: pipeline summary"
python3 -c "
import json, urllib.request, re, sys
base = '$ATOCORE_URL'
ts = '$TIMESTAMP'
def post_state(key, value):
body = json.dumps({
'project': 'atocore', 'category': 'status',
'key': key, 'value': value, 'source': 'nightly pipeline',
}).encode()
req = urllib.request.Request(
f'{base}/project/state', data=body,
headers={'Content-Type': 'application/json'}, method='POST',
)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f'WARN: failed to persist {key}: {e}', file=sys.stderr)
# Parse harness JSON
harness = {}
try:
harness = json.loads('''$HARNESS_OUT''')
post_state('retrieval_harness_result', json.dumps({
'passed': harness.get('passed', 0),
'total': harness.get('total', 0),
'failures': [f['name'] for f in harness.get('fixtures', []) if not f.get('ok')],
'run_at': ts,
}))
p, t = harness.get('passed', '?'), harness.get('total', '?')
print(f'Harness: {p}/{t}')
except Exception:
print('WARN: could not parse harness output')
# Parse triage counts from stdout
triage_out = '''$TRIAGE_OUT'''
promoted = len(re.findall(r'promoted', triage_out, re.IGNORECASE))
rejected = len(re.findall(r'rejected', triage_out, re.IGNORECASE))
needs_human = len(re.findall(r'needs.human', triage_out, re.IGNORECASE))
# Build summary
summary = {
'run_at': ts,
'harness_passed': harness.get('passed', -1),
'harness_total': harness.get('total', -1),
'triage_promoted': promoted,
'triage_rejected': rejected,
'triage_needs_human': needs_human,
}
post_state('pipeline_last_run', ts)
post_state('pipeline_summary', json.dumps(summary))
print(f'Pipeline summary persisted: {json.dumps(summary)}')
" 2>&1 || {
log "WARN: pipeline summary persistence failed (non-blocking)"
}
# Step F2: Emerging-concepts detector (Phase 6 C.1)
log "Step F2: emerging-concepts detector"
python3 "$APP_DIR/scripts/detect_emerging.py" \
--base-url "$ATOCORE_URL" \
2>&1 || {
log "WARN: emerging detector failed (non-blocking)"
}
# Step F3: Transient-to-durable extension (Phase 6 C.3)
log "Step F3: transient-to-durable extension"
curl -sSf -X POST "$ATOCORE_URL/admin/memory/extend-reinforced" \
-H 'Content-Type: application/json' \
2>&1 | tail -5 || {
log "WARN: extend-reinforced failed (non-blocking)"
}
# Step G: Integrity check (Phase 4 V1)
log "Step G: integrity check"
python3 "$APP_DIR/scripts/integrity_check.py" \
--base-url "$ATOCORE_URL" \
2>&1 || {
log "WARN: integrity check failed (non-blocking)"
}
# Step H: Pipeline-level alerts — detect conditions that warrant attention
log "Step H: pipeline alerts"
python3 -c "
import json, os, sys, urllib.request
sys.path.insert(0, '$APP_DIR/src')
from atocore.observability.alerts import emit_alert
base = '$ATOCORE_URL'
def get_state(project='atocore'):
try:
req = urllib.request.Request(f'{base}/project/state/{project}')
resp = urllib.request.urlopen(req, timeout=10)
return json.loads(resp.read()).get('entries', [])
except Exception:
return []
def get_dashboard():
try:
req = urllib.request.Request(f'{base}/admin/dashboard')
resp = urllib.request.urlopen(req, timeout=10)
return json.loads(resp.read())
except Exception:
return {}
state = {(e['category'], e['key']): e['value'] for e in get_state()}
dash = get_dashboard()
# Harness regression check
harness_raw = state.get(('status', 'retrieval_harness_result'))
if harness_raw:
try:
h = json.loads(harness_raw)
passed, total = h.get('passed', 0), h.get('total', 0)
if total > 0:
rate = passed / total
if rate < 0.85:
emit_alert('warning', 'Retrieval harness below 85%',
f'Only {passed}/{total} fixtures passing ({rate:.0%}). Failures: {h.get(\"failures\", [])[:5]}',
context={'pass_rate': rate})
except Exception:
pass
# Candidate queue pileup
candidates = dash.get('memories', {}).get('candidates', 0)
if candidates > 200:
emit_alert('warning', 'Candidate queue not draining',
f'{candidates} candidates pending. Auto-triage may be stuck or rate-limited.',
context={'candidates': candidates})
print('pipeline alerts check complete')
" 2>&1 || true
log "=== AtoCore batch extraction + triage complete ==="