""" Learning Atomizer Core (LAC) ============================ Persistent knowledge store that grows with every Claude Code session. Enables Atomizer to learn from experience and improve over time. Usage: from knowledge_base.lac import LearningAtomizerCore lac = LearningAtomizerCore() # Record what worked lac.record_optimization_outcome( study_name="bracket_v1", geometry_type="bracket", method="TPE", objectives=["mass"], design_vars=3, trials=100, converged=True, convergence_trial=67, notes="Good convergence with default settings" ) # Record an insight lac.record_insight( category="success_pattern", context="Bracket optimization", insight="TPE with 20 startup trials converges faster", confidence=0.85 ) # Query before starting new work similar = lac.query_similar_optimizations("bracket", ["mass"]) insights = lac.get_relevant_insights("bracket mass optimization") Protocol Reference: - SYS_12: Extractor Library (for extraction learnings) - SYS_15: Method Selector (for algorithm learnings) Author: Atomizer Claude Created: 2025-12-11 Version: 1.0 """ from pathlib import Path import json from datetime import datetime from typing import Dict, List, Optional, Any, Tuple import logging logger = logging.getLogger(__name__) class LearningAtomizerCore: """ Simple persistent knowledge store that grows with every session. No database needed - just structured JSON Lines files that Claude can read and write efficiently. Directory Structure: knowledge_base/lac/ ├── optimization_memory/ # What worked for what geometry │ ├── bracket.jsonl │ ├── beam.jsonl │ └── mirror.jsonl ├── session_insights/ # Learnings from sessions │ ├── failure.jsonl │ ├── success_pattern.jsonl │ ├── user_preference.jsonl │ └── protocol_clarification.jsonl └── skill_evolution/ # Protocol improvements └── suggested_updates.jsonl Attributes: root: Base path for LAC storage optimization_memory: Path to optimization outcome storage session_insights: Path to insight storage skill_evolution: Path to skill improvement suggestions """ def __init__(self, root: Optional[Path] = None): """ Initialize LAC with storage directories. Args: root: Base path for LAC storage. Defaults to knowledge_base/lac/ relative to the Atomizer project root. """ if root is None: # Try to find Atomizer root current = Path.cwd() while current != current.parent: if (current / "CLAUDE.md").exists(): root = current / "knowledge_base" / "lac" break current = current.parent else: root = Path("knowledge_base/lac") self.root = Path(root) self.optimization_memory = self.root / "optimization_memory" self.session_insights = self.root / "session_insights" self.skill_evolution = self.root / "skill_evolution" # Create directories for d in [self.optimization_memory, self.session_insights, self.skill_evolution]: d.mkdir(parents=True, exist_ok=True) logger.info(f"LAC initialized at {self.root}") # ══════════════════════════════════════════════════════════════════ # OPTIMIZATION MEMORY - What works for what # ══════════════════════════════════════════════════════════════════ def record_optimization_outcome( self, study_name: str, geometry_type: str, method: str, objectives: List[str], design_vars: int, trials: int, converged: bool, convergence_trial: Optional[int] = None, best_value: Optional[float] = None, best_params: Optional[Dict[str, float]] = None, notes: str = "" ) -> None: """ Record what happened in an optimization for future reference. This builds a history of what methods work for what types of problems, enabling better algorithm selection in the future. Args: study_name: Name of the study (e.g., "bracket_v1") geometry_type: Type of geometry (e.g., "bracket", "beam", "mirror") method: Optimization method used (e.g., "TPE", "CMA-ES", "NSGA-II") objectives: List of objective names design_vars: Number of design variables trials: Total number of trials run converged: Whether optimization converged satisfactorily convergence_trial: Trial number where convergence was achieved best_value: Best objective value found best_params: Best parameter values found notes: Any additional notes about the optimization Example: >>> lac.record_optimization_outcome( ... study_name="bracket_mass_opt", ... geometry_type="bracket", ... method="TPE", ... objectives=["mass"], ... design_vars=3, ... trials=100, ... converged=True, ... convergence_trial=67, ... best_value=2.34, ... notes="Good convergence with default settings" ... ) """ record = { "timestamp": datetime.now().isoformat(), "study_name": study_name, "geometry_type": geometry_type, "method": method, "objectives": objectives, "n_objectives": len(objectives), "design_vars": design_vars, "trials": trials, "converged": converged, "convergence_trial": convergence_trial, "convergence_ratio": convergence_trial / trials if convergence_trial else None, "best_value": best_value, "best_params": best_params, "notes": notes } # Append to geometry-type specific file file_path = self.optimization_memory / f"{geometry_type.lower()}.jsonl" with open(file_path, "a", encoding="utf-8") as f: f.write(json.dumps(record) + "\n") logger.info(f"Recorded optimization outcome for {study_name} -> {file_path}") def query_similar_optimizations( self, geometry_type: str, objectives: Optional[List[str]] = None, design_var_range: Optional[Tuple[int, int]] = None, method: Optional[str] = None, converged_only: bool = False, limit: int = 10 ) -> List[Dict[str, Any]]: """ Find past optimizations similar to what we're about to run. Use this before starting a new optimization to learn from past experience with similar problems. Args: geometry_type: Type of geometry to search for objectives: Filter by objective names (partial match) design_var_range: Filter by design variable count (min, max) method: Filter by optimization method converged_only: Only return converged optimizations limit: Maximum number of results Returns: List of matching optimization records, sorted by recency Example: >>> similar = lac.query_similar_optimizations( ... geometry_type="bracket", ... objectives=["mass"], ... converged_only=True ... ) >>> for opt in similar: ... print(f"{opt['study_name']}: {opt['method']} - {opt['trials']} trials") """ results = [] file_path = self.optimization_memory / f"{geometry_type.lower()}.jsonl" if not file_path.exists(): logger.debug(f"No history for geometry type: {geometry_type}") return [] with open(file_path, "r", encoding="utf-8") as f: for line in f: if not line.strip(): continue record = json.loads(line) # Apply filters if converged_only and not record.get("converged"): continue if method and record.get("method") != method: continue if design_var_range: dv = record.get("design_vars", 0) if not (design_var_range[0] <= dv <= design_var_range[1]): continue if objectives: record_objs = set(record.get("objectives", [])) if not any(obj in record_objs for obj in objectives): continue results.append(record) # Sort by timestamp (most recent first) and limit results.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return results[:limit] def get_best_method_for( self, geometry_type: str, n_objectives: int = 1, design_vars: Optional[int] = None ) -> Optional[Dict[str, Any]]: """ Get the best-performing method for a given problem type. Analyzes historical data to recommend the method with the best convergence rate for similar problems. Args: geometry_type: Type of geometry n_objectives: Number of objectives (1 for single, 2+ for multi) design_vars: Number of design variables (for similarity matching) Returns: Dict with recommended method and statistics, or None if no data Example: >>> rec = lac.get_best_method_for("bracket", n_objectives=1) >>> print(f"Recommended: {rec['method']} (success rate: {rec['success_rate']:.0%})") """ all_records = self.query_similar_optimizations( geometry_type=geometry_type, limit=100 ) if not all_records: return None # Filter by n_objectives filtered = [r for r in all_records if r.get("n_objectives", 1) == n_objectives] if not filtered: filtered = all_records # Fall back to all records # Group by method and calculate success rates method_stats = {} for record in filtered: method = record.get("method", "unknown") if method not in method_stats: method_stats[method] = {"total": 0, "converged": 0, "avg_convergence_ratio": []} method_stats[method]["total"] += 1 if record.get("converged"): method_stats[method]["converged"] += 1 if record.get("convergence_ratio"): method_stats[method]["avg_convergence_ratio"].append(record["convergence_ratio"]) # Find best method best_method = None best_score = -1 for method, stats in method_stats.items(): if stats["total"] < 2: # Need at least 2 data points continue success_rate = stats["converged"] / stats["total"] avg_ratio = ( sum(stats["avg_convergence_ratio"]) / len(stats["avg_convergence_ratio"]) if stats["avg_convergence_ratio"] else 1.0 ) # Score = success_rate * (1 - avg_convergence_ratio) # Higher success rate and faster convergence = higher score score = success_rate * (1 - avg_ratio * 0.5) if score > best_score: best_score = score best_method = { "method": method, "success_rate": success_rate, "avg_convergence_ratio": avg_ratio, "sample_size": stats["total"], "score": score } return best_method # ══════════════════════════════════════════════════════════════════ # SESSION INSIGHTS - What we learned today # ══════════════════════════════════════════════════════════════════ def record_insight( self, category: str, context: str, insight: str, confidence: float = 0.7, tags: Optional[List[str]] = None ) -> None: """ Record something learned during a session. Categories: - failure: Something that failed and why - success_pattern: An approach that worked well - user_preference: Something the user prefers - protocol_clarification: A protocol that needed clarification - performance: Performance-related observation - workaround: A workaround for a known issue Args: category: Type of insight (see above) context: Situation where this was learned insight: The actual learning confidence: How confident we are (0.0-1.0) tags: Optional tags for filtering Example: >>> lac.record_insight( ... category="success_pattern", ... context="Bracket optimization with 5+ design variables", ... insight="CMA-ES outperforms TPE when design_vars > 5", ... confidence=0.85, ... tags=["method_selection", "cma-es", "tpe"] ... ) """ valid_categories = [ "failure", "success_pattern", "user_preference", "protocol_clarification", "performance", "workaround" ] if category not in valid_categories: logger.warning(f"Unknown category '{category}'. Using 'success_pattern'.") category = "success_pattern" record = { "timestamp": datetime.now().isoformat(), "category": category, "context": context, "insight": insight, "confidence": min(max(confidence, 0.0), 1.0), "tags": tags or [] } file_path = self.session_insights / f"{category}.jsonl" with open(file_path, "a", encoding="utf-8") as f: f.write(json.dumps(record) + "\n") logger.info(f"Recorded {category} insight: {insight[:50]}...") def get_relevant_insights( self, context: str, categories: Optional[List[str]] = None, min_confidence: float = 0.5, limit: int = 5 ) -> List[Dict[str, Any]]: """ Get insights relevant to current context. Uses simple keyword matching to find relevant insights. Future versions could use embeddings for better matching. Args: context: Current context to match against categories: Filter by categories (None = all) min_confidence: Minimum confidence threshold limit: Maximum results to return Returns: List of relevant insights, sorted by relevance score Example: >>> insights = lac.get_relevant_insights( ... "bracket stress optimization", ... min_confidence=0.7 ... ) >>> for ins in insights: ... print(f"- {ins['insight']}") """ all_insights = [] # Determine which files to search if categories: files = [self.session_insights / f"{cat}.jsonl" for cat in categories] else: files = list(self.session_insights.glob("*.jsonl")) # Load all insights for file_path in files: if not file_path.exists(): continue with open(file_path, "r", encoding="utf-8") as f: for line in f: if not line.strip(): continue record = json.loads(line) if record.get("confidence", 0) >= min_confidence: all_insights.append(record) if not all_insights: return [] # Score by context overlap (simple keyword matching) context_words = set(context.lower().split()) scored = [] for insight in all_insights: insight_text = f"{insight.get('context', '')} {insight.get('insight', '')}" insight_words = set(insight_text.lower().split()) # Also check tags tags = set(t.lower() for t in insight.get("tags", [])) # Calculate overlap word_overlap = len(context_words & insight_words) tag_overlap = len(context_words & tags) * 2 # Tags worth more total_score = word_overlap + tag_overlap if total_score > 0: # Weight by confidence weighted_score = total_score * insight.get("confidence", 0.5) scored.append((weighted_score, insight)) # Sort by score and return top results scored.sort(reverse=True, key=lambda x: x[0]) return [s[1] for s in scored[:limit]] def get_insights_by_category( self, category: str, limit: int = 20 ) -> List[Dict[str, Any]]: """ Get all insights of a specific category. Args: category: Category to retrieve limit: Maximum results Returns: List of insights, most recent first """ file_path = self.session_insights / f"{category}.jsonl" if not file_path.exists(): return [] insights = [] with open(file_path, "r", encoding="utf-8") as f: for line in f: if line.strip(): insights.append(json.loads(line)) # Sort by timestamp (most recent first) insights.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return insights[:limit] # ══════════════════════════════════════════════════════════════════ # SKILL EVOLUTION - How protocols should improve # ══════════════════════════════════════════════════════════════════ def suggest_protocol_update( self, protocol: str, section: str, current_text: str, suggested_text: str, reason: str ) -> None: """ Suggest an improvement to a protocol based on experience. These suggestions are stored for review. A human or admin session can review and apply them. Args: protocol: Protocol identifier (e.g., "OP_01", "SYS_12") section: Section within the protocol current_text: Current text that needs updating suggested_text: Proposed replacement text reason: Why this change is suggested Example: >>> lac.suggest_protocol_update( ... protocol="SYS_15_METHOD_SELECTOR.md", ... section="Modal Optimization", ... current_text="Use TPE or CMA-ES for frequency optimization", ... suggested_text="Use TPE for frequency optimization. CMA-ES struggles with discrete targets.", ... reason="Discovered during bracket_modal study - CMA-ES failed to converge on frequency target" ... ) """ record = { "timestamp": datetime.now().isoformat(), "protocol": protocol, "section": section, "current_text": current_text, "suggested_text": suggested_text, "reason": reason, "status": "pending" # pending, approved, rejected, applied } file_path = self.skill_evolution / "suggested_updates.jsonl" with open(file_path, "a", encoding="utf-8") as f: f.write(json.dumps(record) + "\n") logger.info(f"Suggested update to {protocol}: {reason[:50]}...") def get_pending_updates(self) -> List[Dict[str, Any]]: """ Get all pending protocol updates for review. Returns: List of pending update suggestions """ file_path = self.skill_evolution / "suggested_updates.jsonl" if not file_path.exists(): return [] pending = [] with open(file_path, "r", encoding="utf-8") as f: for line in f: if not line.strip(): continue record = json.loads(line) if record.get("status") == "pending": pending.append(record) return pending def update_suggestion_status( self, timestamp: str, new_status: str, notes: str = "" ) -> bool: """ Update the status of a suggestion. Args: timestamp: Timestamp of the suggestion to update new_status: New status (approved, rejected, applied) notes: Optional notes about the decision Returns: True if updated, False if not found """ file_path = self.skill_evolution / "suggested_updates.jsonl" if not file_path.exists(): return False # Read all records records = [] found = False with open(file_path, "r", encoding="utf-8") as f: for line in f: if not line.strip(): continue record = json.loads(line) if record.get("timestamp") == timestamp: record["status"] = new_status record["status_updated"] = datetime.now().isoformat() record["status_notes"] = notes found = True records.append(record) if not found: return False # Write back with open(file_path, "w", encoding="utf-8") as f: for record in records: f.write(json.dumps(record) + "\n") return True # ══════════════════════════════════════════════════════════════════ # STATISTICS & REPORTING # ══════════════════════════════════════════════════════════════════ def get_statistics(self) -> Dict[str, Any]: """ Get overall LAC statistics. Returns: Dict with counts and summaries """ stats = { "optimization_memory": {}, "session_insights": {}, "skill_evolution": { "pending_updates": 0, "approved": 0, "rejected": 0, "applied": 0 } } # Count optimization records by geometry type for file_path in self.optimization_memory.glob("*.jsonl"): count = sum(1 for line in open(file_path) if line.strip()) stats["optimization_memory"][file_path.stem] = count # Count insights by category for file_path in self.session_insights.glob("*.jsonl"): count = sum(1 for line in open(file_path) if line.strip()) stats["session_insights"][file_path.stem] = count # Count skill evolution by status updates_file = self.skill_evolution / "suggested_updates.jsonl" if updates_file.exists(): with open(updates_file) as f: for line in f: if line.strip(): record = json.loads(line) status = record.get("status", "pending") if status in stats["skill_evolution"]: stats["skill_evolution"][status] += 1 return stats def generate_report(self) -> str: """ Generate a human-readable LAC report. Returns: Markdown-formatted report string """ stats = self.get_statistics() lines = [ "# Learning Atomizer Core (LAC) Report", f"Generated: {datetime.now().isoformat()}", "", "## Optimization Memory", "" ] total_opts = 0 for geom_type, count in stats["optimization_memory"].items(): lines.append(f"- {geom_type}: {count} records") total_opts += count lines.append(f"\n**Total**: {total_opts} optimization records") lines.extend([ "", "## Session Insights", "" ]) total_insights = 0 for category, count in stats["session_insights"].items(): lines.append(f"- {category}: {count} insights") total_insights += count lines.append(f"\n**Total**: {total_insights} insights") lines.extend([ "", "## Skill Evolution", "", f"- Pending updates: {stats['skill_evolution']['pending_updates']}", f"- Approved: {stats['skill_evolution']['approved']}", f"- Applied: {stats['skill_evolution']['applied']}", f"- Rejected: {stats['skill_evolution']['rejected']}", ]) return "\n".join(lines) # ══════════════════════════════════════════════════════════════════════════════ # CONVENIENCE FUNCTIONS # ══════════════════════════════════════════════════════════════════════════════ _default_lac: Optional[LearningAtomizerCore] = None def get_lac() -> LearningAtomizerCore: """ Get the default LAC instance (singleton pattern). Returns: Default LearningAtomizerCore instance """ global _default_lac if _default_lac is None: _default_lac = LearningAtomizerCore() return _default_lac def record_insight(category: str, context: str, insight: str, confidence: float = 0.7) -> None: """Convenience function to record an insight.""" get_lac().record_insight(category, context, insight, confidence) def query_insights(context: str, limit: int = 5) -> List[Dict[str, Any]]: """Convenience function to query insights.""" return get_lac().get_relevant_insights(context, limit=limit) # ══════════════════════════════════════════════════════════════════════════════ # CLI INTERFACE # ══════════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": import sys lac = LearningAtomizerCore() if len(sys.argv) < 2: print("Usage: python lac.py [args]") print("\nCommands:") print(" stats - Show LAC statistics") print(" report - Generate full report") print(" pending - Show pending protocol updates") print(" insights - Query insights (requires context arg)") sys.exit(1) command = sys.argv[1] if command == "stats": stats = lac.get_statistics() print(json.dumps(stats, indent=2)) elif command == "report": print(lac.generate_report()) elif command == "pending": pending = lac.get_pending_updates() if pending: for p in pending: print(f"\n{'='*60}") print(f"Protocol: {p['protocol']}") print(f"Section: {p['section']}") print(f"Reason: {p['reason']}") print(f"Suggested: {p['suggested_text'][:100]}...") else: print("No pending updates.") elif command == "insights": if len(sys.argv) < 3: print("Usage: python lac.py insights ") sys.exit(1) context = " ".join(sys.argv[2:]) insights = lac.get_relevant_insights(context) if insights: for ins in insights: print(f"\n[{ins['category']}] (confidence: {ins['confidence']:.0%})") print(f"Context: {ins['context']}") print(f"Insight: {ins['insight']}") else: print("No relevant insights found.") else: print(f"Unknown command: {command}") sys.exit(1)