feat(dashboard): Enhanced chat, spec management, and Claude integration
Backend: - spec.py: New AtomizerSpec REST API endpoints - spec_manager.py: SpecManager service for unified config - interview_engine.py: Study creation interview logic - claude.py: Enhanced Claude API with context - optimization.py: Extended optimization endpoints - context_builder.py, session_manager.py: Improved services Frontend: - Chat components: Enhanced message rendering, tool call cards - Hooks: useClaudeCode, useSpecWebSocket, improved useChat - Pages: Updated Dashboard, Analysis, Insights, Setup, Home - Components: ParallelCoordinatesPlot, ParetoPlot improvements - App.tsx: Route updates for canvas/studio Infrastructure: - vite.config.ts: Build configuration updates - start/stop-dashboard.bat: Script improvements
This commit is contained in:
747
atomizer-dashboard/backend/api/services/spec_manager.py
Normal file
747
atomizer-dashboard/backend/api/services/spec_manager.py
Normal file
@@ -0,0 +1,747 @@
|
||||
"""
|
||||
SpecManager Service
|
||||
|
||||
Central service for managing AtomizerSpec v2.0.
|
||||
All spec modifications flow through this service.
|
||||
|
||||
Features:
|
||||
- Load/save specs with validation
|
||||
- Atomic writes with conflict detection
|
||||
- Patch operations with JSONPath support
|
||||
- Node CRUD operations
|
||||
- Custom function support
|
||||
- WebSocket broadcast integration
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
|
||||
|
||||
# Add optimization_engine to path if needed
|
||||
ATOMIZER_ROOT = Path(__file__).parent.parent.parent.parent.parent
|
||||
if str(ATOMIZER_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ATOMIZER_ROOT))
|
||||
|
||||
from optimization_engine.config.spec_models import (
|
||||
AtomizerSpec,
|
||||
DesignVariable,
|
||||
Extractor,
|
||||
Objective,
|
||||
Constraint,
|
||||
CanvasPosition,
|
||||
CanvasEdge,
|
||||
ExtractorType,
|
||||
CustomFunction,
|
||||
ExtractorOutput,
|
||||
ValidationReport,
|
||||
)
|
||||
from optimization_engine.config.spec_validator import (
|
||||
SpecValidator,
|
||||
SpecValidationError,
|
||||
)
|
||||
|
||||
|
||||
class SpecManagerError(Exception):
|
||||
"""Base error for SpecManager operations."""
|
||||
pass
|
||||
|
||||
|
||||
class SpecNotFoundError(SpecManagerError):
|
||||
"""Raised when spec file doesn't exist."""
|
||||
pass
|
||||
|
||||
|
||||
class SpecConflictError(SpecManagerError):
|
||||
"""Raised when spec has been modified by another client."""
|
||||
|
||||
def __init__(self, message: str, current_hash: str):
|
||||
super().__init__(message)
|
||||
self.current_hash = current_hash
|
||||
|
||||
|
||||
class WebSocketSubscriber:
|
||||
"""Protocol for WebSocket subscribers."""
|
||||
|
||||
async def send_json(self, data: Dict[str, Any]) -> None:
|
||||
"""Send JSON data to subscriber."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class SpecManager:
|
||||
"""
|
||||
Central service for managing AtomizerSpec.
|
||||
|
||||
All modifications go through this service to ensure:
|
||||
- Validation on every change
|
||||
- Atomic file writes
|
||||
- Conflict detection via hashing
|
||||
- WebSocket broadcast to all clients
|
||||
"""
|
||||
|
||||
SPEC_FILENAME = "atomizer_spec.json"
|
||||
|
||||
def __init__(self, study_path: Union[str, Path]):
|
||||
"""
|
||||
Initialize SpecManager for a study.
|
||||
|
||||
Args:
|
||||
study_path: Path to the study directory
|
||||
"""
|
||||
self.study_path = Path(study_path)
|
||||
self.spec_path = self.study_path / self.SPEC_FILENAME
|
||||
self.validator = SpecValidator()
|
||||
self._subscribers: List[WebSocketSubscriber] = []
|
||||
self._last_hash: Optional[str] = None
|
||||
|
||||
# =========================================================================
|
||||
# Core CRUD Operations
|
||||
# =========================================================================
|
||||
|
||||
def load(self, validate: bool = True) -> AtomizerSpec:
|
||||
"""
|
||||
Load and optionally validate the spec.
|
||||
|
||||
Args:
|
||||
validate: Whether to validate the spec
|
||||
|
||||
Returns:
|
||||
AtomizerSpec instance
|
||||
|
||||
Raises:
|
||||
SpecNotFoundError: If spec file doesn't exist
|
||||
SpecValidationError: If validation fails
|
||||
"""
|
||||
if not self.spec_path.exists():
|
||||
raise SpecNotFoundError(f"Spec not found: {self.spec_path}")
|
||||
|
||||
with open(self.spec_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if validate:
|
||||
self.validator.validate(data, strict=True)
|
||||
|
||||
spec = AtomizerSpec.model_validate(data)
|
||||
self._last_hash = self._compute_hash(data)
|
||||
return spec
|
||||
|
||||
def load_raw(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Load spec as raw dict without parsing.
|
||||
|
||||
Returns:
|
||||
Raw spec dict
|
||||
|
||||
Raises:
|
||||
SpecNotFoundError: If spec file doesn't exist
|
||||
"""
|
||||
if not self.spec_path.exists():
|
||||
raise SpecNotFoundError(f"Spec not found: {self.spec_path}")
|
||||
|
||||
with open(self.spec_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
|
||||
def save(
|
||||
self,
|
||||
spec: Union[AtomizerSpec, Dict[str, Any]],
|
||||
modified_by: str = "api",
|
||||
expected_hash: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
Save spec with validation and broadcast.
|
||||
|
||||
Args:
|
||||
spec: Spec to save (AtomizerSpec or dict)
|
||||
modified_by: Who/what is making the change
|
||||
expected_hash: If provided, verify current file hash matches
|
||||
|
||||
Returns:
|
||||
New spec hash
|
||||
|
||||
Raises:
|
||||
SpecValidationError: If validation fails
|
||||
SpecConflictError: If expected_hash doesn't match current
|
||||
"""
|
||||
# Convert to dict if needed
|
||||
if isinstance(spec, AtomizerSpec):
|
||||
data = spec.model_dump(mode='json')
|
||||
else:
|
||||
data = spec
|
||||
|
||||
# Check for conflicts if expected_hash provided
|
||||
if expected_hash and self.spec_path.exists():
|
||||
current_hash = self.get_hash()
|
||||
if current_hash != expected_hash:
|
||||
raise SpecConflictError(
|
||||
"Spec was modified by another client",
|
||||
current_hash=current_hash
|
||||
)
|
||||
|
||||
# Update metadata
|
||||
now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
|
||||
data["meta"]["modified"] = now
|
||||
data["meta"]["modified_by"] = modified_by
|
||||
|
||||
# Validate
|
||||
self.validator.validate(data, strict=True)
|
||||
|
||||
# Compute new hash
|
||||
new_hash = self._compute_hash(data)
|
||||
|
||||
# Atomic write (write to temp, then rename)
|
||||
temp_path = self.spec_path.with_suffix('.tmp')
|
||||
with open(temp_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
temp_path.replace(self.spec_path)
|
||||
|
||||
# Update cached hash
|
||||
self._last_hash = new_hash
|
||||
|
||||
# Broadcast to subscribers
|
||||
self._broadcast({
|
||||
"type": "spec_updated",
|
||||
"hash": new_hash,
|
||||
"modified_by": modified_by,
|
||||
"timestamp": now
|
||||
})
|
||||
|
||||
return new_hash
|
||||
|
||||
def exists(self) -> bool:
|
||||
"""Check if spec file exists."""
|
||||
return self.spec_path.exists()
|
||||
|
||||
def get_hash(self) -> str:
|
||||
"""Get current spec hash."""
|
||||
if not self.spec_path.exists():
|
||||
return ""
|
||||
with open(self.spec_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
return self._compute_hash(data)
|
||||
|
||||
def validate_and_report(self) -> ValidationReport:
|
||||
"""
|
||||
Run full validation and return detailed report.
|
||||
|
||||
Returns:
|
||||
ValidationReport with errors, warnings, summary
|
||||
"""
|
||||
if not self.spec_path.exists():
|
||||
raise SpecNotFoundError(f"Spec not found: {self.spec_path}")
|
||||
|
||||
data = self.load_raw()
|
||||
return self.validator.validate(data, strict=False)
|
||||
|
||||
# =========================================================================
|
||||
# Patch Operations
|
||||
# =========================================================================
|
||||
|
||||
def patch(
|
||||
self,
|
||||
path: str,
|
||||
value: Any,
|
||||
modified_by: str = "api"
|
||||
) -> AtomizerSpec:
|
||||
"""
|
||||
Apply a JSONPath-style modification.
|
||||
|
||||
Args:
|
||||
path: JSONPath like "design_variables[0].bounds.max"
|
||||
value: New value to set
|
||||
modified_by: Who/what is making the change
|
||||
|
||||
Returns:
|
||||
Updated AtomizerSpec
|
||||
"""
|
||||
data = self.load_raw()
|
||||
|
||||
# Validate the partial update
|
||||
spec = AtomizerSpec.model_validate(data)
|
||||
is_valid, errors = self.validator.validate_partial(path, value, spec)
|
||||
if not is_valid:
|
||||
raise SpecValidationError(f"Invalid update: {'; '.join(errors)}")
|
||||
|
||||
# Apply the patch
|
||||
self._apply_patch(data, path, value)
|
||||
|
||||
# Save and return
|
||||
self.save(data, modified_by)
|
||||
return self.load(validate=False)
|
||||
|
||||
def _apply_patch(self, data: Dict, path: str, value: Any) -> None:
|
||||
"""
|
||||
Apply a patch to the data dict.
|
||||
|
||||
Supports paths like:
|
||||
- "meta.description"
|
||||
- "design_variables[0].bounds.max"
|
||||
- "objectives[1].weight"
|
||||
"""
|
||||
parts = self._parse_path(path)
|
||||
if not parts:
|
||||
raise ValueError(f"Invalid path: {path}")
|
||||
|
||||
# Navigate to parent
|
||||
current = data
|
||||
for part in parts[:-1]:
|
||||
if isinstance(current, list):
|
||||
idx = int(part)
|
||||
current = current[idx]
|
||||
else:
|
||||
current = current[part]
|
||||
|
||||
# Set final value
|
||||
final_key = parts[-1]
|
||||
if isinstance(current, list):
|
||||
idx = int(final_key)
|
||||
current[idx] = value
|
||||
else:
|
||||
current[final_key] = value
|
||||
|
||||
def _parse_path(self, path: str) -> List[str]:
|
||||
"""Parse JSONPath into parts."""
|
||||
# Handle both dot notation and bracket notation
|
||||
parts = []
|
||||
for part in re.split(r'\.|\[|\]', path):
|
||||
if part:
|
||||
parts.append(part)
|
||||
return parts
|
||||
|
||||
# =========================================================================
|
||||
# Node Operations
|
||||
# =========================================================================
|
||||
|
||||
def add_node(
|
||||
self,
|
||||
node_type: str,
|
||||
node_data: Dict[str, Any],
|
||||
modified_by: str = "canvas"
|
||||
) -> str:
|
||||
"""
|
||||
Add a new node (design var, extractor, objective, constraint).
|
||||
|
||||
Args:
|
||||
node_type: One of 'designVar', 'extractor', 'objective', 'constraint'
|
||||
node_data: Node data without ID
|
||||
modified_by: Who/what is making the change
|
||||
|
||||
Returns:
|
||||
Generated node ID
|
||||
"""
|
||||
data = self.load_raw()
|
||||
|
||||
# Generate ID
|
||||
node_id = self._generate_id(node_type, data)
|
||||
node_data["id"] = node_id
|
||||
|
||||
# Add canvas position if not provided
|
||||
if "canvas_position" not in node_data:
|
||||
node_data["canvas_position"] = self._auto_position(node_type, data)
|
||||
|
||||
# Add to appropriate section
|
||||
section = self._get_section_for_type(node_type)
|
||||
|
||||
if section not in data or data[section] is None:
|
||||
data[section] = []
|
||||
|
||||
data[section].append(node_data)
|
||||
|
||||
self.save(data, modified_by)
|
||||
|
||||
# Broadcast node addition
|
||||
self._broadcast({
|
||||
"type": "node_added",
|
||||
"node_type": node_type,
|
||||
"node_id": node_id,
|
||||
"modified_by": modified_by
|
||||
})
|
||||
|
||||
return node_id
|
||||
|
||||
def update_node(
|
||||
self,
|
||||
node_id: str,
|
||||
updates: Dict[str, Any],
|
||||
modified_by: str = "canvas"
|
||||
) -> None:
|
||||
"""
|
||||
Update an existing node.
|
||||
|
||||
Args:
|
||||
node_id: ID of the node to update
|
||||
updates: Dict of fields to update
|
||||
modified_by: Who/what is making the change
|
||||
"""
|
||||
data = self.load_raw()
|
||||
|
||||
# Find and update the node
|
||||
found = False
|
||||
for section in ["design_variables", "extractors", "objectives", "constraints"]:
|
||||
if section not in data or data[section] is None:
|
||||
continue
|
||||
for node in data[section]:
|
||||
if node.get("id") == node_id:
|
||||
node.update(updates)
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
break
|
||||
|
||||
if not found:
|
||||
raise SpecManagerError(f"Node not found: {node_id}")
|
||||
|
||||
self.save(data, modified_by)
|
||||
|
||||
def remove_node(
|
||||
self,
|
||||
node_id: str,
|
||||
modified_by: str = "canvas"
|
||||
) -> None:
|
||||
"""
|
||||
Remove a node and all edges referencing it.
|
||||
|
||||
Args:
|
||||
node_id: ID of the node to remove
|
||||
modified_by: Who/what is making the change
|
||||
"""
|
||||
data = self.load_raw()
|
||||
|
||||
# Find and remove node
|
||||
removed = False
|
||||
for section in ["design_variables", "extractors", "objectives", "constraints"]:
|
||||
if section not in data or data[section] is None:
|
||||
continue
|
||||
original_len = len(data[section])
|
||||
data[section] = [n for n in data[section] if n.get("id") != node_id]
|
||||
if len(data[section]) < original_len:
|
||||
removed = True
|
||||
break
|
||||
|
||||
if not removed:
|
||||
raise SpecManagerError(f"Node not found: {node_id}")
|
||||
|
||||
# Remove edges referencing this node
|
||||
if "canvas" in data and data["canvas"] and "edges" in data["canvas"]:
|
||||
data["canvas"]["edges"] = [
|
||||
e for e in data["canvas"]["edges"]
|
||||
if e.get("source") != node_id and e.get("target") != node_id
|
||||
]
|
||||
|
||||
self.save(data, modified_by)
|
||||
|
||||
# Broadcast node removal
|
||||
self._broadcast({
|
||||
"type": "node_removed",
|
||||
"node_id": node_id,
|
||||
"modified_by": modified_by
|
||||
})
|
||||
|
||||
def update_node_position(
|
||||
self,
|
||||
node_id: str,
|
||||
position: Dict[str, float],
|
||||
modified_by: str = "canvas"
|
||||
) -> None:
|
||||
"""
|
||||
Update a node's canvas position.
|
||||
|
||||
Args:
|
||||
node_id: ID of the node
|
||||
position: Dict with x, y coordinates
|
||||
modified_by: Who/what is making the change
|
||||
"""
|
||||
self.update_node(node_id, {"canvas_position": position}, modified_by)
|
||||
|
||||
def add_edge(
|
||||
self,
|
||||
source: str,
|
||||
target: str,
|
||||
modified_by: str = "canvas"
|
||||
) -> None:
|
||||
"""
|
||||
Add a canvas edge between nodes.
|
||||
|
||||
Args:
|
||||
source: Source node ID
|
||||
target: Target node ID
|
||||
modified_by: Who/what is making the change
|
||||
"""
|
||||
data = self.load_raw()
|
||||
|
||||
# Initialize canvas section if needed
|
||||
if "canvas" not in data or data["canvas"] is None:
|
||||
data["canvas"] = {}
|
||||
if "edges" not in data["canvas"] or data["canvas"]["edges"] is None:
|
||||
data["canvas"]["edges"] = []
|
||||
|
||||
# Check for duplicate
|
||||
for edge in data["canvas"]["edges"]:
|
||||
if edge.get("source") == source and edge.get("target") == target:
|
||||
return # Already exists
|
||||
|
||||
data["canvas"]["edges"].append({
|
||||
"source": source,
|
||||
"target": target
|
||||
})
|
||||
|
||||
self.save(data, modified_by)
|
||||
|
||||
def remove_edge(
|
||||
self,
|
||||
source: str,
|
||||
target: str,
|
||||
modified_by: str = "canvas"
|
||||
) -> None:
|
||||
"""
|
||||
Remove a canvas edge.
|
||||
|
||||
Args:
|
||||
source: Source node ID
|
||||
target: Target node ID
|
||||
modified_by: Who/what is making the change
|
||||
"""
|
||||
data = self.load_raw()
|
||||
|
||||
if "canvas" in data and data["canvas"] and "edges" in data["canvas"]:
|
||||
data["canvas"]["edges"] = [
|
||||
e for e in data["canvas"]["edges"]
|
||||
if not (e.get("source") == source and e.get("target") == target)
|
||||
]
|
||||
|
||||
self.save(data, modified_by)
|
||||
|
||||
# =========================================================================
|
||||
# Custom Function Support
|
||||
# =========================================================================
|
||||
|
||||
def add_custom_function(
|
||||
self,
|
||||
name: str,
|
||||
code: str,
|
||||
outputs: List[str],
|
||||
description: Optional[str] = None,
|
||||
modified_by: str = "claude"
|
||||
) -> str:
|
||||
"""
|
||||
Add a custom extractor function.
|
||||
|
||||
Args:
|
||||
name: Function name
|
||||
code: Python source code
|
||||
outputs: List of output names
|
||||
description: Optional description
|
||||
modified_by: Who/what is making the change
|
||||
|
||||
Returns:
|
||||
Generated extractor ID
|
||||
|
||||
Raises:
|
||||
SpecValidationError: If Python syntax is invalid
|
||||
"""
|
||||
# Validate Python syntax
|
||||
try:
|
||||
compile(code, f"<custom:{name}>", "exec")
|
||||
except SyntaxError as e:
|
||||
raise SpecValidationError(
|
||||
f"Invalid Python syntax: {e.msg} at line {e.lineno}"
|
||||
)
|
||||
|
||||
data = self.load_raw()
|
||||
|
||||
# Generate extractor ID
|
||||
ext_id = self._generate_id("extractor", data)
|
||||
|
||||
# Create extractor
|
||||
extractor = {
|
||||
"id": ext_id,
|
||||
"name": description or f"Custom: {name}",
|
||||
"type": "custom_function",
|
||||
"builtin": False,
|
||||
"function": {
|
||||
"name": name,
|
||||
"module": "custom_extractors.dynamic",
|
||||
"source_code": code
|
||||
},
|
||||
"outputs": [{"name": o, "metric": "custom"} for o in outputs],
|
||||
"canvas_position": self._auto_position("extractor", data)
|
||||
}
|
||||
|
||||
data["extractors"].append(extractor)
|
||||
self.save(data, modified_by)
|
||||
|
||||
return ext_id
|
||||
|
||||
def update_custom_function(
|
||||
self,
|
||||
extractor_id: str,
|
||||
code: Optional[str] = None,
|
||||
outputs: Optional[List[str]] = None,
|
||||
modified_by: str = "claude"
|
||||
) -> None:
|
||||
"""
|
||||
Update an existing custom function.
|
||||
|
||||
Args:
|
||||
extractor_id: ID of the custom extractor
|
||||
code: New Python code (optional)
|
||||
outputs: New outputs (optional)
|
||||
modified_by: Who/what is making the change
|
||||
"""
|
||||
data = self.load_raw()
|
||||
|
||||
# Find the extractor
|
||||
extractor = None
|
||||
for ext in data.get("extractors", []):
|
||||
if ext.get("id") == extractor_id:
|
||||
extractor = ext
|
||||
break
|
||||
|
||||
if not extractor:
|
||||
raise SpecManagerError(f"Extractor not found: {extractor_id}")
|
||||
|
||||
if extractor.get("type") != "custom_function":
|
||||
raise SpecManagerError(f"Extractor {extractor_id} is not a custom function")
|
||||
|
||||
# Update code
|
||||
if code is not None:
|
||||
try:
|
||||
compile(code, f"<custom:{extractor_id}>", "exec")
|
||||
except SyntaxError as e:
|
||||
raise SpecValidationError(
|
||||
f"Invalid Python syntax: {e.msg} at line {e.lineno}"
|
||||
)
|
||||
if "function" not in extractor:
|
||||
extractor["function"] = {}
|
||||
extractor["function"]["source_code"] = code
|
||||
|
||||
# Update outputs
|
||||
if outputs is not None:
|
||||
extractor["outputs"] = [{"name": o, "metric": "custom"} for o in outputs]
|
||||
|
||||
self.save(data, modified_by)
|
||||
|
||||
# =========================================================================
|
||||
# WebSocket Subscription
|
||||
# =========================================================================
|
||||
|
||||
def subscribe(self, subscriber: WebSocketSubscriber) -> None:
|
||||
"""Subscribe to spec changes."""
|
||||
if subscriber not in self._subscribers:
|
||||
self._subscribers.append(subscriber)
|
||||
|
||||
def unsubscribe(self, subscriber: WebSocketSubscriber) -> None:
|
||||
"""Unsubscribe from spec changes."""
|
||||
if subscriber in self._subscribers:
|
||||
self._subscribers.remove(subscriber)
|
||||
|
||||
def _broadcast(self, message: Dict[str, Any]) -> None:
|
||||
"""Broadcast message to all subscribers."""
|
||||
import asyncio
|
||||
|
||||
for subscriber in self._subscribers:
|
||||
try:
|
||||
# Handle both sync and async contexts
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
loop.create_task(subscriber.send_json(message))
|
||||
except RuntimeError:
|
||||
# No running loop, try direct call if possible
|
||||
pass
|
||||
except Exception:
|
||||
# Subscriber may have disconnected
|
||||
pass
|
||||
|
||||
# =========================================================================
|
||||
# Helper Methods
|
||||
# =========================================================================
|
||||
|
||||
def _compute_hash(self, data: Dict) -> str:
|
||||
"""Compute hash of spec data for conflict detection."""
|
||||
# Sort keys for consistent hashing
|
||||
json_str = json.dumps(data, sort_keys=True, ensure_ascii=False)
|
||||
return hashlib.sha256(json_str.encode()).hexdigest()[:16]
|
||||
|
||||
def _generate_id(self, node_type: str, data: Dict) -> str:
|
||||
"""Generate unique ID for a node type."""
|
||||
prefix_map = {
|
||||
"designVar": "dv",
|
||||
"design_variable": "dv",
|
||||
"extractor": "ext",
|
||||
"objective": "obj",
|
||||
"constraint": "con"
|
||||
}
|
||||
prefix = prefix_map.get(node_type, node_type[:3])
|
||||
|
||||
# Find existing IDs
|
||||
section = self._get_section_for_type(node_type)
|
||||
existing_ids: Set[str] = set()
|
||||
if section in data and data[section]:
|
||||
existing_ids = {n.get("id", "") for n in data[section]}
|
||||
|
||||
# Generate next available ID
|
||||
for i in range(1, 1000):
|
||||
new_id = f"{prefix}_{i:03d}"
|
||||
if new_id not in existing_ids:
|
||||
return new_id
|
||||
|
||||
raise SpecManagerError(f"Cannot generate ID for {node_type}: too many nodes")
|
||||
|
||||
def _get_section_for_type(self, node_type: str) -> str:
|
||||
"""Map node type to spec section name."""
|
||||
section_map = {
|
||||
"designVar": "design_variables",
|
||||
"design_variable": "design_variables",
|
||||
"extractor": "extractors",
|
||||
"objective": "objectives",
|
||||
"constraint": "constraints"
|
||||
}
|
||||
return section_map.get(node_type, node_type + "s")
|
||||
|
||||
def _auto_position(self, node_type: str, data: Dict) -> Dict[str, float]:
|
||||
"""Calculate auto position for a new node."""
|
||||
# Default x positions by type
|
||||
x_positions = {
|
||||
"designVar": 50,
|
||||
"design_variable": 50,
|
||||
"extractor": 740,
|
||||
"objective": 1020,
|
||||
"constraint": 1020
|
||||
}
|
||||
|
||||
x = x_positions.get(node_type, 400)
|
||||
|
||||
# Find max y position for this type
|
||||
section = self._get_section_for_type(node_type)
|
||||
max_y = 0
|
||||
if section in data and data[section]:
|
||||
for node in data[section]:
|
||||
pos = node.get("canvas_position", {})
|
||||
y = pos.get("y", 0)
|
||||
if y > max_y:
|
||||
max_y = y
|
||||
|
||||
# Place below existing nodes
|
||||
y = max_y + 100 if max_y > 0 else 100
|
||||
|
||||
return {"x": x, "y": y}
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Factory Function
|
||||
# =========================================================================
|
||||
|
||||
def get_spec_manager(study_path: Union[str, Path]) -> SpecManager:
|
||||
"""
|
||||
Get a SpecManager instance for a study.
|
||||
|
||||
Args:
|
||||
study_path: Path to the study directory
|
||||
|
||||
Returns:
|
||||
SpecManager instance
|
||||
"""
|
||||
return SpecManager(study_path)
|
||||
Reference in New Issue
Block a user