""" SpecManager Service Central service for managing AtomizerSpec v2.0. All spec modifications flow through this service. Features: - Load/save specs with validation - Atomic writes with conflict detection - Patch operations with JSONPath support - Node CRUD operations - Custom function support - WebSocket broadcast integration """ import hashlib import json import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union # Add optimization_engine to path if needed ATOMIZER_ROOT = Path(__file__).parent.parent.parent.parent.parent if str(ATOMIZER_ROOT) not in sys.path: sys.path.insert(0, str(ATOMIZER_ROOT)) from optimization_engine.config.spec_models import ( AtomizerSpec, DesignVariable, Extractor, Objective, Constraint, CanvasPosition, CanvasEdge, ExtractorType, CustomFunction, ExtractorOutput, ValidationReport, ) from optimization_engine.config.spec_validator import ( SpecValidator, SpecValidationError, ) class SpecManagerError(Exception): """Base error for SpecManager operations.""" pass class SpecNotFoundError(SpecManagerError): """Raised when spec file doesn't exist.""" pass class SpecConflictError(SpecManagerError): """Raised when spec has been modified by another client.""" def __init__(self, message: str, current_hash: str): super().__init__(message) self.current_hash = current_hash class WebSocketSubscriber: """Protocol for WebSocket subscribers.""" async def send_json(self, data: Dict[str, Any]) -> None: """Send JSON data to subscriber.""" raise NotImplementedError class SpecManager: """ Central service for managing AtomizerSpec. All modifications go through this service to ensure: - Validation on every change - Atomic file writes - Conflict detection via hashing - WebSocket broadcast to all clients """ SPEC_FILENAME = "atomizer_spec.json" def __init__(self, study_path: Union[str, Path]): """ Initialize SpecManager for a study. Args: study_path: Path to the study directory """ self.study_path = Path(study_path) self.spec_path = self.study_path / self.SPEC_FILENAME self.validator = SpecValidator() self._subscribers: List[WebSocketSubscriber] = [] self._last_hash: Optional[str] = None # ========================================================================= # Core CRUD Operations # ========================================================================= def load(self, validate: bool = True) -> AtomizerSpec: """ Load and optionally validate the spec. Args: validate: Whether to validate the spec Returns: AtomizerSpec instance Raises: SpecNotFoundError: If spec file doesn't exist SpecValidationError: If validation fails """ if not self.spec_path.exists(): raise SpecNotFoundError(f"Spec not found: {self.spec_path}") with open(self.spec_path, 'r', encoding='utf-8') as f: data = json.load(f) if validate: self.validator.validate(data, strict=True) spec = AtomizerSpec.model_validate(data) self._last_hash = self._compute_hash(data) return spec def load_raw(self) -> Dict[str, Any]: """ Load spec as raw dict without parsing. Returns: Raw spec dict Raises: SpecNotFoundError: If spec file doesn't exist """ if not self.spec_path.exists(): raise SpecNotFoundError(f"Spec not found: {self.spec_path}") with open(self.spec_path, 'r', encoding='utf-8') as f: return json.load(f) def save( self, spec: Union[AtomizerSpec, Dict[str, Any]], modified_by: str = "api", expected_hash: Optional[str] = None ) -> str: """ Save spec with validation and broadcast. Args: spec: Spec to save (AtomizerSpec or dict) modified_by: Who/what is making the change expected_hash: If provided, verify current file hash matches Returns: New spec hash Raises: SpecValidationError: If validation fails SpecConflictError: If expected_hash doesn't match current """ # Convert to dict if needed if isinstance(spec, AtomizerSpec): data = spec.model_dump(mode='json') else: data = spec # Check for conflicts if expected_hash provided if expected_hash and self.spec_path.exists(): current_hash = self.get_hash() if current_hash != expected_hash: raise SpecConflictError( "Spec was modified by another client", current_hash=current_hash ) # Update metadata now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') data["meta"]["modified"] = now data["meta"]["modified_by"] = modified_by # Validate self.validator.validate(data, strict=True) # Compute new hash new_hash = self._compute_hash(data) # Atomic write (write to temp, then rename) temp_path = self.spec_path.with_suffix('.tmp') with open(temp_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) temp_path.replace(self.spec_path) # Update cached hash self._last_hash = new_hash # Broadcast to subscribers self._broadcast({ "type": "spec_updated", "hash": new_hash, "modified_by": modified_by, "timestamp": now }) return new_hash def exists(self) -> bool: """Check if spec file exists.""" return self.spec_path.exists() def get_hash(self) -> str: """Get current spec hash.""" if not self.spec_path.exists(): return "" with open(self.spec_path, 'r', encoding='utf-8') as f: data = json.load(f) return self._compute_hash(data) def validate_and_report(self) -> ValidationReport: """ Run full validation and return detailed report. Returns: ValidationReport with errors, warnings, summary """ if not self.spec_path.exists(): raise SpecNotFoundError(f"Spec not found: {self.spec_path}") data = self.load_raw() return self.validator.validate(data, strict=False) # ========================================================================= # Patch Operations # ========================================================================= def patch( self, path: str, value: Any, modified_by: str = "api" ) -> AtomizerSpec: """ Apply a JSONPath-style modification. Args: path: JSONPath like "design_variables[0].bounds.max" value: New value to set modified_by: Who/what is making the change Returns: Updated AtomizerSpec """ data = self.load_raw() # Validate the partial update spec = AtomizerSpec.model_validate(data) is_valid, errors = self.validator.validate_partial(path, value, spec) if not is_valid: raise SpecValidationError(f"Invalid update: {'; '.join(errors)}") # Apply the patch self._apply_patch(data, path, value) # Save and return self.save(data, modified_by) return self.load(validate=False) def _apply_patch(self, data: Dict, path: str, value: Any) -> None: """ Apply a patch to the data dict. Supports paths like: - "meta.description" - "design_variables[0].bounds.max" - "objectives[1].weight" """ parts = self._parse_path(path) if not parts: raise ValueError(f"Invalid path: {path}") # Navigate to parent current = data for part in parts[:-1]: if isinstance(current, list): idx = int(part) current = current[idx] else: current = current[part] # Set final value final_key = parts[-1] if isinstance(current, list): idx = int(final_key) current[idx] = value else: current[final_key] = value def _parse_path(self, path: str) -> List[str]: """Parse JSONPath into parts.""" # Handle both dot notation and bracket notation parts = [] for part in re.split(r'\.|\[|\]', path): if part: parts.append(part) return parts # ========================================================================= # Node Operations # ========================================================================= def add_node( self, node_type: str, node_data: Dict[str, Any], modified_by: str = "canvas" ) -> str: """ Add a new node (design var, extractor, objective, constraint). Args: node_type: One of 'designVar', 'extractor', 'objective', 'constraint' node_data: Node data without ID modified_by: Who/what is making the change Returns: Generated node ID """ data = self.load_raw() # Generate ID node_id = self._generate_id(node_type, data) node_data["id"] = node_id # Add canvas position if not provided if "canvas_position" not in node_data: node_data["canvas_position"] = self._auto_position(node_type, data) # Add to appropriate section section = self._get_section_for_type(node_type) if section not in data or data[section] is None: data[section] = [] data[section].append(node_data) self.save(data, modified_by) # Broadcast node addition self._broadcast({ "type": "node_added", "node_type": node_type, "node_id": node_id, "modified_by": modified_by }) return node_id def update_node( self, node_id: str, updates: Dict[str, Any], modified_by: str = "canvas" ) -> None: """ Update an existing node. Args: node_id: ID of the node to update updates: Dict of fields to update modified_by: Who/what is making the change """ data = self.load_raw() # Find and update the node found = False for section in ["design_variables", "extractors", "objectives", "constraints"]: if section not in data or data[section] is None: continue for node in data[section]: if node.get("id") == node_id: node.update(updates) found = True break if found: break if not found: raise SpecManagerError(f"Node not found: {node_id}") self.save(data, modified_by) def remove_node( self, node_id: str, modified_by: str = "canvas" ) -> None: """ Remove a node and all edges referencing it. Args: node_id: ID of the node to remove modified_by: Who/what is making the change """ data = self.load_raw() # Find and remove node removed = False for section in ["design_variables", "extractors", "objectives", "constraints"]: if section not in data or data[section] is None: continue original_len = len(data[section]) data[section] = [n for n in data[section] if n.get("id") != node_id] if len(data[section]) < original_len: removed = True break if not removed: raise SpecManagerError(f"Node not found: {node_id}") # Remove edges referencing this node if "canvas" in data and data["canvas"] and "edges" in data["canvas"]: data["canvas"]["edges"] = [ e for e in data["canvas"]["edges"] if e.get("source") != node_id and e.get("target") != node_id ] self.save(data, modified_by) # Broadcast node removal self._broadcast({ "type": "node_removed", "node_id": node_id, "modified_by": modified_by }) def update_node_position( self, node_id: str, position: Dict[str, float], modified_by: str = "canvas" ) -> None: """ Update a node's canvas position. Args: node_id: ID of the node position: Dict with x, y coordinates modified_by: Who/what is making the change """ self.update_node(node_id, {"canvas_position": position}, modified_by) def add_edge( self, source: str, target: str, modified_by: str = "canvas" ) -> None: """ Add a canvas edge between nodes. Args: source: Source node ID target: Target node ID modified_by: Who/what is making the change """ data = self.load_raw() # Initialize canvas section if needed if "canvas" not in data or data["canvas"] is None: data["canvas"] = {} if "edges" not in data["canvas"] or data["canvas"]["edges"] is None: data["canvas"]["edges"] = [] # Check for duplicate for edge in data["canvas"]["edges"]: if edge.get("source") == source and edge.get("target") == target: return # Already exists data["canvas"]["edges"].append({ "source": source, "target": target }) self.save(data, modified_by) def remove_edge( self, source: str, target: str, modified_by: str = "canvas" ) -> None: """ Remove a canvas edge. Args: source: Source node ID target: Target node ID modified_by: Who/what is making the change """ data = self.load_raw() if "canvas" in data and data["canvas"] and "edges" in data["canvas"]: data["canvas"]["edges"] = [ e for e in data["canvas"]["edges"] if not (e.get("source") == source and e.get("target") == target) ] self.save(data, modified_by) # ========================================================================= # Custom Function Support # ========================================================================= def add_custom_function( self, name: str, code: str, outputs: List[str], description: Optional[str] = None, modified_by: str = "claude" ) -> str: """ Add a custom extractor function. Args: name: Function name code: Python source code outputs: List of output names description: Optional description modified_by: Who/what is making the change Returns: Generated extractor ID Raises: SpecValidationError: If Python syntax is invalid """ # Validate Python syntax try: compile(code, f"", "exec") except SyntaxError as e: raise SpecValidationError( f"Invalid Python syntax: {e.msg} at line {e.lineno}" ) data = self.load_raw() # Generate extractor ID ext_id = self._generate_id("extractor", data) # Create extractor extractor = { "id": ext_id, "name": description or f"Custom: {name}", "type": "custom_function", "builtin": False, "function": { "name": name, "module": "custom_extractors.dynamic", "source_code": code }, "outputs": [{"name": o, "metric": "custom"} for o in outputs], "canvas_position": self._auto_position("extractor", data) } data["extractors"].append(extractor) self.save(data, modified_by) return ext_id def update_custom_function( self, extractor_id: str, code: Optional[str] = None, outputs: Optional[List[str]] = None, modified_by: str = "claude" ) -> None: """ Update an existing custom function. Args: extractor_id: ID of the custom extractor code: New Python code (optional) outputs: New outputs (optional) modified_by: Who/what is making the change """ data = self.load_raw() # Find the extractor extractor = None for ext in data.get("extractors", []): if ext.get("id") == extractor_id: extractor = ext break if not extractor: raise SpecManagerError(f"Extractor not found: {extractor_id}") if extractor.get("type") != "custom_function": raise SpecManagerError(f"Extractor {extractor_id} is not a custom function") # Update code if code is not None: try: compile(code, f"", "exec") except SyntaxError as e: raise SpecValidationError( f"Invalid Python syntax: {e.msg} at line {e.lineno}" ) if "function" not in extractor: extractor["function"] = {} extractor["function"]["source_code"] = code # Update outputs if outputs is not None: extractor["outputs"] = [{"name": o, "metric": "custom"} for o in outputs] self.save(data, modified_by) # ========================================================================= # WebSocket Subscription # ========================================================================= def subscribe(self, subscriber: WebSocketSubscriber) -> None: """Subscribe to spec changes.""" if subscriber not in self._subscribers: self._subscribers.append(subscriber) def unsubscribe(self, subscriber: WebSocketSubscriber) -> None: """Unsubscribe from spec changes.""" if subscriber in self._subscribers: self._subscribers.remove(subscriber) def _broadcast(self, message: Dict[str, Any]) -> None: """Broadcast message to all subscribers.""" import asyncio for subscriber in self._subscribers: try: # Handle both sync and async contexts try: loop = asyncio.get_running_loop() loop.create_task(subscriber.send_json(message)) except RuntimeError: # No running loop, try direct call if possible pass except Exception: # Subscriber may have disconnected pass # ========================================================================= # Helper Methods # ========================================================================= def _compute_hash(self, data: Dict) -> str: """Compute hash of spec data for conflict detection.""" # Sort keys for consistent hashing json_str = json.dumps(data, sort_keys=True, ensure_ascii=False) return hashlib.sha256(json_str.encode()).hexdigest()[:16] def _generate_id(self, node_type: str, data: Dict) -> str: """Generate unique ID for a node type.""" prefix_map = { "designVar": "dv", "design_variable": "dv", "extractor": "ext", "objective": "obj", "constraint": "con" } prefix = prefix_map.get(node_type, node_type[:3]) # Find existing IDs section = self._get_section_for_type(node_type) existing_ids: Set[str] = set() if section in data and data[section]: existing_ids = {n.get("id", "") for n in data[section]} # Generate next available ID for i in range(1, 1000): new_id = f"{prefix}_{i:03d}" if new_id not in existing_ids: return new_id raise SpecManagerError(f"Cannot generate ID for {node_type}: too many nodes") def _get_section_for_type(self, node_type: str) -> str: """Map node type to spec section name.""" section_map = { "designVar": "design_variables", "design_variable": "design_variables", "extractor": "extractors", "objective": "objectives", "constraint": "constraints" } return section_map.get(node_type, node_type + "s") def _auto_position(self, node_type: str, data: Dict) -> Dict[str, float]: """Calculate auto position for a new node.""" # Default x positions by type x_positions = { "designVar": 50, "design_variable": 50, "extractor": 740, "objective": 1020, "constraint": 1020 } x = x_positions.get(node_type, 400) # Find max y position for this type section = self._get_section_for_type(node_type) max_y = 0 if section in data and data[section]: for node in data[section]: pos = node.get("canvas_position", {}) y = pos.get("y", 0) if y > max_y: max_y = y # Place below existing nodes y = max_y + 100 if max_y > 0 else 100 return {"x": x, "y": y} # ========================================================================= # Factory Function # ========================================================================= def get_spec_manager(study_path: Union[str, Path]) -> SpecManager: """ Get a SpecManager instance for a study. Args: study_path: Path to the study directory Returns: SpecManager instance """ return SpecManager(study_path)