Files
Atomizer/docs/plans/ATOMIZER_CONTEXT_ENGINEERING_PLAN.md
Anto01 82f36689b7 feat: Pre-migration checkpoint - updated docs and utilities
Updates before optimization_engine migration:
- Updated migration plan to v2.1 with complete file inventory
- Added OP_07 disk optimization protocol
- Added SYS_16 self-aware turbo protocol
- Added study archiver and cleanup utilities
- Added ensemble surrogate module
- Updated NX solver and session manager
- Updated zernike HTML generator
- Added context engineering plan
- LAC session insights updates

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 10:22:45 -05:00

57 KiB

Atomizer Context Engineering Implementation Plan

Claude Code Enhancement Strategy Using State-of-the-Art Context Engineering

Version: 1.1 Date: December 2025 Updated: December 28, 2025 Author: Antoine (with Claude) Purpose: Transform Atomizer's LLM integration using cutting-edge context engineering patterns Prerequisite: OPTIMIZATION_ENGINE_MIGRATION_PLAN.md (must complete BEFORE this plan)


Prerequisite: Complete Migration First

IMPORTANT: This plan assumes the optimization_engine reorganization has been completed.

Before starting Context Engineering:

  1. Complete all phases in .claude/skills/modules/OPTIMIZATION_ENGINE_MIGRATION_PLAN.md
  2. Verify optimization_engine/core/runner.py exists (not optimization_engine/runner.py)
  3. All imports use new paths (e.g., from optimization_engine.core.runner import ...)
  4. Test suite passes with new structure

If migration is NOT complete, go do it first. Context Engineering builds on top of the reorganized structure.


Executive Summary

This plan transforms Atomizer from a traditional LLM-assisted tool into a self-improving, context-aware optimization platform by implementing state-of-the-art context engineering techniques. The core innovation is treating the Learning Atomizer Core (LAC) as an evolving playbook that accumulates institutional knowledge through structured generation, reflection, and curation cycles.

Expected Outcomes:

  • 10-15% improvement in optimization task success rates
  • 80%+ reduction in repeated mistakes across sessions
  • Dramatic cost reduction through KV-cache optimization
  • True institutional memory that compounds over time

Part 1: Architecture Mapping

Current Atomizer Architecture → ACE Framework Alignment

Atomizer Component ACE Role Enhancement
Optimization Runner Generator Produces optimization trajectories with success/failure signals
Post-run Analysis Reflector Extracts insights from optimization outcomes
Learning Atomizer Core (LAC) Curator Integrates insights into persistent playbook
Protocol Operating System (POS) Context Loader Selects relevant context per task type
Claude Code Sessions Agent Executes tasks using curated context

New Component: AtomizerPlaybook

┌─────────────────────────────────────────────────────────────────┐
│                     AtomizerPlaybook System                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────┐   ┌─────────────┐   ┌─────────────┐           │
│  │  Generator  │──▶│  Reflector  │──▶│   Curator   │           │
│  │(Opt Runs)   │   │(Analysis)   │   │(LAC Update) │           │
│  └─────────────┘   └─────────────┘   └─────────────┘           │
│        │                 │                  │                   │
│        ▼                 ▼                  ▼                   │
│  ┌──────────────────────────────────────────────────────┐      │
│  │              Structured Playbook Store               │      │
│  ├──────────────────────────────────────────────────────┤      │
│  │ [str-00001] helpful=8 harmful=0 ::                   │      │
│  │   "For thin-walled structures, start with shell      │      │
│  │    elements before trying solid mesh"                │      │
│  │                                                      │      │
│  │ [cal-00002] helpful=12 harmful=1 ::                  │      │
│  │   "Safety factor = yield_stress / max_von_mises"     │      │
│  │                                                      │      │
│  │ [mis-00003] helpful=0 harmful=6 ::                   │      │
│  │   "Never set convergence < 1e-8 for SOL 106"        │      │
│  └──────────────────────────────────────────────────────┘      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Part 2: Implementation Phases

Phase 1: Structured Playbook System (Week 1-2)

Goal: Convert LAC from unstructured memory to ACE-style itemized playbook

1.1 Create Playbook Data Structure

File: optimization_engine/context/playbook.py

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
import json
from pathlib import Path
from datetime import datetime
import hashlib

class InsightCategory(Enum):
    STRATEGY = "str"      # Optimization strategies
    CALCULATION = "cal"   # Formulas and calculations
    MISTAKE = "mis"       # Common mistakes to avoid
    TOOL = "tool"         # Tool usage patterns
    DOMAIN = "dom"        # Domain-specific knowledge (FEA, NX)
    WORKFLOW = "wf"       # Workflow patterns

@dataclass
class PlaybookItem:
    """Single insight in the playbook with helpful/harmful tracking."""
    id: str
    category: InsightCategory
    content: str
    helpful_count: int = 0
    harmful_count: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
    last_used: Optional[str] = None
    source_trials: List[int] = field(default_factory=list)
    tags: List[str] = field(default_factory=list)
    
    @property
    def net_score(self) -> int:
        return self.helpful_count - self.harmful_count
    
    @property
    def confidence(self) -> float:
        total = self.helpful_count + self.harmful_count
        if total == 0:
            return 0.5
        return self.helpful_count / total
    
    def to_context_string(self) -> str:
        """Format for injection into LLM context."""
        return f"[{self.id}] helpful={self.helpful_count} harmful={self.harmful_count} :: {self.content}"

