Files
Atomizer/knowledge_base/lac.py
Antoine fc123326e5 feat: Integrate Learning Atomizer Core (LAC) and master instructions
Add persistent knowledge system that enables Atomizer to learn from every
session and improve over time.

## New Files
- knowledge_base/lac.py: LAC class with optimization memory, session insights,
  and skill evolution tracking
- knowledge_base/__init__.py: Package initialization
- .claude/skills/modules/learning-atomizer-core.md: Full LAC skill documentation
- docs/07_DEVELOPMENT/ATOMIZER_CLAUDE_CODE_INSTRUCTIONS.md: Master instructions

## Updated Files
- CLAUDE.md: Added LAC section, communication style, AVERVS execution framework,
  error classification, and "Atomizer Claude" identity
- 00_BOOTSTRAP.md: Added session startup/closing checklists with LAC integration
- 01_CHEATSHEET.md: Added LAC CLI and Python API quick reference
- 02_CONTEXT_LOADER.md: Added LAC query section and anti-pattern

## LAC Features
- Query similar past optimizations before starting new ones
- Record insights (failures, success patterns, workarounds)
- Record optimization outcomes for future reference
- Suggest protocol improvements based on discoveries
- Simple JSONL storage (no database required)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:55:01 -05:00

815 lines
30 KiB
Python

