Files

521 lines
17 KiB
Python
Raw Permalink Normal View History

"""
Atomizer Context Compaction - Long-Running Session Management
Part of the ACE (Agentic Context Engineering) implementation for Atomizer.
Based on Google ADK's compaction architecture:
- Trigger compaction when threshold reached
- Summarize older events
- Preserve recent detail
- Never compact error events
This module handles context management for long-running optimizations
that may exceed context window limits.
"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class EventType(Enum):
"""Types of events in optimization context."""
TRIAL_START = "trial_start"
TRIAL_COMPLETE = "trial_complete"
TRIAL_FAILED = "trial_failed"
ERROR = "error"
WARNING = "warning"
MILESTONE = "milestone"
COMPACTION = "compaction"
STUDY_START = "study_start"
STUDY_END = "study_end"
CONFIG_CHANGE = "config_change"
@dataclass
class ContextEvent:
"""
Single event in optimization context.
Events are the atomic units of context history.
They can be compacted (summarized) or preserved based on importance.
"""
timestamp: datetime
event_type: EventType
summary: str
details: Dict[str, Any] = field(default_factory=dict)
compacted: bool = False
preserve: bool = False # If True, never compact this event
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"timestamp": self.timestamp.isoformat(),
"event_type": self.event_type.value,
"summary": self.summary,
"details": self.details,
"compacted": self.compacted,
"preserve": self.preserve
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ContextEvent":
"""Create from dictionary."""
return cls(
timestamp=datetime.fromisoformat(data["timestamp"]),
event_type=EventType(data["event_type"]),
summary=data["summary"],
details=data.get("details", {}),
compacted=data.get("compacted", False),
preserve=data.get("preserve", False)
)
class CompactionManager:
"""
Manages context compaction for long optimization sessions.
Strategy:
- Keep last N events in full detail
- Summarize older events into milestone markers
- Preserve error events (never compact errors)
- Track statistics for optimization insights
Usage:
manager = CompactionManager(compaction_threshold=50, keep_recent=20)
# Add events as they occur
manager.add_event(ContextEvent(
timestamp=datetime.now(),
event_type=EventType.TRIAL_COMPLETE,
summary="Trial 42 complete: obj=100.5",
details={"trial_number": 42, "objective": 100.5}
))
# Get context string for LLM
context = manager.get_context_string()
# Check if compaction occurred
print(f"Compactions: {manager.compaction_count}")
"""
def __init__(
self,
compaction_threshold: int = 50,
keep_recent: int = 20,
keep_errors: bool = True
):
"""
Initialize compaction manager.
Args:
compaction_threshold: Trigger compaction when events exceed this
keep_recent: Number of recent events to always keep in detail
keep_errors: Whether to preserve all error events
"""
self.events: List[ContextEvent] = []
self.compaction_threshold = compaction_threshold
self.keep_recent = keep_recent
self.keep_errors = keep_errors
self.compaction_count = 0
# Statistics for compacted regions
self._compaction_stats: List[Dict[str, Any]] = []
def add_event(self, event: ContextEvent) -> bool:
"""
Add event and trigger compaction if needed.
Args:
event: The event to add
Returns:
True if compaction was triggered
"""
# Mark errors as preserved
if event.event_type == EventType.ERROR and self.keep_errors:
event.preserve = True
self.events.append(event)
# Check if compaction needed
if len(self.events) > self.compaction_threshold:
self._compact()
return True
return False
def add_trial_event(
self,
trial_number: int,
success: bool,
objective: Optional[float] = None,
duration: Optional[float] = None
) -> None:
"""
Convenience method to add a trial completion event.
Args:
trial_number: Trial number
success: Whether trial succeeded
objective: Objective value (if successful)
duration: Trial duration in seconds
"""
event_type = EventType.TRIAL_COMPLETE if success else EventType.TRIAL_FAILED
summary_parts = [f"Trial {trial_number}"]
if success and objective is not None:
summary_parts.append(f"obj={objective:.4g}")
elif not success:
summary_parts.append("FAILED")
if duration is not None:
summary_parts.append(f"{duration:.1f}s")
self.add_event(ContextEvent(
timestamp=datetime.now(),
event_type=event_type,
summary=" | ".join(summary_parts),
details={
"trial_number": trial_number,
"success": success,
"objective": objective,
"duration": duration
}
))
def add_error_event(self, error_message: str, error_type: str = "") -> None:
"""
Add an error event (always preserved).
Args:
error_message: Error description
error_type: Optional error classification
"""
summary = f"[{error_type}] {error_message}" if error_type else error_message
self.add_event(ContextEvent(
timestamp=datetime.now(),
event_type=EventType.ERROR,
summary=summary,
details={"error_type": error_type, "message": error_message},
preserve=True
))
def add_milestone(self, description: str, details: Optional[Dict[str, Any]] = None) -> None:
"""
Add a milestone event (preserved).
Args:
description: Milestone description
details: Optional additional details
"""
self.add_event(ContextEvent(
timestamp=datetime.now(),
event_type=EventType.MILESTONE,
summary=description,
details=details or {},
preserve=True
))
def _compact(self) -> None:
"""
Compact older events into summaries.
Preserves:
- All error events (if keep_errors=True)
- Events marked with preserve=True
- Last `keep_recent` events
- Milestone summaries of compacted regions
"""
if len(self.events) <= self.keep_recent:
return
# Split into old and recent
old_events = self.events[:-self.keep_recent]
recent_events = self.events[-self.keep_recent:]
# Separate preserved from compactable
preserved_events = [e for e in old_events if e.preserve]
compactable_events = [e for e in old_events if not e.preserve]
# Summarize compactable events
if compactable_events:
summary = self._create_summary(compactable_events)
compaction_event = ContextEvent(
timestamp=compactable_events[0].timestamp,
event_type=EventType.COMPACTION,
summary=summary,
details={
"events_compacted": len(compactable_events),
"compaction_number": self.compaction_count,
"time_range": {
"start": compactable_events[0].timestamp.isoformat(),
"end": compactable_events[-1].timestamp.isoformat()
}
},
compacted=True
)
self.compaction_count += 1
# Store compaction statistics
self._compaction_stats.append({
"compaction_number": self.compaction_count,
"events_compacted": len(compactable_events),
"summary": summary
})
# Rebuild events list
self.events = [compaction_event] + preserved_events + recent_events
else:
self.events = preserved_events + recent_events
def _create_summary(self, events: List[ContextEvent]) -> str:
"""
Create summary of compacted events.
Args:
events: List of events to summarize
Returns:
Summary string
"""
# Collect trial statistics
trial_events = [
e for e in events
if e.event_type in (EventType.TRIAL_COMPLETE, EventType.TRIAL_FAILED)
]
if not trial_events:
return f"[{len(events)} events compacted]"
# Extract trial statistics
trial_numbers = []
objectives = []
failures = 0
for e in trial_events:
if "trial_number" in e.details:
trial_numbers.append(e.details["trial_number"])
if "objective" in e.details and e.details["objective"] is not None:
objectives.append(e.details["objective"])
if e.event_type == EventType.TRIAL_FAILED:
failures += 1
if trial_numbers and objectives:
return (
f"Trials {min(trial_numbers)}-{max(trial_numbers)}: "
f"Best={min(objectives):.4g}, "
f"Avg={sum(objectives)/len(objectives):.4g}, "
f"Failures={failures}"
)
elif trial_numbers:
return f"Trials {min(trial_numbers)}-{max(trial_numbers)} ({failures} failures)"
else:
return f"[{len(events)} events compacted]"
def get_context_string(self, include_timestamps: bool = False) -> str:
"""
Generate context string from events.
Args:
include_timestamps: Whether to include timestamps
Returns:
Formatted context string for LLM
"""
lines = ["## Optimization History", ""]
for event in self.events:
timestamp = ""
if include_timestamps:
timestamp = f"[{event.timestamp.strftime('%H:%M:%S')}] "
if event.compacted:
lines.append(f"📦 {timestamp}{event.summary}")
elif event.event_type == EventType.ERROR:
lines.append(f"{timestamp}{event.summary}")
elif event.event_type == EventType.WARNING:
lines.append(f"⚠️ {timestamp}{event.summary}")
elif event.event_type == EventType.MILESTONE:
lines.append(f"🎯 {timestamp}{event.summary}")
elif event.event_type == EventType.TRIAL_FAILED:
lines.append(f"{timestamp}{event.summary}")
elif event.event_type == EventType.TRIAL_COMPLETE:
lines.append(f"{timestamp}{event.summary}")
else:
lines.append(f"- {timestamp}{event.summary}")
return "\n".join(lines)
def get_stats(self) -> Dict[str, Any]:
"""Get compaction statistics."""
event_counts = {}
for event in self.events:
etype = event.event_type.value
event_counts[etype] = event_counts.get(etype, 0) + 1
return {
"total_events": len(self.events),
"compaction_count": self.compaction_count,
"events_by_type": event_counts,
"error_events": event_counts.get("error", 0),
"compacted_events": len([e for e in self.events if e.compacted]),
"preserved_events": len([e for e in self.events if e.preserve]),
"compaction_history": self._compaction_stats[-5:] # Last 5
}
def get_recent_events(self, n: int = 10) -> List[ContextEvent]:
"""Get the n most recent events."""
return self.events[-n:]
def get_errors(self) -> List[ContextEvent]:
"""Get all error events."""
return [e for e in self.events if e.event_type == EventType.ERROR]
def clear(self) -> None:
"""Clear all events and reset state."""
self.events = []
self.compaction_count = 0
self._compaction_stats = []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"events": [e.to_dict() for e in self.events],
"compaction_threshold": self.compaction_threshold,
"keep_recent": self.keep_recent,
"keep_errors": self.keep_errors,
"compaction_count": self.compaction_count,
"compaction_stats": self._compaction_stats
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CompactionManager":
"""Create from dictionary."""
manager = cls(
compaction_threshold=data.get("compaction_threshold", 50),
keep_recent=data.get("keep_recent", 20),
keep_errors=data.get("keep_errors", True)
)
manager.events = [ContextEvent.from_dict(e) for e in data.get("events", [])]
manager.compaction_count = data.get("compaction_count", 0)
manager._compaction_stats = data.get("compaction_stats", [])
return manager
class ContextBudgetManager:
"""
Manages overall context budget across sessions.
Tracks:
- Token estimates for each context section
- Recommendations for context reduction
- Budget allocation warnings
"""
# Approximate tokens per character
CHARS_PER_TOKEN = 4
# Default budget allocation (tokens)
DEFAULT_BUDGET = {
"stable_prefix": 5000,
"protocols": 10000,
"playbook": 5000,
"session_state": 2000,
"conversation": 30000,
"working_space": 48000,
"total": 100000
}
def __init__(self, budget: Optional[Dict[str, int]] = None):
"""
Initialize budget manager.
Args:
budget: Custom budget allocation (uses defaults if not provided)
"""
self.budget = budget or self.DEFAULT_BUDGET.copy()
self._current_usage: Dict[str, int] = {k: 0 for k in self.budget.keys()}
def estimate_tokens(self, text: str) -> int:
"""Estimate token count for text."""
return len(text) // self.CHARS_PER_TOKEN
def update_usage(self, section: str, text: str) -> Dict[str, Any]:
"""
Update usage for a section.
Args:
section: Budget section name
text: Content of the section
Returns:
Usage status with warnings if over budget
"""
tokens = self.estimate_tokens(text)
self._current_usage[section] = tokens
result = {
"section": section,
"tokens": tokens,
"budget": self.budget.get(section, 0),
"over_budget": tokens > self.budget.get(section, float('inf'))
}
if result["over_budget"]:
result["warning"] = f"{section} exceeds budget by {tokens - self.budget[section]} tokens"
return result
def get_total_usage(self) -> int:
"""Get total token usage across all sections."""
return sum(self._current_usage.values())
def get_status(self) -> Dict[str, Any]:
"""Get overall budget status."""
total_used = self.get_total_usage()
total_budget = self.budget.get("total", 100000)
return {
"total_used": total_used,
"total_budget": total_budget,
"utilization": total_used / total_budget,
"by_section": {
section: {
"used": self._current_usage.get(section, 0),
"budget": self.budget.get(section, 0),
"utilization": (
self._current_usage.get(section, 0) / self.budget.get(section, 1)
if self.budget.get(section, 0) > 0 else 0
)
}
for section in self.budget.keys()
if section != "total"
},
"recommendations": self._get_recommendations()
}
def _get_recommendations(self) -> List[str]:
"""Generate budget recommendations."""
recommendations = []
total_used = self.get_total_usage()
total_budget = self.budget.get("total", 100000)
if total_used > total_budget * 0.9:
recommendations.append("Context usage > 90%. Consider triggering compaction.")
for section, used in self._current_usage.items():
budget = self.budget.get(section, 0)
if budget > 0 and used > budget:
recommendations.append(
f"{section}: {used - budget} tokens over budget. Reduce content."
)
if not recommendations:
recommendations.append("Budget healthy.")
return recommendations