Files
Atomizer/optimization_engine/config/migrator.py

845 lines
31 KiB
Python
Raw Normal View History

"""
AtomizerSpec v2.0 Migrator
Converts legacy optimization_config.json files to AtomizerSpec v2.0 format.
Supports migration from:
- Mirror/Zernike configs (extraction_method, zernike_settings)
- Structural/Bracket configs (optimization_settings, simulation_settings)
- Canvas Intent format (simplified canvas output)
Migration Rules:
- bounds: [min, max] -> bounds: {min, max}
- parameter -> expression_name
- goal/type: "minimize"/"maximize" -> direction: "minimize"/"maximize"
- Infers extractors from objectives and extraction settings
- Generates canvas edges automatically
"""
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import json
import re
class MigrationError(Exception):
"""Raised when migration fails."""
pass
class SpecMigrator:
"""
Migrate old optimization_config.json to AtomizerSpec v2.0.
Handles multiple legacy formats and infers missing information.
"""
# Extractor type inference based on objective names
EXTRACTOR_INFERENCE = {
# Zernike patterns
r"wfe|zernike|opd": "zernike_opd",
r"mfg|manufacturing": "zernike_opd",
r"rms": "zernike_opd",
# Structural patterns
r"displacement|deflection|deform": "displacement",
r"stress|von.?mises": "stress",
r"frequency|modal|eigen": "frequency",
r"mass|weight": "mass",
r"stiffness": "displacement", # Stiffness computed from displacement
r"temperature|thermal": "temperature",
}
def __init__(self, study_path: Optional[Path] = None):
"""
Initialize migrator.
Args:
study_path: Path to study directory (for inferring sim/fem paths)
"""
self.study_path = Path(study_path) if study_path else None
self._extractor_counter = 0
self._objective_counter = 0
self._constraint_counter = 0
self._dv_counter = 0
def migrate(
self,
old_config: Dict[str, Any],
study_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Convert old config to AtomizerSpec v2.0.
Args:
old_config: Legacy config dict
study_name: Override study name (defaults to config value)
Returns:
AtomizerSpec v2.0 dict
"""
# Reset counters
self._extractor_counter = 0
self._objective_counter = 0
self._constraint_counter = 0
self._dv_counter = 0
# Detect config type
config_type = self._detect_config_type(old_config)
# Build spec
spec = {
"meta": self._migrate_meta(old_config, study_name),
"model": self._migrate_model(old_config, config_type),
"design_variables": self._migrate_design_variables(old_config),
"extractors": [],
"objectives": [],
"constraints": [],
"optimization": self._migrate_optimization(old_config, config_type),
"canvas": {"edges": [], "layout_version": "2.0"}
}
# Migrate extractors and objectives together (they're linked)
extractors, objectives = self._migrate_extractors_and_objectives(old_config, config_type)
spec["extractors"] = extractors
spec["objectives"] = objectives
# Migrate constraints
spec["constraints"] = self._migrate_constraints(old_config, spec["extractors"])
# Generate canvas edges
spec["canvas"]["edges"] = self._generate_edges(spec)
# Add workflow if SAT/turbo settings present
if self._has_sat_settings(old_config):
spec["workflow"] = self._migrate_workflow(old_config)
return spec
def migrate_file(
self,
config_path: Union[str, Path],
output_path: Optional[Union[str, Path]] = None
) -> Dict[str, Any]:
"""
Migrate a config file and optionally save the result.
Args:
config_path: Path to old config file
output_path: Path to save new spec (optional)
Returns:
AtomizerSpec v2.0 dict
"""
config_path = Path(config_path)
if not config_path.exists():
raise MigrationError(f"Config file not found: {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
old_config = json.load(f)
# Infer study path from config location
if self.study_path is None:
# Config is typically in study_dir/1_setup/ or study_dir/
if config_path.parent.name == "1_setup":
self.study_path = config_path.parent.parent
else:
self.study_path = config_path.parent
spec = self.migrate(old_config)
if output_path:
output_path = Path(output_path)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(spec, f, indent=2, ensure_ascii=False)
return spec
# =========================================================================
# Detection
# =========================================================================
def _detect_config_type(self, config: Dict) -> str:
"""Detect the type of config format."""
if "extraction_method" in config or "zernike_settings" in config:
return "mirror"
elif "simulation_settings" in config or "extraction_settings" in config:
return "structural"
elif "optimization_settings" in config:
return "structural"
elif "extractors" in config:
# Already partially in new format (canvas intent)
return "canvas_intent"
else:
# Generic/minimal format
return "generic"
def _has_sat_settings(self, config: Dict) -> bool:
"""Check if config has SAT/turbo settings."""
return (
"sat_settings" in config or
config.get("optimization", {}).get("algorithm") in ["SAT_v3", "SAT", "turbo"]
)
# =========================================================================
# Meta Migration
# =========================================================================
def _migrate_meta(self, config: Dict, study_name: Optional[str]) -> Dict:
"""Migrate metadata section."""
now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
name = study_name or config.get("study_name", "migrated_study")
# Ensure snake_case
name = re.sub(r'[^a-z0-9_]', '_', name.lower())
name = re.sub(r'_+', '_', name).strip('_')
meta = {
"version": "2.0",
"created": now,
"modified": now,
"created_by": "migration",
"modified_by": "migration",
"study_name": name,
"description": config.get("description", ""),
"tags": []
}
# Extract tags from various sources
if "study_tag" in config:
meta["tags"].append(config["study_tag"])
if "business_context" in config:
meta["engineering_context"] = config["business_context"].get("purpose", "")
# Infer tags from config type
if "zernike_settings" in config:
meta["tags"].extend(["mirror", "zernike"])
if "extraction_method" in config:
if config["extraction_method"].get("type") == "zernike_opd":
meta["tags"].append("opd")
return meta
# =========================================================================
# Model Migration
# =========================================================================
def _migrate_model(self, config: Dict, config_type: str) -> Dict:
"""Migrate model section (sim/fem/prt paths)."""
model = {
"sim": {
"path": "",
"solver": "nastran"
}
}
# Extract from nx_settings (mirror format)
if "nx_settings" in config:
nx = config["nx_settings"]
model["sim"]["path"] = nx.get("sim_file", "")
if "nx_install_path" in nx:
model["nx_settings"] = {
"nx_install_path": nx["nx_install_path"],
"simulation_timeout_s": nx.get("simulation_timeout_s", 600)
}
# Extract from simulation_settings (structural format)
elif "simulation_settings" in config:
sim = config["simulation_settings"]
model["sim"]["path"] = sim.get("sim_file", "")
solver = sim.get("solver", "nastran").lower()
# Normalize solver name - valid values: nastran, NX_Nastran, abaqus
solver_map = {"nx": "nastran", "nx_nastran": "NX_Nastran", "nxnastran": "NX_Nastran"}
model["sim"]["solver"] = solver_map.get(solver, "nastran" if solver not in ["nastran", "NX_Nastran", "abaqus"] else solver)
if sim.get("solution_type"):
model["sim"]["solution_type"] = sim["solution_type"]
if sim.get("model_file"):
model["nx_part"] = {"path": sim["model_file"]}
if sim.get("fem_file"):
model["fem"] = {"path": sim["fem_file"]}
# Try to infer from study path
if self.study_path and not model["sim"]["path"]:
setup_dir = self.study_path / "1_setup" / "model"
if setup_dir.exists():
for f in setup_dir.glob("*.sim"):
model["sim"]["path"] = str(f.relative_to(self.study_path))
break
return model
# =========================================================================
# Design Variables Migration
# =========================================================================
def _migrate_design_variables(self, config: Dict) -> List[Dict]:
"""Migrate design variables."""
dvs = []
for dv in config.get("design_variables", []):
self._dv_counter += 1
# Handle different bound formats
if "bounds" in dv:
if isinstance(dv["bounds"], list):
bounds = {"min": dv["bounds"][0], "max": dv["bounds"][1]}
else:
bounds = dv["bounds"]
else:
bounds = {"min": dv.get("min", 0), "max": dv.get("max", 1)}
# Ensure min < max (fix degenerate cases)
if bounds["min"] >= bounds["max"]:
# Expand bounds slightly around the value
val = bounds["min"]
if val == 0:
bounds = {"min": -0.001, "max": 0.001}
else:
bounds = {"min": val * 0.99, "max": val * 1.01}
# Determine type
dv_type = dv.get("type", "continuous")
if dv_type not in ["continuous", "integer", "categorical"]:
dv_type = "continuous"
new_dv = {
"id": f"dv_{self._dv_counter:03d}",
"name": dv.get("name", f"param_{self._dv_counter}"),
"expression_name": dv.get("expression_name", dv.get("parameter", dv.get("name", ""))),
"type": dv_type,
"bounds": bounds,
"baseline": dv.get("baseline", dv.get("initial")),
"units": dv.get("units", dv.get("unit", "")),
"enabled": dv.get("enabled", True),
"description": dv.get("description", dv.get("notes", "")),
"canvas_position": {"x": 50, "y": 100 + (self._dv_counter - 1) * 80}
}
dvs.append(new_dv)
return dvs
# =========================================================================
# Extractors and Objectives Migration
# =========================================================================
def _migrate_extractors_and_objectives(
self,
config: Dict,
config_type: str
) -> Tuple[List[Dict], List[Dict]]:
"""
Migrate extractors and objectives together.
Returns tuple of (extractors, objectives).
"""
extractors = []
objectives = []
# Handle mirror/zernike configs
if config_type == "mirror" and "zernike_settings" in config:
extractor = self._create_zernike_extractor(config)
extractors.append(extractor)
# Create objectives from config
for obj in config.get("objectives", []):
self._objective_counter += 1
objectives.append(self._create_objective(obj, extractor["id"]))
# Handle structural configs
elif config_type == "structural":
# Create extractors based on extraction_settings
if "extraction_settings" in config:
extractor = self._create_structural_extractor(config)
extractors.append(extractor)
ext_id = extractor["id"]
else:
# Infer extractors from objectives
ext_id = None
for obj in config.get("objectives", []):
self._objective_counter += 1
# Infer extractor if not yet created
if ext_id is None:
inferred_type = self._infer_extractor_type(obj.get("name", ""))
ext_id = self._get_or_create_extractor(extractors, inferred_type, obj.get("name", ""))
objectives.append(self._create_objective(obj, ext_id))
# Handle canvas intent or generic
else:
# Pass through existing extractors if present
for ext in config.get("extractors", []):
self._extractor_counter += 1
ext_copy = dict(ext)
if "id" not in ext_copy:
ext_copy["id"] = f"ext_{self._extractor_counter:03d}"
extractors.append(ext_copy)
# Create objectives
for obj in config.get("objectives", []):
self._objective_counter += 1
# Find or create extractor
ext_id = None
if extractors:
ext_id = extractors[0]["id"]
else:
inferred_type = self._infer_extractor_type(obj.get("name", ""))
ext_id = self._get_or_create_extractor(extractors, inferred_type, obj.get("name", ""))
objectives.append(self._create_objective(obj, ext_id))
return extractors, objectives
def _create_zernike_extractor(self, config: Dict) -> Dict:
"""Create a Zernike OPD extractor from config."""
self._extractor_counter += 1
zs = config.get("zernike_settings", {})
em = config.get("extraction_method", {})
# Collect all output names from objectives
outputs = []
for obj in config.get("objectives", []):
obj_name = obj.get("name", "")
outputs.append({
"name": obj_name,
"metric": "filtered_rms_nm"
})
# Get outer radius with sensible default for telescope mirrors
outer_radius = em.get("outer_radius", zs.get("outer_radius"))
if outer_radius is None:
# Default to typical M1 mirror outer radius
outer_radius = 500.0
extractor = {
"id": f"ext_{self._extractor_counter:03d}",
"name": "Zernike WFE Extractor",
"type": "zernike_opd",
"builtin": True,
"config": {
"inner_radius_mm": em.get("inner_radius", zs.get("inner_radius", 0)),
"outer_radius_mm": outer_radius,
"n_modes": zs.get("n_modes", 40),
"filter_low_orders": zs.get("filter_low_orders", 4),
"displacement_unit": zs.get("displacement_unit", "mm"),
"reference_subcase": int(zs.get("reference_subcase", 1))
},
"outputs": outputs,
"canvas_position": {"x": 740, "y": 100}
}
return extractor
def _create_structural_extractor(self, config: Dict) -> Dict:
"""Create extractor from extraction_settings."""
self._extractor_counter += 1
es = config.get("extraction_settings", {})
# Infer type from extractor class name
extractor_class = es.get("extractor_class", "")
if "stiffness" in extractor_class.lower():
ext_type = "displacement"
elif "stress" in extractor_class.lower():
ext_type = "stress"
elif "frequency" in extractor_class.lower():
ext_type = "frequency"
else:
ext_type = "displacement"
# Create outputs from objectives
outputs = []
for obj in config.get("objectives", []):
outputs.append({
"name": obj.get("name", "output"),
"metric": es.get("displacement_aggregation", "max")
})
extractor = {
"id": f"ext_{self._extractor_counter:03d}",
"name": f"{extractor_class or 'Results'} Extractor",
"type": ext_type,
"builtin": True,
"config": {
"result_type": es.get("displacement_component", "z"),
"metric": es.get("displacement_aggregation", "max")
},
"outputs": outputs,
"canvas_position": {"x": 740, "y": 100}
}
return extractor
def _infer_extractor_type(self, objective_name: str) -> str:
"""Infer extractor type from objective name."""
name_lower = objective_name.lower()
for pattern, ext_type in self.EXTRACTOR_INFERENCE.items():
if re.search(pattern, name_lower):
return ext_type
return "displacement" # Default
def _get_or_create_extractor(
self,
extractors: List[Dict],
ext_type: str,
output_name: str
) -> str:
"""Get existing extractor of type or create new one."""
# Look for existing
for ext in extractors:
if ext.get("type") == ext_type:
# Add output if not present
output_names = {o["name"] for o in ext.get("outputs", [])}
if output_name not in output_names:
ext["outputs"].append({"name": output_name, "metric": "total"})
return ext["id"]
# Create new
self._extractor_counter += 1
ext_id = f"ext_{self._extractor_counter:03d}"
extractor = {
"id": ext_id,
"name": f"{ext_type.title()} Extractor",
"type": ext_type,
"builtin": True,
"outputs": [{"name": output_name, "metric": "total"}],
"canvas_position": {"x": 740, "y": 100 + (len(extractors)) * 150}
}
extractors.append(extractor)
return ext_id
def _create_objective(self, obj: Dict, extractor_id: str) -> Dict:
"""Create objective from old format."""
# Normalize direction
direction = obj.get("direction", obj.get("type", obj.get("goal", "minimize")))
if direction not in ["minimize", "maximize"]:
direction = "minimize" if "min" in direction.lower() else "maximize"
obj_name = obj.get("name", f"objective_{self._objective_counter}")
return {
"id": f"obj_{self._objective_counter:03d}",
"name": obj.get("description", obj_name),
"direction": direction,
"weight": obj.get("weight", 1.0),
"source": {
"extractor_id": extractor_id,
"output_name": obj_name
},
"target": obj.get("target"),
"units": obj.get("units", ""),
"canvas_position": {"x": 1020, "y": 100 + (self._objective_counter - 1) * 100}
}
# =========================================================================
# Constraints Migration
# =========================================================================
def _migrate_constraints(self, config: Dict, extractors: List[Dict]) -> List[Dict]:
"""Migrate constraints."""
constraints = []
for con in config.get("constraints", []):
self._constraint_counter += 1
# Determine constraint type
con_type = con.get("type", "hard")
if con_type not in ["hard", "soft"]:
# Infer from type field
if con_type in ["less_than", "greater_than", "less_equal", "greater_equal"]:
con_type = "hard"
# Determine operator
operator = con.get("operator", "<=")
old_type = con.get("type", "")
if "less" in old_type:
operator = "<=" if "equal" in old_type else "<"
elif "greater" in old_type:
operator = ">=" if "equal" in old_type else ">"
# Try to parse expression for threshold
threshold = con.get("threshold", con.get("value"))
if threshold is None and "expression" in con:
# Parse from expression like "mass_kg <= 120.0"
match = re.search(r'([<>=!]+)\s*([\d.]+)', con["expression"])
if match:
operator = match.group(1)
threshold = float(match.group(2))
# Find or create extractor for constraint
con_name = con.get("name", "constraint")
extractor_id = None
output_name = con_name
# Check if name matches existing objective (share extractor)
for ext in extractors:
for out in ext.get("outputs", []):
if con_name.replace("_max", "").replace("_min", "") in out["name"]:
extractor_id = ext["id"]
output_name = out["name"]
break
if extractor_id:
break
# If no match, use first extractor or create mass extractor for mass constraints
if extractor_id is None:
if "mass" in con_name.lower():
# Check if mass extractor exists
for ext in extractors:
if ext.get("type") == "mass":
extractor_id = ext["id"]
break
if extractor_id is None:
# Create mass extractor
ext_id = f"ext_{len(extractors) + 1:03d}"
extractors.append({
"id": ext_id,
"name": "Mass Extractor",
"type": "mass",
"builtin": True,
"outputs": [{"name": "mass_kg", "metric": "total"}],
"canvas_position": {"x": 740, "y": 100 + len(extractors) * 150}
})
extractor_id = ext_id
output_name = "mass_kg"
elif extractors:
extractor_id = extractors[0]["id"]
output_name = extractors[0]["outputs"][0]["name"] if extractors[0].get("outputs") else con_name
constraint = {
"id": f"con_{self._constraint_counter:03d}",
"name": con.get("description", con_name),
"type": con_type if con_type in ["hard", "soft"] else "hard",
"operator": operator,
"threshold": threshold or 0,
"source": {
"extractor_id": extractor_id or "ext_001",
"output_name": output_name
},
"penalty_config": {
"method": "quadratic",
"weight": con.get("penalty_weight", 1000.0)
},
"canvas_position": {"x": 1020, "y": 400 + (self._constraint_counter - 1) * 100}
}
constraints.append(constraint)
return constraints
# =========================================================================
# Optimization Migration
# =========================================================================
def _migrate_optimization(self, config: Dict, config_type: str) -> Dict:
"""Migrate optimization settings."""
# Extract from different locations
if "optimization" in config:
opt = config["optimization"]
elif "optimization_settings" in config:
opt = config["optimization_settings"]
else:
opt = {}
# Normalize algorithm name
algo = opt.get("algorithm", opt.get("sampler", "TPE"))
algo_map = {
"tpe": "TPE",
"tpesampler": "TPE",
"cma-es": "CMA-ES",
"cmaes": "CMA-ES",
"nsga-ii": "NSGA-II",
"nsgaii": "NSGA-II",
"nsga2": "NSGA-II",
"random": "RandomSearch",
"randomsampler": "RandomSearch",
"randomsearch": "RandomSearch",
"sat": "SAT_v3",
"sat_v3": "SAT_v3",
"turbo": "SAT_v3",
"gp": "GP-BO",
"gp-bo": "GP-BO",
"gpbo": "GP-BO",
"bo": "GP-BO",
"bayesian": "GP-BO"
}
# Valid algorithm types for schema
valid_algorithms = {"TPE", "CMA-ES", "NSGA-II", "RandomSearch", "SAT_v3", "GP-BO"}
algo = algo_map.get(algo.lower(), algo)
# Fallback to TPE if still invalid
if algo not in valid_algorithms:
algo = "TPE"
optimization = {
"algorithm": {
"type": algo,
"config": {}
},
"budget": {
"max_trials": opt.get("n_trials", 100)
},
"canvas_position": {"x": 1300, "y": 150}
}
# Algorithm-specific config
if algo == "CMA-ES":
optimization["algorithm"]["config"]["sigma0"] = opt.get("sigma0", 0.3)
elif algo == "NSGA-II":
optimization["algorithm"]["config"]["population_size"] = opt.get("population_size", 50)
elif algo == "TPE":
optimization["algorithm"]["config"]["n_startup_trials"] = opt.get("n_startup_trials", 10)
# Seed
if "seed" in opt:
optimization["algorithm"]["config"]["seed"] = opt["seed"]
# Timeout/patience
if opt.get("timeout"):
optimization["budget"]["max_time_hours"] = opt["timeout"] / 3600
# SAT/surrogate settings
if "sat_settings" in config:
sat = config["sat_settings"]
optimization["surrogate"] = {
"enabled": True,
"type": "ensemble",
"config": {
"n_models": sat.get("n_ensemble_models", 10),
"architecture": sat.get("hidden_dims", [256, 128]),
"train_every_n_trials": sat.get("retrain_frequency", 20),
"min_training_samples": sat.get("min_samples", 30)
}
}
return optimization
# =========================================================================
# Workflow Migration
# =========================================================================
def _migrate_workflow(self, config: Dict) -> Dict:
"""Migrate SAT/turbo workflow settings."""
sat = config.get("sat_settings", {})
exploration_trials = sat.get("min_samples", 30)
total_trials = config.get("optimization", {}).get("n_trials", 100)
return {
"stages": [
{
"id": "stage_exploration",
"name": "Design Space Exploration",
"algorithm": "RandomSearch",
"trials": exploration_trials,
"purpose": "Build initial training data for surrogate"
},
{
"id": "stage_optimization",
"name": "Surrogate-Assisted Optimization",
"algorithm": "SAT_v3",
"trials": total_trials - exploration_trials,
"purpose": "Neural-accelerated optimization"
}
],
"transitions": [
{
"from": "stage_exploration",
"to": "stage_optimization",
"condition": f"trial_count >= {exploration_trials}"
}
]
}
# =========================================================================
# Canvas Edge Generation
# =========================================================================
def _generate_edges(self, spec: Dict) -> List[Dict]:
"""Generate canvas edges connecting nodes."""
edges = []
# DVs -> model
for dv in spec.get("design_variables", []):
edges.append({"source": dv["id"], "target": "model"})
# model -> solver
edges.append({"source": "model", "target": "solver"})
# solver -> extractors
for ext in spec.get("extractors", []):
edges.append({"source": "solver", "target": ext["id"]})
# extractors -> objectives
for obj in spec.get("objectives", []):
ext_id = obj.get("source", {}).get("extractor_id")
if ext_id:
edges.append({"source": ext_id, "target": obj["id"]})
# extractors -> constraints
for con in spec.get("constraints", []):
ext_id = con.get("source", {}).get("extractor_id")
if ext_id:
edges.append({"source": ext_id, "target": con["id"]})
# objectives -> optimization
for obj in spec.get("objectives", []):
edges.append({"source": obj["id"], "target": "optimization"})
# constraints -> optimization
for con in spec.get("constraints", []):
edges.append({"source": con["id"], "target": "optimization"})
return edges
# ============================================================================
# Convenience Functions
# ============================================================================
def migrate_config(
old_config: Dict[str, Any],
study_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Migrate old config dict to AtomizerSpec v2.0.
Args:
old_config: Legacy config dict
study_name: Override study name
Returns:
AtomizerSpec v2.0 dict
"""
migrator = SpecMigrator()
return migrator.migrate(old_config, study_name)
def migrate_config_file(
config_path: Union[str, Path],
output_path: Optional[Union[str, Path]] = None
) -> Dict[str, Any]:
"""
Migrate a config file to AtomizerSpec v2.0.
Args:
config_path: Path to old config file
output_path: Path to save new spec (optional)
Returns:
AtomizerSpec v2.0 dict
"""
migrator = SpecMigrator()
return migrator.migrate_file(config_path, output_path)