@dataclass
class AtomizerPlaybook:
    """
    Evolving playbook that accumulates optimization knowledge.
    
    Based on ACE framework principles:
    - Incremental delta updates (never rewrite wholesale)
    - Helpful/harmful tracking for each insight
    - Semantic deduplication
    - Category-based organization
    """
    items: Dict[str, PlaybookItem] = field(default_factory=dict)
    version: int = 1
    last_updated: str = field(default_factory=lambda: datetime.now().isoformat())
    
    def _generate_id(self, category: InsightCategory) -> str:
        """Generate unique ID for new item."""
        existing = [k for k in self.items.keys() if k.startswith(category.value)]
        next_num = len(existing) + 1
        return f"{category.value}-{next_num:05d}"
    
    def add_insight(
        self, 
        category: InsightCategory, 
        content: str,
        source_trial: Optional[int] = None,
        tags: Optional[List[str]] = None
    ) -> PlaybookItem:
        """
        Add new insight with delta update (ACE principle).
        
        Checks for semantic duplicates before adding.
        """
        # Check for near-duplicates (simple implementation)
        content_hash = hashlib.md5(content.lower().encode()).hexdigest()[:8]
        
        for item in self.items.values():
            existing_hash = hashlib.md5(item.content.lower().encode()).hexdigest()[:8]
            if content_hash == existing_hash:
                # Update existing instead of adding duplicate
                item.helpful_count += 1
                if source_trial:
                    item.source_trials.append(source_trial)
                return item
        
        # Create new item
        item_id = self._generate_id(category)
        item = PlaybookItem(
            id=item_id,
            category=category,
            content=content,
            source_trials=[source_trial] if source_trial else [],
            tags=tags or []
        )
        self.items[item_id] = item
        self.last_updated = datetime.now().isoformat()
        self.version += 1
        return item
    
    def record_outcome(self, item_id: str, helpful: bool):
        """Record whether using this insight was helpful or harmful."""
        if item_id in self.items:
            if helpful:
                self.items[item_id].helpful_count += 1
            else:
                self.items[item_id].harmful_count += 1
            self.items[item_id].last_used = datetime.now().isoformat()
    
    def get_context_for_task(
        self, 
        task_type: str,
        max_items: int = 20,
        min_confidence: float = 0.5
    ) -> str:
        """
        Generate context string for LLM consumption.
        
        Filters by relevance and confidence, sorted by net score.
        """
        relevant_items = [
            item for item in self.items.values()
            if item.confidence >= min_confidence
        ]
        
        # Sort by net score (most helpful first)
        relevant_items.sort(key=lambda x: x.net_score, reverse=True)
        
        # Group by category
        sections = {}
        for item in relevant_items[:max_items]:
            cat_name = item.category.name
            if cat_name not in sections:
                sections[cat_name] = []
            sections[cat_name].append(item.to_context_string())
        
        # Build context string
        lines = ["## Atomizer Knowledge Playbook\n"]
        for cat_name, items in sections.items():
            lines.append(f"### {cat_name}")
            lines.extend(items)
            lines.append("")
        
        return "\n".join(lines)
    
    def prune_harmful(self, threshold: int = -3):
        """Remove items that have proven consistently harmful."""
        to_remove = [
            item_id for item_id, item in self.items.items()
            if item.net_score <= threshold
        ]
        for item_id in to_remove:
            del self.items[item_id]
    
    def save(self, path: Path):
        """Persist playbook to JSON."""
        data = {
            "version": self.version,
            "last_updated": self.last_updated,
            "items": {
                k: {
                    "id": v.id,
                    "category": v.category.value,
                    "content": v.content,
                    "helpful_count": v.helpful_count,
                    "harmful_count": v.harmful_count,
                    "created_at": v.created_at,
                    "last_used": v.last_used,
                    "source_trials": v.source_trials,
                    "tags": v.tags
                }
                for k, v in self.items.items()
            }
        }
        path.parent.mkdir(parents=True, exist_ok=True)
        with open(path, 'w') as f:
            json.dump(data, f, indent=2)
    
    @classmethod
    def load(cls, path: Path) -> "AtomizerPlaybook":
        """Load playbook from JSON."""
        if not path.exists():
            return cls()
        
        with open(path) as f:
            data = json.load(f)
        
        playbook = cls(
            version=data.get("version", 1),
            last_updated=data.get("last_updated", datetime.now().isoformat())
        )
        
        for item_data in data.get("items", {}).values():
            item = PlaybookItem(
                id=item_data["id"],
                category=InsightCategory(item_data["category"]),
                content=item_data["content"],
                helpful_count=item_data.get("helpful_count", 0),
                harmful_count=item_data.get("harmful_count", 0),
                created_at=item_data.get("created_at", ""),
                last_used=item_data.get("last_used"),
                source_trials=item_data.get("source_trials", []),
                tags=item_data.get("tags", [])
            )
            playbook.items[item.id] = item
        
        return playbook

1.2 Create Reflector Component

File: optimization_engine/context/reflector.py

"""
Reflector: Analyzes optimization outcomes to extract insights.

Part of the ACE (Agentic Context Engineering) implementation for Atomizer.
"""

from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from pathlib import Path
import json

from .playbook import AtomizerPlaybook, InsightCategory

@dataclass
class OptimizationOutcome:
    """Captured outcome from an optimization run."""
    trial_number: int
    success: bool
    objective_value: Optional[float]
    constraint_violations: List[str]
    solver_errors: List[str]
    design_variables: Dict[str, float]
    extractor_used: str
    duration_seconds: float
    notes: str = ""