"""
Learning Atomizer Core (LAC)
============================
Persistent knowledge store that grows with every Claude Code session.
Enables Atomizer to learn from experience and improve over time.
Usage:
from knowledge_base.lac import LearningAtomizerCore
lac = LearningAtomizerCore()
# Record what worked
lac.record_optimization_outcome(
study_name="bracket_v1",
geometry_type="bracket",
method="TPE",
objectives=["mass"],
design_vars=3,
trials=100,
converged=True,
convergence_trial=67,
notes="Good convergence with default settings"
)
# Record an insight
lac.record_insight(
category="success_pattern",
context="Bracket optimization",
insight="TPE with 20 startup trials converges faster",
confidence=0.85
)
# Query before starting new work
similar = lac.query_similar_optimizations("bracket", ["mass"])
insights = lac.get_relevant_insights("bracket mass optimization")
Protocol Reference:
- SYS_12: Extractor Library (for extraction learnings)
- SYS_15: Method Selector (for algorithm learnings)
Author: Atomizer Claude
Created: 2025-12-11
Version: 1.0
"""
from pathlib import Path
import json
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
import logging
logger = logging.getLogger(__name__)
class LearningAtomizerCore:
"""
Simple persistent knowledge store that grows with every session.
No database needed - just structured JSON Lines files that Claude
can read and write efficiently.
Directory Structure:
knowledge_base/lac/
├── optimization_memory/ # What worked for what geometry
│ ├── bracket.jsonl
│ ├── beam.jsonl
│ └── mirror.jsonl
├── session_insights/ # Learnings from sessions
│ ├── failure.jsonl
│ ├── success_pattern.jsonl
│ ├── user_preference.jsonl
│ └── protocol_clarification.jsonl
└── skill_evolution/ # Protocol improvements
└── suggested_updates.jsonl
Attributes:
root: Base path for LAC storage
optimization_memory: Path to optimization outcome storage
session_insights: Path to insight storage
skill_evolution: Path to skill improvement suggestions
"""
def __init__(self, root: Optional[Path] = None):
"""
Initialize LAC with storage directories.
Args:
root: Base path for LAC storage. Defaults to knowledge_base/lac/
relative to the Atomizer project root.
"""
if root is None:
# Try to find Atomizer root
current = Path.cwd()
while current != current.parent:
if (current / "CLAUDE.md").exists():
root = current / "knowledge_base" / "lac"
break
current = current.parent
else:
root = Path("knowledge_base/lac")
self.root = Path(root)
self.optimization_memory = self.root / "optimization_memory"
self.session_insights = self.root / "session_insights"
self.skill_evolution = self.root / "skill_evolution"
# Create directories
for d in [self.optimization_memory, self.session_insights, self.skill_evolution]:
d.mkdir(parents=True, exist_ok=True)
logger.info(f"LAC initialized at {self.root}")
# ══════════════════════════════════════════════════════════════════
# OPTIMIZATION MEMORY - What works for what
# ══════════════════════════════════════════════════════════════════
def record_optimization_outcome(
self,
study_name: str,
geometry_type: str,
method: str,
objectives: List[str],
design_vars: int,
trials: int,
converged: bool,
convergence_trial: Optional[int] = None,
best_value: Optional[float] = None,
best_params: Optional[Dict[str, float]] = None,
notes: str = ""
) -> None:
"""
Record what happened in an optimization for future reference.
This builds a history of what methods work for what types of
problems, enabling better algorithm selection in the future.
Args:
study_name: Name of the study (e.g., "bracket_v1")
geometry_type: Type of geometry (e.g., "bracket", "beam", "mirror")
method: Optimization method used (e.g., "TPE", "CMA-ES", "NSGA-II")
objectives: List of objective names
design_vars: Number of design variables
trials: Total number of trials run
converged: Whether optimization converged satisfactorily
convergence_trial: Trial number where convergence was achieved
best_value: Best objective value found
best_params: Best parameter values found
notes: Any additional notes about the optimization
Example:
>>> lac.record_optimization_outcome(
... study_name="bracket_mass_opt",
... geometry_type="bracket",
... method="TPE",
... objectives=["mass"],
... design_vars=3,
... trials=100,
... converged=True,
... convergence_trial=67,
... best_value=2.34,
... notes="Good convergence with default settings"
... )
"""
record = {
"timestamp": datetime.now().isoformat(),
"study_name": study_name,
"geometry_type": geometry_type,
"method": method,
"objectives": objectives,
"n_objectives": len(objectives),
"design_vars": design_vars,
"trials": trials,
"converged": converged,
"convergence_trial": convergence_trial,
"convergence_ratio": convergence_trial / trials if convergence_trial else None,
"best_value": best_value,
"best_params": best_params,
"notes": notes
}
# Append to geometry-type specific file
file_path = self.optimization_memory / f"{geometry_type.lower()}.jsonl"
with open(file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")
logger.info(f"Recorded optimization outcome for {study_name} -> {file_path}")
def query_similar_optimizations(
self,
geometry_type: str,
objectives: Optional[List[str]] = None,
design_var_range: Optional[Tuple[int, int]] = None,
method: Optional[str] = None,
converged_only: bool = False,
limit: int = 10
) -> List[Dict[str, Any]]:
"""
Find past optimizations similar to what we're about to run.
Use this before starting a new optimization to learn from
past experience with similar problems.
Args:
geometry_type: Type of geometry to search for
objectives: Filter by objective names (partial match)
design_var_range: Filter by design variable count (min, max)
method: Filter by optimization method
converged_only: Only return converged optimizations
limit: Maximum number of results
Returns:
List of matching optimization records, sorted by recency
Example:
>>> similar = lac.query_similar_optimizations(
... geometry_type="bracket",
... objectives=["mass"],
... converged_only=True
... )
>>> for opt in similar:
... print(f"{opt['study_name']}: {opt['method']} - {opt['trials']} trials")
"""
results = []
file_path = self.optimization_memory / f"{geometry_type.lower()}.jsonl"
if not file_path.exists():
logger.debug(f"No history for geometry type: {geometry_type}")
return []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
record = json.loads(line)
# Apply filters
if converged_only and not record.get("converged"):
continue
if method and record.get("method") != method:
continue
if design_var_range:
dv = record.get("design_vars", 0)
if not (design_var_range[0] <= dv <= design_var_range[1]):
continue
if objectives:
record_objs = set(record.get("objectives", []))
if not any(obj in record_objs for obj in objectives):
continue
results.append(record)
# Sort by timestamp (most recent first) and limit
results.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
return results[:limit]
def get_best_method_for(
self,
geometry_type: str,
n_objectives: int = 1,
design_vars: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""
Get the best-performing method for a given problem type.
Analyzes historical data to recommend the method with the
best convergence rate for similar problems.
Args:
geometry_type: Type of geometry
n_objectives: Number of objectives (1 for single, 2+ for multi)
design_vars: Number of design variables (for similarity matching)
Returns:
Dict with recommended method and statistics, or None if no data
Example:
>>> rec = lac.get_best_method_for("bracket", n_objectives=1)
>>> print(f"Recommended: {rec['method']} (success rate: {rec['success_rate']:.0%})")
"""
all_records = self.query_similar_optimizations(
geometry_type=geometry_type,
limit=100
)
if not all_records:
return None
# Filter by n_objectives
filtered = [r for r in all_records if r.get("n_objectives", 1) == n_objectives]
if not filtered:
filtered = all_records # Fall back to all records
# Group by method and calculate success rates
method_stats = {}
for record in filtered:
method = record.get("method", "unknown")
if method not in method_stats:
method_stats[method] = {"total": 0, "converged": 0, "avg_convergence_ratio": []}
method_stats[method]["total"] += 1
if record.get("converged"):
method_stats[method]["converged"] += 1
if record.get("convergence_ratio"):
method_stats[method]["avg_convergence_ratio"].append(record["convergence_ratio"])
# Find best method
best_method = None
best_score = -1
for method, stats in method_stats.items():
if stats["total"] < 2: # Need at least 2 data points
continue
success_rate = stats["converged"] / stats["total"]
avg_ratio = (
sum(stats["avg_convergence_ratio"]) / len(stats["avg_convergence_ratio"])
if stats["avg_convergence_ratio"] else 1.0
)
# Score = success_rate * (1 - avg_convergence_ratio)
# Higher success rate and faster convergence = higher score
score = success_rate * (1 - avg_ratio * 0.5)
if score > best_score:
best_score = score
best_method = {
"method": method,
"success_rate": success_rate,
"avg_convergence_ratio": avg_ratio,
"sample_size": stats["total"],
"score": score
}
return best_method
# ══════════════════════════════════════════════════════════════════
# SESSION INSIGHTS - What we learned today
# ══════════════════════════════════════════════════════════════════
def record_insight(
self,
category: str,
context: str,
insight: str,
confidence: float = 0.7,
tags: Optional[List[str]] = None
) -> None:
"""
Record something learned during a session.
Categories:
- failure: Something that failed and why
- success_pattern: An approach that worked well
- user_preference: Something the user prefers
- protocol_clarification: A protocol that needed clarification
- performance: Performance-related observation
- workaround: A workaround for a known issue
Args:
category: Type of insight (see above)
context: Situation where this was learned
insight: The actual learning
confidence: How confident we are (0.0-1.0)
tags: Optional tags for filtering
Example:
>>> lac.record_insight(
... category="success_pattern",
... context="Bracket optimization with 5+ design variables",
... insight="CMA-ES outperforms TPE when design_vars > 5",
... confidence=0.85,
... tags=["method_selection", "cma-es", "tpe"]
... )
"""
valid_categories = [
"failure", "success_pattern", "user_preference",
"protocol_clarification", "performance", "workaround"
]
if category not in valid_categories:
logger.warning(f"Unknown category '{category}'. Using 'success_pattern'.")
category = "success_pattern"
record = {
"timestamp": datetime.now().isoformat(),
"category": category,
"context": context,
"insight": insight,
"confidence": min(max(confidence, 0.0), 1.0),
"tags": tags or []
}
file_path = self.session_insights / f"{category}.jsonl"
with open(file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")
logger.info(f"Recorded {category} insight: {insight[:50]}...")
def get_relevant_insights(
self,
context: str,
categories: Optional[List[str]] = None,
min_confidence: float = 0.5,
limit: int = 5
) -> List[Dict[str, Any]]:
"""
Get insights relevant to current context.
Uses simple keyword matching to find relevant insights.
Future versions could use embeddings for better matching.
Args:
context: Current context to match against
categories: Filter by categories (None = all)
min_confidence: Minimum confidence threshold
limit: Maximum results to return
Returns:
List of relevant insights, sorted by relevance score
Example:
>>> insights = lac.get_relevant_insights(
... "bracket stress optimization",
... min_confidence=0.7
... )
>>> for ins in insights:
... print(f"- {ins['insight']}")
"""
all_insights = []
# Determine which files to search
if categories:
files = [self.session_insights / f"{cat}.jsonl" for cat in categories]
else:
files = list(self.session_insights.glob("*.jsonl"))
# Load all insights
for file_path in files:
if not file_path.exists():
continue
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
record = json.loads(line)
if record.get("confidence", 0) >= min_confidence:
all_insights.append(record)
if not all_insights:
return []
# Score by context overlap (simple keyword matching)
context_words = set(context.lower().split())
scored = []
for insight in all_insights:
insight_text = f"{insight.get('context', '')} {insight.get('insight', '')}"
insight_words = set(insight_text.lower().split())
# Also check tags
tags = set(t.lower() for t in insight.get("tags", []))
# Calculate overlap
word_overlap = len(context_words & insight_words)
tag_overlap = len(context_words & tags) * 2 # Tags worth more
total_score = word_overlap + tag_overlap
if total_score > 0:
# Weight by confidence
weighted_score = total_score * insight.get("confidence", 0.5)
scored.append((weighted_score, insight))
# Sort by score and return top results
scored.sort(reverse=True, key=lambda x: x[0])
return [s[1] for s in scored[:limit]]
def get_insights_by_category(
self,
category: str,
limit: int = 20
) -> List[Dict[str, Any]]:
"""
Get all insights of a specific category.
Args:
category: Category to retrieve
limit: Maximum results
Returns:
List of insights, most recent first
"""
file_path = self.session_insights / f"{category}.jsonl"
if not file_path.exists():
return []
insights = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
insights.append(json.loads(line))
# Sort by timestamp (most recent first)
insights.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
return insights[:limit]
# ══════════════════════════════════════════════════════════════════
# SKILL EVOLUTION - How protocols should improve
# ══════════════════════════════════════════════════════════════════
def suggest_protocol_update(
self,
protocol: str,
section: str,
current_text: str,
suggested_text: str,
reason: str
) -> None:
"""
Suggest an improvement to a protocol based on experience.
These suggestions are stored for review. A human or admin
session can review and apply them.
Args:
protocol: Protocol identifier (e.g., "OP_01", "SYS_12")
section: Section within the protocol
current_text: Current text that needs updating
suggested_text: Proposed replacement text
reason: Why this change is suggested
Example:
>>> lac.suggest_protocol_update(
... protocol="SYS_15_METHOD_SELECTOR.md",
... section="Modal Optimization",
... current_text="Use TPE or CMA-ES for frequency optimization",
... suggested_text="Use TPE for frequency optimization. CMA-ES struggles with discrete targets.",
... reason="Discovered during bracket_modal study - CMA-ES failed to converge on frequency target"
... )
"""
record = {
"timestamp": datetime.now().isoformat(),
"protocol": protocol,
"section": section,
"current_text": current_text,
"suggested_text": suggested_text,
"reason": reason,
"status": "pending" # pending, approved, rejected, applied
}
file_path = self.skill_evolution / "suggested_updates.jsonl"
with open(file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")
logger.info(f"Suggested update to {protocol}: {reason[:50]}...")
def get_pending_updates(self) -> List[Dict[str, Any]]:
"""
Get all pending protocol updates for review.
Returns:
List of pending update suggestions
"""
file_path = self.skill_evolution / "suggested_updates.jsonl"
if not file_path.exists():
return []
pending = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
record = json.loads(line)
if record.get("status") == "pending":
pending.append(record)
return pending
def update_suggestion_status(
self,
timestamp: str,
new_status: str,
notes: str = ""
) -> bool:
"""
Update the status of a suggestion.
Args:
timestamp: Timestamp of the suggestion to update
new_status: New status (approved, rejected, applied)
notes: Optional notes about the decision
Returns:
True if updated, False if not found
"""
file_path = self.skill_evolution / "suggested_updates.jsonl"
if not file_path.exists():
return False
# Read all records
records = []
found = False
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
record = json.loads(line)
if record.get("timestamp") == timestamp:
record["status"] = new_status
record["status_updated"] = datetime.now().isoformat()
record["status_notes"] = notes
found = True
records.append(record)
if not found:
return False
# Write back
with open(file_path, "w", encoding="utf-8") as f:
for record in records:
f.write(json.dumps(record) + "\n")
return True
# ══════════════════════════════════════════════════════════════════
# STATISTICS & REPORTING
# ══════════════════════════════════════════════════════════════════
def get_statistics(self) -> Dict[str, Any]:
"""
Get overall LAC statistics.
Returns:
Dict with counts and summaries
"""
stats = {
"optimization_memory": {},
"session_insights": {},
"skill_evolution": {
"pending_updates": 0,
"approved": 0,
"rejected": 0,
"applied": 0
}
}
# Count optimization records by geometry type
for file_path in self.optimization_memory.glob("*.jsonl"):
count = sum(1 for line in open(file_path) if line.strip())
stats["optimization_memory"][file_path.stem] = count
# Count insights by category
for file_path in self.session_insights.glob("*.jsonl"):
count = sum(1 for line in open(file_path) if line.strip())
stats["session_insights"][file_path.stem] = count
# Count skill evolution by status
updates_file = self.skill_evolution / "suggested_updates.jsonl"
if updates_file.exists():
with open(updates_file) as f:
for line in f:
if line.strip():
record = json.loads(line)
status = record.get("status", "pending")
if status in stats["skill_evolution"]:
stats["skill_evolution"][status] += 1
return stats
def generate_report(self) -> str:
"""
Generate a human-readable LAC report.
Returns:
Markdown-formatted report string
"""
stats = self.get_statistics()
lines = [
"# Learning Atomizer Core (LAC) Report",
f"Generated: {datetime.now().isoformat()}",
"",
"## Optimization Memory",
""
]
total_opts = 0
for geom_type, count in stats["optimization_memory"].items():
lines.append(f"- {geom_type}: {count} records")
total_opts += count
lines.append(f"\n**Total**: {total_opts} optimization records")
lines.extend([
"",
"## Session Insights",
""
])
total_insights = 0
for category, count in stats["session_insights"].items():
lines.append(f"- {category}: {count} insights")
total_insights += count
lines.append(f"\n**Total**: {total_insights} insights")
lines.extend([
"",
"## Skill Evolution",
"",
f"- Pending updates: {stats['skill_evolution']['pending_updates']}",
f"- Approved: {stats['skill_evolution']['approved']}",
f"- Applied: {stats['skill_evolution']['applied']}",
f"- Rejected: {stats['skill_evolution']['rejected']}",
])
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════════
# CONVENIENCE FUNCTIONS
# ══════════════════════════════════════════════════════════════════════════════
_default_lac: Optional[LearningAtomizerCore] = None
def get_lac() -> LearningAtomizerCore:
"""
Get the default LAC instance (singleton pattern).
Returns:
Default LearningAtomizerCore instance
"""
global _default_lac
if _default_lac is None:
_default_lac = LearningAtomizerCore()
return _default_lac
def record_insight(category: str, context: str, insight: str, confidence: float = 0.7) -> None:
"""Convenience function to record an insight."""
get_lac().record_insight(category, context, insight, confidence)
def query_insights(context: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Convenience function to query insights."""
return get_lac().get_relevant_insights(context, limit=limit)
# ══════════════════════════════════════════════════════════════════════════════
# CLI INTERFACE
# ══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
import sys
lac = LearningAtomizerCore()
if len(sys.argv) < 2:
print("Usage: python lac.py <command> [args]")
print("\nCommands:")
print(" stats - Show LAC statistics")
print(" report - Generate full report")
print(" pending - Show pending protocol updates")
print(" insights - Query insights (requires context arg)")
sys.exit(1)
command = sys.argv[1]
if command == "stats":
stats = lac.get_statistics()
print(json.dumps(stats, indent=2))
elif command == "report":
print(lac.generate_report())
elif command == "pending":
pending = lac.get_pending_updates()
if pending:
for p in pending:
print(f"\n{'='*60}")
print(f"Protocol: {p['protocol']}")
print(f"Section: {p['section']}")
print(f"Reason: {p['reason']}")
print(f"Suggested: {p['suggested_text'][:100]}...")
else:
print("No pending updates.")
elif command == "insights":
if len(sys.argv) < 3:
print("Usage: python lac.py insights <context>")
sys.exit(1)
context = " ".join(sys.argv[2:])
insights = lac.get_relevant_insights(context)
if insights:
for ins in insights:
print(f"\n[{ins['category']}] (confidence: {ins['confidence']:.0%})")
print(f"Context: {ins['context']}")
print(f"Insight: {ins['insight']}")
else:
print("No relevant insights found.")
else:
print(f"Unknown command: {command}")
sys.exit(1)