""" Atomizer Context Compaction - Long-Running Session Management Part of the ACE (Agentic Context Engineering) implementation for Atomizer. Based on Google ADK's compaction architecture: - Trigger compaction when threshold reached - Summarize older events - Preserve recent detail - Never compact error events This module handles context management for long-running optimizations that may exceed context window limits. """ from typing import List, Dict, Any, Optional from dataclasses import dataclass, field from datetime import datetime from enum import Enum class EventType(Enum): """Types of events in optimization context.""" TRIAL_START = "trial_start" TRIAL_COMPLETE = "trial_complete" TRIAL_FAILED = "trial_failed" ERROR = "error" WARNING = "warning" MILESTONE = "milestone" COMPACTION = "compaction" STUDY_START = "study_start" STUDY_END = "study_end" CONFIG_CHANGE = "config_change" @dataclass class ContextEvent: """ Single event in optimization context. Events are the atomic units of context history. They can be compacted (summarized) or preserved based on importance. """ timestamp: datetime event_type: EventType summary: str details: Dict[str, Any] = field(default_factory=dict) compacted: bool = False preserve: bool = False # If True, never compact this event def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "timestamp": self.timestamp.isoformat(), "event_type": self.event_type.value, "summary": self.summary, "details": self.details, "compacted": self.compacted, "preserve": self.preserve } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "ContextEvent": """Create from dictionary.""" return cls( timestamp=datetime.fromisoformat(data["timestamp"]), event_type=EventType(data["event_type"]), summary=data["summary"], details=data.get("details", {}), compacted=data.get("compacted", False), preserve=data.get("preserve", False) ) class CompactionManager: """ Manages context compaction for long optimization sessions. Strategy: - Keep last N events in full detail - Summarize older events into milestone markers - Preserve error events (never compact errors) - Track statistics for optimization insights Usage: manager = CompactionManager(compaction_threshold=50, keep_recent=20) # Add events as they occur manager.add_event(ContextEvent( timestamp=datetime.now(), event_type=EventType.TRIAL_COMPLETE, summary="Trial 42 complete: obj=100.5", details={"trial_number": 42, "objective": 100.5} )) # Get context string for LLM context = manager.get_context_string() # Check if compaction occurred print(f"Compactions: {manager.compaction_count}") """ def __init__( self, compaction_threshold: int = 50, keep_recent: int = 20, keep_errors: bool = True ): """ Initialize compaction manager. Args: compaction_threshold: Trigger compaction when events exceed this keep_recent: Number of recent events to always keep in detail keep_errors: Whether to preserve all error events """ self.events: List[ContextEvent] = [] self.compaction_threshold = compaction_threshold self.keep_recent = keep_recent self.keep_errors = keep_errors self.compaction_count = 0 # Statistics for compacted regions self._compaction_stats: List[Dict[str, Any]] = [] def add_event(self, event: ContextEvent) -> bool: """ Add event and trigger compaction if needed. Args: event: The event to add Returns: True if compaction was triggered """ # Mark errors as preserved if event.event_type == EventType.ERROR and self.keep_errors: event.preserve = True self.events.append(event) # Check if compaction needed if len(self.events) > self.compaction_threshold: self._compact() return True return False def add_trial_event( self, trial_number: int, success: bool, objective: Optional[float] = None, duration: Optional[float] = None ) -> None: """ Convenience method to add a trial completion event. Args: trial_number: Trial number success: Whether trial succeeded objective: Objective value (if successful) duration: Trial duration in seconds """ event_type = EventType.TRIAL_COMPLETE if success else EventType.TRIAL_FAILED summary_parts = [f"Trial {trial_number}"] if success and objective is not None: summary_parts.append(f"obj={objective:.4g}") elif not success: summary_parts.append("FAILED") if duration is not None: summary_parts.append(f"{duration:.1f}s") self.add_event(ContextEvent( timestamp=datetime.now(), event_type=event_type, summary=" | ".join(summary_parts), details={ "trial_number": trial_number, "success": success, "objective": objective, "duration": duration } )) def add_error_event(self, error_message: str, error_type: str = "") -> None: """ Add an error event (always preserved). Args: error_message: Error description error_type: Optional error classification """ summary = f"[{error_type}] {error_message}" if error_type else error_message self.add_event(ContextEvent( timestamp=datetime.now(), event_type=EventType.ERROR, summary=summary, details={"error_type": error_type, "message": error_message}, preserve=True )) def add_milestone(self, description: str, details: Optional[Dict[str, Any]] = None) -> None: """ Add a milestone event (preserved). Args: description: Milestone description details: Optional additional details """ self.add_event(ContextEvent( timestamp=datetime.now(), event_type=EventType.MILESTONE, summary=description, details=details or {}, preserve=True )) def _compact(self) -> None: """ Compact older events into summaries. Preserves: - All error events (if keep_errors=True) - Events marked with preserve=True - Last `keep_recent` events - Milestone summaries of compacted regions """ if len(self.events) <= self.keep_recent: return # Split into old and recent old_events = self.events[:-self.keep_recent] recent_events = self.events[-self.keep_recent:] # Separate preserved from compactable preserved_events = [e for e in old_events if e.preserve] compactable_events = [e for e in old_events if not e.preserve] # Summarize compactable events if compactable_events: summary = self._create_summary(compactable_events) compaction_event = ContextEvent( timestamp=compactable_events[0].timestamp, event_type=EventType.COMPACTION, summary=summary, details={ "events_compacted": len(compactable_events), "compaction_number": self.compaction_count, "time_range": { "start": compactable_events[0].timestamp.isoformat(), "end": compactable_events[-1].timestamp.isoformat() } }, compacted=True ) self.compaction_count += 1 # Store compaction statistics self._compaction_stats.append({ "compaction_number": self.compaction_count, "events_compacted": len(compactable_events), "summary": summary }) # Rebuild events list self.events = [compaction_event] + preserved_events + recent_events else: self.events = preserved_events + recent_events def _create_summary(self, events: List[ContextEvent]) -> str: """ Create summary of compacted events. Args: events: List of events to summarize Returns: Summary string """ # Collect trial statistics trial_events = [ e for e in events if e.event_type in (EventType.TRIAL_COMPLETE, EventType.TRIAL_FAILED) ] if not trial_events: return f"[{len(events)} events compacted]" # Extract trial statistics trial_numbers = [] objectives = [] failures = 0 for e in trial_events: if "trial_number" in e.details: trial_numbers.append(e.details["trial_number"]) if "objective" in e.details and e.details["objective"] is not None: objectives.append(e.details["objective"]) if e.event_type == EventType.TRIAL_FAILED: failures += 1 if trial_numbers and objectives: return ( f"Trials {min(trial_numbers)}-{max(trial_numbers)}: " f"Best={min(objectives):.4g}, " f"Avg={sum(objectives)/len(objectives):.4g}, " f"Failures={failures}" ) elif trial_numbers: return f"Trials {min(trial_numbers)}-{max(trial_numbers)} ({failures} failures)" else: return f"[{len(events)} events compacted]" def get_context_string(self, include_timestamps: bool = False) -> str: """ Generate context string from events. Args: include_timestamps: Whether to include timestamps Returns: Formatted context string for LLM """ lines = ["## Optimization History", ""] for event in self.events: timestamp = "" if include_timestamps: timestamp = f"[{event.timestamp.strftime('%H:%M:%S')}] " if event.compacted: lines.append(f"📦 {timestamp}{event.summary}") elif event.event_type == EventType.ERROR: lines.append(f"❌ {timestamp}{event.summary}") elif event.event_type == EventType.WARNING: lines.append(f"⚠️ {timestamp}{event.summary}") elif event.event_type == EventType.MILESTONE: lines.append(f"🎯 {timestamp}{event.summary}") elif event.event_type == EventType.TRIAL_FAILED: lines.append(f"✗ {timestamp}{event.summary}") elif event.event_type == EventType.TRIAL_COMPLETE: lines.append(f"✓ {timestamp}{event.summary}") else: lines.append(f"- {timestamp}{event.summary}") return "\n".join(lines) def get_stats(self) -> Dict[str, Any]: """Get compaction statistics.""" event_counts = {} for event in self.events: etype = event.event_type.value event_counts[etype] = event_counts.get(etype, 0) + 1 return { "total_events": len(self.events), "compaction_count": self.compaction_count, "events_by_type": event_counts, "error_events": event_counts.get("error", 0), "compacted_events": len([e for e in self.events if e.compacted]), "preserved_events": len([e for e in self.events if e.preserve]), "compaction_history": self._compaction_stats[-5:] # Last 5 } def get_recent_events(self, n: int = 10) -> List[ContextEvent]: """Get the n most recent events.""" return self.events[-n:] def get_errors(self) -> List[ContextEvent]: """Get all error events.""" return [e for e in self.events if e.event_type == EventType.ERROR] def clear(self) -> None: """Clear all events and reset state.""" self.events = [] self.compaction_count = 0 self._compaction_stats = [] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { "events": [e.to_dict() for e in self.events], "compaction_threshold": self.compaction_threshold, "keep_recent": self.keep_recent, "keep_errors": self.keep_errors, "compaction_count": self.compaction_count, "compaction_stats": self._compaction_stats } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "CompactionManager": """Create from dictionary.""" manager = cls( compaction_threshold=data.get("compaction_threshold", 50), keep_recent=data.get("keep_recent", 20), keep_errors=data.get("keep_errors", True) ) manager.events = [ContextEvent.from_dict(e) for e in data.get("events", [])] manager.compaction_count = data.get("compaction_count", 0) manager._compaction_stats = data.get("compaction_stats", []) return manager class ContextBudgetManager: """ Manages overall context budget across sessions. Tracks: - Token estimates for each context section - Recommendations for context reduction - Budget allocation warnings """ # Approximate tokens per character CHARS_PER_TOKEN = 4 # Default budget allocation (tokens) DEFAULT_BUDGET = { "stable_prefix": 5000, "protocols": 10000, "playbook": 5000, "session_state": 2000, "conversation": 30000, "working_space": 48000, "total": 100000 } def __init__(self, budget: Optional[Dict[str, int]] = None): """ Initialize budget manager. Args: budget: Custom budget allocation (uses defaults if not provided) """ self.budget = budget or self.DEFAULT_BUDGET.copy() self._current_usage: Dict[str, int] = {k: 0 for k in self.budget.keys()} def estimate_tokens(self, text: str) -> int: """Estimate token count for text.""" return len(text) // self.CHARS_PER_TOKEN def update_usage(self, section: str, text: str) -> Dict[str, Any]: """ Update usage for a section. Args: section: Budget section name text: Content of the section Returns: Usage status with warnings if over budget """ tokens = self.estimate_tokens(text) self._current_usage[section] = tokens result = { "section": section, "tokens": tokens, "budget": self.budget.get(section, 0), "over_budget": tokens > self.budget.get(section, float('inf')) } if result["over_budget"]: result["warning"] = f"{section} exceeds budget by {tokens - self.budget[section]} tokens" return result def get_total_usage(self) -> int: """Get total token usage across all sections.""" return sum(self._current_usage.values()) def get_status(self) -> Dict[str, Any]: """Get overall budget status.""" total_used = self.get_total_usage() total_budget = self.budget.get("total", 100000) return { "total_used": total_used, "total_budget": total_budget, "utilization": total_used / total_budget, "by_section": { section: { "used": self._current_usage.get(section, 0), "budget": self.budget.get(section, 0), "utilization": ( self._current_usage.get(section, 0) / self.budget.get(section, 1) if self.budget.get(section, 0) > 0 else 0 ) } for section in self.budget.keys() if section != "total" }, "recommendations": self._get_recommendations() } def _get_recommendations(self) -> List[str]: """Generate budget recommendations.""" recommendations = [] total_used = self.get_total_usage() total_budget = self.budget.get("total", 100000) if total_used > total_budget * 0.9: recommendations.append("Context usage > 90%. Consider triggering compaction.") for section, used in self._current_usage.items(): budget = self.budget.get(section, 0) if budget > 0 and used > budget: recommendations.append( f"{section}: {used - budget} tokens over budget. Reduce content." ) if not recommendations: recommendations.append("Budget healthy.") return recommendations