class AtomizerReflector:
    """
    Analyzes optimization outcomes and extracts actionable insights.
    
    Implements the Reflector role from ACE framework:
    - Examines successful and failed trials
    - Extracts patterns that led to success/failure
    - Formats insights for Curator integration
    """
    
    def __init__(self, playbook: AtomizerPlaybook):
        self.playbook = playbook
        self.pending_insights: List[Dict[str, Any]] = []
    
    def analyze_trial(self, outcome: OptimizationOutcome) -> List[Dict[str, Any]]:
        """
        Analyze a single trial outcome and extract insights.
        
        Returns list of insight candidates (not yet added to playbook).
        """
        insights = []
        
        # Analyze solver errors
        for error in outcome.solver_errors:
            if "convergence" in error.lower():
                insights.append({
                    "category": InsightCategory.MISTAKE,
                    "content": f"Convergence failure with config: {self._summarize_config(outcome)}",
                    "helpful": False,
                    "trial": outcome.trial_number
                })
            elif "mesh" in error.lower():
                insights.append({
                    "category": InsightCategory.MISTAKE,
                    "content": f"Mesh-related error: {error[:100]}",
                    "helpful": False,
                    "trial": outcome.trial_number
                })
        
        # Analyze successful patterns
        if outcome.success and outcome.objective_value is not None:
            # Record successful design variable ranges
            insights.append({
                "category": InsightCategory.STRATEGY,
                "content": f"Successful design: {self._summarize_design(outcome)}",
                "helpful": True,
                "trial": outcome.trial_number
            })
        
        # Analyze constraint violations
        for violation in outcome.constraint_violations:
            insights.append({
                "category": InsightCategory.MISTAKE,
                "content": f"Constraint violation: {violation}",
                "helpful": False,
                "trial": outcome.trial_number
            })
        
        self.pending_insights.extend(insights)
        return insights
    
    def analyze_study_completion(
        self, 
        study_name: str,
        total_trials: int,
        best_value: float,
        convergence_rate: float
    ) -> List[Dict[str, Any]]:
        """
        Analyze completed study and extract high-level insights.
        """
        insights = []
        
        if convergence_rate > 0.9:
            insights.append({
                "category": InsightCategory.STRATEGY,
                "content": f"Study '{study_name}' achieved {convergence_rate:.0%} convergence - configuration is robust",
                "helpful": True,
                "trial": None
            })
        elif convergence_rate < 0.5:
            insights.append({
                "category": InsightCategory.MISTAKE,
                "content": f"Study '{study_name}' had only {convergence_rate:.0%} convergence - review mesh and solver settings",
                "helpful": False,
                "trial": None
            })
        
        return insights
    
    def commit_insights(self) -> int:
        """
        Commit pending insights to playbook (Curator handoff).
        
        Returns number of insights added.
        """
        count = 0
        for insight in self.pending_insights:
            item = self.playbook.add_insight(
                category=insight["category"],
                content=insight["content"],
                source_trial=insight.get("trial")
            )
            if not insight.get("helpful", True):
                self.playbook.record_outcome(item.id, helpful=False)
            count += 1
        
        self.pending_insights = []
        return count
    
    def _summarize_config(self, outcome: OptimizationOutcome) -> str:
        """Create brief config summary."""
        return f"extractor={outcome.extractor_used}, vars={len(outcome.design_variables)}"
    
    def _summarize_design(self, outcome: OptimizationOutcome) -> str:
        """Create brief design summary."""
        vars_summary = ", ".join(
            f"{k}={v:.3g}" for k, v in list(outcome.design_variables.items())[:3]
        )
        return f"obj={outcome.objective_value:.4g}, {vars_summary}"

1.3 Integration with OptimizationRunner

File: optimization_engine/core/runner.py (modifications - POST-MIGRATION PATH)

# Add to imports
from optimization_engine.context.playbook import AtomizerPlaybook
from optimization_engine.context.reflector import AtomizerReflector, OptimizationOutcome

class OptimizationRunner:
    def __init__(self, ...):
        # ... existing init ...
        
        # Initialize context engineering components
        self.playbook = AtomizerPlaybook.load(
            self.output_dir / "playbook.json"
        )
        self.reflector = AtomizerReflector(self.playbook)
    
    def _objective(self, trial: optuna.Trial) -> float:
        # ... existing trial logic ...
        
        # After trial completion, capture outcome for reflection
        outcome = OptimizationOutcome(
            trial_number=trial.number,
            success=not failed,
            objective_value=objective_value if not failed else None,
            constraint_violations=constraint_violations,
            solver_errors=solver_errors,
            design_variables=design_vars,
            extractor_used=self.config.get("extractor", "unknown"),
            duration_seconds=trial_duration
        )
        self.reflector.analyze_trial(outcome)
        
        return objective_value
    
    def run(self, n_trials: int) -> Dict[str, Any]:
        # ... existing run logic ...
        
        # After study completion
        self.reflector.analyze_study_completion(
            study_name=self.study.study_name,
            total_trials=len(self.study.trials),
            best_value=self.study.best_value,
            convergence_rate=successful_trials / total_trials
        )
        
        # Commit insights and save playbook
        insights_added = self.reflector.commit_insights()
        self.playbook.save(self.output_dir / "playbook.json")
        
        print(f"Added {insights_added} insights to playbook")

Phase 2: Context Isolation & Loading (Week 2-3)

Goal: Implement Write-Select-Compress-Isolate pattern for POS

2.1 Session State Schema

File: optimization_engine/context/session_state.py

"""
Session state management with context isolation.

Implements the "Isolate" pattern from context engineering:
- Exposed fields are sent to LLM
- Isolated fields are accessed selectively
"""

from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum

class TaskType(Enum):
    CREATE_STUDY = "create_study"
    RUN_OPTIMIZATION = "run_optimization"
    MONITOR_PROGRESS = "monitor_progress"
    ANALYZE_RESULTS = "analyze_results"
    DEBUG_ERROR = "debug_error"
    CONFIGURE_SETTINGS = "configure_settings"

class ExposedState(BaseModel):
    """State exposed to LLM at every turn."""
    
    # Current task context
    task_type: Optional[TaskType] = None
    current_objective: str = ""
    
    # Recent history (compressed)
    recent_actions: List[str] = Field(default_factory=list, max_items=10)
    recent_errors: List[str] = Field(default_factory=list, max_items=5)
    
    # Active study summary
    study_name: Optional[str] = None
    study_status: str = "unknown"
    trials_completed: int = 0
    best_value: Optional[float] = None
    
    # Playbook excerpt (most relevant items)
    active_playbook_items: List[str] = Field(default_factory=list, max_items=15)

class IsolatedState(BaseModel):
    """State isolated from LLM - accessed selectively."""
    
    # Full optimization history (can be large)
    full_trial_history: List[Dict[str, Any]] = Field(default_factory=list)
    
    # NX session state (heavy, complex)
    nx_model_path: Optional[str] = None
    nx_expressions: Dict[str, Any] = Field(default_factory=dict)
    
    # Neural network cache
    neural_predictions: Dict[str, float] = Field(default_factory=dict)
    
    # Full playbook (loaded on demand)
    full_playbook_path: Optional[str] = None
    
    # Debug information
    last_solver_output: str = ""
    last_f06_content: str = ""

