fix: detector uses HTTP-only (host lacks atocore deps)
Same pattern as integrity_check.py — the host-side Python doesn't have pydantic_settings. Refactor detect_emerging.py to talk to the container via HTTP instead of importing atocore.memory.service. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,19 +1,19 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Phase 6 C.1 — Emerging-concepts detector.
|
||||
"""Phase 6 C.1 — Emerging-concepts detector (HTTP-only).
|
||||
|
||||
Scans active + candidate memories to surface:
|
||||
Scans active + candidate memories via the HTTP API to surface:
|
||||
1. Unregistered projects — project strings appearing on 3+ memories
|
||||
that aren't in the project registry. Surface for one-click
|
||||
registration.
|
||||
2. Emerging categories — top 20 domain_tags by frequency, for
|
||||
"what themes are emerging in my work?" intelligence.
|
||||
3. Reinforced transients — active memories with reference_count >= 5
|
||||
AND valid_until set. These "were temporary but now durable";
|
||||
candidates for valid_until extension (handled by a sibling script).
|
||||
AND valid_until set. These "were temporary but now durable"; a
|
||||
sibling endpoint (/admin/memory/extend-reinforced) actually
|
||||
performs the extension.
|
||||
|
||||
Writes results to project_state under atocore/proposals/*. Emits a
|
||||
warning alert the FIRST time a project crosses the 5-memory threshold
|
||||
(so the user gets notified without being spammed on every run).
|
||||
Writes results to project_state under atocore/proposals/* via the API.
|
||||
Runs host-side (cron calls it) so uses stdlib only — no atocore deps.
|
||||
|
||||
Usage:
|
||||
python3 scripts/detect_emerging.py [--base-url URL] [--dry-run]
|
||||
@@ -25,47 +25,106 @@ import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from collections import Counter, defaultdict
|
||||
|
||||
# src/ importable so we can reuse service helpers
|
||||
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
_SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src"))
|
||||
if _SRC_DIR not in sys.path:
|
||||
sys.path.insert(0, _SRC_DIR)
|
||||
|
||||
|
||||
PROJECT_MIN_MEMORIES = int(os.environ.get("ATOCORE_EMERGING_PROJECT_MIN", "3"))
|
||||
PROJECT_ALERT_THRESHOLD = int(os.environ.get("ATOCORE_EMERGING_ALERT_THRESHOLD", "5"))
|
||||
TOP_TAGS_LIMIT = int(os.environ.get("ATOCORE_EMERGING_TOP_TAGS", "20"))
|
||||
|
||||
|
||||
def api_get(base_url: str, path: str, timeout: int = 30) -> dict:
|
||||
req = urllib.request.Request(f"{base_url}{path}")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def api_post(base_url: str, path: str, body: dict, timeout: int = 10) -> dict:
|
||||
data = json.dumps(body).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{base_url}{path}", method="POST",
|
||||
headers={"Content-Type": "application/json"}, data=data,
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def fetch_registered_project_names(base_url: str) -> set[str]:
|
||||
"""Set of all registered project ids + aliases, lowercased."""
|
||||
try:
|
||||
result = api_get(base_url, "/projects")
|
||||
except Exception as e:
|
||||
print(f"WARN: could not load project registry: {e}", file=sys.stderr)
|
||||
return set()
|
||||
registered = set()
|
||||
for p in result.get("projects", []):
|
||||
pid = (p.get("project_id") or p.get("id") or p.get("name") or "").strip()
|
||||
if pid:
|
||||
registered.add(pid.lower())
|
||||
for alias in p.get("aliases", []) or []:
|
||||
if isinstance(alias, str) and alias.strip():
|
||||
registered.add(alias.strip().lower())
|
||||
return registered
|
||||
|
||||
|
||||
def fetch_memories(base_url: str, status: str, limit: int = 500) -> list[dict]:
|
||||
try:
|
||||
params = f"limit={limit}"
|
||||
if status == "active":
|
||||
params += "&active_only=true"
|
||||
else:
|
||||
params += f"&status={status}"
|
||||
result = api_get(base_url, f"/memory?{params}")
|
||||
return result.get("memories", [])
|
||||
except Exception as e:
|
||||
print(f"WARN: could not fetch {status} memories: {e}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
|
||||
def fetch_previous_proposals(base_url: str) -> list[dict]:
|
||||
"""Read last run's unregistered_projects to diff against this run."""
|
||||
try:
|
||||
result = api_get(base_url, "/project/state/atocore")
|
||||
entries = result.get("entries", result.get("state", []))
|
||||
for e in entries:
|
||||
if e.get("category") == "proposals" and e.get("key") == "unregistered_projects_prev":
|
||||
try:
|
||||
return json.loads(e.get("value") or "[]")
|
||||
except Exception:
|
||||
return []
|
||||
except Exception:
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def set_state(base_url: str, category: str, key: str, value: str, source: str = "emerging detector") -> None:
|
||||
api_post(base_url, "/project/state", {
|
||||
"project": "atocore",
|
||||
"category": category,
|
||||
"key": key,
|
||||
"value": value,
|
||||
"source": source,
|
||||
})
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Detect emerging projects + categories")
|
||||
parser.add_argument("--base-url", default=os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100"))
|
||||
parser.add_argument("--dry-run", action="store_true", help="Report without writing to project state")
|
||||
args = parser.parse_args()
|
||||
|
||||
from atocore.memory.service import get_memories
|
||||
from atocore.projects.registry import load_project_registry
|
||||
from atocore.context.project_state import set_state, get_state
|
||||
base = args.base_url.rstrip("/")
|
||||
|
||||
# Registered project ids (including aliases — a memory tagged 'p04' should
|
||||
# NOT be flagged as emerging since 'p04' is a registered alias for p04-gigabit)
|
||||
registered = set()
|
||||
for p in load_project_registry():
|
||||
registered.add(p.project_id.lower())
|
||||
for alias in p.aliases:
|
||||
registered.add(alias.lower())
|
||||
|
||||
# Pull active + candidate memories (give ourselves a broad view)
|
||||
active = get_memories(active_only=True, limit=500)
|
||||
candidates = get_memories(status="candidate", limit=500)
|
||||
all_mems = list(active) + list(candidates)
|
||||
registered = fetch_registered_project_names(base)
|
||||
active = fetch_memories(base, "active")
|
||||
candidates = fetch_memories(base, "candidate")
|
||||
all_mems = active + candidates
|
||||
|
||||
# --- Unregistered projects ---
|
||||
project_mems: dict[str, list] = defaultdict(list)
|
||||
for m in all_mems:
|
||||
proj = (m.project or "").strip().lower()
|
||||
proj = (m.get("project") or "").strip().lower()
|
||||
if not proj or proj in registered:
|
||||
continue
|
||||
project_mems[proj].append(m)
|
||||
@@ -77,37 +136,32 @@ def main() -> None:
|
||||
unregistered.append({
|
||||
"project": proj,
|
||||
"count": len(mems),
|
||||
"sample_memory_ids": [m.id for m in mems[:3]],
|
||||
"sample_contents": [(m.content or "")[:150] for m in mems[:3]],
|
||||
"sample_memory_ids": [m.get("id") for m in mems[:3]],
|
||||
"sample_contents": [(m.get("content") or "")[:150] for m in mems[:3]],
|
||||
})
|
||||
|
||||
# --- Emerging domain_tags (only active memories — candidates might be noise) ---
|
||||
tag_counter = Counter()
|
||||
# --- Emerging domain_tags (active only) ---
|
||||
tag_counter: Counter = Counter()
|
||||
for m in active:
|
||||
for t in (m.domain_tags or []):
|
||||
for t in (m.get("domain_tags") or []):
|
||||
if isinstance(t, str) and t.strip():
|
||||
tag_counter[t.strip().lower()] += 1
|
||||
emerging_tags = [{"tag": tag, "count": cnt} for tag, cnt in tag_counter.most_common(TOP_TAGS_LIMIT)]
|
||||
|
||||
emerging_tags = [
|
||||
{"tag": tag, "count": cnt}
|
||||
for tag, cnt in tag_counter.most_common(TOP_TAGS_LIMIT)
|
||||
]
|
||||
|
||||
# --- Reinforced transients ---
|
||||
# --- Reinforced transients (active, high refs, has expiry) ---
|
||||
reinforced = []
|
||||
for m in active:
|
||||
ref_count = getattr(m, "reference_count", 0) or 0
|
||||
vu = (getattr(m, "valid_until", "") or "").strip()
|
||||
ref_count = int(m.get("reference_count") or 0)
|
||||
vu = (m.get("valid_until") or "").strip()
|
||||
if ref_count >= 5 and vu:
|
||||
reinforced.append({
|
||||
"memory_id": m.id,
|
||||
"memory_id": m.get("id"),
|
||||
"reference_count": ref_count,
|
||||
"valid_until": vu,
|
||||
"content_preview": (m.content or "")[:150],
|
||||
"project": m.project or "",
|
||||
"content_preview": (m.get("content") or "")[:150],
|
||||
"project": m.get("project") or "",
|
||||
})
|
||||
|
||||
# --- Output ---
|
||||
result = {
|
||||
"unregistered_projects": unregistered,
|
||||
"emerging_categories": emerging_tags,
|
||||
@@ -126,72 +180,41 @@ def main() -> None:
|
||||
if args.dry_run:
|
||||
return
|
||||
|
||||
# --- Persist to project state ---
|
||||
# --- Persist to project state via HTTP ---
|
||||
try:
|
||||
set_state(
|
||||
project_name="atocore",
|
||||
category="proposals",
|
||||
key="unregistered_projects",
|
||||
value=json.dumps(unregistered),
|
||||
source="emerging detector",
|
||||
)
|
||||
set_state(
|
||||
project_name="atocore",
|
||||
category="proposals",
|
||||
key="emerging_categories",
|
||||
value=json.dumps(emerging_tags),
|
||||
source="emerging detector",
|
||||
)
|
||||
set_state(
|
||||
project_name="atocore",
|
||||
category="proposals",
|
||||
key="reinforced_transients",
|
||||
value=json.dumps(reinforced),
|
||||
source="emerging detector",
|
||||
)
|
||||
set_state(base, "proposals", "unregistered_projects", json.dumps(unregistered))
|
||||
set_state(base, "proposals", "emerging_categories", json.dumps(emerging_tags))
|
||||
set_state(base, "proposals", "reinforced_transients", json.dumps(reinforced))
|
||||
except Exception as e:
|
||||
print(f"WARN: failed to persist to project state: {e}", file=sys.stderr)
|
||||
print(f"WARN: failed to persist proposals: {e}", file=sys.stderr)
|
||||
|
||||
# --- Alert on NEW projects crossing alert threshold ---
|
||||
# --- Alert on NEW projects crossing the threshold ---
|
||||
try:
|
||||
# Read previous run's projects to detect "new" ones
|
||||
prev_unregistered: list = []
|
||||
for e in get_state("atocore"):
|
||||
if e.category == "proposals" and e.key == "unregistered_projects_prev":
|
||||
try:
|
||||
prev_unregistered = json.loads(e.value)
|
||||
except Exception:
|
||||
pass
|
||||
prev_names = {p.get("project") for p in prev_unregistered if isinstance(p, dict)}
|
||||
|
||||
prev = fetch_previous_proposals(base)
|
||||
prev_names = {p.get("project") for p in prev if isinstance(p, dict)}
|
||||
newly_crossed = [
|
||||
p for p in unregistered
|
||||
if p["count"] >= PROJECT_ALERT_THRESHOLD
|
||||
and p["project"] not in prev_names
|
||||
]
|
||||
if newly_crossed:
|
||||
from atocore.observability.alerts import emit_alert
|
||||
names = ", ".join(p["project"] for p in newly_crossed)
|
||||
emit_alert(
|
||||
severity="warning",
|
||||
title=f"Emerging project(s) detected: {names}",
|
||||
message=(
|
||||
f"{len(newly_crossed)} unregistered project(s) have crossed "
|
||||
f"the {PROJECT_ALERT_THRESHOLD}-memory threshold and may "
|
||||
f"warrant registration: {names}. Review at /wiki or "
|
||||
f"/admin/dashboard."
|
||||
),
|
||||
context={"projects": [p["project"] for p in newly_crossed]},
|
||||
)
|
||||
# Use existing alert mechanism via state (Phase 4 infra)
|
||||
try:
|
||||
set_state(base, "alert", "last_warning", json.dumps({
|
||||
"title": f"Emerging project(s) detected: {names}",
|
||||
"message": (
|
||||
f"{len(newly_crossed)} unregistered project(s) crossed "
|
||||
f"the {PROJECT_ALERT_THRESHOLD}-memory threshold. "
|
||||
f"Review at /wiki or /admin/dashboard."
|
||||
),
|
||||
"timestamp": "",
|
||||
}))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Persist this run's list for next-run comparison
|
||||
set_state(
|
||||
project_name="atocore",
|
||||
category="proposals",
|
||||
key="unregistered_projects_prev",
|
||||
value=json.dumps(unregistered),
|
||||
source="emerging detector",
|
||||
)
|
||||
# Snapshot for next run's diff
|
||||
set_state(base, "proposals", "unregistered_projects_prev", json.dumps(unregistered))
|
||||
except Exception as e:
|
||||
print(f"WARN: alert/state write failed: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user