Files
Atomizer/docs/plans/ATOMIZER_CONTEXT_ENGINEERING_PLAN.md
Anto01 ea437d360e docs: Major documentation overhaul - restructure folders, update tagline, add Getting Started guide
- Restructure docs/ folder (remove numeric prefixes):
  - 04_USER_GUIDES -> guides/
  - 05_API_REFERENCE -> api/
  - 06_PHYSICS -> physics/
  - 07_DEVELOPMENT -> development/
  - 08_ARCHIVE -> archive/
  - 09_DIAGRAMS -> diagrams/

- Replace tagline 'Talk, don't click' with 'LLM-driven optimization framework' in 9 files

- Create comprehensive docs/GETTING_STARTED.md:
  - Prerequisites and quick setup
  - Project structure overview
  - First study tutorial (Claude or manual)
  - Dashboard usage guide
  - Neural acceleration introduction

- Rewrite docs/00_INDEX.md with correct paths and modern structure

- Archive obsolete files:
  - 01_PROTOCOLS.md -> archive/historical/01_PROTOCOLS_legacy.md
  - 03_GETTING_STARTED.md -> archive/historical/
  - ATOMIZER_PODCAST_BRIEFING.md -> archive/marketing/

- Update timestamps to 2026-01-20 across all key files

- Update .gitignore to exclude docs/generated/

- Version bump: ATOMIZER_CONTEXT v1.8 -> v2.0
2026-01-20 10:03:45 -05:00

1787 lines
57 KiB
Markdown