class AtomizerSessionState(BaseModel):
    """
    Complete session state with exposure control.
    
    The exposed state is automatically injected into every LLM context.
    The isolated state is accessed only when explicitly needed.
    """
    
    session_id: str
    created_at: datetime = Field(default_factory=datetime.now)
    last_updated: datetime = Field(default_factory=datetime.now)
    
    exposed: ExposedState = Field(default_factory=ExposedState)
    isolated: IsolatedState = Field(default_factory=IsolatedState)
    
    def get_llm_context(self) -> str:
        """Generate context string for LLM consumption."""
        lines = [
            "## Current Session State",
            "",
            f"**Task**: {self.exposed.task_type.value if self.exposed.task_type else 'Not set'}",
            f"**Objective**: {self.exposed.current_objective}",
            "",
        ]
        
        if self.exposed.study_name:
            lines.extend([
                f"### Active Study: {self.exposed.study_name}",
                f"- Status: {self.exposed.study_status}",
                f"- Trials: {self.exposed.trials_completed}",
                f"- Best: {self.exposed.best_value}",
                "",
            ])
        
        if self.exposed.recent_actions:
            lines.append("### Recent Actions")
            for action in self.exposed.recent_actions[-5:]:
                lines.append(f"- {action}")
            lines.append("")
        
        if self.exposed.recent_errors:
            lines.append("### Recent Errors (address these)")
            for error in self.exposed.recent_errors:
                lines.append(f"- ⚠️ {error}")
            lines.append("")
        
        if self.exposed.active_playbook_items:
            lines.append("### Relevant Knowledge")
            for item in self.exposed.active_playbook_items:
                lines.append(f"- {item}")
            lines.append("")
        
        return "\n".join(lines)
    
    def add_action(self, action: str):
        """Record an action (auto-compresses old actions)."""
        self.exposed.recent_actions.append(action)
        if len(self.exposed.recent_actions) > 10:
            # Compress: keep first, last 5, summarize middle
            self.exposed.recent_actions = (
                [self.exposed.recent_actions[0]] +
                ["... (earlier actions summarized)"] +
                self.exposed.recent_actions[-5:]
            )
        self.last_updated = datetime.now()
    
    def add_error(self, error: str):
        """Record an error for LLM attention."""
        self.exposed.recent_errors.append(error)
        self.exposed.recent_errors = self.exposed.recent_errors[-5:]
        self.last_updated = datetime.now()
    
    def load_isolated_data(self, key: str) -> Any:
        """Explicitly load isolated data when needed."""
        return getattr(self.isolated, key, None)

2.2 Context Loader Enhancement

File: .claude/skills/02_CONTEXT_LOADER.md (updated)

# Context Loader - Enhanced with Playbook Integration

## Loading Rules by Task Type

### CREATE_STUDY
**Always Load**:
- `core/study-creation-core.md`
- `SYS_12_EXTRACTOR_LIBRARY.md`
- **Playbook**: Filter by tags=['study_creation', 'design_variables']

**Load If**:
- `modules/zernike-optimization.md`: if "telescope" or "mirror" in query
- `modules/neural-acceleration.md`: if trials > 50

### RUN_OPTIMIZATION
**Always Load**:
- `OP_02_RUN_OPTIMIZATION.md`
- **Playbook**: Filter by tags=['solver', 'convergence', 'mesh']

**Load If**:
- Recent errors exist → Include mistake items from playbook

### DEBUG_ERROR
**Always Load**:
- `OP_06_TROUBLESHOOT.md`
- **Playbook**: Filter by category=MISTAKE, min_confidence=0.3
- Session state recent_errors

**Load If**:
- "convergence" in error → Load solver-specific playbook items
- "mesh" in error → Load mesh-specific playbook items

## Playbook Integration Pattern

```python
# In context loader
def load_context_for_task(task_type: TaskType, session: AtomizerSessionState):
    context_parts = []
    
    # 1. Load protocol docs (existing behavior)
    context_parts.append(load_protocol(task_type))
    
    # 2. Load session state (exposed only)
    context_parts.append(session.get_llm_context())
    
    # 3. Load relevant playbook items
    playbook = AtomizerPlaybook.load(PLAYBOOK_PATH)
    playbook_context = playbook.get_context_for_task(
        task_type=task_type.value,
        max_items=15,
        min_confidence=0.6
    )
    context_parts.append(playbook_context)
    
    # 4. Add error-specific items if debugging
    if task_type == TaskType.DEBUG_ERROR:
        for error in session.exposed.recent_errors:
            relevant = playbook.search_by_content(error, category=InsightCategory.MISTAKE)
            context_parts.extend([item.to_context_string() for item in relevant])
    
    return "\n\n---\n\n".join(context_parts)

---

### Phase 3: KV-Cache Optimization (Week 3)

**Goal**: Maximize cache hits for 10x cost reduction

#### 3.1 Stable Prefix Architecture

**File**: `.claude/skills/SYSTEM_PROMPT_TEMPLATE.md`

```markdown
# Atomizer System Prompt Template

## Structure for KV-Cache Optimization

The system prompt is structured to maximize KV-cache hits:

