Files
Atomizer/optimization_engine/context/feedback_loop.py

379 lines
12 KiB
Python
Raw Normal View History

"""
Atomizer Feedback Loop - Automated Learning from Execution
Part of the ACE (Agentic Context Engineering) implementation for Atomizer.
Connects optimization outcomes to playbook updates using the principle:
"Leverage natural execution feedback as the learning signal"
The feedback loop:
1. Observes trial outcomes (success/failure)
2. Tracks which playbook items were active during each trial
3. Updates helpful/harmful counts based on outcomes
4. Commits new insights from the reflector
This implements true self-improvement: the system gets better
at optimization over time by learning from its own execution.
"""
from typing import Dict, Any, List, Optional
from pathlib import Path
from datetime import datetime
import json
from .playbook import AtomizerPlaybook, InsightCategory
from .reflector import AtomizerReflector, OptimizationOutcome
class FeedbackLoop:
"""
Automated feedback loop that learns from optimization runs.
Key insight from ACE: Use execution feedback (success/failure)
as the learning signal, not labeled data.
Usage:
feedback = FeedbackLoop(playbook_path)
# After each trial
feedback.process_trial_result(
trial_number=42,
success=True,
objective_value=100.5,
design_variables={"thickness": 1.5},
context_items_used=["str-00001", "mis-00003"]
)
# After study completion
result = feedback.finalize_study(study_stats)
print(f"Added {result['insights_added']} insights")
"""
def __init__(self, playbook_path: Path):
"""
Initialize feedback loop with playbook path.
Args:
playbook_path: Path to the playbook JSON file
"""
self.playbook_path = playbook_path
self.playbook = AtomizerPlaybook.load(playbook_path)
self.reflector = AtomizerReflector(self.playbook)
# Track items used per trial for attribution
self._trial_item_usage: Dict[int, List[str]] = {}
# Track outcomes for batch analysis
self._outcomes: List[OptimizationOutcome] = []
# Statistics
self._total_trials_processed = 0
self._successful_trials = 0
self._failed_trials = 0
def process_trial_result(
self,
trial_number: int,
success: bool,
objective_value: float,
design_variables: Dict[str, float],
context_items_used: Optional[List[str]] = None,
errors: Optional[List[str]] = None,
extractor_used: str = "",
duration_seconds: float = 0.0
) -> Dict[str, Any]:
"""
Process a trial result and update playbook accordingly.
This is the core learning mechanism:
- If trial succeeded with certain playbook items -> increase helpful count
- If trial failed with certain playbook items -> increase harmful count
Args:
trial_number: Trial number
success: Whether the trial succeeded
objective_value: Objective function value (0 if failed)
design_variables: Design variable values used
context_items_used: List of playbook item IDs in context
errors: List of error messages (if any)
extractor_used: Name of extractor used
duration_seconds: Trial duration
Returns:
Dictionary with processing results
"""
context_items_used = context_items_used or []
errors = errors or []
# Update statistics
self._total_trials_processed += 1
if success:
self._successful_trials += 1
else:
self._failed_trials += 1
# Track item usage for this trial
self._trial_item_usage[trial_number] = context_items_used
# Update playbook item scores based on outcome
items_updated = 0
for item_id in context_items_used:
if self.playbook.record_outcome(item_id, helpful=success):
items_updated += 1
# Create outcome for reflection
outcome = OptimizationOutcome(
trial_number=trial_number,
success=success,
objective_value=objective_value if success else None,
constraint_violations=[],
solver_errors=errors,
design_variables=design_variables,
extractor_used=extractor_used,
duration_seconds=duration_seconds
)
# Store outcome
self._outcomes.append(outcome)
# Reflect on outcome
insights = self.reflector.analyze_trial(outcome)
return {
"trial_number": trial_number,
"success": success,
"items_updated": items_updated,
"insights_extracted": len(insights)
}
def record_error(
self,
trial_number: int,
error_type: str,
error_message: str,
context_items_used: Optional[List[str]] = None
) -> None:
"""
Record an error for a trial.
Separate from process_trial_result for cases where
we want to record errors without full trial data.
Args:
trial_number: Trial number
error_type: Classification of error
error_message: Error details
context_items_used: Playbook items that were active
"""
context_items_used = context_items_used or []
# Mark items as harmful
for item_id in context_items_used:
self.playbook.record_outcome(item_id, helpful=False)
# Create insight about the error
self.reflector.pending_insights.append({
"category": InsightCategory.MISTAKE,
"content": f"{error_type}: {error_message[:200]}",
"helpful": False,
"trial": trial_number
})
def finalize_study(
self,
study_stats: Dict[str, Any],
save_playbook: bool = True
) -> Dict[str, Any]:
"""
Called when study completes. Commits insights and prunes playbook.
Args:
study_stats: Dictionary with study statistics:
- name: Study name
- total_trials: Total trials run
- best_value: Best objective achieved
- convergence_rate: Success rate (0.0-1.0)
- method: Optimization method used
save_playbook: Whether to save playbook to disk
Returns:
Dictionary with finalization results
"""
# Analyze study-level patterns
study_insights = self.reflector.analyze_study_completion(
study_name=study_stats.get("name", "unknown"),
total_trials=study_stats.get("total_trials", 0),
best_value=study_stats.get("best_value", 0),
convergence_rate=study_stats.get("convergence_rate", 0),
method=study_stats.get("method", "")
)
# Commit all pending insights
insights_added = self.reflector.commit_insights()
# Prune consistently harmful items
items_pruned = self.playbook.prune_harmful(threshold=-3)
# Save updated playbook
if save_playbook:
self.playbook.save(self.playbook_path)
return {
"insights_added": insights_added,
"items_pruned": items_pruned,
"playbook_size": len(self.playbook.items),
"playbook_version": self.playbook.version,
"total_trials_processed": self._total_trials_processed,
"successful_trials": self._successful_trials,
"failed_trials": self._failed_trials,
"success_rate": (
self._successful_trials / self._total_trials_processed
if self._total_trials_processed > 0 else 0
)
}
def get_item_performance(self) -> Dict[str, Dict[str, Any]]:
"""
Get performance metrics for all playbook items.
Returns:
Dictionary mapping item IDs to performance stats
"""
performance = {}
for item_id, item in self.playbook.items.items():
trials_used_in = [
trial for trial, items in self._trial_item_usage.items()
if item_id in items
]
performance[item_id] = {
"helpful_count": item.helpful_count,
"harmful_count": item.harmful_count,
"net_score": item.net_score,
"confidence": item.confidence,
"trials_used_in": len(trials_used_in),
"category": item.category.value,
"content_preview": item.content[:100]
}
return performance
def get_top_performers(self, n: int = 10) -> List[Dict[str, Any]]:
"""
Get the top performing playbook items.
Args:
n: Number of top items to return
Returns:
List of item performance dictionaries
"""
performance = self.get_item_performance()
sorted_items = sorted(
performance.items(),
key=lambda x: x[1]["net_score"],
reverse=True
)
return [
{"id": item_id, **stats}
for item_id, stats in sorted_items[:n]
]
def get_worst_performers(self, n: int = 10) -> List[Dict[str, Any]]:
"""
Get the worst performing playbook items.
Args:
n: Number of worst items to return
Returns:
List of item performance dictionaries
"""
performance = self.get_item_performance()
sorted_items = sorted(
performance.items(),
key=lambda x: x[1]["net_score"]
)
return [
{"id": item_id, **stats}
for item_id, stats in sorted_items[:n]
]
def get_statistics(self) -> Dict[str, Any]:
"""Get feedback loop statistics."""
return {
"total_trials_processed": self._total_trials_processed,
"successful_trials": self._successful_trials,
"failed_trials": self._failed_trials,
"success_rate": (
self._successful_trials / self._total_trials_processed
if self._total_trials_processed > 0 else 0
),
"playbook_items": len(self.playbook.items),
"pending_insights": self.reflector.get_pending_count(),
"outcomes_recorded": len(self._outcomes)
}
def export_learning_report(self, path: Path) -> None:
"""
Export a detailed learning report.
Args:
path: Path to save the report
"""
report = {
"generated_at": datetime.now().isoformat(),
"statistics": self.get_statistics(),
"top_performers": self.get_top_performers(20),
"worst_performers": self.get_worst_performers(10),
"playbook_stats": self.playbook.get_stats(),
"outcomes_summary": {
"total": len(self._outcomes),
"by_success": {
"success": len([o for o in self._outcomes if o.success]),
"failure": len([o for o in self._outcomes if not o.success])
}
}
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2)
def reset(self) -> None:
"""Reset the feedback loop state (keeps playbook)."""
self._trial_item_usage = {}
self._outcomes = []
self._total_trials_processed = 0
self._successful_trials = 0
self._failed_trials = 0
self.reflector = AtomizerReflector(self.playbook)
class FeedbackLoopFactory:
"""Factory for creating feedback loops."""
@staticmethod
def create_for_study(study_dir: Path) -> FeedbackLoop:
"""
Create a feedback loop for a specific study.
Args:
study_dir: Path to study directory
Returns:
Configured FeedbackLoop
"""
playbook_path = study_dir / "3_results" / "playbook.json"
return FeedbackLoop(playbook_path)
@staticmethod
def create_global() -> FeedbackLoop:
"""
Create a feedback loop using the global playbook.
Returns:
FeedbackLoop using global playbook path
"""
from pathlib import Path
playbook_path = Path(__file__).parents[2] / "knowledge_base" / "playbook.json"
return FeedbackLoop(playbook_path)