diff --git a/deploy/dalidou/cron-backup.sh b/deploy/dalidou/cron-backup.sh index 113bd75..7146d53 100755 --- a/deploy/dalidou/cron-backup.sh +++ b/deploy/dalidou/cron-backup.sh @@ -82,6 +82,22 @@ else log "Step 3: ATOCORE_BACKUP_RSYNC not set, skipping off-host copy" fi +# Step 3a: Pull OpenClaw state from clawdbot (one-way import of +# SOUL.md, USER.md, MODEL-ROUTING.md, MEMORY.md, recent memory/*.md). +# Loose coupling: OpenClaw's internals don't need to change. +# Fail-open: importer failure never blocks the pipeline. +log "Step 3a: pull OpenClaw state" +OPENCLAW_IMPORT="${ATOCORE_OPENCLAW_IMPORT:-true}" +if [[ "$OPENCLAW_IMPORT" == "true" ]]; then + python3 "$SCRIPT_DIR/../../scripts/import_openclaw_state.py" \ + --base-url "$ATOCORE_URL" \ + 2>&1 | while IFS= read -r line; do log " $line"; done || { + log " WARN: OpenClaw import failed (non-blocking)" + } +else + log " skipped (ATOCORE_OPENCLAW_IMPORT != true)" +fi + # Step 3b: Auto-refresh vault sources so new PKM files flow in # automatically. Fail-open: never blocks the rest of the pipeline. log "Step 3b: auto-refresh vault sources" diff --git a/scripts/import_openclaw_state.py b/scripts/import_openclaw_state.py new file mode 100644 index 0000000..34fcdb1 --- /dev/null +++ b/scripts/import_openclaw_state.py @@ -0,0 +1,254 @@ +"""OpenClaw state importer — one-way pull from clawdbot into AtoCore. + +Reads OpenClaw's file continuity layer (SOUL.md, USER.md, MODEL-ROUTING.md, +MEMORY.md, memory/YYYY-MM-DD.md) from the T420 via SSH and imports them +into AtoCore as candidate memories. Hash-based delta detection — only +re-imports files that changed since the last run. + +Classification per codex's integration proposal: +- SOUL.md -> identity candidates +- USER.md -> identity + preference candidates +- MODEL-ROUTING.md -> adaptation candidates (routing rules) +- MEMORY.md -> long-term memory candidates (type varies) +- memory/YYYY-MM-DD.md -> episodic memory candidates (daily logs) +- heartbeat-state.json -> skipped (ops metadata only) + +All candidates land as status=candidate. Auto-triage filters noise. +This importer is conservative: it doesn't promote directly, it just +feeds signal. The triage pipeline decides what graduates to active. + +Usage: + python3 scripts/import_openclaw_state.py \ + --base-url http://localhost:8100 \ + --openclaw-host papa@192.168.86.39 \ + --openclaw-path /home/papa/openclaw-workspace + +Runs nightly via cron (added as Step 2c in cron-backup.sh). +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import os +import shutil +import subprocess +import sys +import tempfile +import urllib.error +import urllib.request +from pathlib import Path + +DEFAULT_BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://localhost:8100") +DEFAULT_OPENCLAW_HOST = os.environ.get("ATOCORE_OPENCLAW_HOST", "papa@192.168.86.39") +DEFAULT_OPENCLAW_PATH = os.environ.get("ATOCORE_OPENCLAW_PATH", "/home/papa/openclaw-workspace") + +# Files to pull and how to classify them +DURABLE_FILES = [ + ("SOUL.md", "identity"), + ("USER.md", "identity"), + ("MODEL-ROUTING.md", "adaptation"), + ("MEMORY.md", "memory"), # type parsed from entries +] +DAILY_MEMORY_GLOB = "memory/*.md" +HASH_STATE_KEY = "openclaw_import_hashes" + + +def api_get(base_url, path): + try: + with urllib.request.urlopen(f"{base_url}{path}", timeout=15) as r: + return json.loads(r.read()) + except Exception: + return None + + +def api_post(base_url, path, body): + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + f"{base_url}{path}", method="POST", + headers={"Content-Type": "application/json"}, data=data, + ) + try: + with urllib.request.urlopen(req, timeout=15) as r: + return json.loads(r.read()) + except urllib.error.HTTPError as exc: + if exc.code == 400: + return {"skipped": True} + raise + + +def ssh_cat(host, remote_path): + """Cat a remote file via SSH. Returns content or None if missing.""" + try: + result = subprocess.run( + ["ssh", "-o", "ConnectTimeout=5", "-o", "BatchMode=yes", + host, f"cat {remote_path}"], + capture_output=True, text=True, timeout=30, + encoding="utf-8", errors="replace", + ) + if result.returncode == 0: + return result.stdout + except Exception: + pass + return None + + +def ssh_ls(host, remote_glob): + """List files matching a glob on the remote host.""" + try: + result = subprocess.run( + ["ssh", "-o", "ConnectTimeout=5", "-o", "BatchMode=yes", + host, f"ls -1 {remote_glob} 2>/dev/null"], + capture_output=True, text=True, timeout=10, + encoding="utf-8", errors="replace", + ) + if result.returncode == 0: + return [line.strip() for line in result.stdout.splitlines() if line.strip()] + except Exception: + pass + return [] + + +def content_hash(text): + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + + +def load_hash_state(base_url): + """Load the hash state from project_state so we know what's changed.""" + state = api_get(base_url, "/project/state/atocore?category=status") + if not state: + return {} + for entry in state.get("entries", []): + if entry.get("key") == HASH_STATE_KEY: + try: + return json.loads(entry["value"]) + except Exception: + return {} + return {} + + +def save_hash_state(base_url, hashes): + api_post(base_url, "/project/state", { + "project": "atocore", + "category": "status", + "key": HASH_STATE_KEY, + "value": json.dumps(hashes), + "source": "import_openclaw_state.py", + }) + + +def import_file_as_memory(base_url, filename, content, memory_type, source_tag): + """Import a file's content as a single candidate memory for triage.""" + # Trim to reasonable size — auto-triage can handle long content but + # we don't want single mega-memories dominating the queue + trimmed = content[:2000] + if len(content) > 2000: + trimmed += f"\n\n[...truncated from {len(content)} chars]" + + body = { + "memory_type": memory_type, + "content": f"From OpenClaw/{filename}: {trimmed}", + "project": "", # global/identity, not project-scoped + "confidence": 0.5, + "status": "candidate", + } + return api_post(base_url, "/memory", body) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--openclaw-host", default=DEFAULT_OPENCLAW_HOST) + parser.add_argument("--openclaw-path", default=DEFAULT_OPENCLAW_PATH) + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + print(f"openclaw_host={args.openclaw_host} openclaw_path={args.openclaw_path}") + print(f"dry_run={args.dry_run}") + + # Check SSH connectivity first + test = ssh_cat(args.openclaw_host, f"{args.openclaw_path}/SOUL.md") + if test is None: + print("ERROR: cannot reach OpenClaw workspace via SSH or SOUL.md not found") + print("Check: ssh key installed? path correct? workspace exists?") + return 1 + + hashes = load_hash_state(args.base_url) + imported = skipped = errors = 0 + + # 1. Durable files + for filename, mem_type in DURABLE_FILES: + remote = f"{args.openclaw_path}/{filename}" + content = ssh_cat(args.openclaw_host, remote) + if content is None or not content.strip(): + print(f" - {filename}: not found or empty") + continue + + h = content_hash(content) + if hashes.get(filename) == h: + print(f" = {filename}: unchanged (hash {h})") + skipped += 1 + continue + + print(f" + {filename}: changed (hash {h}, {len(content)}ch)") + if not args.dry_run: + try: + result = import_file_as_memory( + args.base_url, filename, content, mem_type, + source_tag="openclaw-durable", + ) + if result.get("skipped"): + print(f" (duplicate content, skipped)") + else: + print(f" -> candidate {result.get('id', '?')[:8]}") + imported += 1 + hashes[filename] = h + except Exception as e: + print(f" ! error: {e}") + errors += 1 + + # 2. Daily memory logs (memory/YYYY-MM-DD.md) + daily_glob = f"{args.openclaw_path}/{DAILY_MEMORY_GLOB}" + daily_files = ssh_ls(args.openclaw_host, daily_glob) + print(f"\ndaily memory files: {len(daily_files)}") + + # Only process the most recent 7 daily files to avoid flooding + for remote_path in sorted(daily_files)[-7:]: + filename = Path(remote_path).name + content = ssh_cat(args.openclaw_host, remote_path) + if content is None or not content.strip(): + continue + + h = content_hash(content) + key = f"daily/{filename}" + if hashes.get(key) == h: + print(f" = {filename}: unchanged") + skipped += 1 + continue + + print(f" + {filename}: changed ({len(content)}ch)") + if not args.dry_run: + try: + result = import_file_as_memory( + args.base_url, filename, content, "episodic", + source_tag="openclaw-daily", + ) + if not result.get("skipped"): + print(f" -> candidate {result.get('id', '?')[:8]}") + imported += 1 + hashes[key] = h + except Exception as e: + print(f" ! error: {e}") + errors += 1 + + # Save hash state + if not args.dry_run and imported > 0: + save_hash_state(args.base_url, hashes) + + print(f"\nimported={imported} skipped={skipped} errors={errors}") + print("Candidates queued — auto-triage will filter them on next run.") + + +if __name__ == "__main__": + raise SystemExit(main() or 0)