feat: Integrate Learning Atomizer Core (LAC) and master instructions
Add persistent knowledge system that enables Atomizer to learn from every session and improve over time. ## New Files - knowledge_base/lac.py: LAC class with optimization memory, session insights, and skill evolution tracking - knowledge_base/__init__.py: Package initialization - .claude/skills/modules/learning-atomizer-core.md: Full LAC skill documentation - docs/07_DEVELOPMENT/ATOMIZER_CLAUDE_CODE_INSTRUCTIONS.md: Master instructions ## Updated Files - CLAUDE.md: Added LAC section, communication style, AVERVS execution framework, error classification, and "Atomizer Claude" identity - 00_BOOTSTRAP.md: Added session startup/closing checklists with LAC integration - 01_CHEATSHEET.md: Added LAC CLI and Python API quick reference - 02_CONTEXT_LOADER.md: Added LAC query section and anti-pattern ## LAC Features - Query similar past optimizations before starting new ones - Record insights (failures, success patterns, workarounds) - Record optimization outcomes for future reference - Suggest protocol improvements based on discoveries - Simple JSONL storage (no database required) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
17
knowledge_base/__init__.py
Normal file
17
knowledge_base/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
||||
"""
|
||||
Knowledge Base Package
|
||||
======================
|
||||
|
||||
Provides persistent knowledge storage for Atomizer including:
|
||||
- LAC (Learning Atomizer Core): Session insights, optimization memory, skill evolution
|
||||
- Research sessions: Detailed research logs
|
||||
"""
|
||||
|
||||
from .lac import LearningAtomizerCore, get_lac, record_insight, query_insights
|
||||
|
||||
__all__ = [
|
||||
"LearningAtomizerCore",
|
||||
"get_lac",
|
||||
"record_insight",
|
||||
"query_insights",
|
||||
]
|
||||
814
knowledge_base/lac.py
Normal file
814
knowledge_base/lac.py
Normal file
@@ -0,0 +1,814 @@
|
||||
"""
|
||||
Learning Atomizer Core (LAC)
|
||||
============================
|
||||
|
||||
Persistent knowledge store that grows with every Claude Code session.
|
||||
Enables Atomizer to learn from experience and improve over time.
|
||||
|
||||
Usage:
|
||||
from knowledge_base.lac import LearningAtomizerCore
|
||||
|
||||
lac = LearningAtomizerCore()
|
||||
|
||||
# Record what worked
|
||||
lac.record_optimization_outcome(
|
||||
study_name="bracket_v1",
|
||||
geometry_type="bracket",
|
||||
method="TPE",
|
||||
objectives=["mass"],
|
||||
design_vars=3,
|
||||
trials=100,
|
||||
converged=True,
|
||||
convergence_trial=67,
|
||||
notes="Good convergence with default settings"
|
||||
)
|
||||
|
||||
# Record an insight
|
||||
lac.record_insight(
|
||||
category="success_pattern",
|
||||
context="Bracket optimization",
|
||||
insight="TPE with 20 startup trials converges faster",
|
||||
confidence=0.85
|
||||
)
|
||||
|
||||
# Query before starting new work
|
||||
similar = lac.query_similar_optimizations("bracket", ["mass"])
|
||||
insights = lac.get_relevant_insights("bracket mass optimization")
|
||||
|
||||
Protocol Reference:
|
||||
- SYS_12: Extractor Library (for extraction learnings)
|
||||
- SYS_15: Method Selector (for algorithm learnings)
|
||||
|
||||
Author: Atomizer Claude
|
||||
Created: 2025-12-11
|
||||
Version: 1.0
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LearningAtomizerCore:
|
||||
"""
|
||||
Simple persistent knowledge store that grows with every session.
|
||||
|
||||
No database needed - just structured JSON Lines files that Claude
|
||||
can read and write efficiently.
|
||||
|
||||
Directory Structure:
|
||||
knowledge_base/lac/
|
||||
├── optimization_memory/ # What worked for what geometry
|
||||
│ ├── bracket.jsonl
|
||||
│ ├── beam.jsonl
|
||||
│ └── mirror.jsonl
|
||||
├── session_insights/ # Learnings from sessions
|
||||
│ ├── failure.jsonl
|
||||
│ ├── success_pattern.jsonl
|
||||
│ ├── user_preference.jsonl
|
||||
│ └── protocol_clarification.jsonl
|
||||
└── skill_evolution/ # Protocol improvements
|
||||
└── suggested_updates.jsonl
|
||||
|
||||
Attributes:
|
||||
root: Base path for LAC storage
|
||||
optimization_memory: Path to optimization outcome storage
|
||||
session_insights: Path to insight storage
|
||||
skill_evolution: Path to skill improvement suggestions
|
||||
"""
|
||||
|
||||
def __init__(self, root: Optional[Path] = None):
|
||||
"""
|
||||
Initialize LAC with storage directories.
|
||||
|
||||
Args:
|
||||
root: Base path for LAC storage. Defaults to knowledge_base/lac/
|
||||
relative to the Atomizer project root.
|
||||
"""
|
||||
if root is None:
|
||||
# Try to find Atomizer root
|
||||
current = Path.cwd()
|
||||
while current != current.parent:
|
||||
if (current / "CLAUDE.md").exists():
|
||||
root = current / "knowledge_base" / "lac"
|
||||
break
|
||||
current = current.parent
|
||||
else:
|
||||
root = Path("knowledge_base/lac")
|
||||
|
||||
self.root = Path(root)
|
||||
self.optimization_memory = self.root / "optimization_memory"
|
||||
self.session_insights = self.root / "session_insights"
|
||||
self.skill_evolution = self.root / "skill_evolution"
|
||||
|
||||
# Create directories
|
||||
for d in [self.optimization_memory, self.session_insights, self.skill_evolution]:
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
logger.info(f"LAC initialized at {self.root}")
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
# OPTIMIZATION MEMORY - What works for what
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
|
||||
def record_optimization_outcome(
|
||||
self,
|
||||
study_name: str,
|
||||
geometry_type: str,
|
||||
method: str,
|
||||
objectives: List[str],
|
||||
design_vars: int,
|
||||
trials: int,
|
||||
converged: bool,
|
||||
convergence_trial: Optional[int] = None,
|
||||
best_value: Optional[float] = None,
|
||||
best_params: Optional[Dict[str, float]] = None,
|
||||
notes: str = ""
|
||||
) -> None:
|
||||
"""
|
||||
Record what happened in an optimization for future reference.
|
||||
|
||||
This builds a history of what methods work for what types of
|
||||
problems, enabling better algorithm selection in the future.
|
||||
|
||||
Args:
|
||||
study_name: Name of the study (e.g., "bracket_v1")
|
||||
geometry_type: Type of geometry (e.g., "bracket", "beam", "mirror")
|
||||
method: Optimization method used (e.g., "TPE", "CMA-ES", "NSGA-II")
|
||||
objectives: List of objective names
|
||||
design_vars: Number of design variables
|
||||
trials: Total number of trials run
|
||||
converged: Whether optimization converged satisfactorily
|
||||
convergence_trial: Trial number where convergence was achieved
|
||||
best_value: Best objective value found
|
||||
best_params: Best parameter values found
|
||||
notes: Any additional notes about the optimization
|
||||
|
||||
Example:
|
||||
>>> lac.record_optimization_outcome(
|
||||
... study_name="bracket_mass_opt",
|
||||
... geometry_type="bracket",
|
||||
... method="TPE",
|
||||
... objectives=["mass"],
|
||||
... design_vars=3,
|
||||
... trials=100,
|
||||
... converged=True,
|
||||
... convergence_trial=67,
|
||||
... best_value=2.34,
|
||||
... notes="Good convergence with default settings"
|
||||
... )
|
||||
"""
|
||||
record = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"study_name": study_name,
|
||||
"geometry_type": geometry_type,
|
||||
"method": method,
|
||||
"objectives": objectives,
|
||||
"n_objectives": len(objectives),
|
||||
"design_vars": design_vars,
|
||||
"trials": trials,
|
||||
"converged": converged,
|
||||
"convergence_trial": convergence_trial,
|
||||
"convergence_ratio": convergence_trial / trials if convergence_trial else None,
|
||||
"best_value": best_value,
|
||||
"best_params": best_params,
|
||||
"notes": notes
|
||||
}
|
||||
|
||||
# Append to geometry-type specific file
|
||||
file_path = self.optimization_memory / f"{geometry_type.lower()}.jsonl"
|
||||
with open(file_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(record) + "\n")
|
||||
|
||||
logger.info(f"Recorded optimization outcome for {study_name} -> {file_path}")
|
||||
|
||||
def query_similar_optimizations(
|
||||
self,
|
||||
geometry_type: str,
|
||||
objectives: Optional[List[str]] = None,
|
||||
design_var_range: Optional[Tuple[int, int]] = None,
|
||||
method: Optional[str] = None,
|
||||
converged_only: bool = False,
|
||||
limit: int = 10
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Find past optimizations similar to what we're about to run.
|
||||
|
||||
Use this before starting a new optimization to learn from
|
||||
past experience with similar problems.
|
||||
|
||||
Args:
|
||||
geometry_type: Type of geometry to search for
|
||||
objectives: Filter by objective names (partial match)
|
||||
design_var_range: Filter by design variable count (min, max)
|
||||
method: Filter by optimization method
|
||||
converged_only: Only return converged optimizations
|
||||
limit: Maximum number of results
|
||||
|
||||
Returns:
|
||||
List of matching optimization records, sorted by recency
|
||||
|
||||
Example:
|
||||
>>> similar = lac.query_similar_optimizations(
|
||||
... geometry_type="bracket",
|
||||
... objectives=["mass"],
|
||||
... converged_only=True
|
||||
... )
|
||||
>>> for opt in similar:
|
||||
... print(f"{opt['study_name']}: {opt['method']} - {opt['trials']} trials")
|
||||
"""
|
||||
results = []
|
||||
file_path = self.optimization_memory / f"{geometry_type.lower()}.jsonl"
|
||||
|
||||
if not file_path.exists():
|
||||
logger.debug(f"No history for geometry type: {geometry_type}")
|
||||
return []
|
||||
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
if not line.strip():
|
||||
continue
|
||||
record = json.loads(line)
|
||||
|
||||
# Apply filters
|
||||
if converged_only and not record.get("converged"):
|
||||
continue
|
||||
|
||||
if method and record.get("method") != method:
|
||||
continue
|
||||
|
||||
if design_var_range:
|
||||
dv = record.get("design_vars", 0)
|
||||
if not (design_var_range[0] <= dv <= design_var_range[1]):
|
||||
continue
|
||||
|
||||
if objectives:
|
||||
record_objs = set(record.get("objectives", []))
|
||||
if not any(obj in record_objs for obj in objectives):
|
||||
continue
|
||||
|
||||
results.append(record)
|
||||
|
||||
# Sort by timestamp (most recent first) and limit
|
||||
results.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
|
||||
return results[:limit]
|
||||
|
||||
def get_best_method_for(
|
||||
self,
|
||||
geometry_type: str,
|
||||
n_objectives: int = 1,
|
||||
design_vars: Optional[int] = None
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get the best-performing method for a given problem type.
|
||||
|
||||
Analyzes historical data to recommend the method with the
|
||||
best convergence rate for similar problems.
|
||||
|
||||
Args:
|
||||
geometry_type: Type of geometry
|
||||
n_objectives: Number of objectives (1 for single, 2+ for multi)
|
||||
design_vars: Number of design variables (for similarity matching)
|
||||
|
||||
Returns:
|
||||
Dict with recommended method and statistics, or None if no data
|
||||
|
||||
Example:
|
||||
>>> rec = lac.get_best_method_for("bracket", n_objectives=1)
|
||||
>>> print(f"Recommended: {rec['method']} (success rate: {rec['success_rate']:.0%})")
|
||||
"""
|
||||
all_records = self.query_similar_optimizations(
|
||||
geometry_type=geometry_type,
|
||||
limit=100
|
||||
)
|
||||
|
||||
if not all_records:
|
||||
return None
|
||||
|
||||
# Filter by n_objectives
|
||||
filtered = [r for r in all_records if r.get("n_objectives", 1) == n_objectives]
|
||||
|
||||
if not filtered:
|
||||
filtered = all_records # Fall back to all records
|
||||
|
||||
# Group by method and calculate success rates
|
||||
method_stats = {}
|
||||
for record in filtered:
|
||||
method = record.get("method", "unknown")
|
||||
if method not in method_stats:
|
||||
method_stats[method] = {"total": 0, "converged": 0, "avg_convergence_ratio": []}
|
||||
|
||||
method_stats[method]["total"] += 1
|
||||
if record.get("converged"):
|
||||
method_stats[method]["converged"] += 1
|
||||
if record.get("convergence_ratio"):
|
||||
method_stats[method]["avg_convergence_ratio"].append(record["convergence_ratio"])
|
||||
|
||||
# Find best method
|
||||
best_method = None
|
||||
best_score = -1
|
||||
|
||||
for method, stats in method_stats.items():
|
||||
if stats["total"] < 2: # Need at least 2 data points
|
||||
continue
|
||||
|
||||
success_rate = stats["converged"] / stats["total"]
|
||||
avg_ratio = (
|
||||
sum(stats["avg_convergence_ratio"]) / len(stats["avg_convergence_ratio"])
|
||||
if stats["avg_convergence_ratio"] else 1.0
|
||||
)
|
||||
|
||||
# Score = success_rate * (1 - avg_convergence_ratio)
|
||||
# Higher success rate and faster convergence = higher score
|
||||
score = success_rate * (1 - avg_ratio * 0.5)
|
||||
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_method = {
|
||||
"method": method,
|
||||
"success_rate": success_rate,
|
||||
"avg_convergence_ratio": avg_ratio,
|
||||
"sample_size": stats["total"],
|
||||
"score": score
|
||||
}
|
||||
|
||||
return best_method
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
# SESSION INSIGHTS - What we learned today
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
|
||||
def record_insight(
|
||||
self,
|
||||
category: str,
|
||||
context: str,
|
||||
insight: str,
|
||||
confidence: float = 0.7,
|
||||
tags: Optional[List[str]] = None
|
||||
) -> None:
|
||||
"""
|
||||
Record something learned during a session.
|
||||
|
||||
Categories:
|
||||
- failure: Something that failed and why
|
||||
- success_pattern: An approach that worked well
|
||||
- user_preference: Something the user prefers
|
||||
- protocol_clarification: A protocol that needed clarification
|
||||
- performance: Performance-related observation
|
||||
- workaround: A workaround for a known issue
|
||||
|
||||
Args:
|
||||
category: Type of insight (see above)
|
||||
context: Situation where this was learned
|
||||
insight: The actual learning
|
||||
confidence: How confident we are (0.0-1.0)
|
||||
tags: Optional tags for filtering
|
||||
|
||||
Example:
|
||||
>>> lac.record_insight(
|
||||
... category="success_pattern",
|
||||
... context="Bracket optimization with 5+ design variables",
|
||||
... insight="CMA-ES outperforms TPE when design_vars > 5",
|
||||
... confidence=0.85,
|
||||
... tags=["method_selection", "cma-es", "tpe"]
|
||||
... )
|
||||
"""
|
||||
valid_categories = [
|
||||
"failure", "success_pattern", "user_preference",
|
||||
"protocol_clarification", "performance", "workaround"
|
||||
]
|
||||
|
||||
if category not in valid_categories:
|
||||
logger.warning(f"Unknown category '{category}'. Using 'success_pattern'.")
|
||||
category = "success_pattern"
|
||||
|
||||
record = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"category": category,
|
||||
"context": context,
|
||||
"insight": insight,
|
||||
"confidence": min(max(confidence, 0.0), 1.0),
|
||||
"tags": tags or []
|
||||
}
|
||||
|
||||
file_path = self.session_insights / f"{category}.jsonl"
|
||||
with open(file_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(record) + "\n")
|
||||
|
||||
logger.info(f"Recorded {category} insight: {insight[:50]}...")
|
||||
|
||||
def get_relevant_insights(
|
||||
self,
|
||||
context: str,
|
||||
categories: Optional[List[str]] = None,
|
||||
min_confidence: float = 0.5,
|
||||
limit: int = 5
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get insights relevant to current context.
|
||||
|
||||
Uses simple keyword matching to find relevant insights.
|
||||
Future versions could use embeddings for better matching.
|
||||
|
||||
Args:
|
||||
context: Current context to match against
|
||||
categories: Filter by categories (None = all)
|
||||
min_confidence: Minimum confidence threshold
|
||||
limit: Maximum results to return
|
||||
|
||||
Returns:
|
||||
List of relevant insights, sorted by relevance score
|
||||
|
||||
Example:
|
||||
>>> insights = lac.get_relevant_insights(
|
||||
... "bracket stress optimization",
|
||||
... min_confidence=0.7
|
||||
... )
|
||||
>>> for ins in insights:
|
||||
... print(f"- {ins['insight']}")
|
||||
"""
|
||||
all_insights = []
|
||||
|
||||
# Determine which files to search
|
||||
if categories:
|
||||
files = [self.session_insights / f"{cat}.jsonl" for cat in categories]
|
||||
else:
|
||||
files = list(self.session_insights.glob("*.jsonl"))
|
||||
|
||||
# Load all insights
|
||||
for file_path in files:
|
||||
if not file_path.exists():
|
||||
continue
|
||||
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
if not line.strip():
|
||||
continue
|
||||
record = json.loads(line)
|
||||
if record.get("confidence", 0) >= min_confidence:
|
||||
all_insights.append(record)
|
||||
|
||||
if not all_insights:
|
||||
return []
|
||||
|
||||
# Score by context overlap (simple keyword matching)
|
||||
context_words = set(context.lower().split())
|
||||
scored = []
|
||||
|
||||
for insight in all_insights:
|
||||
insight_text = f"{insight.get('context', '')} {insight.get('insight', '')}"
|
||||
insight_words = set(insight_text.lower().split())
|
||||
|
||||
# Also check tags
|
||||
tags = set(t.lower() for t in insight.get("tags", []))
|
||||
|
||||
# Calculate overlap
|
||||
word_overlap = len(context_words & insight_words)
|
||||
tag_overlap = len(context_words & tags) * 2 # Tags worth more
|
||||
|
||||
total_score = word_overlap + tag_overlap
|
||||
|
||||
if total_score > 0:
|
||||
# Weight by confidence
|
||||
weighted_score = total_score * insight.get("confidence", 0.5)
|
||||
scored.append((weighted_score, insight))
|
||||
|
||||
# Sort by score and return top results
|
||||
scored.sort(reverse=True, key=lambda x: x[0])
|
||||
return [s[1] for s in scored[:limit]]
|
||||
|
||||
def get_insights_by_category(
|
||||
self,
|
||||
category: str,
|
||||
limit: int = 20
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all insights of a specific category.
|
||||
|
||||
Args:
|
||||
category: Category to retrieve
|
||||
limit: Maximum results
|
||||
|
||||
Returns:
|
||||
List of insights, most recent first
|
||||
"""
|
||||
file_path = self.session_insights / f"{category}.jsonl"
|
||||
|
||||
if not file_path.exists():
|
||||
return []
|
||||
|
||||
insights = []
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
insights.append(json.loads(line))
|
||||
|
||||
# Sort by timestamp (most recent first)
|
||||
insights.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
|
||||
return insights[:limit]
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
# SKILL EVOLUTION - How protocols should improve
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
|
||||
def suggest_protocol_update(
|
||||
self,
|
||||
protocol: str,
|
||||
section: str,
|
||||
current_text: str,
|
||||
suggested_text: str,
|
||||
reason: str
|
||||
) -> None:
|
||||
"""
|
||||
Suggest an improvement to a protocol based on experience.
|
||||
|
||||
These suggestions are stored for review. A human or admin
|
||||
session can review and apply them.
|
||||
|
||||
Args:
|
||||
protocol: Protocol identifier (e.g., "OP_01", "SYS_12")
|
||||
section: Section within the protocol
|
||||
current_text: Current text that needs updating
|
||||
suggested_text: Proposed replacement text
|
||||
reason: Why this change is suggested
|
||||
|
||||
Example:
|
||||
>>> lac.suggest_protocol_update(
|
||||
... protocol="SYS_15_METHOD_SELECTOR.md",
|
||||
... section="Modal Optimization",
|
||||
... current_text="Use TPE or CMA-ES for frequency optimization",
|
||||
... suggested_text="Use TPE for frequency optimization. CMA-ES struggles with discrete targets.",
|
||||
... reason="Discovered during bracket_modal study - CMA-ES failed to converge on frequency target"
|
||||
... )
|
||||
"""
|
||||
record = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"protocol": protocol,
|
||||
"section": section,
|
||||
"current_text": current_text,
|
||||
"suggested_text": suggested_text,
|
||||
"reason": reason,
|
||||
"status": "pending" # pending, approved, rejected, applied
|
||||
}
|
||||
|
||||
file_path = self.skill_evolution / "suggested_updates.jsonl"
|
||||
with open(file_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(record) + "\n")
|
||||
|
||||
logger.info(f"Suggested update to {protocol}: {reason[:50]}...")
|
||||
|
||||
def get_pending_updates(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all pending protocol updates for review.
|
||||
|
||||
Returns:
|
||||
List of pending update suggestions
|
||||
"""
|
||||
file_path = self.skill_evolution / "suggested_updates.jsonl"
|
||||
|
||||
if not file_path.exists():
|
||||
return []
|
||||
|
||||
pending = []
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
if not line.strip():
|
||||
continue
|
||||
record = json.loads(line)
|
||||
if record.get("status") == "pending":
|
||||
pending.append(record)
|
||||
|
||||
return pending
|
||||
|
||||
def update_suggestion_status(
|
||||
self,
|
||||
timestamp: str,
|
||||
new_status: str,
|
||||
notes: str = ""
|
||||
) -> bool:
|
||||
"""
|
||||
Update the status of a suggestion.
|
||||
|
||||
Args:
|
||||
timestamp: Timestamp of the suggestion to update
|
||||
new_status: New status (approved, rejected, applied)
|
||||
notes: Optional notes about the decision
|
||||
|
||||
Returns:
|
||||
True if updated, False if not found
|
||||
"""
|
||||
file_path = self.skill_evolution / "suggested_updates.jsonl"
|
||||
|
||||
if not file_path.exists():
|
||||
return False
|
||||
|
||||
# Read all records
|
||||
records = []
|
||||
found = False
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
if not line.strip():
|
||||
continue
|
||||
record = json.loads(line)
|
||||
if record.get("timestamp") == timestamp:
|
||||
record["status"] = new_status
|
||||
record["status_updated"] = datetime.now().isoformat()
|
||||
record["status_notes"] = notes
|
||||
found = True
|
||||
records.append(record)
|
||||
|
||||
if not found:
|
||||
return False
|
||||
|
||||
# Write back
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
for record in records:
|
||||
f.write(json.dumps(record) + "\n")
|
||||
|
||||
return True
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
# STATISTICS & REPORTING
|
||||
# ══════════════════════════════════════════════════════════════════
|
||||
|
||||
def get_statistics(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get overall LAC statistics.
|
||||
|
||||
Returns:
|
||||
Dict with counts and summaries
|
||||
"""
|
||||
stats = {
|
||||
"optimization_memory": {},
|
||||
"session_insights": {},
|
||||
"skill_evolution": {
|
||||
"pending_updates": 0,
|
||||
"approved": 0,
|
||||
"rejected": 0,
|
||||
"applied": 0
|
||||
}
|
||||
}
|
||||
|
||||
# Count optimization records by geometry type
|
||||
for file_path in self.optimization_memory.glob("*.jsonl"):
|
||||
count = sum(1 for line in open(file_path) if line.strip())
|
||||
stats["optimization_memory"][file_path.stem] = count
|
||||
|
||||
# Count insights by category
|
||||
for file_path in self.session_insights.glob("*.jsonl"):
|
||||
count = sum(1 for line in open(file_path) if line.strip())
|
||||
stats["session_insights"][file_path.stem] = count
|
||||
|
||||
# Count skill evolution by status
|
||||
updates_file = self.skill_evolution / "suggested_updates.jsonl"
|
||||
if updates_file.exists():
|
||||
with open(updates_file) as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
record = json.loads(line)
|
||||
status = record.get("status", "pending")
|
||||
if status in stats["skill_evolution"]:
|
||||
stats["skill_evolution"][status] += 1
|
||||
|
||||
return stats
|
||||
|
||||
def generate_report(self) -> str:
|
||||
"""
|
||||
Generate a human-readable LAC report.
|
||||
|
||||
Returns:
|
||||
Markdown-formatted report string
|
||||
"""
|
||||
stats = self.get_statistics()
|
||||
|
||||
lines = [
|
||||
"# Learning Atomizer Core (LAC) Report",
|
||||
f"Generated: {datetime.now().isoformat()}",
|
||||
"",
|
||||
"## Optimization Memory",
|
||||
""
|
||||
]
|
||||
|
||||
total_opts = 0
|
||||
for geom_type, count in stats["optimization_memory"].items():
|
||||
lines.append(f"- {geom_type}: {count} records")
|
||||
total_opts += count
|
||||
lines.append(f"\n**Total**: {total_opts} optimization records")
|
||||
|
||||
lines.extend([
|
||||
"",
|
||||
"## Session Insights",
|
||||
""
|
||||
])
|
||||
|
||||
total_insights = 0
|
||||
for category, count in stats["session_insights"].items():
|
||||
lines.append(f"- {category}: {count} insights")
|
||||
total_insights += count
|
||||
lines.append(f"\n**Total**: {total_insights} insights")
|
||||
|
||||
lines.extend([
|
||||
"",
|
||||
"## Skill Evolution",
|
||||
"",
|
||||
f"- Pending updates: {stats['skill_evolution']['pending_updates']}",
|
||||
f"- Approved: {stats['skill_evolution']['approved']}",
|
||||
f"- Applied: {stats['skill_evolution']['applied']}",
|
||||
f"- Rejected: {stats['skill_evolution']['rejected']}",
|
||||
])
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# CONVENIENCE FUNCTIONS
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
_default_lac: Optional[LearningAtomizerCore] = None
|
||||
|
||||
|
||||
def get_lac() -> LearningAtomizerCore:
|
||||
"""
|
||||
Get the default LAC instance (singleton pattern).
|
||||
|
||||
Returns:
|
||||
Default LearningAtomizerCore instance
|
||||
"""
|
||||
global _default_lac
|
||||
if _default_lac is None:
|
||||
_default_lac = LearningAtomizerCore()
|
||||
return _default_lac
|
||||
|
||||
|
||||
def record_insight(category: str, context: str, insight: str, confidence: float = 0.7) -> None:
|
||||
"""Convenience function to record an insight."""
|
||||
get_lac().record_insight(category, context, insight, confidence)
|
||||
|
||||
|
||||
def query_insights(context: str, limit: int = 5) -> List[Dict[str, Any]]:
|
||||
"""Convenience function to query insights."""
|
||||
return get_lac().get_relevant_insights(context, limit=limit)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# CLI INTERFACE
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
lac = LearningAtomizerCore()
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python lac.py <command> [args]")
|
||||
print("\nCommands:")
|
||||
print(" stats - Show LAC statistics")
|
||||
print(" report - Generate full report")
|
||||
print(" pending - Show pending protocol updates")
|
||||
print(" insights - Query insights (requires context arg)")
|
||||
sys.exit(1)
|
||||
|
||||
command = sys.argv[1]
|
||||
|
||||
if command == "stats":
|
||||
stats = lac.get_statistics()
|
||||
print(json.dumps(stats, indent=2))
|
||||
|
||||
elif command == "report":
|
||||
print(lac.generate_report())
|
||||
|
||||
elif command == "pending":
|
||||
pending = lac.get_pending_updates()
|
||||
if pending:
|
||||
for p in pending:
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Protocol: {p['protocol']}")
|
||||
print(f"Section: {p['section']}")
|
||||
print(f"Reason: {p['reason']}")
|
||||
print(f"Suggested: {p['suggested_text'][:100]}...")
|
||||
else:
|
||||
print("No pending updates.")
|
||||
|
||||
elif command == "insights":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: python lac.py insights <context>")
|
||||
sys.exit(1)
|
||||
|
||||
context = " ".join(sys.argv[2:])
|
||||
insights = lac.get_relevant_insights(context)
|
||||
|
||||
if insights:
|
||||
for ins in insights:
|
||||
print(f"\n[{ins['category']}] (confidence: {ins['confidence']:.0%})")
|
||||
print(f"Context: {ins['context']}")
|
||||
print(f"Insight: {ins['insight']}")
|
||||
else:
|
||||
print("No relevant insights found.")
|
||||
|
||||
else:
|
||||
print(f"Unknown command: {command}")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user