- Reinforcement matcher now handles paragraph-length memories via a
dual-mode threshold: short memories keep the 70% overlap rule,
long memories (>15 stems) require 12 absolute overlaps AND 35%
fraction so organic paraphrase can still reinforce. Diagnosis:
every active memory stayed at reference_count=0 because 40-token
project summaries never hit 70% overlap on real responses.
- scripts/atocore_client.py gains batch-extract (fan out
/interactions/{id}/extract over recent interactions) and triage
(interactive promote/reject walker for the candidate queue),
matching the Phase 9 reflection-loop review flow without pulling
extraction into the capture hot path.
- deploy/dalidou/cron-backup.sh adds an optional off-host rsync step
gated on ATOCORE_BACKUP_RSYNC, fail-open when the target is offline
so a laptop being off at 03:00 UTC never reds the local backup.
- docs/next-steps.md records the retrieval-quality sweep: project
state surfaces, chunks are on-topic but broad, active memories
never reach the pack (reflection loop has no retrieval outlet yet).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
631 lines
24 KiB
Python
631 lines
24 KiB
Python
"""Operator-facing API client for live AtoCore instances.
|
|
|
|
This script is intentionally external to the app runtime. It is for admins
|
|
and operators who want a convenient way to inspect live project state,
|
|
refresh projects, audit retrieval quality, manage trusted project-state
|
|
entries, and drive the Phase 9 reflection loop (capture, extract, queue,
|
|
promote, reject).
|
|
|
|
Environment variables
|
|
---------------------
|
|
|
|
ATOCORE_BASE_URL
|
|
Base URL of the AtoCore service (default: ``http://dalidou:8100``).
|
|
|
|
When running ON the Dalidou host itself or INSIDE the Dalidou
|
|
container, override this with loopback or the real IP::
|
|
|
|
ATOCORE_BASE_URL=http://127.0.0.1:8100 \\
|
|
python scripts/atocore_client.py health
|
|
|
|
The default hostname "dalidou" is meant for cases where the
|
|
caller is a remote machine (laptop, T420/OpenClaw, etc.) with
|
|
"dalidou" in its /etc/hosts or resolvable via Tailscale. It does
|
|
NOT reliably resolve on the host itself or inside the container,
|
|
and when it fails the client returns
|
|
``{"status": "unavailable", "fail_open": true}`` — the right
|
|
diagnosis when that happens is to set ATOCORE_BASE_URL explicitly
|
|
to 127.0.0.1:8100 and retry.
|
|
|
|
ATOCORE_TIMEOUT_SECONDS
|
|
Request timeout for most operations (default: 30).
|
|
|
|
ATOCORE_REFRESH_TIMEOUT_SECONDS
|
|
Longer timeout for project refresh operations which can be slow
|
|
(default: 1800).
|
|
|
|
ATOCORE_FAIL_OPEN
|
|
When "true" (default), network errors return a small fail-open
|
|
envelope instead of raising. Set to "false" for admin operations
|
|
where you need the real error.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from typing import Any
|
|
|
|
|
|
BASE_URL = os.environ.get("ATOCORE_BASE_URL", "http://dalidou:8100").rstrip("/")
|
|
TIMEOUT = int(os.environ.get("ATOCORE_TIMEOUT_SECONDS", "30"))
|
|
REFRESH_TIMEOUT = int(os.environ.get("ATOCORE_REFRESH_TIMEOUT_SECONDS", "1800"))
|
|
FAIL_OPEN = os.environ.get("ATOCORE_FAIL_OPEN", "true").lower() == "true"
|
|
|
|
# Bumped when the subcommand surface or JSON output shapes meaningfully
|
|
# change. See docs/architecture/llm-client-integration.md for the
|
|
# semver rules. History:
|
|
# 0.1.0 initial stable-ops-only client
|
|
# 0.2.0 Phase 9 reflection loop added: capture, extract,
|
|
# reinforce-interaction, list-interactions, get-interaction,
|
|
# queue, promote, reject
|
|
CLIENT_VERSION = "0.2.0"
|
|
|
|
|
|
def print_json(payload: Any) -> None:
|
|
print(json.dumps(payload, ensure_ascii=True, indent=2))
|
|
|
|
|
|
def fail_open_payload() -> dict[str, Any]:
|
|
return {"status": "unavailable", "source": "atocore", "fail_open": True}
|
|
|
|
|
|
def request(
|
|
method: str,
|
|
path: str,
|
|
data: dict[str, Any] | None = None,
|
|
timeout: int | None = None,
|
|
) -> Any:
|
|
url = f"{BASE_URL}{path}"
|
|
headers = {"Content-Type": "application/json"} if data is not None else {}
|
|
payload = json.dumps(data).encode("utf-8") if data is not None else None
|
|
req = urllib.request.Request(url, data=payload, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout or TIMEOUT) as response:
|
|
body = response.read().decode("utf-8")
|
|
except urllib.error.HTTPError as exc:
|
|
body = exc.read().decode("utf-8")
|
|
if body:
|
|
print(body)
|
|
raise SystemExit(22) from exc
|
|
except (urllib.error.URLError, TimeoutError, OSError):
|
|
if FAIL_OPEN:
|
|
print_json(fail_open_payload())
|
|
raise SystemExit(0)
|
|
raise
|
|
|
|
if not body.strip():
|
|
return {}
|
|
return json.loads(body)
|
|
|
|
|
|
def parse_aliases(aliases_csv: str) -> list[str]:
|
|
return [alias.strip() for alias in aliases_csv.split(",") if alias.strip()]
|
|
|
|
|
|
def detect_project(prompt: str) -> dict[str, Any]:
|
|
payload = request("GET", "/projects")
|
|
prompt_lower = prompt.lower()
|
|
best_project = None
|
|
best_alias = None
|
|
best_score = -1
|
|
|
|
for project in payload.get("projects", []):
|
|
candidates = [project.get("id", ""), *project.get("aliases", [])]
|
|
for candidate in candidates:
|
|
candidate = (candidate or "").strip()
|
|
if not candidate:
|
|
continue
|
|
pattern = rf"(?<![a-z0-9]){re.escape(candidate.lower())}(?![a-z0-9])"
|
|
matched = re.search(pattern, prompt_lower) is not None
|
|
if not matched and candidate.lower() not in prompt_lower:
|
|
continue
|
|
score = len(candidate)
|
|
if score > best_score:
|
|
best_project = project.get("id")
|
|
best_alias = candidate
|
|
best_score = score
|
|
|
|
return {"matched_project": best_project, "matched_alias": best_alias}
|
|
|
|
|
|
def classify_result(result: dict[str, Any]) -> dict[str, Any]:
|
|
source_file = (result.get("source_file") or "").lower()
|
|
heading = (result.get("heading_path") or "").lower()
|
|
title = (result.get("title") or "").lower()
|
|
text = " ".join([source_file, heading, title])
|
|
|
|
labels: list[str] = []
|
|
if any(token in text for token in ["_archive", "/archive", "archive/", "pre-cleanup", "pre-migration", "history"]):
|
|
labels.append("archive_or_history")
|
|
if any(token in text for token in ["status", "dashboard", "current-state", "current state", "next-steps", "next steps"]):
|
|
labels.append("current_status")
|
|
if any(token in text for token in ["decision", "adr", "tradeoff", "selected architecture", "selection"]):
|
|
labels.append("decision")
|
|
if any(token in text for token in ["requirement", "spec", "constraints", "baseline", "cdr", "sow"]):
|
|
labels.append("requirements")
|
|
if any(token in text for token in ["roadmap", "milestone", "plan", "workflow", "calibration", "contract"]):
|
|
labels.append("execution_plan")
|
|
if not labels:
|
|
labels.append("reference")
|
|
|
|
return {
|
|
"score": result.get("score"),
|
|
"title": result.get("title"),
|
|
"heading_path": result.get("heading_path"),
|
|
"source_file": result.get("source_file"),
|
|
"labels": labels,
|
|
"is_noise_risk": "archive_or_history" in labels,
|
|
}
|
|
|
|
|
|
def audit_query(prompt: str, top_k: int, project: str | None) -> dict[str, Any]:
|
|
response = request(
|
|
"POST",
|
|
"/query",
|
|
{"prompt": prompt, "top_k": top_k, "project": project or None},
|
|
)
|
|
classifications = [classify_result(result) for result in response.get("results", [])]
|
|
broad_prompt = len(prompt.split()) <= 2
|
|
noise_hits = sum(1 for item in classifications if item["is_noise_risk"])
|
|
current_hits = sum(1 for item in classifications if "current_status" in item["labels"])
|
|
decision_hits = sum(1 for item in classifications if "decision" in item["labels"])
|
|
requirements_hits = sum(1 for item in classifications if "requirements" in item["labels"])
|
|
|
|
recommendations: list[str] = []
|
|
if broad_prompt:
|
|
recommendations.append("Prompt is broad; prefer a project-specific question with intent, artifact type, or constraint language.")
|
|
if noise_hits:
|
|
recommendations.append("Archive/history noise is present; prefer current-status, decision, requirements, and baseline docs in the next ingestion/ranking pass.")
|
|
if current_hits == 0:
|
|
recommendations.append("No current-status docs surfaced in the top results; Wave 2 should ingest or strengthen trusted operational truth.")
|
|
if decision_hits == 0:
|
|
recommendations.append("No decision docs surfaced in the top results; add or freeze decision logs for the active project.")
|
|
if requirements_hits == 0:
|
|
recommendations.append("No requirements/baseline docs surfaced in the top results; prioritize baseline and architecture-freeze material.")
|
|
if not recommendations:
|
|
recommendations.append("Ranking looks healthy for this prompt.")
|
|
|
|
return {
|
|
"prompt": prompt,
|
|
"project": project,
|
|
"top_k": top_k,
|
|
"broad_prompt": broad_prompt,
|
|
"noise_hits": noise_hits,
|
|
"current_status_hits": current_hits,
|
|
"decision_hits": decision_hits,
|
|
"requirements_hits": requirements_hits,
|
|
"results": classifications,
|
|
"recommendations": recommendations,
|
|
}
|
|
|
|
|
|
def project_payload(
|
|
project_id: str,
|
|
aliases_csv: str,
|
|
source: str,
|
|
subpath: str,
|
|
description: str,
|
|
label: str,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"project_id": project_id,
|
|
"aliases": parse_aliases(aliases_csv),
|
|
"description": description,
|
|
"ingest_roots": [{"source": source, "subpath": subpath, "label": label}],
|
|
}
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="AtoCore live API client")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
for name in ["health", "sources", "stats", "projects", "project-template", "debug-context", "ingest-sources"]:
|
|
sub.add_parser(name)
|
|
|
|
p = sub.add_parser("detect-project")
|
|
p.add_argument("prompt")
|
|
|
|
p = sub.add_parser("auto-context")
|
|
p.add_argument("prompt")
|
|
p.add_argument("budget", nargs="?", type=int, default=3000)
|
|
p.add_argument("project", nargs="?", default="")
|
|
|
|
for name in ["propose-project", "register-project"]:
|
|
p = sub.add_parser(name)
|
|
p.add_argument("project_id")
|
|
p.add_argument("aliases_csv")
|
|
p.add_argument("source")
|
|
p.add_argument("subpath")
|
|
p.add_argument("description", nargs="?", default="")
|
|
p.add_argument("label", nargs="?", default="")
|
|
|
|
p = sub.add_parser("update-project")
|
|
p.add_argument("project")
|
|
p.add_argument("description")
|
|
p.add_argument("aliases_csv", nargs="?", default="")
|
|
|
|
p = sub.add_parser("refresh-project")
|
|
p.add_argument("project")
|
|
p.add_argument("purge_deleted", nargs="?", default="false")
|
|
|
|
p = sub.add_parser("project-state")
|
|
p.add_argument("project")
|
|
p.add_argument("category", nargs="?", default="")
|
|
|
|
p = sub.add_parser("project-state-set")
|
|
p.add_argument("project")
|
|
p.add_argument("category")
|
|
p.add_argument("key")
|
|
p.add_argument("value")
|
|
p.add_argument("source", nargs="?", default="")
|
|
p.add_argument("confidence", nargs="?", type=float, default=1.0)
|
|
|
|
p = sub.add_parser("project-state-invalidate")
|
|
p.add_argument("project")
|
|
p.add_argument("category")
|
|
p.add_argument("key")
|
|
|
|
p = sub.add_parser("query")
|
|
p.add_argument("prompt")
|
|
p.add_argument("top_k", nargs="?", type=int, default=5)
|
|
p.add_argument("project", nargs="?", default="")
|
|
|
|
p = sub.add_parser("context-build")
|
|
p.add_argument("prompt")
|
|
p.add_argument("project", nargs="?", default="")
|
|
p.add_argument("budget", nargs="?", type=int, default=3000)
|
|
|
|
p = sub.add_parser("audit-query")
|
|
p.add_argument("prompt")
|
|
p.add_argument("top_k", nargs="?", type=int, default=5)
|
|
p.add_argument("project", nargs="?", default="")
|
|
|
|
# --- Phase 9 reflection loop surface --------------------------------
|
|
#
|
|
# capture: record one interaction (prompt + response + context used).
|
|
# Mirrors POST /interactions. response is positional so shell
|
|
# callers can pass it via $(cat file.txt) or heredoc. project,
|
|
# client, and session_id are optional positionals with empty
|
|
# defaults, matching the existing script's style.
|
|
p = sub.add_parser("capture")
|
|
p.add_argument("prompt")
|
|
p.add_argument("response", nargs="?", default="")
|
|
p.add_argument("project", nargs="?", default="")
|
|
p.add_argument("client", nargs="?", default="")
|
|
p.add_argument("session_id", nargs="?", default="")
|
|
p.add_argument("reinforce", nargs="?", default="true")
|
|
|
|
# extract: run the Phase 9 C rule-based extractor against an
|
|
# already-captured interaction. persist='true' writes the
|
|
# candidates as status='candidate' memories; default is
|
|
# preview-only.
|
|
p = sub.add_parser("extract")
|
|
p.add_argument("interaction_id")
|
|
p.add_argument("persist", nargs="?", default="false")
|
|
|
|
# reinforce: backfill reinforcement on an already-captured interaction.
|
|
p = sub.add_parser("reinforce-interaction")
|
|
p.add_argument("interaction_id")
|
|
|
|
# list-interactions: paginated listing with filters.
|
|
p = sub.add_parser("list-interactions")
|
|
p.add_argument("project", nargs="?", default="")
|
|
p.add_argument("session_id", nargs="?", default="")
|
|
p.add_argument("client", nargs="?", default="")
|
|
p.add_argument("since", nargs="?", default="")
|
|
p.add_argument("limit", nargs="?", type=int, default=50)
|
|
|
|
# get-interaction: fetch one by id
|
|
p = sub.add_parser("get-interaction")
|
|
p.add_argument("interaction_id")
|
|
|
|
# queue: list the candidate review queue
|
|
p = sub.add_parser("queue")
|
|
p.add_argument("memory_type", nargs="?", default="")
|
|
p.add_argument("project", nargs="?", default="")
|
|
p.add_argument("limit", nargs="?", type=int, default=50)
|
|
|
|
# promote: candidate -> active
|
|
p = sub.add_parser("promote")
|
|
p.add_argument("memory_id")
|
|
|
|
# reject: candidate -> invalid
|
|
p = sub.add_parser("reject")
|
|
p.add_argument("memory_id")
|
|
|
|
# batch-extract: fan out /interactions/{id}/extract?persist=true across
|
|
# recent interactions. Idempotent — the extractor create_memory path
|
|
# silently skips duplicates, so re-running is safe.
|
|
p = sub.add_parser("batch-extract")
|
|
p.add_argument("since", nargs="?", default="")
|
|
p.add_argument("project", nargs="?", default="")
|
|
p.add_argument("limit", nargs="?", type=int, default=100)
|
|
p.add_argument("persist", nargs="?", default="true")
|
|
|
|
# triage: interactive candidate review loop. Fetches the queue, shows
|
|
# each candidate, accepts p/r/s (promote / reject / skip) / q (quit).
|
|
p = sub.add_parser("triage")
|
|
p.add_argument("memory_type", nargs="?", default="")
|
|
p.add_argument("project", nargs="?", default="")
|
|
p.add_argument("limit", nargs="?", type=int, default=50)
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> int:
|
|
args = build_parser().parse_args()
|
|
cmd = args.command
|
|
|
|
if cmd == "health":
|
|
print_json(request("GET", "/health"))
|
|
elif cmd == "sources":
|
|
print_json(request("GET", "/sources"))
|
|
elif cmd == "stats":
|
|
print_json(request("GET", "/stats"))
|
|
elif cmd == "projects":
|
|
print_json(request("GET", "/projects"))
|
|
elif cmd == "project-template":
|
|
print_json(request("GET", "/projects/template"))
|
|
elif cmd == "debug-context":
|
|
print_json(request("GET", "/debug/context"))
|
|
elif cmd == "ingest-sources":
|
|
print_json(request("POST", "/ingest/sources", {}))
|
|
elif cmd == "detect-project":
|
|
print_json(detect_project(args.prompt))
|
|
elif cmd == "auto-context":
|
|
project = args.project or detect_project(args.prompt).get("matched_project") or ""
|
|
if not project:
|
|
print_json({"status": "no_project_match", "source": "atocore", "mode": "auto-context"})
|
|
else:
|
|
print_json(request("POST", "/context/build", {"prompt": args.prompt, "project": project, "budget": args.budget}))
|
|
elif cmd in {"propose-project", "register-project"}:
|
|
path = "/projects/proposal" if cmd == "propose-project" else "/projects/register"
|
|
print_json(request("POST", path, project_payload(args.project_id, args.aliases_csv, args.source, args.subpath, args.description, args.label)))
|
|
elif cmd == "update-project":
|
|
payload: dict[str, Any] = {"description": args.description}
|
|
if args.aliases_csv.strip():
|
|
payload["aliases"] = parse_aliases(args.aliases_csv)
|
|
print_json(request("PUT", f"/projects/{urllib.parse.quote(args.project)}", payload))
|
|
elif cmd == "refresh-project":
|
|
purge_deleted = args.purge_deleted.lower() in {"1", "true", "yes", "y"}
|
|
path = f"/projects/{urllib.parse.quote(args.project)}/refresh?purge_deleted={str(purge_deleted).lower()}"
|
|
print_json(request("POST", path, {}, timeout=REFRESH_TIMEOUT))
|
|
elif cmd == "project-state":
|
|
suffix = f"?category={urllib.parse.quote(args.category)}" if args.category else ""
|
|
print_json(request("GET", f"/project/state/{urllib.parse.quote(args.project)}{suffix}"))
|
|
elif cmd == "project-state-set":
|
|
print_json(request("POST", "/project/state", {
|
|
"project": args.project,
|
|
"category": args.category,
|
|
"key": args.key,
|
|
"value": args.value,
|
|
"source": args.source,
|
|
"confidence": args.confidence,
|
|
}))
|
|
elif cmd == "project-state-invalidate":
|
|
print_json(request("DELETE", "/project/state", {"project": args.project, "category": args.category, "key": args.key}))
|
|
elif cmd == "query":
|
|
print_json(request("POST", "/query", {"prompt": args.prompt, "top_k": args.top_k, "project": args.project or None}))
|
|
elif cmd == "context-build":
|
|
print_json(request("POST", "/context/build", {"prompt": args.prompt, "project": args.project or None, "budget": args.budget}))
|
|
elif cmd == "audit-query":
|
|
print_json(audit_query(args.prompt, args.top_k, args.project or None))
|
|
# --- Phase 9 reflection loop surface ------------------------------
|
|
elif cmd == "capture":
|
|
body: dict[str, Any] = {
|
|
"prompt": args.prompt,
|
|
"response": args.response,
|
|
"project": args.project,
|
|
"client": args.client or "atocore-client",
|
|
"session_id": args.session_id,
|
|
"reinforce": args.reinforce.lower() in {"1", "true", "yes", "y"},
|
|
}
|
|
print_json(request("POST", "/interactions", body))
|
|
elif cmd == "extract":
|
|
persist = args.persist.lower() in {"1", "true", "yes", "y"}
|
|
print_json(
|
|
request(
|
|
"POST",
|
|
f"/interactions/{urllib.parse.quote(args.interaction_id, safe='')}/extract",
|
|
{"persist": persist},
|
|
)
|
|
)
|
|
elif cmd == "reinforce-interaction":
|
|
print_json(
|
|
request(
|
|
"POST",
|
|
f"/interactions/{urllib.parse.quote(args.interaction_id, safe='')}/reinforce",
|
|
{},
|
|
)
|
|
)
|
|
elif cmd == "list-interactions":
|
|
query_parts: list[str] = []
|
|
if args.project:
|
|
query_parts.append(f"project={urllib.parse.quote(args.project)}")
|
|
if args.session_id:
|
|
query_parts.append(f"session_id={urllib.parse.quote(args.session_id)}")
|
|
if args.client:
|
|
query_parts.append(f"client={urllib.parse.quote(args.client)}")
|
|
if args.since:
|
|
query_parts.append(f"since={urllib.parse.quote(args.since)}")
|
|
query_parts.append(f"limit={int(args.limit)}")
|
|
query = "?" + "&".join(query_parts)
|
|
print_json(request("GET", f"/interactions{query}"))
|
|
elif cmd == "get-interaction":
|
|
print_json(
|
|
request(
|
|
"GET",
|
|
f"/interactions/{urllib.parse.quote(args.interaction_id, safe='')}",
|
|
)
|
|
)
|
|
elif cmd == "queue":
|
|
query_parts = ["status=candidate"]
|
|
if args.memory_type:
|
|
query_parts.append(f"memory_type={urllib.parse.quote(args.memory_type)}")
|
|
if args.project:
|
|
query_parts.append(f"project={urllib.parse.quote(args.project)}")
|
|
query_parts.append(f"limit={int(args.limit)}")
|
|
query = "?" + "&".join(query_parts)
|
|
print_json(request("GET", f"/memory{query}"))
|
|
elif cmd == "promote":
|
|
print_json(
|
|
request(
|
|
"POST",
|
|
f"/memory/{urllib.parse.quote(args.memory_id, safe='')}/promote",
|
|
{},
|
|
)
|
|
)
|
|
elif cmd == "reject":
|
|
print_json(
|
|
request(
|
|
"POST",
|
|
f"/memory/{urllib.parse.quote(args.memory_id, safe='')}/reject",
|
|
{},
|
|
)
|
|
)
|
|
elif cmd == "batch-extract":
|
|
print_json(run_batch_extract(args.since, args.project, args.limit, args.persist))
|
|
elif cmd == "triage":
|
|
return run_triage(args.memory_type, args.project, args.limit)
|
|
else:
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def run_batch_extract(since: str, project: str, limit: int, persist_flag: str) -> dict:
|
|
"""Fetch recent interactions and run the extractor against each one.
|
|
|
|
Returns an aggregated summary. Safe to re-run: the server-side
|
|
persist path catches ValueError on duplicates and the endpoint
|
|
reports per-interaction candidate counts either way.
|
|
"""
|
|
persist = persist_flag.lower() in {"1", "true", "yes", "y"}
|
|
query_parts: list[str] = []
|
|
if project:
|
|
query_parts.append(f"project={urllib.parse.quote(project)}")
|
|
if since:
|
|
query_parts.append(f"since={urllib.parse.quote(since)}")
|
|
query_parts.append(f"limit={int(limit)}")
|
|
query = "?" + "&".join(query_parts)
|
|
|
|
listing = request("GET", f"/interactions{query}")
|
|
interactions = listing.get("interactions", []) if isinstance(listing, dict) else []
|
|
|
|
processed = 0
|
|
total_candidates = 0
|
|
total_persisted = 0
|
|
errors: list[dict] = []
|
|
per_interaction: list[dict] = []
|
|
|
|
for item in interactions:
|
|
iid = item.get("id") or ""
|
|
if not iid:
|
|
continue
|
|
try:
|
|
result = request(
|
|
"POST",
|
|
f"/interactions/{urllib.parse.quote(iid, safe='')}/extract",
|
|
{"persist": persist},
|
|
)
|
|
except Exception as exc: # pragma: no cover - network errors land here
|
|
errors.append({"interaction_id": iid, "error": str(exc)})
|
|
continue
|
|
processed += 1
|
|
count = int(result.get("candidate_count", 0) or 0)
|
|
persisted_ids = result.get("persisted_ids") or []
|
|
total_candidates += count
|
|
total_persisted += len(persisted_ids)
|
|
if count:
|
|
per_interaction.append(
|
|
{
|
|
"interaction_id": iid,
|
|
"candidate_count": count,
|
|
"persisted_count": len(persisted_ids),
|
|
"project": item.get("project") or "",
|
|
}
|
|
)
|
|
|
|
return {
|
|
"processed": processed,
|
|
"total_candidates": total_candidates,
|
|
"total_persisted": total_persisted,
|
|
"persist": persist,
|
|
"errors": errors,
|
|
"interactions_with_candidates": per_interaction,
|
|
}
|
|
|
|
|
|
def run_triage(memory_type: str, project: str, limit: int) -> int:
|
|
"""Interactive review of candidate memories.
|
|
|
|
Loads the queue once, walks through entries, prompts for
|
|
(p)romote / (r)eject / (s)kip / (q)uit. Stateless between runs —
|
|
re-running picks up whatever is still status=candidate.
|
|
"""
|
|
query_parts = ["status=candidate"]
|
|
if memory_type:
|
|
query_parts.append(f"memory_type={urllib.parse.quote(memory_type)}")
|
|
if project:
|
|
query_parts.append(f"project={urllib.parse.quote(project)}")
|
|
query_parts.append(f"limit={int(limit)}")
|
|
listing = request("GET", "/memory?" + "&".join(query_parts))
|
|
memories = listing.get("memories", []) if isinstance(listing, dict) else []
|
|
|
|
if not memories:
|
|
print_json({"status": "empty_queue", "count": 0})
|
|
return 0
|
|
|
|
promoted = 0
|
|
rejected = 0
|
|
skipped = 0
|
|
stopped_early = False
|
|
|
|
print(f"Triage queue: {len(memories)} candidate(s)\n", file=sys.stderr)
|
|
for idx, mem in enumerate(memories, 1):
|
|
mid = mem.get("id", "")
|
|
print(f"[{idx}/{len(memories)}] {mem.get('memory_type','?')} project={mem.get('project','')} conf={mem.get('confidence','?')}", file=sys.stderr)
|
|
print(f" id: {mid}", file=sys.stderr)
|
|
print(f" {mem.get('content','')}", file=sys.stderr)
|
|
try:
|
|
choice = input(" (p)romote / (r)eject / (s)kip / (q)uit > ").strip().lower()
|
|
except EOFError:
|
|
stopped_early = True
|
|
break
|
|
if choice in {"q", "quit"}:
|
|
stopped_early = True
|
|
break
|
|
if choice in {"p", "promote"}:
|
|
request("POST", f"/memory/{urllib.parse.quote(mid, safe='')}/promote", {})
|
|
promoted += 1
|
|
print(" -> promoted", file=sys.stderr)
|
|
elif choice in {"r", "reject"}:
|
|
request("POST", f"/memory/{urllib.parse.quote(mid, safe='')}/reject", {})
|
|
rejected += 1
|
|
print(" -> rejected", file=sys.stderr)
|
|
else:
|
|
skipped += 1
|
|
print(" -> skipped", file=sys.stderr)
|
|
|
|
print_json(
|
|
{
|
|
"reviewed": promoted + rejected + skipped,
|
|
"promoted": promoted,
|
|
"rejected": rejected,
|
|
"skipped": skipped,
|
|
"stopped_early": stopped_early,
|
|
"remaining_in_queue": len(memories) - (promoted + rejected + skipped) - (1 if stopped_early else 0),
|
|
}
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|