[SECTION 1: STABLE - Never changes]

  • Atomizer identity and capabilities
  • Core principles (talk don't click)
  • Tool schemas and definitions
  • Base protocol routing table

[SECTION 2: SEMI-STABLE - Changes per session type]

  • Active protocol definition
  • Task-specific instructions
  • Relevant playbook items (top 10 by score)

[SECTION 3: DYNAMIC - Changes every turn]

  • Current session state
  • Recent actions/errors
  • User's latest message

## Implementation

### Stable Prefix (Cache This)

You are assisting with Atomizer, an LLM-first FEA optimization framework.

Core Capabilities

  • Natural language → optimization configuration
  • NX Nastran integration via journals
  • Multi-strategy optimization (TPE, CMA-ES, NSGA-II)
  • Real-time progress monitoring
  • Neural acceleration (600-1000x speedup)

Principles

  1. Talk, don't click - users describe goals in plain language
  2. Never modify master models - work on copies
  3. Always validate before running
  4. Document everything

Available Tools

[... tool schemas - NEVER CHANGE ORDER ...]

Protocol Routing

Intent Protocol Priority
Create study OP_01 1
Run optimization OP_02 1
Monitor progress OP_03 2
Analyze results OP_04 2
Debug errors OP_06 1

### Semi-Stable Section (Per Session)

Active Task: {task_type}

Loaded Protocol: {protocol_name}

{protocol_content}

Relevant Knowledge (from {playbook_version})

{playbook_items}


### Dynamic Section (Per Turn)

Current State

{session_state.get_llm_context()}

User Message

{user_message}

3.2 Cache Monitoring

File: optimization_engine/context/cache_monitor.py

"""
Monitor and optimize KV-cache hit rates.
"""

from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class CacheStats:
    total_requests: int = 0
    cache_hits: int = 0
    prefix_length: int = 0
    
    @property
    def hit_rate(self) -> float:
        if self.total_requests == 0:
            return 0.0
        return self.cache_hits / self.total_requests

class ContextCacheOptimizer:
    """
    Tracks and optimizes context for cache efficiency.
    """
    
    def __init__(self):
        self.stats = CacheStats()
        self._last_prefix_hash: Optional[str] = None
    
    def prepare_context(
        self,
        stable_prefix: str,
        semi_stable: str,
        dynamic: str
    ) -> str:
        """
        Assemble context optimized for caching.
        
        Tracks whether prefix changed (cache miss).
        """
        # Hash the stable prefix
        prefix_hash = hashlib.md5(stable_prefix.encode()).hexdigest()
        
        self.stats.total_requests += 1
        if prefix_hash == self._last_prefix_hash:
            self.stats.cache_hits += 1
        
        self._last_prefix_hash = prefix_hash
        self.stats.prefix_length = len(stable_prefix)
        
        # Assemble with clear boundaries
        return f"""{stable_prefix}

---
{semi_stable}

---
{dynamic}"""
    
    def get_report(self) -> str:
        """Generate cache efficiency report."""
        return f"""
Cache Statistics:
- Requests: {self.stats.total_requests}
- Cache Hits: {self.stats.cache_hits}
- Hit Rate: {self.stats.hit_rate:.1%}
- Stable Prefix: {self.stats.prefix_length} chars

Estimated Cost Savings: {self.stats.hit_rate * 90:.0f}% 
(Based on 10x cost difference between cached/uncached tokens)
"""

Phase 4: Error Recovery & Learning (Week 4)

Goal: Leave wrong turns in context, learn from failures

4.1 Error Preservation Hook

File: optimization_engine/plugins/post_solve/error_tracker.py

"""
Error Tracker Hook

Preserves solver errors and failures in context for learning.
Based on Manus insight: "leave the wrong turns in the context"
"""

from pathlib import Path
from datetime import datetime
import json

def track_error(context: dict) -> dict:
    """
    Hook that preserves errors for context learning.
    
    Called at post_solve when solver fails.
    """
    trial_number = context.get('trial_number', -1)
    output_dir = Path(context.get('output_dir', '.'))
    
    # Capture error details
    error_info = {
        "trial": trial_number,
        "timestamp": datetime.now().isoformat(),
        "solver_returncode": context.get('solver_returncode'),
        "error_type": classify_error(context),
        "design_variables": context.get('design_variables', {}),
        "error_message": context.get('error_message', ''),
        "f06_snippet": extract_f06_error(context.get('f06_path'))
    }
    
    # Append to error log (never overwrite - accumulate)
    error_log_path = output_dir / "error_history.jsonl"
    with open(error_log_path, 'a') as f:
        f.write(json.dumps(error_info) + "\n")
    
    # Update session state for LLM context
    if 'session_state' in context:
        context['session_state'].add_error(
            f"Trial {trial_number}: {error_info['error_type']} - {error_info['error_message'][:100]}"
        )
    
    return {"error_tracked": True, "error_type": error_info['error_type']}

def classify_error(context: dict) -> str:
    """Classify error type for playbook categorization."""
    error_msg = context.get('error_message', '').lower()
    
    if 'convergence' in error_msg:
        return "convergence_failure"
    elif 'mesh' in error_msg or 'element' in error_msg:
        return "mesh_error"
    elif 'singular' in error_msg or 'matrix' in error_msg:
        return "singularity"
    elif 'memory' in error_msg or 'allocation' in error_msg:
        return "memory_error"
    elif 'license' in error_msg:
        return "license_error"
    else:
        return "unknown_error"

def extract_f06_error(f06_path: str) -> str:
    """Extract error section from F06 file."""
    if not f06_path or not Path(f06_path).exists():
        return ""
    
    try:
        with open(f06_path) as f:
            content = f.read()
        
        # Look for error indicators
        error_markers = ["*** USER FATAL", "*** SYSTEM FATAL", "*** USER WARNING"]
        for marker in error_markers:
            if marker in content:
                idx = content.index(marker)
                return content[idx:idx+500]
        
        return ""
    except Exception:
        return ""

# Hook registration
HOOK_CONFIG = {
    "name": "error_tracker",
    "hook_point": "post_solve",
    "priority": 100,  # Run early to capture before cleanup
    "enabled": True,
    "description": "Preserves solver errors for context learning"
}

4.2 Feedback Loop Integration

File: optimization_engine/context/feedback_loop.py

"""
Feedback Loop: Connects optimization outcomes to playbook updates.

Implements ACE's "leverage natural execution feedback" principle.
"""

from typing import Dict, Any, List
from pathlib import Path

from .playbook import AtomizerPlaybook, InsightCategory
from .reflector import AtomizerReflector, OptimizationOutcome

class FeedbackLoop:
    """
    Automated feedback loop that learns from optimization runs.
    
    Key insight from ACE: Use execution feedback (success/failure)
    as the learning signal, not labeled data.
    """
    
    def __init__(self, playbook_path: Path):
        self.playbook = AtomizerPlaybook.load(playbook_path)
        self.reflector = AtomizerReflector(self.playbook)
        self.playbook_path = playbook_path
    
    def process_trial_result(
        self,
        trial_number: int,
        success: bool,
        objective_value: float,
        design_variables: Dict[str, float],
        context_items_used: List[str],  # Which playbook items were in context
        errors: List[str] = None
    ):
        """
        Process a trial result and update playbook accordingly.
        
        This is the core learning mechanism:
        - If trial succeeded with certain playbook items → increase helpful count
        - If trial failed with certain playbook items → increase harmful count
        """
        # Update playbook item scores based on outcome
        for item_id in context_items_used:
            self.playbook.record_outcome(item_id, helpful=success)
        
        # Create outcome for reflection
        outcome = OptimizationOutcome(
            trial_number=trial_number,
            success=success,
            objective_value=objective_value if success else None,
            constraint_violations=[],
            solver_errors=errors or [],
            design_variables=design_variables,
            extractor_used="",
            duration_seconds=0
        )
        
        # Reflect on outcome
        self.reflector.analyze_trial(outcome)
    
    def finalize_study(self, study_stats: Dict[str, Any]):
        """
        Called when study completes. Commits insights and prunes playbook.
        """
        # Analyze study-level patterns
        self.reflector.analyze_study_completion(
            study_name=study_stats.get("name", "unknown"),
            total_trials=study_stats.get("total_trials", 0),
            best_value=study_stats.get("best_value", 0),
            convergence_rate=study_stats.get("convergence_rate", 0)
        )
        
        # Commit all pending insights
        insights_added = self.reflector.commit_insights()
        
        # Prune consistently harmful items
        self.playbook.prune_harmful(threshold=-3)
        
        # Save updated playbook
        self.playbook.save(self.playbook_path)
        
        return {
            "insights_added": insights_added,
            "playbook_size": len(self.playbook.items),
            "playbook_version": self.playbook.version
        }

Phase 5: Context Compaction (Week 4-5)

Goal: Handle long-running optimization sessions without context overflow

5.1 Compaction Manager

File: optimization_engine/context/compaction.py

"""
Context Compaction for Long-Running Optimizations

Based on Google ADK's compaction architecture:
- Trigger compaction when threshold reached
- Summarize older events
- Preserve recent detail
"""

from typing import List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class ContextEvent:
    """Single event in optimization context."""
    timestamp: datetime
    event_type: str  # trial_start, trial_complete, error, milestone
    summary: str
    details: Dict[str, Any] = field(default_factory=dict)
    compacted: bool = False

class CompactionManager:
    """
    Manages context compaction for long optimization sessions.
    
    Strategy:
    - Keep last N events in full detail
    - Summarize older events into milestone markers
    - Preserve error events (never compact errors)
    """
    
    def __init__(
        self,
        compaction_threshold: int = 50,
        keep_recent: int = 20,
        keep_errors: bool = True
    ):
        self.events: List[ContextEvent] = []
        self.compaction_threshold = compaction_threshold
        self.keep_recent = keep_recent
        self.keep_errors = keep_errors
        self.compaction_count = 0
    
    def add_event(self, event: ContextEvent):
        """Add event and trigger compaction if needed."""
        self.events.append(event)
        
        if len(self.events) > self.compaction_threshold:
            self._compact()
    
    def _compact(self):
        """
        Compact older events into summaries.
        
        Preserves:
        - All error events
        - Last `keep_recent` events
        - Milestone summaries of compacted regions
        """
        if len(self.events) <= self.keep_recent:
            return
        
        # Split into old and recent
        old_events = self.events[:-self.keep_recent]
        recent_events = self.events[-self.keep_recent:]
        
        # Separate errors from old events
        error_events = [e for e in old_events if e.event_type == "error"]
        non_error_events = [e for e in old_events if e.event_type != "error"]
        
        # Summarize non-error old events
        if non_error_events:
            summary = self._create_summary(non_error_events)
            compaction_event = ContextEvent(
                timestamp=non_error_events[0].timestamp,
                event_type="compaction",
                summary=summary,
                details={
                    "events_compacted": len(non_error_events),
                    "compaction_number": self.compaction_count
                },
                compacted=True
            )
            self.compaction_count += 1
            
            # Rebuild events list
            self.events = [compaction_event] + error_events + recent_events
        else:
            self.events = error_events + recent_events
    
    def _create_summary(self, events: List[ContextEvent]) -> str:
        """Create summary of compacted events."""
        trial_events = [e for e in events if "trial" in e.event_type]
        
        if not trial_events:
            return f"[{len(events)} events compacted]"
        
        # Extract trial statistics
        trial_numbers = []
        objectives = []
        
        for e in trial_events:
            if "trial_number" in e.details:
                trial_numbers.append(e.details["trial_number"])
            if "objective" in e.details:
                objectives.append(e.details["objective"])
        
        if trial_numbers and objectives:
            return (
                f"Trials {min(trial_numbers)}-{max(trial_numbers)}: "
                f"Best={min(objectives):.4g}, "
                f"Avg={sum(objectives)/len(objectives):.4g}"
            )
        elif trial_numbers:
            return f"Trials {min(trial_numbers)}-{max(trial_numbers)} completed"
        else:
            return f"[{len(events)} events compacted]"
    
    def get_context_string(self) -> str:
        """Generate context string from events."""
        lines = ["## Optimization History", ""]
        
        for event in self.events:
            if event.compacted:
                lines.append(f"📦 {event.summary}")
            elif event.event_type == "error":
                lines.append(f"⚠️ {event.summary}")
            else:
                lines.append(f"- {event.summary}")
        
        return "\n".join(lines)
    
    def get_stats(self) -> Dict[str, Any]:
        """Get compaction statistics."""
        return {
            "total_events": len(self.events),
            "compaction_count": self.compaction_count,
            "error_events": len([e for e in self.events if e.event_type == "error"]),
            "compacted_events": len([e for e in self.events if e.compacted])
        }

Part 3: Claude Code Session Integration

3.1 Enhanced Bootstrap for Claude Code

File: .claude/skills/00_BOOTSTRAP_V2.md

# Atomizer Bootstrap v2.0 - Context-Aware Sessions

## Session Initialization

On session start, perform these steps:

### Step 1: Load Playbook
```bash
# Check for existing playbook
cat optimization_engine/context/playbook.json 2>/dev/null | head -20

If playbook exists, extract top insights:

  • Filter by task type (inferred from user's first message)
  • Include top 10 by net_score
  • Always include recent mistakes (last 5)

Step 2: Initialize Session State

from optimization_engine.context.session_state import AtomizerSessionState, TaskType

session = AtomizerSessionState(session_id="current")
session.exposed.task_type = TaskType.CREATE_STUDY  # Update based on intent

Step 3: Load Task-Specific Context

Based on detected task type, load protocols per 02_CONTEXT_LOADER.md

Step 4: Inject Playbook Items

Add relevant playbook items to session.exposed.active_playbook_items


Error Handling Protocol

When ANY error occurs:

  1. Preserve the error - Add to session state
  2. Check playbook - Look for matching mistake patterns
  3. Learn from it - If novel error, queue for playbook addition
  4. Show to user - Include error context in response
# On error
session.add_error(f"{error_type}: {error_message}")

# Check playbook for similar errors
similar = playbook.search_by_content(error_message, category=InsightCategory.MISTAKE)
if similar:
    print(f"Known issue: {similar[0].content}")
else:
    reflector.queue_insight(InsightCategory.MISTAKE, error_message)

Context Budget Management

Total context budget: ~100K tokens

Allocation:

  • Stable prefix: 5K tokens (cached)
  • Protocols: 10K tokens
  • Playbook items: 5K tokens
  • Session state: 2K tokens
  • Conversation history: 30K tokens
  • Working space: 48K tokens

If approaching limit:

  1. Trigger compaction of old events
  2. Reduce playbook items to top 5
  3. Summarize conversation history

### 3.2 Dashboard Integration

**File**: `atomizer-dashboard/backend/api/routes/context.py`

```python
"""
Context Engineering API Routes

Provides endpoints for:
- Viewing playbook contents
- Managing session state
- Triggering compaction
- Monitoring cache efficiency
"""

from fastapi import APIRouter, HTTPException
from pathlib import Path
from typing import Optional

router = APIRouter(prefix="/context", tags=["context"])

ATOMIZER_ROOT = Path(__file__).parents[4]
PLAYBOOK_PATH = ATOMIZER_ROOT / "optimization_engine" / "context" / "playbook.json"

@router.get("/playbook")
async def get_playbook(
    category: Optional[str] = None,
    min_score: int = 0,
    limit: int = 50
):
    """Get playbook items with optional filtering."""
    from optimization_engine.context.playbook import AtomizerPlaybook, InsightCategory
    
    playbook = AtomizerPlaybook.load(PLAYBOOK_PATH)
    
    items = list(playbook.items.values())
    
    # Filter by category
    if category:
        try:
            cat = InsightCategory(category)
            items = [i for i in items if i.category == cat]
        except ValueError:
            raise HTTPException(400, f"Invalid category: {category}")
    
    # Filter by score
    items = [i for i in items if i.net_score >= min_score]
    
    # Sort by score
    items.sort(key=lambda x: x.net_score, reverse=True)
    
    return {
        "total": len(playbook.items),
        "filtered": len(items),
        "items": [
            {
                "id": i.id,
                "category": i.category.value,
                "content": i.content,
                "helpful": i.helpful_count,
                "harmful": i.harmful_count,
                "score": i.net_score,
                "confidence": i.confidence
            }
            for i in items[:limit]
        ]
    }

@router.post("/playbook/feedback")
async def record_feedback(item_id: str, helpful: bool):
    """Record feedback on a playbook item."""
    from optimization_engine.context.playbook import AtomizerPlaybook
    
    playbook = AtomizerPlaybook.load(PLAYBOOK_PATH)
    
    if item_id not in playbook.items:
        raise HTTPException(404, f"Item not found: {item_id}")
    
    playbook.record_outcome(item_id, helpful=helpful)
    playbook.save(PLAYBOOK_PATH)
    
    item = playbook.items[item_id]
    return {
        "id": item_id,
        "new_score": item.net_score,
        "confidence": item.confidence
    }

@router.get("/session/{session_id}")
async def get_session_state(session_id: str):
    """Get current session state."""
    # Implementation depends on session storage
    pass

@router.get("/cache/stats")
async def get_cache_stats():
    """Get KV-cache efficiency statistics."""
    from optimization_engine.context.cache_monitor import ContextCacheOptimizer
    
    # Would need to access singleton cache optimizer
    return {
        "message": "Cache stats endpoint - implement with actual cache monitor"
    }

Part 4: Testing & Validation

4.1 Test Suite

File: tests/test_context_engineering.py

"""
Test suite for context engineering components.
"""

import pytest
from pathlib import Path
import tempfile
import json

from optimization_engine.context.playbook import (
    AtomizerPlaybook, 
    PlaybookItem, 
    InsightCategory
)
from optimization_engine.context.reflector import (
    AtomizerReflector,
    OptimizationOutcome
)
from optimization_engine.context.session_state import (
    AtomizerSessionState,
    TaskType
)
from optimization_engine.context.compaction import (
    CompactionManager,
    ContextEvent
)


class TestAtomizerPlaybook:
    """Tests for the playbook system."""
    
    def test_add_insight(self):
        """Test adding insights to playbook."""
        playbook = AtomizerPlaybook()
        
        item = playbook.add_insight(
            category=InsightCategory.STRATEGY,
            content="Use shell elements for thin walls",
            source_trial=1
        )
        
        assert item.id == "str-00001"
        assert item.helpful_count == 0
        assert item.harmful_count == 0
        assert len(playbook.items) == 1
    
    def test_deduplication(self):
        """Test that duplicate insights are merged."""
        playbook = AtomizerPlaybook()
        
        playbook.add_insight(InsightCategory.STRATEGY, "Use shell elements")
        playbook.add_insight(InsightCategory.STRATEGY, "Use shell elements")
        
        assert len(playbook.items) == 1
        assert playbook.items["str-00001"].helpful_count == 1
    
    def test_outcome_tracking(self):
        """Test helpful/harmful tracking."""
        playbook = AtomizerPlaybook()
        item = playbook.add_insight(InsightCategory.STRATEGY, "Test insight")
        
        playbook.record_outcome(item.id, helpful=True)
        playbook.record_outcome(item.id, helpful=True)
        playbook.record_outcome(item.id, helpful=False)
        
        assert item.helpful_count == 2
        assert item.harmful_count == 1
        assert item.net_score == 1
    
    def test_persistence(self, tmp_path):
        """Test save/load cycle."""
        playbook = AtomizerPlaybook()
        playbook.add_insight(InsightCategory.MISTAKE, "Don't do this")
        
        save_path = tmp_path / "playbook.json"
        playbook.save(save_path)
        
        loaded = AtomizerPlaybook.load(save_path)
        assert len(loaded.items) == 1
        assert "mis-00001" in loaded.items
    
    def test_pruning(self):
        """Test harmful item pruning."""
        playbook = AtomizerPlaybook()
        item = playbook.add_insight(InsightCategory.STRATEGY, "Bad advice")
        
        # Record many harmful outcomes
        for _ in range(5):
            playbook.record_outcome(item.id, helpful=False)
        
        playbook.prune_harmful(threshold=-3)
        assert len(playbook.items) == 0


class TestAtomizerReflector:
    """Tests for the reflector component."""
    
    def test_analyze_failed_trial(self):
        """Test analysis of failed trial."""
        playbook = AtomizerPlaybook()
        reflector = AtomizerReflector(playbook)
        
        outcome = OptimizationOutcome(
            trial_number=1,
            success=False,
            objective_value=None,
            constraint_violations=["stress > 250 MPa"],
            solver_errors=["convergence failure at iteration 50"],
            design_variables={"thickness": 0.5},
            extractor_used="stress_extractor",
            duration_seconds=120
        )
        
        insights = reflector.analyze_trial(outcome)
        
        assert len(insights) >= 2  # At least error + constraint
        assert any(i["category"] == InsightCategory.MISTAKE for i in insights)
    
    def test_commit_insights(self):
        """Test committing insights to playbook."""
        playbook = AtomizerPlaybook()
        reflector = AtomizerReflector(playbook)
        
        outcome = OptimizationOutcome(
            trial_number=1,
            success=True,
            objective_value=100.0,
            constraint_violations=[],
            solver_errors=[],
            design_variables={"thickness": 1.0},
            extractor_used="mass_extractor",
            duration_seconds=60
        )
        
        reflector.analyze_trial(outcome)
        count = reflector.commit_insights()
        
        assert count > 0
        assert len(playbook.items) > 0


class TestSessionState:
    """Tests for session state management."""
    
    def test_exposed_state_context(self):
        """Test LLM context generation."""
        session = AtomizerSessionState(session_id="test")
        session.exposed.task_type = TaskType.CREATE_STUDY
        session.exposed.study_name = "bracket_opt"
        session.exposed.trials_completed = 25
        session.exposed.best_value = 0.5
        
        context = session.get_llm_context()
        
        assert "bracket_opt" in context
        assert "25" in context
        assert "0.5" in context
    
    def test_action_compression(self):
        """Test automatic action compression."""
        session = AtomizerSessionState(session_id="test")
        
        for i in range(15):
            session.add_action(f"Action {i}")
        
        # Should be compressed
        assert len(session.exposed.recent_actions) <= 12
        assert "summarized" in session.exposed.recent_actions[1].lower()


class TestCompactionManager:
    """Tests for context compaction."""
    
    def test_compaction_trigger(self):
        """Test that compaction triggers at threshold."""
        manager = CompactionManager(compaction_threshold=10, keep_recent=5)
        
        for i in range(15):
            manager.add_event(ContextEvent(
                timestamp=datetime.now(),
                event_type="trial_complete",
                summary=f"Trial {i} complete",
                details={"trial_number": i, "objective": i * 0.1}
            ))
        
        assert manager.compaction_count > 0
        assert len(manager.events) <= 10
    
    def test_error_preservation(self):
        """Test that errors are never compacted."""
        manager = CompactionManager(compaction_threshold=10, keep_recent=3)
        
        # Add error early
        manager.add_event(ContextEvent(
            timestamp=datetime.now(),
            event_type="error",
            summary="Critical solver failure"
        ))
        
        # Add many regular events
        for i in range(20):
            manager.add_event(ContextEvent(
                timestamp=datetime.now(),
                event_type="trial_complete",
                summary=f"Trial {i}"
            ))
        
        # Error should still be present
        errors = [e for e in manager.events if e.event_type == "error"]
        assert len(errors) == 1

4.2 Integration Test

File: tests/test_context_integration.py

"""
Integration test for full context engineering pipeline.
"""

import pytest
from pathlib import Path
import tempfile

def test_full_optimization_with_context_engineering():
    """
    End-to-end test of optimization with context engineering.
    
    Simulates:
    1. Starting fresh session
    2. Running optimization with failures
    3. Verifying playbook learns from failures
    4. Running second optimization
    5. Verifying improved performance
    """
    from optimization_engine.context.playbook import AtomizerPlaybook
    from optimization_engine.context.feedback_loop import FeedbackLoop
    
    with tempfile.TemporaryDirectory() as tmp_dir:
        playbook_path = Path(tmp_dir) / "playbook.json"
        
        # Initialize feedback loop
        feedback = FeedbackLoop(playbook_path)
        
        # Simulate first study with failures
        for i in range(10):
            success = i % 3 != 0  # Every 3rd trial fails
            feedback.process_trial_result(
                trial_number=i,
                success=success,
                objective_value=100 - i if success else 0,
                design_variables={"thickness": 0.5 + i * 0.1},
                context_items_used=[],
                errors=["convergence failure"] if not success else []
            )
        
        # Finalize and check learning
        result = feedback.finalize_study({
            "name": "test_study",
            "total_trials": 10,
            "best_value": 91,
            "convergence_rate": 0.7
        })
        
        assert result["insights_added"] > 0
        
        # Load playbook and verify content
        playbook = AtomizerPlaybook.load(playbook_path)
        
        # Should have learned about convergence failures
        mistakes = [
            item for item in playbook.items.values()
            if item.category.value == "mis"
        ]
        assert len(mistakes) > 0

Part 5: Rollout Plan

Week 1-2: Foundation

  • Implement AtomizerPlaybook class
  • Implement AtomizerReflector class
  • Add playbook persistence (JSON)
  • Write unit tests
  • Integrate with existing LAC concepts

Week 3: Context Management

  • Implement AtomizerSessionState
  • Update 02_CONTEXT_LOADER.md with playbook integration
  • Create stable prefix template
  • Implement cache monitoring

Week 4: Learning Loop

  • Implement FeedbackLoop
  • Create error tracker hook
  • Implement compaction manager
  • Integration testing

Week 5: Claude Code Integration

  • Update 00_BOOTSTRAP.md to v2
  • Add dashboard API routes
  • Create playbook visualization component
  • End-to-end testing with real optimizations

Week 6: Polish & Documentation

  • Performance benchmarking
  • Cost analysis (cache hit rates)
  • Documentation updates
  • Team training materials

Success Metrics

Metric Baseline Target Measurement
Task success rate ~70% 80-85% Track via feedback loop
Repeated mistakes N/A <20% recurrence Playbook harmful counts
Cache hit rate 0% >70% Cache monitor stats
Cost per session $X 0.3X API billing analysis
Playbook growth 0 100+ items/month Playbook stats

References

  1. ACE Framework: Zhang et al., "Agentic Context Engineering: Evolving Contexts for Self-Improving Language Models", arXiv:2510.04618, Oct 2025
  2. Manus Blog: "Context Engineering for AI Agents: Lessons from Building Manus"
  3. Anthropic: "Effective context engineering for AI agents"
  4. LangChain: "Context Engineering for Agents"
  5. Google ADK: "Architecting efficient context-aware multi-agent framework"

Document generated: December 2025 For Claude Code implementation sessions