# Atomizer Context Engineering Implementation Plan
## Claude Code Enhancement Strategy Using State-of-the-Art Context Engineering
**Version**: 1.1
**Date**: December 2025
**Updated**: December 28, 2025
**Author**: Antoine (with Claude)
**Purpose**: Transform Atomizer's LLM integration using cutting-edge context engineering patterns
**Prerequisite**: OPTIMIZATION_ENGINE_MIGRATION_PLAN.md (must complete BEFORE this plan)
---
## Prerequisite: Complete Migration First
**IMPORTANT**: This plan assumes the optimization_engine reorganization has been completed.
Before starting Context Engineering:
1. Complete all phases in `.claude/skills/modules/OPTIMIZATION_ENGINE_MIGRATION_PLAN.md`
2. Verify `optimization_engine/core/runner.py` exists (not `optimization_engine/runner.py`)
3. All imports use new paths (e.g., `from optimization_engine.core.runner import ...`)
4. Test suite passes with new structure
If migration is NOT complete, go do it first. Context Engineering builds on top of the reorganized structure.
---
## Executive Summary
This plan transforms Atomizer from a traditional LLM-assisted tool into a **self-improving, context-aware optimization platform** by implementing state-of-the-art context engineering techniques. The core innovation is treating the Learning Atomizer Core (LAC) as an **evolving playbook** that accumulates institutional knowledge through structured generation, reflection, and curation cycles.
**Expected Outcomes**:
- 10-15% improvement in optimization task success rates
- 80%+ reduction in repeated mistakes across sessions
- Dramatic cost reduction through KV-cache optimization
- True institutional memory that compounds over time
---
## Part 1: Architecture Mapping
### Current Atomizer Architecture → ACE Framework Alignment
| Atomizer Component | ACE Role | Enhancement |
|-------------------|----------|-------------|
| Optimization Runner | **Generator** | Produces optimization trajectories with success/failure signals |
| Post-run Analysis | **Reflector** | Extracts insights from optimization outcomes |
| Learning Atomizer Core (LAC) | **Curator** | Integrates insights into persistent playbook |
| Protocol Operating System (POS) | **Context Loader** | Selects relevant context per task type |
| Claude Code Sessions | **Agent** | Executes tasks using curated context |
### New Component: AtomizerPlaybook
```
┌─────────────────────────────────────────────────────────────────┐
│ AtomizerPlaybook System │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Generator │──▶│ Reflector │──▶│ Curator │ │
│ │(Opt Runs) │ │(Analysis) │ │(LAC Update) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Structured Playbook Store │ │
│ ├──────────────────────────────────────────────────────┤ │
│ │ [str-00001] helpful=8 harmful=0 :: │ │
│ │ "For thin-walled structures, start with shell │ │
│ │ elements before trying solid mesh" │ │
│ │ │ │
│ │ [cal-00002] helpful=12 harmful=1 :: │ │
│ │ "Safety factor = yield_stress / max_von_mises" │ │
│ │ │ │
│ │ [mis-00003] helpful=0 harmful=6 :: │ │
│ │ "Never set convergence < 1e-8 for SOL 106" │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
---
## Part 2: Implementation Phases
### Phase 1: Structured Playbook System (Week 1-2)
**Goal**: Convert LAC from unstructured memory to ACE-style itemized playbook
#### 1.1 Create Playbook Data Structure
**File**: `optimization_engine/context/playbook.py`
```python
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
import json
from pathlib import Path
from datetime import datetime
import hashlib
class InsightCategory(Enum):
STRATEGY = "str" # Optimization strategies
CALCULATION = "cal" # Formulas and calculations
MISTAKE = "mis" # Common mistakes to avoid
TOOL = "tool" # Tool usage patterns
DOMAIN = "dom" # Domain-specific knowledge (FEA, NX)
WORKFLOW = "wf" # Workflow patterns
@dataclass
class PlaybookItem:
"""Single insight in the playbook with helpful/harmful tracking."""
id: str
category: InsightCategory
content: str
helpful_count: int = 0
harmful_count: int = 0
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
last_used: Optional[str] = None
source_trials: List[int] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
@property
def net_score(self) -> int:
return self.helpful_count - self.harmful_count
@property
def confidence(self) -> float:
total = self.helpful_count + self.harmful_count
if total == 0:
return 0.5
return self.helpful_count / total
def to_context_string(self) -> str:
"""Format for injection into LLM context."""
return f"[{self.id}] helpful={self.helpful_count} harmful={self.harmful_count} :: {self.content}"
@dataclass
class AtomizerPlaybook:
"""
Evolving playbook that accumulates optimization knowledge.
Based on ACE framework principles:
- Incremental delta updates (never rewrite wholesale)
- Helpful/harmful tracking for each insight
- Semantic deduplication
- Category-based organization
"""
items: Dict[str, PlaybookItem] = field(default_factory=dict)
version: int = 1
last_updated: str = field(default_factory=lambda: datetime.now().isoformat())
def _generate_id(self, category: InsightCategory) -> str:
"""Generate unique ID for new item."""
existing = [k for k in self.items.keys() if k.startswith(category.value)]
next_num = len(existing) + 1
return f"{category.value}-{next_num:05d}"
def add_insight(
self,
category: InsightCategory,
content: str,
source_trial: Optional[int] = None,
tags: Optional[List[str]] = None
) -> PlaybookItem:
"""
Add new insight with delta update (ACE principle).
Checks for semantic duplicates before adding.
"""
# Check for near-duplicates (simple implementation)
content_hash = hashlib.md5(content.lower().encode()).hexdigest()[:8]
for item in self.items.values():
existing_hash = hashlib.md5(item.content.lower().encode()).hexdigest()[:8]
if content_hash == existing_hash:
# Update existing instead of adding duplicate
item.helpful_count += 1
if source_trial:
item.source_trials.append(source_trial)
return item
# Create new item
item_id = self._generate_id(category)
item = PlaybookItem(
id=item_id,
category=category,
content=content,
source_trials=[source_trial] if source_trial else [],
tags=tags or []
)
self.items[item_id] = item
self.last_updated = datetime.now().isoformat()
self.version += 1
return item
def record_outcome(self, item_id: str, helpful: bool):
"""Record whether using this insight was helpful or harmful."""
if item_id in self.items:
if helpful:
self.items[item_id].helpful_count += 1
else:
self.items[item_id].harmful_count += 1
self.items[item_id].last_used = datetime.now().isoformat()
def get_context_for_task(
self,
task_type: str,
max_items: int = 20,
min_confidence: float = 0.5
) -> str:
"""
Generate context string for LLM consumption.
Filters by relevance and confidence, sorted by net score.
"""
relevant_items = [
item for item in self.items.values()
if item.confidence >= min_confidence
]
# Sort by net score (most helpful first)
relevant_items.sort(key=lambda x: x.net_score, reverse=True)
# Group by category
sections = {}
for item in relevant_items[:max_items]:
cat_name = item.category.name
if cat_name not in sections:
sections[cat_name] = []
sections[cat_name].append(item.to_context_string())
# Build context string
lines = ["## Atomizer Knowledge Playbook\n"]
for cat_name, items in sections.items():
lines.append(f"### {cat_name}")
lines.extend(items)
lines.append("")
return "\n".join(lines)
def prune_harmful(self, threshold: int = -3):
"""Remove items that have proven consistently harmful."""
to_remove = [
item_id for item_id, item in self.items.items()
if item.net_score <= threshold
]
for item_id in to_remove:
del self.items[item_id]
def save(self, path: Path):
"""Persist playbook to JSON."""
data = {
"version": self.version,
"last_updated": self.last_updated,
"items": {
k: {
"id": v.id,
"category": v.category.value,
"content": v.content,
"helpful_count": v.helpful_count,
"harmful_count": v.harmful_count,
"created_at": v.created_at,
"last_used": v.last_used,
"source_trials": v.source_trials,
"tags": v.tags
}
for k, v in self.items.items()
}
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as f:
json.dump(data, f, indent=2)
@classmethod
def load(cls, path: Path) -> "AtomizerPlaybook":
"""Load playbook from JSON."""
if not path.exists():
return cls()
with open(path) as f:
data = json.load(f)
playbook = cls(
version=data.get("version", 1),
last_updated=data.get("last_updated", datetime.now().isoformat())
)
for item_data in data.get("items", {}).values():
item = PlaybookItem(
id=item_data["id"],
category=InsightCategory(item_data["category"]),
content=item_data["content"],
helpful_count=item_data.get("helpful_count", 0),
harmful_count=item_data.get("harmful_count", 0),
created_at=item_data.get("created_at", ""),
last_used=item_data.get("last_used"),
source_trials=item_data.get("source_trials", []),
tags=item_data.get("tags", [])
)
playbook.items[item.id] = item
return playbook
```
#### 1.2 Create Reflector Component
**File**: `optimization_engine/context/reflector.py`
```python
"""
Reflector: Analyzes optimization outcomes to extract insights.
Part of the ACE (Agentic Context Engineering) implementation for Atomizer.
"""
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from pathlib import Path
import json
from .playbook import AtomizerPlaybook, InsightCategory
@dataclass
class OptimizationOutcome:
"""Captured outcome from an optimization run."""
trial_number: int
success: bool
objective_value: Optional[float]
constraint_violations: List[str]
solver_errors: List[str]
design_variables: Dict[str, float]
extractor_used: str
duration_seconds: float
notes: str = ""
class AtomizerReflector:
"""
Analyzes optimization outcomes and extracts actionable insights.
Implements the Reflector role from ACE framework:
- Examines successful and failed trials
- Extracts patterns that led to success/failure
- Formats insights for Curator integration
"""
def __init__(self, playbook: AtomizerPlaybook):
self.playbook = playbook
self.pending_insights: List[Dict[str, Any]] = []
def analyze_trial(self, outcome: OptimizationOutcome) -> List[Dict[str, Any]]:
"""
Analyze a single trial outcome and extract insights.
Returns list of insight candidates (not yet added to playbook).
"""
insights = []
# Analyze solver errors
for error in outcome.solver_errors:
if "convergence" in error.lower():
insights.append({
"category": InsightCategory.MISTAKE,
"content": f"Convergence failure with config: {self._summarize_config(outcome)}",
"helpful": False,
"trial": outcome.trial_number
})
elif "mesh" in error.lower():
insights.append({
"category": InsightCategory.MISTAKE,
"content": f"Mesh-related error: {error[:100]}",
"helpful": False,
"trial": outcome.trial_number
})
# Analyze successful patterns
if outcome.success and outcome.objective_value is not None:
# Record successful design variable ranges
insights.append({
"category": InsightCategory.STRATEGY,
"content": f"Successful design: {self._summarize_design(outcome)}",
"helpful": True,
"trial": outcome.trial_number
})
# Analyze constraint violations
for violation in outcome.constraint_violations:
insights.append({
"category": InsightCategory.MISTAKE,
"content": f"Constraint violation: {violation}",
"helpful": False,
"trial": outcome.trial_number
})
self.pending_insights.extend(insights)
return insights
def analyze_study_completion(
self,
study_name: str,
total_trials: int,
best_value: float,
convergence_rate: float
) -> List[Dict[str, Any]]:
"""
Analyze completed study and extract high-level insights.
"""
insights = []
if convergence_rate > 0.9:
insights.append({
"category": InsightCategory.STRATEGY,
"content": f"Study '{study_name}' achieved {convergence_rate:.0%} convergence - configuration is robust",
"helpful": True,
"trial": None
})
elif convergence_rate < 0.5:
insights.append({
"category": InsightCategory.MISTAKE,
"content": f"Study '{study_name}' had only {convergence_rate:.0%} convergence - review mesh and solver settings",
"helpful": False,
"trial": None
})
return insights
def commit_insights(self) -> int:
"""
Commit pending insights to playbook (Curator handoff).
Returns number of insights added.
"""
count = 0
for insight in self.pending_insights:
item = self.playbook.add_insight(
category=insight["category"],
content=insight["content"],
source_trial=insight.get("trial")
)
if not insight.get("helpful", True):
self.playbook.record_outcome(item.id, helpful=False)
count += 1
self.pending_insights = []
return count
def _summarize_config(self, outcome: OptimizationOutcome) -> str:
"""Create brief config summary."""
return f"extractor={outcome.extractor_used}, vars={len(outcome.design_variables)}"
def _summarize_design(self, outcome: OptimizationOutcome) -> str:
"""Create brief design summary."""
vars_summary = ", ".join(
f"{k}={v:.3g}" for k, v in list(outcome.design_variables.items())[:3]
)
return f"obj={outcome.objective_value:.4g}, {vars_summary}"
```
#### 1.3 Integration with OptimizationRunner
**File**: `optimization_engine/core/runner.py` (modifications - POST-MIGRATION PATH)
```python
# Add to imports
from optimization_engine.context.playbook import AtomizerPlaybook
from optimization_engine.context.reflector import AtomizerReflector, OptimizationOutcome
class OptimizationRunner:
def __init__(self, ...):
# ... existing init ...
# Initialize context engineering components
self.playbook = AtomizerPlaybook.load(
self.output_dir / "playbook.json"
)
self.reflector = AtomizerReflector(self.playbook)
def _objective(self, trial: optuna.Trial) -> float:
# ... existing trial logic ...
# After trial completion, capture outcome for reflection
outcome = OptimizationOutcome(
trial_number=trial.number,
success=not failed,
objective_value=objective_value if not failed else None,
constraint_violations=constraint_violations,
solver_errors=solver_errors,
design_variables=design_vars,
extractor_used=self.config.get("extractor", "unknown"),
duration_seconds=trial_duration
)
self.reflector.analyze_trial(outcome)
return objective_value
def run(self, n_trials: int) -> Dict[str, Any]:
# ... existing run logic ...
# After study completion
self.reflector.analyze_study_completion(
study_name=self.study.study_name,
total_trials=len(self.study.trials),
best_value=self.study.best_value,
convergence_rate=successful_trials / total_trials
)
# Commit insights and save playbook
insights_added = self.reflector.commit_insights()
self.playbook.save(self.output_dir / "playbook.json")
print(f"Added {insights_added} insights to playbook")
```
---
### Phase 2: Context Isolation & Loading (Week 2-3)
**Goal**: Implement Write-Select-Compress-Isolate pattern for POS
#### 2.1 Session State Schema
**File**: `optimization_engine/context/session_state.py`
```python
"""
Session state management with context isolation.
Implements the "Isolate" pattern from context engineering:
- Exposed fields are sent to LLM
- Isolated fields are accessed selectively
"""
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
class TaskType(Enum):
CREATE_STUDY = "create_study"
RUN_OPTIMIZATION = "run_optimization"
MONITOR_PROGRESS = "monitor_progress"
ANALYZE_RESULTS = "analyze_results"
DEBUG_ERROR = "debug_error"
CONFIGURE_SETTINGS = "configure_settings"
class ExposedState(BaseModel):
"""State exposed to LLM at every turn."""
# Current task context
task_type: Optional[TaskType] = None
current_objective: str = ""
# Recent history (compressed)
recent_actions: List[str] = Field(default_factory=list, max_items=10)
recent_errors: List[str] = Field(default_factory=list, max_items=5)
# Active study summary
study_name: Optional[str] = None
study_status: str = "unknown"
trials_completed: int = 0
best_value: Optional[float] = None
# Playbook excerpt (most relevant items)
active_playbook_items: List[str] = Field(default_factory=list, max_items=15)
class IsolatedState(BaseModel):
"""State isolated from LLM - accessed selectively."""
# Full optimization history (can be large)
full_trial_history: List[Dict[str, Any]] = Field(default_factory=list)
# NX session state (heavy, complex)
nx_model_path: Optional[str] = None
nx_expressions: Dict[str, Any] = Field(default_factory=dict)
# Neural network cache
neural_predictions: Dict[str, float] = Field(default_factory=dict)
# Full playbook (loaded on demand)
full_playbook_path: Optional[str] = None
# Debug information
last_solver_output: str = ""
last_f06_content: str = ""
class AtomizerSessionState(BaseModel):
"""
Complete session state with exposure control.
The exposed state is automatically injected into every LLM context.
The isolated state is accessed only when explicitly needed.
"""
session_id: str
created_at: datetime = Field(default_factory=datetime.now)
last_updated: datetime = Field(default_factory=datetime.now)
exposed: ExposedState = Field(default_factory=ExposedState)
isolated: IsolatedState = Field(default_factory=IsolatedState)
def get_llm_context(self) -> str:
"""Generate context string for LLM consumption."""
lines = [
"## Current Session State",
"",
f"**Task**: {self.exposed.task_type.value if self.exposed.task_type else 'Not set'}",
f"**Objective**: {self.exposed.current_objective}",
"",
]
if self.exposed.study_name:
lines.extend([
f"### Active Study: {self.exposed.study_name}",
f"- Status: {self.exposed.study_status}",
f"- Trials: {self.exposed.trials_completed}",
f"- Best: {self.exposed.best_value}",
"",
])
if self.exposed.recent_actions:
lines.append("### Recent Actions")
for action in self.exposed.recent_actions[-5:]:
lines.append(f"- {action}")
lines.append("")
if self.exposed.recent_errors:
lines.append("### Recent Errors (address these)")
for error in self.exposed.recent_errors:
lines.append(f"- ⚠️ {error}")
lines.append("")
if self.exposed.active_playbook_items:
lines.append("### Relevant Knowledge")
for item in self.exposed.active_playbook_items:
lines.append(f"- {item}")
lines.append("")
return "\n".join(lines)
def add_action(self, action: str):
"""Record an action (auto-compresses old actions)."""
self.exposed.recent_actions.append(action)
if len(self.exposed.recent_actions) > 10:
# Compress: keep first, last 5, summarize middle
self.exposed.recent_actions = (
[self.exposed.recent_actions[0]] +
["... (earlier actions summarized)"] +
self.exposed.recent_actions[-5:]
)
self.last_updated = datetime.now()
def add_error(self, error: str):
"""Record an error for LLM attention."""
self.exposed.recent_errors.append(error)
self.exposed.recent_errors = self.exposed.recent_errors[-5:]
self.last_updated = datetime.now()
def load_isolated_data(self, key: str) -> Any:
"""Explicitly load isolated data when needed."""
return getattr(self.isolated, key, None)
```
#### 2.2 Context Loader Enhancement
**File**: `.claude/skills/02_CONTEXT_LOADER.md` (updated)
```markdown
# Context Loader - Enhanced with Playbook Integration
## Loading Rules by Task Type
### CREATE_STUDY
**Always Load**:
- `core/study-creation-core.md`
- `SYS_12_EXTRACTOR_LIBRARY.md`
- **Playbook**: Filter by tags=['study_creation', 'design_variables']
**Load If**:
- `modules/zernike-optimization.md`: if "telescope" or "mirror" in query
- `modules/neural-acceleration.md`: if trials > 50
### RUN_OPTIMIZATION
**Always Load**:
- `OP_02_RUN_OPTIMIZATION.md`
- **Playbook**: Filter by tags=['solver', 'convergence', 'mesh']
**Load If**:
- Recent errors exist → Include mistake items from playbook
### DEBUG_ERROR
**Always Load**:
- `OP_06_TROUBLESHOOT.md`
- **Playbook**: Filter by category=MISTAKE, min_confidence=0.3
- Session state recent_errors
**Load If**:
- "convergence" in error → Load solver-specific playbook items
- "mesh" in error → Load mesh-specific playbook items
## Playbook Integration Pattern
```python
# In context loader
def load_context_for_task(task_type: TaskType, session: AtomizerSessionState):
context_parts = []
# 1. Load protocol docs (existing behavior)
context_parts.append(load_protocol(task_type))
# 2. Load session state (exposed only)
context_parts.append(session.get_llm_context())
# 3. Load relevant playbook items
playbook = AtomizerPlaybook.load(PLAYBOOK_PATH)
playbook_context = playbook.get_context_for_task(
task_type=task_type.value,
max_items=15,
min_confidence=0.6
)
context_parts.append(playbook_context)
# 4. Add error-specific items if debugging
if task_type == TaskType.DEBUG_ERROR:
for error in session.exposed.recent_errors:
relevant = playbook.search_by_content(error, category=InsightCategory.MISTAKE)
context_parts.extend([item.to_context_string() for item in relevant])
return "\n\n---\n\n".join(context_parts)
```
```
---
### Phase 3: KV-Cache Optimization (Week 3)
**Goal**: Maximize cache hits for 10x cost reduction
#### 3.1 Stable Prefix Architecture
**File**: `.claude/skills/SYSTEM_PROMPT_TEMPLATE.md`
```markdown
# Atomizer System Prompt Template
## Structure for KV-Cache Optimization
The system prompt is structured to maximize KV-cache hits:
```
[SECTION 1: STABLE - Never changes]
- Atomizer identity and capabilities
- Core principles (LLM-driven optimization)
- Tool schemas and definitions
- Base protocol routing table
[SECTION 2: SEMI-STABLE - Changes per session type]
- Active protocol definition
- Task-specific instructions
- Relevant playbook items (top 10 by score)
[SECTION 3: DYNAMIC - Changes every turn]
- Current session state
- Recent actions/errors
- User's latest message
```
## Implementation
### Stable Prefix (Cache This)
```
You are assisting with **Atomizer**, an LLM-first FEA optimization framework.
## Core Capabilities
- Natural language → optimization configuration
- NX Nastran integration via journals
- Multi-strategy optimization (TPE, CMA-ES, NSGA-II)
- Real-time progress monitoring
- Neural acceleration (600-1000x speedup)
## Principles
1. LLM-driven - users describe goals in plain language
2. Never modify master models - work on copies
3. Always validate before running
4. Document everything
## Available Tools
[... tool schemas - NEVER CHANGE ORDER ...]
## Protocol Routing
| Intent | Protocol | Priority |
|--------|----------|----------|
| Create study | OP_01 | 1 |
| Run optimization | OP_02 | 1 |
| Monitor progress | OP_03 | 2 |
| Analyze results | OP_04 | 2 |
| Debug errors | OP_06 | 1 |
```
### Semi-Stable Section (Per Session)
```
## Active Task: {task_type}
### Loaded Protocol: {protocol_name}
{protocol_content}
### Relevant Knowledge (from {playbook_version})
{playbook_items}
```
### Dynamic Section (Per Turn)
```
## Current State
{session_state.get_llm_context()}
## User Message
{user_message}
```
```
#### 3.2 Cache Monitoring
**File**: `optimization_engine/context/cache_monitor.py`
```python
"""
Monitor and optimize KV-cache hit rates.
"""
from dataclasses import dataclass
from typing import Optional
import hashlib
@dataclass
class CacheStats:
total_requests: int = 0
cache_hits: int = 0
prefix_length: int = 0
@property
def hit_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return self.cache_hits / self.total_requests
class ContextCacheOptimizer:
"""
Tracks and optimizes context for cache efficiency.
"""
def __init__(self):
self.stats = CacheStats()
self._last_prefix_hash: Optional[str] = None
def prepare_context(
self,
stable_prefix: str,
semi_stable: str,
dynamic: str
) -> str:
"""
Assemble context optimized for caching.
Tracks whether prefix changed (cache miss).
"""
# Hash the stable prefix
prefix_hash = hashlib.md5(stable_prefix.encode()).hexdigest()
self.stats.total_requests += 1
if prefix_hash == self._last_prefix_hash:
self.stats.cache_hits += 1
self._last_prefix_hash = prefix_hash
self.stats.prefix_length = len(stable_prefix)
# Assemble with clear boundaries
return f"""{stable_prefix}
---
{semi_stable}
---
{dynamic}"""
def get_report(self) -> str:
"""Generate cache efficiency report."""
return f"""
Cache Statistics:
- Requests: {self.stats.total_requests}
- Cache Hits: {self.stats.cache_hits}
- Hit Rate: {self.stats.hit_rate:.1%}
- Stable Prefix: {self.stats.prefix_length} chars
Estimated Cost Savings: {self.stats.hit_rate * 90:.0f}%
(Based on 10x cost difference between cached/uncached tokens)
"""
```
---
### Phase 4: Error Recovery & Learning (Week 4)
**Goal**: Leave wrong turns in context, learn from failures
#### 4.1 Error Preservation Hook
**File**: `optimization_engine/plugins/post_solve/error_tracker.py`
```python
"""
Error Tracker Hook
Preserves solver errors and failures in context for learning.
Based on Manus insight: "leave the wrong turns in the context"
"""
from pathlib import Path
from datetime import datetime
import json
def track_error(context: dict) -> dict:
"""
Hook that preserves errors for context learning.
Called at post_solve when solver fails.
"""
trial_number = context.get('trial_number', -1)
output_dir = Path(context.get('output_dir', '.'))
# Capture error details
error_info = {
"trial": trial_number,
"timestamp": datetime.now().isoformat(),
"solver_returncode": context.get('solver_returncode'),
"error_type": classify_error(context),
"design_variables": context.get('design_variables', {}),
"error_message": context.get('error_message', ''),
"f06_snippet": extract_f06_error(context.get('f06_path'))
}
# Append to error log (never overwrite - accumulate)
error_log_path = output_dir / "error_history.jsonl"
with open(error_log_path, 'a') as f:
f.write(json.dumps(error_info) + "\n")
# Update session state for LLM context
if 'session_state' in context:
context['session_state'].add_error(
f"Trial {trial_number}: {error_info['error_type']} - {error_info['error_message'][:100]}"
)
return {"error_tracked": True, "error_type": error_info['error_type']}
def classify_error(context: dict) -> str:
"""Classify error type for playbook categorization."""
error_msg = context.get('error_message', '').lower()
if 'convergence' in error_msg:
return "convergence_failure"
elif 'mesh' in error_msg or 'element' in error_msg:
return "mesh_error"
elif 'singular' in error_msg or 'matrix' in error_msg:
return "singularity"
elif 'memory' in error_msg or 'allocation' in error_msg:
return "memory_error"
elif 'license' in error_msg:
return "license_error"
else:
return "unknown_error"
def extract_f06_error(f06_path: str) -> str:
"""Extract error section from F06 file."""
if not f06_path or not Path(f06_path).exists():
return ""
try:
with open(f06_path) as f:
content = f.read()
# Look for error indicators
error_markers = ["*** USER FATAL", "*** SYSTEM FATAL", "*** USER WARNING"]
for marker in error_markers:
if marker in content:
idx = content.index(marker)
return content[idx:idx+500]
return ""
except Exception:
return ""
# Hook registration
HOOK_CONFIG = {
"name": "error_tracker",
"hook_point": "post_solve",
"priority": 100, # Run early to capture before cleanup
"enabled": True,
"description": "Preserves solver errors for context learning"
}
```
#### 4.2 Feedback Loop Integration
**File**: `optimization_engine/context/feedback_loop.py`
```python
"""
Feedback Loop: Connects optimization outcomes to playbook updates.
Implements ACE's "leverage natural execution feedback" principle.
"""
from typing import Dict, Any, List
from pathlib import Path
from .playbook import AtomizerPlaybook, InsightCategory
from .reflector import AtomizerReflector, OptimizationOutcome
class FeedbackLoop:
"""
Automated feedback loop that learns from optimization runs.
Key insight from ACE: Use execution feedback (success/failure)
as the learning signal, not labeled data.
"""
def __init__(self, playbook_path: Path):
self.playbook = AtomizerPlaybook.load(playbook_path)
self.reflector = AtomizerReflector(self.playbook)
self.playbook_path = playbook_path
def process_trial_result(
self,
trial_number: int,
success: bool,
objective_value: float,
design_variables: Dict[str, float],
context_items_used: List[str], # Which playbook items were in context
errors: List[str] = None
):
"""
Process a trial result and update playbook accordingly.
This is the core learning mechanism:
- If trial succeeded with certain playbook items → increase helpful count
- If trial failed with certain playbook items → increase harmful count
"""
# Update playbook item scores based on outcome
for item_id in context_items_used:
self.playbook.record_outcome(item_id, helpful=success)
# Create outcome for reflection
outcome = OptimizationOutcome(
trial_number=trial_number,
success=success,
objective_value=objective_value if success else None,
constraint_violations=[],
solver_errors=errors or [],
design_variables=design_variables,
extractor_used="",
duration_seconds=0
)
# Reflect on outcome
self.reflector.analyze_trial(outcome)
def finalize_study(self, study_stats: Dict[str, Any]):
"""
Called when study completes. Commits insights and prunes playbook.
"""
# Analyze study-level patterns
self.reflector.analyze_study_completion(
study_name=study_stats.get("name", "unknown"),
total_trials=study_stats.get("total_trials", 0),
best_value=study_stats.get("best_value", 0),
convergence_rate=study_stats.get("convergence_rate", 0)
)
# Commit all pending insights
insights_added = self.reflector.commit_insights()
# Prune consistently harmful items
self.playbook.prune_harmful(threshold=-3)
# Save updated playbook
self.playbook.save(self.playbook_path)
return {
"insights_added": insights_added,
"playbook_size": len(self.playbook.items),
"playbook_version": self.playbook.version
}
```
---
### Phase 5: Context Compaction (Week 4-5)
**Goal**: Handle long-running optimization sessions without context overflow
#### 5.1 Compaction Manager
**File**: `optimization_engine/context/compaction.py`
```python
"""
Context Compaction for Long-Running Optimizations
Based on Google ADK's compaction architecture:
- Trigger compaction when threshold reached
- Summarize older events
- Preserve recent detail
"""
from typing import List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ContextEvent:
"""Single event in optimization context."""
timestamp: datetime
event_type: str # trial_start, trial_complete, error, milestone
summary: str
details: Dict[str, Any] = field(default_factory=dict)
compacted: bool = False
class CompactionManager:
"""
Manages context compaction for long optimization sessions.
Strategy:
- Keep last N events in full detail
- Summarize older events into milestone markers
- Preserve error events (never compact errors)
"""
def __init__(
self,
compaction_threshold: int = 50,
keep_recent: int = 20,
keep_errors: bool = True
):
self.events: List[ContextEvent] = []
self.compaction_threshold = compaction_threshold
self.keep_recent = keep_recent
self.keep_errors = keep_errors
self.compaction_count = 0
def add_event(self, event: ContextEvent):
"""Add event and trigger compaction if needed."""
self.events.append(event)
if len(self.events) > self.compaction_threshold:
self._compact()
def _compact(self):
"""
Compact older events into summaries.
Preserves:
- All error events
- Last `keep_recent` events
- Milestone summaries of compacted regions
"""
if len(self.events) <= self.keep_recent:
return
# Split into old and recent
old_events = self.events[:-self.keep_recent]
recent_events = self.events[-self.keep_recent:]
# Separate errors from old events
error_events = [e for e in old_events if e.event_type == "error"]
non_error_events = [e for e in old_events if e.event_type != "error"]
# Summarize non-error old events
if non_error_events:
summary = self._create_summary(non_error_events)
compaction_event = ContextEvent(
timestamp=non_error_events[0].timestamp,
event_type="compaction",
summary=summary,
details={
"events_compacted": len(non_error_events),
"compaction_number": self.compaction_count
},
compacted=True
)
self.compaction_count += 1
# Rebuild events list
self.events = [compaction_event] + error_events + recent_events
else:
self.events = error_events + recent_events
def _create_summary(self, events: List[ContextEvent]) -> str:
"""Create summary of compacted events."""
trial_events = [e for e in events if "trial" in e.event_type]
if not trial_events:
return f"[{len(events)} events compacted]"
# Extract trial statistics
trial_numbers = []
objectives = []
for e in trial_events:
if "trial_number" in e.details:
trial_numbers.append(e.details["trial_number"])
if "objective" in e.details:
objectives.append(e.details["objective"])
if trial_numbers and objectives:
return (
f"Trials {min(trial_numbers)}-{max(trial_numbers)}: "
f"Best={min(objectives):.4g}, "
f"Avg={sum(objectives)/len(objectives):.4g}"
)
elif trial_numbers:
return f"Trials {min(trial_numbers)}-{max(trial_numbers)} completed"
else:
return f"[{len(events)} events compacted]"
def get_context_string(self) -> str:
"""Generate context string from events."""
lines = ["## Optimization History", ""]
for event in self.events:
if event.compacted:
lines.append(f"📦 {event.summary}")
elif event.event_type == "error":
lines.append(f"⚠️ {event.summary}")
else:
lines.append(f"- {event.summary}")
return "\n".join(lines)
def get_stats(self) -> Dict[str, Any]:
"""Get compaction statistics."""
return {
"total_events": len(self.events),
"compaction_count": self.compaction_count,
"error_events": len([e for e in self.events if e.event_type == "error"]),
"compacted_events": len([e for e in self.events if e.compacted])
}
```
---
## Part 3: Claude Code Session Integration
### 3.1 Enhanced Bootstrap for Claude Code
**File**: `.claude/skills/00_BOOTSTRAP_V2.md`
```markdown
# Atomizer Bootstrap v2.0 - Context-Aware Sessions
## Session Initialization
On session start, perform these steps:
### Step 1: Load Playbook
```bash
# Check for existing playbook
cat optimization_engine/context/playbook.json 2>/dev/null | head -20
```
If playbook exists, extract top insights:
- Filter by task type (inferred from user's first message)
- Include top 10 by net_score
- Always include recent mistakes (last 5)
### Step 2: Initialize Session State
```python
from optimization_engine.context.session_state import AtomizerSessionState, TaskType
session = AtomizerSessionState(session_id="current")
session.exposed.task_type = TaskType.CREATE_STUDY # Update based on intent
```
### Step 3: Load Task-Specific Context
Based on detected task type, load protocols per `02_CONTEXT_LOADER.md`
### Step 4: Inject Playbook Items
Add relevant playbook items to `session.exposed.active_playbook_items`
---
## Error Handling Protocol
When ANY error occurs:
1. **Preserve the error** - Add to session state
2. **Check playbook** - Look for matching mistake patterns
3. **Learn from it** - If novel error, queue for playbook addition
4. **Show to user** - Include error context in response
```python
# On error
session.add_error(f"{error_type}: {error_message}")
# Check playbook for similar errors
similar = playbook.search_by_content(error_message, category=InsightCategory.MISTAKE)
if similar:
print(f"Known issue: {similar[0].content}")
else:
reflector.queue_insight(InsightCategory.MISTAKE, error_message)
```
---
## Context Budget Management
Total context budget: ~100K tokens
Allocation:
- **Stable prefix**: 5K tokens (cached)
- **Protocols**: 10K tokens
- **Playbook items**: 5K tokens
- **Session state**: 2K tokens
- **Conversation history**: 30K tokens
- **Working space**: 48K tokens
If approaching limit:
1. Trigger compaction of old events
2. Reduce playbook items to top 5
3. Summarize conversation history
```
### 3.2 Dashboard Integration
**File**: `atomizer-dashboard/backend/api/routes/context.py`
```python
"""
Context Engineering API Routes
Provides endpoints for:
- Viewing playbook contents
- Managing session state
- Triggering compaction
- Monitoring cache efficiency
"""
from fastapi import APIRouter, HTTPException
from pathlib import Path
from typing import Optional
router = APIRouter(prefix="/context", tags=["context"])
ATOMIZER_ROOT = Path(__file__).parents[4]
PLAYBOOK_PATH = ATOMIZER_ROOT / "optimization_engine" / "context" / "playbook.json"
@router.get("/playbook")
async def get_playbook(
category: Optional[str] = None,
min_score: int = 0,
limit: int = 50
):
"""Get playbook items with optional filtering."""
from optimization_engine.context.playbook import AtomizerPlaybook, InsightCategory
playbook = AtomizerPlaybook.load(PLAYBOOK_PATH)
items = list(playbook.items.values())
# Filter by category
if category:
try:
cat = InsightCategory(category)
items = [i for i in items if i.category == cat]
except ValueError:
raise HTTPException(400, f"Invalid category: {category}")
# Filter by score
items = [i for i in items if i.net_score >= min_score]
# Sort by score
items.sort(key=lambda x: x.net_score, reverse=True)
return {
"total": len(playbook.items),
"filtered": len(items),
"items": [
{
"id": i.id,
"category": i.category.value,
"content": i.content,
"helpful": i.helpful_count,
"harmful": i.harmful_count,
"score": i.net_score,
"confidence": i.confidence
}
for i in items[:limit]
]
}
@router.post("/playbook/feedback")
async def record_feedback(item_id: str, helpful: bool):
"""Record feedback on a playbook item."""
from optimization_engine.context.playbook import AtomizerPlaybook
playbook = AtomizerPlaybook.load(PLAYBOOK_PATH)
if item_id not in playbook.items:
raise HTTPException(404, f"Item not found: {item_id}")
playbook.record_outcome(item_id, helpful=helpful)
playbook.save(PLAYBOOK_PATH)
item = playbook.items[item_id]
return {
"id": item_id,
"new_score": item.net_score,
"confidence": item.confidence
}
@router.get("/session/{session_id}")
async def get_session_state(session_id: str):
"""Get current session state."""
# Implementation depends on session storage
pass
@router.get("/cache/stats")
async def get_cache_stats():
"""Get KV-cache efficiency statistics."""
from optimization_engine.context.cache_monitor import ContextCacheOptimizer
# Would need to access singleton cache optimizer
return {
"message": "Cache stats endpoint - implement with actual cache monitor"
}
```
---
## Part 4: Testing & Validation
### 4.1 Test Suite
**File**: `tests/test_context_engineering.py`
```python
"""
Test suite for context engineering components.
"""
import pytest
from pathlib import Path
import tempfile
import json
from optimization_engine.context.playbook import (
AtomizerPlaybook,
PlaybookItem,
InsightCategory
)
from optimization_engine.context.reflector import (
AtomizerReflector,
OptimizationOutcome
)
from optimization_engine.context.session_state import (
AtomizerSessionState,
TaskType
)
from optimization_engine.context.compaction import (
CompactionManager,
ContextEvent
)
class TestAtomizerPlaybook:
"""Tests for the playbook system."""
def test_add_insight(self):
"""Test adding insights to playbook."""
playbook = AtomizerPlaybook()
item = playbook.add_insight(
category=InsightCategory.STRATEGY,
content="Use shell elements for thin walls",
source_trial=1
)
assert item.id == "str-00001"
assert item.helpful_count == 0
assert item.harmful_count == 0
assert len(playbook.items) == 1
def test_deduplication(self):
"""Test that duplicate insights are merged."""
playbook = AtomizerPlaybook()
playbook.add_insight(InsightCategory.STRATEGY, "Use shell elements")
playbook.add_insight(InsightCategory.STRATEGY, "Use shell elements")
assert len(playbook.items) == 1
assert playbook.items["str-00001"].helpful_count == 1
def test_outcome_tracking(self):
"""Test helpful/harmful tracking."""
playbook = AtomizerPlaybook()
item = playbook.add_insight(InsightCategory.STRATEGY, "Test insight")
playbook.record_outcome(item.id, helpful=True)
playbook.record_outcome(item.id, helpful=True)
playbook.record_outcome(item.id, helpful=False)
assert item.helpful_count == 2
assert item.harmful_count == 1
assert item.net_score == 1
def test_persistence(self, tmp_path):
"""Test save/load cycle."""
playbook = AtomizerPlaybook()
playbook.add_insight(InsightCategory.MISTAKE, "Don't do this")
save_path = tmp_path / "playbook.json"
playbook.save(save_path)
loaded = AtomizerPlaybook.load(save_path)
assert len(loaded.items) == 1
assert "mis-00001" in loaded.items
def test_pruning(self):
"""Test harmful item pruning."""
playbook = AtomizerPlaybook()
item = playbook.add_insight(InsightCategory.STRATEGY, "Bad advice")
# Record many harmful outcomes
for _ in range(5):
playbook.record_outcome(item.id, helpful=False)
playbook.prune_harmful(threshold=-3)
assert len(playbook.items) == 0
class TestAtomizerReflector:
"""Tests for the reflector component."""
def test_analyze_failed_trial(self):
"""Test analysis of failed trial."""
playbook = AtomizerPlaybook()
reflector = AtomizerReflector(playbook)
outcome = OptimizationOutcome(
trial_number=1,
success=False,
objective_value=None,
constraint_violations=["stress > 250 MPa"],
solver_errors=["convergence failure at iteration 50"],
design_variables={"thickness": 0.5},
extractor_used="stress_extractor",
duration_seconds=120
)
insights = reflector.analyze_trial(outcome)
assert len(insights) >= 2 # At least error + constraint
assert any(i["category"] == InsightCategory.MISTAKE for i in insights)
def test_commit_insights(self):
"""Test committing insights to playbook."""
playbook = AtomizerPlaybook()
reflector = AtomizerReflector(playbook)
outcome = OptimizationOutcome(
trial_number=1,
success=True,
objective_value=100.0,
constraint_violations=[],
solver_errors=[],
design_variables={"thickness": 1.0},
extractor_used="mass_extractor",
duration_seconds=60
)
reflector.analyze_trial(outcome)
count = reflector.commit_insights()
assert count > 0
assert len(playbook.items) > 0
class TestSessionState:
"""Tests for session state management."""
def test_exposed_state_context(self):
"""Test LLM context generation."""
session = AtomizerSessionState(session_id="test")
session.exposed.task_type = TaskType.CREATE_STUDY
session.exposed.study_name = "bracket_opt"
session.exposed.trials_completed = 25
session.exposed.best_value = 0.5
context = session.get_llm_context()
assert "bracket_opt" in context
assert "25" in context
assert "0.5" in context
def test_action_compression(self):
"""Test automatic action compression."""
session = AtomizerSessionState(session_id="test")
for i in range(15):
session.add_action(f"Action {i}")
# Should be compressed
assert len(session.exposed.recent_actions) <= 12
assert "summarized" in session.exposed.recent_actions[1].lower()
class TestCompactionManager:
"""Tests for context compaction."""
def test_compaction_trigger(self):
"""Test that compaction triggers at threshold."""
manager = CompactionManager(compaction_threshold=10, keep_recent=5)
for i in range(15):
manager.add_event(ContextEvent(
timestamp=datetime.now(),
event_type="trial_complete",
summary=f"Trial {i} complete",
details={"trial_number": i, "objective": i * 0.1}
))
assert manager.compaction_count > 0
assert len(manager.events) <= 10
def test_error_preservation(self):
"""Test that errors are never compacted."""
manager = CompactionManager(compaction_threshold=10, keep_recent=3)
# Add error early
manager.add_event(ContextEvent(
timestamp=datetime.now(),
event_type="error",
summary="Critical solver failure"
))
# Add many regular events
for i in range(20):
manager.add_event(ContextEvent(
timestamp=datetime.now(),
event_type="trial_complete",
summary=f"Trial {i}"
))
# Error should still be present
errors = [e for e in manager.events if e.event_type == "error"]
assert len(errors) == 1
```
### 4.2 Integration Test
**File**: `tests/test_context_integration.py`
```python
"""
Integration test for full context engineering pipeline.
"""
import pytest
from pathlib import Path
import tempfile
def test_full_optimization_with_context_engineering():
"""
End-to-end test of optimization with context engineering.
Simulates:
1. Starting fresh session
2. Running optimization with failures
3. Verifying playbook learns from failures
4. Running second optimization
5. Verifying improved performance
"""
from optimization_engine.context.playbook import AtomizerPlaybook
from optimization_engine.context.feedback_loop import FeedbackLoop
with tempfile.TemporaryDirectory() as tmp_dir:
playbook_path = Path(tmp_dir) / "playbook.json"
# Initialize feedback loop
feedback = FeedbackLoop(playbook_path)
# Simulate first study with failures
for i in range(10):
success = i % 3 != 0 # Every 3rd trial fails
feedback.process_trial_result(
trial_number=i,
success=success,
objective_value=100 - i if success else 0,
design_variables={"thickness": 0.5 + i * 0.1},
context_items_used=[],
errors=["convergence failure"] if not success else []
)
# Finalize and check learning
result = feedback.finalize_study({
"name": "test_study",
"total_trials": 10,
"best_value": 91,
"convergence_rate": 0.7
})
assert result["insights_added"] > 0
# Load playbook and verify content
playbook = AtomizerPlaybook.load(playbook_path)
# Should have learned about convergence failures
mistakes = [
item for item in playbook.items.values()
if item.category.value == "mis"
]
assert len(mistakes) > 0
```
---
## Part 5: Rollout Plan
### Week 1-2: Foundation
- [ ] Implement `AtomizerPlaybook` class
- [ ] Implement `AtomizerReflector` class
- [ ] Add playbook persistence (JSON)
- [ ] Write unit tests
- [ ] Integrate with existing LAC concepts
### Week 3: Context Management
- [ ] Implement `AtomizerSessionState`
- [ ] Update `02_CONTEXT_LOADER.md` with playbook integration
- [ ] Create stable prefix template
- [ ] Implement cache monitoring
### Week 4: Learning Loop
- [ ] Implement `FeedbackLoop`
- [ ] Create error tracker hook
- [ ] Implement compaction manager
- [ ] Integration testing
### Week 5: Claude Code Integration
- [ ] Update `00_BOOTSTRAP.md` to v2
- [ ] Add dashboard API routes
- [ ] Create playbook visualization component
- [ ] End-to-end testing with real optimizations
### Week 6: Polish & Documentation
- [ ] Performance benchmarking
- [ ] Cost analysis (cache hit rates)
- [ ] Documentation updates
- [ ] Team training materials
---
## Success Metrics
| Metric | Baseline | Target | Measurement |
|--------|----------|--------|-------------|
| Task success rate | ~70% | 80-85% | Track via feedback loop |
| Repeated mistakes | N/A | <20% recurrence | Playbook harmful counts |
| Cache hit rate | 0% | >70% | Cache monitor stats |
| Cost per session | $X | 0.3X | API billing analysis |
| Playbook growth | 0 | 100+ items/month | Playbook stats |
---
## References
1. **ACE Framework**: Zhang et al., "Agentic Context Engineering: Evolving Contexts for Self-Improving Language Models", arXiv:2510.04618, Oct 2025
2. **Manus Blog**: "Context Engineering for AI Agents: Lessons from Building Manus"
3. **Anthropic**: "Effective context engineering for AI agents"
4. **LangChain**: "Context Engineering for Agents"
5. **Google ADK**: "Architecting efficient context-aware multi-agent framework"
---
*Document generated: December 2025*
*For Claude Code implementation sessions*