391 lines
12 KiB
Python
391 lines
12 KiB
Python
|
|
"""
|
||
|
|
Atomizer Cache Monitor - KV-Cache Optimization
|
||
|
|
|
||
|
|
Part of the ACE (Agentic Context Engineering) implementation for Atomizer.
|
||
|
|
|
||
|
|
Monitors and optimizes KV-cache hit rates for cost reduction.
|
||
|
|
Based on the principle that cached tokens cost ~10x less than uncached.
|
||
|
|
|
||
|
|
The cache monitor tracks:
|
||
|
|
- Stable prefix length (should stay constant for cache hits)
|
||
|
|
- Cache hit rate across requests
|
||
|
|
- Estimated cost savings
|
||
|
|
|
||
|
|
Structure for KV-cache optimization:
|
||
|
|
1. STABLE PREFIX - Never changes (identity, tools, routing)
|
||
|
|
2. SEMI-STABLE - Changes per session type (protocols, playbook)
|
||
|
|
3. DYNAMIC - Changes every turn (state, user message)
|
||
|
|
"""
|
||
|
|
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from typing import Optional, List, Dict, Any
|
||
|
|
from datetime import datetime
|
||
|
|
import hashlib
|
||
|
|
import json
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class CacheStats:
|
||
|
|
"""Statistics for cache efficiency tracking."""
|
||
|
|
total_requests: int = 0
|
||
|
|
cache_hits: int = 0
|
||
|
|
cache_misses: int = 0
|
||
|
|
prefix_length_chars: int = 0
|
||
|
|
prefix_length_tokens: int = 0 # Estimated
|
||
|
|
|
||
|
|
@property
|
||
|
|
def hit_rate(self) -> float:
|
||
|
|
"""Calculate cache hit rate (0.0-1.0)."""
|
||
|
|
if self.total_requests == 0:
|
||
|
|
return 0.0
|
||
|
|
return self.cache_hits / self.total_requests
|
||
|
|
|
||
|
|
@property
|
||
|
|
def estimated_savings_percent(self) -> float:
|
||
|
|
"""
|
||
|
|
Estimate cost savings from cache hits.
|
||
|
|
|
||
|
|
Based on ~10x cost difference between cached/uncached tokens.
|
||
|
|
"""
|
||
|
|
if self.total_requests == 0:
|
||
|
|
return 0.0
|
||
|
|
# Cached tokens cost ~10% of uncached
|
||
|
|
# So savings = hit_rate * 90%
|
||
|
|
return self.hit_rate * 90.0
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict[str, Any]:
|
||
|
|
"""Convert to dictionary."""
|
||
|
|
return {
|
||
|
|
"total_requests": self.total_requests,
|
||
|
|
"cache_hits": self.cache_hits,
|
||
|
|
"cache_misses": self.cache_misses,
|
||
|
|
"hit_rate": self.hit_rate,
|
||
|
|
"prefix_length_chars": self.prefix_length_chars,
|
||
|
|
"prefix_length_tokens": self.prefix_length_tokens,
|
||
|
|
"estimated_savings_percent": self.estimated_savings_percent
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class ContextSection:
|
||
|
|
"""A section of context with stability classification."""
|
||
|
|
name: str
|
||
|
|
content: str
|
||
|
|
stability: str # "stable", "semi_stable", "dynamic"
|
||
|
|
last_hash: str = ""
|
||
|
|
|
||
|
|
def compute_hash(self) -> str:
|
||
|
|
"""Compute content hash for change detection."""
|
||
|
|
return hashlib.md5(self.content.encode()).hexdigest()
|
||
|
|
|
||
|
|
def has_changed(self) -> bool:
|
||
|
|
"""Check if content has changed since last hash."""
|
||
|
|
current_hash = self.compute_hash()
|
||
|
|
changed = current_hash != self.last_hash
|
||
|
|
self.last_hash = current_hash
|
||
|
|
return changed
|
||
|
|
|
||
|
|
|
||
|
|
class ContextCacheOptimizer:
|
||
|
|
"""
|
||
|
|
Tracks and optimizes context for cache efficiency.
|
||
|
|
|
||
|
|
Implements the three-tier context structure:
|
||
|
|
1. Stable prefix (cached across all requests)
|
||
|
|
2. Semi-stable section (cached per session type)
|
||
|
|
3. Dynamic section (changes every turn)
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
optimizer = ContextCacheOptimizer()
|
||
|
|
|
||
|
|
# Build context with cache optimization
|
||
|
|
context = optimizer.prepare_context(
|
||
|
|
stable_prefix=identity_and_tools,
|
||
|
|
semi_stable=protocols_and_playbook,
|
||
|
|
dynamic=state_and_message
|
||
|
|
)
|
||
|
|
|
||
|
|
# Check efficiency
|
||
|
|
print(optimizer.get_report())
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Approximate tokens per character for estimation
|
||
|
|
CHARS_PER_TOKEN = 4
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.stats = CacheStats()
|
||
|
|
self._sections: Dict[str, ContextSection] = {}
|
||
|
|
self._last_stable_hash: Optional[str] = None
|
||
|
|
self._last_semi_stable_hash: Optional[str] = None
|
||
|
|
self._request_history: List[Dict[str, Any]] = []
|
||
|
|
|
||
|
|
def prepare_context(
|
||
|
|
self,
|
||
|
|
stable_prefix: str,
|
||
|
|
semi_stable: str,
|
||
|
|
dynamic: str
|
||
|
|
) -> str:
|
||
|
|
"""
|
||
|
|
Assemble context optimized for caching.
|
||
|
|
|
||
|
|
Tracks whether prefix changed (cache miss).
|
||
|
|
|
||
|
|
Args:
|
||
|
|
stable_prefix: Content that never changes (tools, identity)
|
||
|
|
semi_stable: Content that changes per session type
|
||
|
|
dynamic: Content that changes every turn
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Assembled context string with clear section boundaries
|
||
|
|
"""
|
||
|
|
# Hash the stable prefix
|
||
|
|
stable_hash = hashlib.md5(stable_prefix.encode()).hexdigest()
|
||
|
|
|
||
|
|
self.stats.total_requests += 1
|
||
|
|
|
||
|
|
# Check for cache hit (stable prefix unchanged)
|
||
|
|
if stable_hash == self._last_stable_hash:
|
||
|
|
self.stats.cache_hits += 1
|
||
|
|
else:
|
||
|
|
self.stats.cache_misses += 1
|
||
|
|
|
||
|
|
self._last_stable_hash = stable_hash
|
||
|
|
self.stats.prefix_length_chars = len(stable_prefix)
|
||
|
|
self.stats.prefix_length_tokens = len(stable_prefix) // self.CHARS_PER_TOKEN
|
||
|
|
|
||
|
|
# Record request for history
|
||
|
|
self._request_history.append({
|
||
|
|
"timestamp": datetime.now().isoformat(),
|
||
|
|
"cache_hit": stable_hash == self._last_stable_hash,
|
||
|
|
"stable_length": len(stable_prefix),
|
||
|
|
"semi_stable_length": len(semi_stable),
|
||
|
|
"dynamic_length": len(dynamic)
|
||
|
|
})
|
||
|
|
|
||
|
|
# Keep history bounded
|
||
|
|
if len(self._request_history) > 100:
|
||
|
|
self._request_history = self._request_history[-100:]
|
||
|
|
|
||
|
|
# Assemble with clear boundaries
|
||
|
|
# Using markdown horizontal rules as section separators
|
||
|
|
return f"""{stable_prefix}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
{semi_stable}
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
{dynamic}"""
|
||
|
|
|
||
|
|
def register_section(
|
||
|
|
self,
|
||
|
|
name: str,
|
||
|
|
content: str,
|
||
|
|
stability: str = "dynamic"
|
||
|
|
) -> None:
|
||
|
|
"""
|
||
|
|
Register a context section for change tracking.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
name: Section identifier
|
||
|
|
content: Section content
|
||
|
|
stability: One of "stable", "semi_stable", "dynamic"
|
||
|
|
"""
|
||
|
|
section = ContextSection(
|
||
|
|
name=name,
|
||
|
|
content=content,
|
||
|
|
stability=stability
|
||
|
|
)
|
||
|
|
section.last_hash = section.compute_hash()
|
||
|
|
self._sections[name] = section
|
||
|
|
|
||
|
|
def check_section_changes(self) -> Dict[str, bool]:
|
||
|
|
"""
|
||
|
|
Check which sections have changed.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary mapping section names to change status
|
||
|
|
"""
|
||
|
|
changes = {}
|
||
|
|
for name, section in self._sections.items():
|
||
|
|
changes[name] = section.has_changed()
|
||
|
|
return changes
|
||
|
|
|
||
|
|
def get_stable_sections(self) -> List[str]:
|
||
|
|
"""Get names of sections marked as stable."""
|
||
|
|
return [
|
||
|
|
name for name, section in self._sections.items()
|
||
|
|
if section.stability == "stable"
|
||
|
|
]
|
||
|
|
|
||
|
|
def get_report(self) -> str:
|
||
|
|
"""Generate human-readable cache efficiency report."""
|
||
|
|
return f"""
|
||
|
|
Cache Efficiency Report
|
||
|
|
=======================
|
||
|
|
Requests: {self.stats.total_requests}
|
||
|
|
Cache Hits: {self.stats.cache_hits}
|
||
|
|
Cache Misses: {self.stats.cache_misses}
|
||
|
|
Hit Rate: {self.stats.hit_rate:.1%}
|
||
|
|
|
||
|
|
Stable Prefix:
|
||
|
|
- Characters: {self.stats.prefix_length_chars:,}
|
||
|
|
- Estimated Tokens: {self.stats.prefix_length_tokens:,}
|
||
|
|
|
||
|
|
Cost Impact:
|
||
|
|
- Estimated Savings: {self.stats.estimated_savings_percent:.0f}%
|
||
|
|
- (Based on 10x cost difference for cached tokens)
|
||
|
|
|
||
|
|
Recommendations:
|
||
|
|
{self._get_recommendations()}
|
||
|
|
"""
|
||
|
|
|
||
|
|
def _get_recommendations(self) -> str:
|
||
|
|
"""Generate optimization recommendations."""
|
||
|
|
recommendations = []
|
||
|
|
|
||
|
|
if self.stats.hit_rate < 0.5 and self.stats.total_requests > 5:
|
||
|
|
recommendations.append(
|
||
|
|
"- Low cache hit rate: Check if stable prefix is actually stable"
|
||
|
|
)
|
||
|
|
|
||
|
|
if self.stats.prefix_length_tokens > 5000:
|
||
|
|
recommendations.append(
|
||
|
|
"- Large stable prefix: Consider moving less-stable content to semi-stable"
|
||
|
|
)
|
||
|
|
|
||
|
|
if self.stats.prefix_length_tokens < 1000:
|
||
|
|
recommendations.append(
|
||
|
|
"- Small stable prefix: Consider moving more content to stable section"
|
||
|
|
)
|
||
|
|
|
||
|
|
if not recommendations:
|
||
|
|
recommendations.append("- Cache performance looks good!")
|
||
|
|
|
||
|
|
return "\n".join(recommendations)
|
||
|
|
|
||
|
|
def get_stats_dict(self) -> Dict[str, Any]:
|
||
|
|
"""Get statistics as dictionary."""
|
||
|
|
return self.stats.to_dict()
|
||
|
|
|
||
|
|
def reset_stats(self) -> None:
|
||
|
|
"""Reset all statistics."""
|
||
|
|
self.stats = CacheStats()
|
||
|
|
self._request_history = []
|
||
|
|
|
||
|
|
def save_stats(self, path: Path) -> None:
|
||
|
|
"""Save statistics to JSON file."""
|
||
|
|
data = {
|
||
|
|
"stats": self.stats.to_dict(),
|
||
|
|
"request_history": self._request_history[-50:], # Last 50
|
||
|
|
"sections": {
|
||
|
|
name: {
|
||
|
|
"stability": s.stability,
|
||
|
|
"content_length": len(s.content)
|
||
|
|
}
|
||
|
|
for name, s in self._sections.items()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
with open(path, 'w', encoding='utf-8') as f:
|
||
|
|
json.dump(data, f, indent=2)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def load_stats(cls, path: Path) -> "ContextCacheOptimizer":
|
||
|
|
"""Load statistics from JSON file."""
|
||
|
|
optimizer = cls()
|
||
|
|
|
||
|
|
if not path.exists():
|
||
|
|
return optimizer
|
||
|
|
|
||
|
|
with open(path, encoding='utf-8') as f:
|
||
|
|
data = json.load(f)
|
||
|
|
|
||
|
|
stats = data.get("stats", {})
|
||
|
|
optimizer.stats.total_requests = stats.get("total_requests", 0)
|
||
|
|
optimizer.stats.cache_hits = stats.get("cache_hits", 0)
|
||
|
|
optimizer.stats.cache_misses = stats.get("cache_misses", 0)
|
||
|
|
optimizer.stats.prefix_length_chars = stats.get("prefix_length_chars", 0)
|
||
|
|
optimizer.stats.prefix_length_tokens = stats.get("prefix_length_tokens", 0)
|
||
|
|
|
||
|
|
optimizer._request_history = data.get("request_history", [])
|
||
|
|
|
||
|
|
return optimizer
|
||
|
|
|
||
|
|
|
||
|
|
class StablePrefixBuilder:
|
||
|
|
"""
|
||
|
|
Helper for building stable prefix content.
|
||
|
|
|
||
|
|
Ensures consistent ordering and formatting of stable content
|
||
|
|
to maximize cache hits.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self._sections: List[tuple] = [] # (order, name, content)
|
||
|
|
|
||
|
|
def add_section(self, name: str, content: str, order: int = 50) -> "StablePrefixBuilder":
|
||
|
|
"""
|
||
|
|
Add a section to the stable prefix.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
name: Section name (for documentation)
|
||
|
|
content: Section content
|
||
|
|
order: Sort order (lower = earlier)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Self for chaining
|
||
|
|
"""
|
||
|
|
self._sections.append((order, name, content))
|
||
|
|
return self
|
||
|
|
|
||
|
|
def add_identity(self, identity: str) -> "StablePrefixBuilder":
|
||
|
|
"""Add identity section (order 10)."""
|
||
|
|
return self.add_section("identity", identity, order=10)
|
||
|
|
|
||
|
|
def add_capabilities(self, capabilities: str) -> "StablePrefixBuilder":
|
||
|
|
"""Add capabilities section (order 20)."""
|
||
|
|
return self.add_section("capabilities", capabilities, order=20)
|
||
|
|
|
||
|
|
def add_tools(self, tools: str) -> "StablePrefixBuilder":
|
||
|
|
"""Add tools section (order 30)."""
|
||
|
|
return self.add_section("tools", tools, order=30)
|
||
|
|
|
||
|
|
def add_routing(self, routing: str) -> "StablePrefixBuilder":
|
||
|
|
"""Add routing section (order 40)."""
|
||
|
|
return self.add_section("routing", routing, order=40)
|
||
|
|
|
||
|
|
def build(self) -> str:
|
||
|
|
"""
|
||
|
|
Build the stable prefix string.
|
||
|
|
|
||
|
|
Sections are sorted by order to ensure consistency.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Assembled stable prefix
|
||
|
|
"""
|
||
|
|
# Sort by order
|
||
|
|
sorted_sections = sorted(self._sections, key=lambda x: x[0])
|
||
|
|
|
||
|
|
lines = []
|
||
|
|
for _, name, content in sorted_sections:
|
||
|
|
lines.append(f"<!-- {name} -->")
|
||
|
|
lines.append(content.strip())
|
||
|
|
lines.append("")
|
||
|
|
|
||
|
|
return "\n".join(lines)
|
||
|
|
|
||
|
|
|
||
|
|
# Global cache optimizer instance
|
||
|
|
_global_optimizer: Optional[ContextCacheOptimizer] = None
|
||
|
|
|
||
|
|
|
||
|
|
def get_cache_optimizer() -> ContextCacheOptimizer:
|
||
|
|
"""Get the global cache optimizer instance."""
|
||
|
|
global _global_optimizer
|
||
|
|
if _global_optimizer is None:
|
||
|
|
_global_optimizer = ContextCacheOptimizer()
|
||
|
|
return _global_optimizer
|