diff --git a/scripts/detect_emerging.py b/scripts/detect_emerging.py index 8793c74..e08ca58 100644 --- a/scripts/detect_emerging.py +++ b/scripts/detect_emerging.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 -"""Phase 6 C.1 — Emerging-concepts detector. +"""Phase 6 C.1 — Emerging-concepts detector (HTTP-only). -Scans active + candidate memories to surface: +Scans active + candidate memories via the HTTP API to surface: 1. Unregistered projects — project strings appearing on 3+ memories that aren't in the project registry. Surface for one-click registration. 2. Emerging categories — top 20 domain_tags by frequency, for "what themes are emerging in my work?" intelligence. 3. Reinforced transients — active memories with reference_count >= 5 - AND valid_until set. These "were temporary but now durable"; - candidates for valid_until extension (handled by a sibling script). + AND valid_until set. These "were temporary but now durable"; a + sibling endpoint (/admin/memory/extend-reinforced) actually + performs the extension. -Writes results to project_state under atocore/proposals/*. Emits a -warning alert the FIRST time a project crosses the 5-memory threshold -(so the user gets notified without being spammed on every run). +Writes results to project_state under atocore/proposals/* via the API. +Runs host-side (cron calls it) so uses stdlib only — no atocore deps. Usage: python3 scripts/detect_emerging.py [--base-url URL] [--dry-run] @@ -25,47 +25,106 @@ import argparse import json import os import sys +import urllib.error +import urllib.request from collections import Counter, defaultdict -# src/ importable so we can reuse service helpers -_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -_SRC_DIR = os.path.abspath(os.path.join(_SCRIPT_DIR, "..", "src")) -if _SRC_DIR not in sys.path: - sys.path.insert(0, _SRC_DIR) - - PROJECT_MIN_MEMORIES = int(os.environ.get("ATOCORE_EMERGING_PROJECT_MIN", "3")) PROJECT_ALERT_THRESHOLD = int(os.environ.get("ATOCORE_EMERGING_ALERT_THRESHOLD", "5")) TOP_TAGS_LIMIT = int(os.environ.get("ATOCORE_EMERGING_TOP_TAGS", "20")) +def api_get(base_url: str, path: str, timeout: int = 30) -> dict: + req = urllib.request.Request(f"{base_url}{path}") + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def api_post(base_url: str, path: str, body: dict, timeout: int = 10) -> dict: + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + f"{base_url}{path}", method="POST", + headers={"Content-Type": "application/json"}, data=data, + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def fetch_registered_project_names(base_url: str) -> set[str]: + """Set of all registered project ids + aliases, lowercased.""" + try: + result = api_get(base_url, "/projects") + except Exception as e: + print(f"WARN: could not load project registry: {e}", file=sys.stderr) + return set() + registered = set() + for p in result.get("projects", []): + pid = (p.get("project_id") or p.get("id") or p.get("name") or "").strip() + if pid: + registered.add(pid.lower()) + for alias in p.get("aliases", []) or []: + if isinstance(alias, str) and alias.strip(): + registered.add(alias.strip().lower()) + return registered + + +def fetch_memories(base_url: str, status: str, limit: int = 500) -> list[dict]: + try: + params = f"limit={limit}" + if status == "active": + params += "&active_only=true" + else: + params += f"&status={status}" + result = api_get(base_url, f"/memory?{params}") + return result.get("memories", []) + except Exception as e: + print(f"WARN: could not fetch {status} memories: {e}", file=sys.stderr) + return [] + + +def fetch_previous_proposals(base_url: str) -> list[dict]: + """Read last run's unregistered_projects to diff against this run.""" + try: + result = api_get(base_url, "/project/state/atocore") + entries = result.get("entries", result.get("state", [])) + for e in entries: + if e.get("category") == "proposals" and e.get("key") == "unregistered_projects_prev": + try: + return json.loads(e.get("value") or "[]") + except Exception: + return [] + except Exception: + pass + return [] + + +def set_state(base_url: str, category: str, key: str, value: str, source: str = "emerging detector") -> None: + api_post(base_url, "/project/state", { + "project": "atocore", + "category": category, + "key": key, + "value": value, + "source": source, + }) + + def main() -> None: parser = argparse.ArgumentParser(description="Detect emerging projects + categories") parser.add_argument("--base-url", default=os.environ.get("ATOCORE_BASE_URL", "http://127.0.0.1:8100")) parser.add_argument("--dry-run", action="store_true", help="Report without writing to project state") args = parser.parse_args() - from atocore.memory.service import get_memories - from atocore.projects.registry import load_project_registry - from atocore.context.project_state import set_state, get_state + base = args.base_url.rstrip("/") - # Registered project ids (including aliases — a memory tagged 'p04' should - # NOT be flagged as emerging since 'p04' is a registered alias for p04-gigabit) - registered = set() - for p in load_project_registry(): - registered.add(p.project_id.lower()) - for alias in p.aliases: - registered.add(alias.lower()) - - # Pull active + candidate memories (give ourselves a broad view) - active = get_memories(active_only=True, limit=500) - candidates = get_memories(status="candidate", limit=500) - all_mems = list(active) + list(candidates) + registered = fetch_registered_project_names(base) + active = fetch_memories(base, "active") + candidates = fetch_memories(base, "candidate") + all_mems = active + candidates # --- Unregistered projects --- project_mems: dict[str, list] = defaultdict(list) for m in all_mems: - proj = (m.project or "").strip().lower() + proj = (m.get("project") or "").strip().lower() if not proj or proj in registered: continue project_mems[proj].append(m) @@ -77,37 +136,32 @@ def main() -> None: unregistered.append({ "project": proj, "count": len(mems), - "sample_memory_ids": [m.id for m in mems[:3]], - "sample_contents": [(m.content or "")[:150] for m in mems[:3]], + "sample_memory_ids": [m.get("id") for m in mems[:3]], + "sample_contents": [(m.get("content") or "")[:150] for m in mems[:3]], }) - # --- Emerging domain_tags (only active memories — candidates might be noise) --- - tag_counter = Counter() + # --- Emerging domain_tags (active only) --- + tag_counter: Counter = Counter() for m in active: - for t in (m.domain_tags or []): + for t in (m.get("domain_tags") or []): if isinstance(t, str) and t.strip(): tag_counter[t.strip().lower()] += 1 + emerging_tags = [{"tag": tag, "count": cnt} for tag, cnt in tag_counter.most_common(TOP_TAGS_LIMIT)] - emerging_tags = [ - {"tag": tag, "count": cnt} - for tag, cnt in tag_counter.most_common(TOP_TAGS_LIMIT) - ] - - # --- Reinforced transients --- + # --- Reinforced transients (active, high refs, has expiry) --- reinforced = [] for m in active: - ref_count = getattr(m, "reference_count", 0) or 0 - vu = (getattr(m, "valid_until", "") or "").strip() + ref_count = int(m.get("reference_count") or 0) + vu = (m.get("valid_until") or "").strip() if ref_count >= 5 and vu: reinforced.append({ - "memory_id": m.id, + "memory_id": m.get("id"), "reference_count": ref_count, "valid_until": vu, - "content_preview": (m.content or "")[:150], - "project": m.project or "", + "content_preview": (m.get("content") or "")[:150], + "project": m.get("project") or "", }) - # --- Output --- result = { "unregistered_projects": unregistered, "emerging_categories": emerging_tags, @@ -126,72 +180,41 @@ def main() -> None: if args.dry_run: return - # --- Persist to project state --- + # --- Persist to project state via HTTP --- try: - set_state( - project_name="atocore", - category="proposals", - key="unregistered_projects", - value=json.dumps(unregistered), - source="emerging detector", - ) - set_state( - project_name="atocore", - category="proposals", - key="emerging_categories", - value=json.dumps(emerging_tags), - source="emerging detector", - ) - set_state( - project_name="atocore", - category="proposals", - key="reinforced_transients", - value=json.dumps(reinforced), - source="emerging detector", - ) + set_state(base, "proposals", "unregistered_projects", json.dumps(unregistered)) + set_state(base, "proposals", "emerging_categories", json.dumps(emerging_tags)) + set_state(base, "proposals", "reinforced_transients", json.dumps(reinforced)) except Exception as e: - print(f"WARN: failed to persist to project state: {e}", file=sys.stderr) + print(f"WARN: failed to persist proposals: {e}", file=sys.stderr) - # --- Alert on NEW projects crossing alert threshold --- + # --- Alert on NEW projects crossing the threshold --- try: - # Read previous run's projects to detect "new" ones - prev_unregistered: list = [] - for e in get_state("atocore"): - if e.category == "proposals" and e.key == "unregistered_projects_prev": - try: - prev_unregistered = json.loads(e.value) - except Exception: - pass - prev_names = {p.get("project") for p in prev_unregistered if isinstance(p, dict)} - + prev = fetch_previous_proposals(base) + prev_names = {p.get("project") for p in prev if isinstance(p, dict)} newly_crossed = [ p for p in unregistered if p["count"] >= PROJECT_ALERT_THRESHOLD and p["project"] not in prev_names ] if newly_crossed: - from atocore.observability.alerts import emit_alert names = ", ".join(p["project"] for p in newly_crossed) - emit_alert( - severity="warning", - title=f"Emerging project(s) detected: {names}", - message=( - f"{len(newly_crossed)} unregistered project(s) have crossed " - f"the {PROJECT_ALERT_THRESHOLD}-memory threshold and may " - f"warrant registration: {names}. Review at /wiki or " - f"/admin/dashboard." - ), - context={"projects": [p["project"] for p in newly_crossed]}, - ) + # Use existing alert mechanism via state (Phase 4 infra) + try: + set_state(base, "alert", "last_warning", json.dumps({ + "title": f"Emerging project(s) detected: {names}", + "message": ( + f"{len(newly_crossed)} unregistered project(s) crossed " + f"the {PROJECT_ALERT_THRESHOLD}-memory threshold. " + f"Review at /wiki or /admin/dashboard." + ), + "timestamp": "", + })) + except Exception: + pass - # Persist this run's list for next-run comparison - set_state( - project_name="atocore", - category="proposals", - key="unregistered_projects_prev", - value=json.dumps(unregistered), - source="emerging detector", - ) + # Snapshot for next run's diff + set_state(base, "proposals", "unregistered_projects_prev", json.dumps(unregistered)) except Exception as e: print(f"WARN: alert/state write failed: {e}", file=sys.stderr)