""" AtomizerSpec v2.0 Migrator Converts legacy optimization_config.json files to AtomizerSpec v2.0 format. Supports migration from: - Mirror/Zernike configs (extraction_method, zernike_settings) - Structural/Bracket configs (optimization_settings, simulation_settings) - Canvas Intent format (simplified canvas output) Migration Rules: - bounds: [min, max] -> bounds: {min, max} - parameter -> expression_name - goal/type: "minimize"/"maximize" -> direction: "minimize"/"maximize" - Infers extractors from objectives and extraction settings - Generates canvas edges automatically """ from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple, Union import json import re class MigrationError(Exception): """Raised when migration fails.""" pass class SpecMigrator: """ Migrate old optimization_config.json to AtomizerSpec v2.0. Handles multiple legacy formats and infers missing information. """ # Extractor type inference based on objective names EXTRACTOR_INFERENCE = { # Zernike patterns r"wfe|zernike|opd": "zernike_opd", r"mfg|manufacturing": "zernike_opd", r"rms": "zernike_opd", # Structural patterns r"displacement|deflection|deform": "displacement", r"stress|von.?mises": "stress", r"frequency|modal|eigen": "frequency", r"mass|weight": "mass", r"stiffness": "displacement", # Stiffness computed from displacement r"temperature|thermal": "temperature", } def __init__(self, study_path: Optional[Path] = None): """ Initialize migrator. Args: study_path: Path to study directory (for inferring sim/fem paths) """ self.study_path = Path(study_path) if study_path else None self._extractor_counter = 0 self._objective_counter = 0 self._constraint_counter = 0 self._dv_counter = 0 def migrate( self, old_config: Dict[str, Any], study_name: Optional[str] = None ) -> Dict[str, Any]: """ Convert old config to AtomizerSpec v2.0. Args: old_config: Legacy config dict study_name: Override study name (defaults to config value) Returns: AtomizerSpec v2.0 dict """ # Reset counters self._extractor_counter = 0 self._objective_counter = 0 self._constraint_counter = 0 self._dv_counter = 0 # Detect config type config_type = self._detect_config_type(old_config) # Build spec spec = { "meta": self._migrate_meta(old_config, study_name), "model": self._migrate_model(old_config, config_type), "design_variables": self._migrate_design_variables(old_config), "extractors": [], "objectives": [], "constraints": [], "optimization": self._migrate_optimization(old_config, config_type), "canvas": {"edges": [], "layout_version": "2.0"} } # Migrate extractors and objectives together (they're linked) extractors, objectives = self._migrate_extractors_and_objectives(old_config, config_type) spec["extractors"] = extractors spec["objectives"] = objectives # Migrate constraints spec["constraints"] = self._migrate_constraints(old_config, spec["extractors"]) # Generate canvas edges spec["canvas"]["edges"] = self._generate_edges(spec) # Add workflow if SAT/turbo settings present if self._has_sat_settings(old_config): spec["workflow"] = self._migrate_workflow(old_config) return spec def migrate_file( self, config_path: Union[str, Path], output_path: Optional[Union[str, Path]] = None ) -> Dict[str, Any]: """ Migrate a config file and optionally save the result. Args: config_path: Path to old config file output_path: Path to save new spec (optional) Returns: AtomizerSpec v2.0 dict """ config_path = Path(config_path) if not config_path.exists(): raise MigrationError(f"Config file not found: {config_path}") with open(config_path, 'r', encoding='utf-8') as f: old_config = json.load(f) # Infer study path from config location if self.study_path is None: # Config is typically in study_dir/1_setup/ or study_dir/ if config_path.parent.name == "1_setup": self.study_path = config_path.parent.parent else: self.study_path = config_path.parent spec = self.migrate(old_config) if output_path: output_path = Path(output_path) with open(output_path, 'w', encoding='utf-8') as f: json.dump(spec, f, indent=2, ensure_ascii=False) return spec # ========================================================================= # Detection # ========================================================================= def _detect_config_type(self, config: Dict) -> str: """Detect the type of config format.""" if "extraction_method" in config or "zernike_settings" in config: return "mirror" elif "simulation_settings" in config or "extraction_settings" in config: return "structural" elif "optimization_settings" in config: return "structural" elif "extractors" in config: # Already partially in new format (canvas intent) return "canvas_intent" else: # Generic/minimal format return "generic" def _has_sat_settings(self, config: Dict) -> bool: """Check if config has SAT/turbo settings.""" return ( "sat_settings" in config or config.get("optimization", {}).get("algorithm") in ["SAT_v3", "SAT", "turbo"] ) # ========================================================================= # Meta Migration # ========================================================================= def _migrate_meta(self, config: Dict, study_name: Optional[str]) -> Dict: """Migrate metadata section.""" now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') name = study_name or config.get("study_name", "migrated_study") # Ensure snake_case name = re.sub(r'[^a-z0-9_]', '_', name.lower()) name = re.sub(r'_+', '_', name).strip('_') meta = { "version": "2.0", "created": now, "modified": now, "created_by": "migration", "modified_by": "migration", "study_name": name, "description": config.get("description", ""), "tags": [] } # Extract tags from various sources if "study_tag" in config: meta["tags"].append(config["study_tag"]) if "business_context" in config: meta["engineering_context"] = config["business_context"].get("purpose", "") # Infer tags from config type if "zernike_settings" in config: meta["tags"].extend(["mirror", "zernike"]) if "extraction_method" in config: if config["extraction_method"].get("type") == "zernike_opd": meta["tags"].append("opd") return meta # ========================================================================= # Model Migration # ========================================================================= def _migrate_model(self, config: Dict, config_type: str) -> Dict: """Migrate model section (sim/fem/prt paths).""" model = { "sim": { "path": "", "solver": "nastran" } } # Extract from nx_settings (mirror format) if "nx_settings" in config: nx = config["nx_settings"] model["sim"]["path"] = nx.get("sim_file", "") if "nx_install_path" in nx: model["nx_settings"] = { "nx_install_path": nx["nx_install_path"], "simulation_timeout_s": nx.get("simulation_timeout_s", 600) } # Extract from simulation_settings (structural format) elif "simulation_settings" in config: sim = config["simulation_settings"] model["sim"]["path"] = sim.get("sim_file", "") solver = sim.get("solver", "nastran").lower() # Normalize solver name - valid values: nastran, NX_Nastran, abaqus solver_map = {"nx": "nastran", "nx_nastran": "NX_Nastran", "nxnastran": "NX_Nastran"} model["sim"]["solver"] = solver_map.get(solver, "nastran" if solver not in ["nastran", "NX_Nastran", "abaqus"] else solver) if sim.get("solution_type"): model["sim"]["solution_type"] = sim["solution_type"] if sim.get("model_file"): model["nx_part"] = {"path": sim["model_file"]} if sim.get("fem_file"): model["fem"] = {"path": sim["fem_file"]} # Try to infer from study path if self.study_path and not model["sim"]["path"]: setup_dir = self.study_path / "1_setup" / "model" if setup_dir.exists(): for f in setup_dir.glob("*.sim"): model["sim"]["path"] = str(f.relative_to(self.study_path)) break return model # ========================================================================= # Design Variables Migration # ========================================================================= def _migrate_design_variables(self, config: Dict) -> List[Dict]: """Migrate design variables.""" dvs = [] for dv in config.get("design_variables", []): self._dv_counter += 1 # Handle different bound formats if "bounds" in dv: if isinstance(dv["bounds"], list): bounds = {"min": dv["bounds"][0], "max": dv["bounds"][1]} else: bounds = dv["bounds"] else: bounds = {"min": dv.get("min", 0), "max": dv.get("max", 1)} # Ensure min < max (fix degenerate cases) if bounds["min"] >= bounds["max"]: # Expand bounds slightly around the value val = bounds["min"] if val == 0: bounds = {"min": -0.001, "max": 0.001} else: bounds = {"min": val * 0.99, "max": val * 1.01} # Determine type dv_type = dv.get("type", "continuous") if dv_type not in ["continuous", "integer", "categorical"]: dv_type = "continuous" new_dv = { "id": f"dv_{self._dv_counter:03d}", "name": dv.get("name", f"param_{self._dv_counter}"), "expression_name": dv.get("expression_name", dv.get("parameter", dv.get("name", ""))), "type": dv_type, "bounds": bounds, "baseline": dv.get("baseline", dv.get("initial")), "units": dv.get("units", dv.get("unit", "")), "enabled": dv.get("enabled", True), "description": dv.get("description", dv.get("notes", "")), "canvas_position": {"x": 50, "y": 100 + (self._dv_counter - 1) * 80} } dvs.append(new_dv) return dvs # ========================================================================= # Extractors and Objectives Migration # ========================================================================= def _migrate_extractors_and_objectives( self, config: Dict, config_type: str ) -> Tuple[List[Dict], List[Dict]]: """ Migrate extractors and objectives together. Returns tuple of (extractors, objectives). """ extractors = [] objectives = [] # Handle mirror/zernike configs if config_type == "mirror" and "zernike_settings" in config: extractor = self._create_zernike_extractor(config) extractors.append(extractor) # Create objectives from config for obj in config.get("objectives", []): self._objective_counter += 1 objectives.append(self._create_objective(obj, extractor["id"])) # Handle structural configs elif config_type == "structural": # Create extractors based on extraction_settings if "extraction_settings" in config: extractor = self._create_structural_extractor(config) extractors.append(extractor) ext_id = extractor["id"] else: # Infer extractors from objectives ext_id = None for obj in config.get("objectives", []): self._objective_counter += 1 # Infer extractor if not yet created if ext_id is None: inferred_type = self._infer_extractor_type(obj.get("name", "")) ext_id = self._get_or_create_extractor(extractors, inferred_type, obj.get("name", "")) objectives.append(self._create_objective(obj, ext_id)) # Handle canvas intent or generic else: # Pass through existing extractors if present for ext in config.get("extractors", []): self._extractor_counter += 1 ext_copy = dict(ext) if "id" not in ext_copy: ext_copy["id"] = f"ext_{self._extractor_counter:03d}" extractors.append(ext_copy) # Create objectives for obj in config.get("objectives", []): self._objective_counter += 1 # Find or create extractor ext_id = None if extractors: ext_id = extractors[0]["id"] else: inferred_type = self._infer_extractor_type(obj.get("name", "")) ext_id = self._get_or_create_extractor(extractors, inferred_type, obj.get("name", "")) objectives.append(self._create_objective(obj, ext_id)) return extractors, objectives def _create_zernike_extractor(self, config: Dict) -> Dict: """Create a Zernike OPD extractor from config.""" self._extractor_counter += 1 zs = config.get("zernike_settings", {}) em = config.get("extraction_method", {}) # Collect all output names from objectives outputs = [] for obj in config.get("objectives", []): obj_name = obj.get("name", "") outputs.append({ "name": obj_name, "metric": "filtered_rms_nm" }) # Get outer radius with sensible default for telescope mirrors outer_radius = em.get("outer_radius", zs.get("outer_radius")) if outer_radius is None: # Default to typical M1 mirror outer radius outer_radius = 500.0 extractor = { "id": f"ext_{self._extractor_counter:03d}", "name": "Zernike WFE Extractor", "type": "zernike_opd", "builtin": True, "config": { "inner_radius_mm": em.get("inner_radius", zs.get("inner_radius", 0)), "outer_radius_mm": outer_radius, "n_modes": zs.get("n_modes", 40), "filter_low_orders": zs.get("filter_low_orders", 4), "displacement_unit": zs.get("displacement_unit", "mm"), "reference_subcase": int(zs.get("reference_subcase", 1)) }, "outputs": outputs, "canvas_position": {"x": 740, "y": 100} } return extractor def _create_structural_extractor(self, config: Dict) -> Dict: """Create extractor from extraction_settings.""" self._extractor_counter += 1 es = config.get("extraction_settings", {}) # Infer type from extractor class name extractor_class = es.get("extractor_class", "") if "stiffness" in extractor_class.lower(): ext_type = "displacement" elif "stress" in extractor_class.lower(): ext_type = "stress" elif "frequency" in extractor_class.lower(): ext_type = "frequency" else: ext_type = "displacement" # Create outputs from objectives outputs = [] for obj in config.get("objectives", []): outputs.append({ "name": obj.get("name", "output"), "metric": es.get("displacement_aggregation", "max") }) extractor = { "id": f"ext_{self._extractor_counter:03d}", "name": f"{extractor_class or 'Results'} Extractor", "type": ext_type, "builtin": True, "config": { "result_type": es.get("displacement_component", "z"), "metric": es.get("displacement_aggregation", "max") }, "outputs": outputs, "canvas_position": {"x": 740, "y": 100} } return extractor def _infer_extractor_type(self, objective_name: str) -> str: """Infer extractor type from objective name.""" name_lower = objective_name.lower() for pattern, ext_type in self.EXTRACTOR_INFERENCE.items(): if re.search(pattern, name_lower): return ext_type return "displacement" # Default def _get_or_create_extractor( self, extractors: List[Dict], ext_type: str, output_name: str ) -> str: """Get existing extractor of type or create new one.""" # Look for existing for ext in extractors: if ext.get("type") == ext_type: # Add output if not present output_names = {o["name"] for o in ext.get("outputs", [])} if output_name not in output_names: ext["outputs"].append({"name": output_name, "metric": "total"}) return ext["id"] # Create new self._extractor_counter += 1 ext_id = f"ext_{self._extractor_counter:03d}" extractor = { "id": ext_id, "name": f"{ext_type.title()} Extractor", "type": ext_type, "builtin": True, "outputs": [{"name": output_name, "metric": "total"}], "canvas_position": {"x": 740, "y": 100 + (len(extractors)) * 150} } extractors.append(extractor) return ext_id def _create_objective(self, obj: Dict, extractor_id: str) -> Dict: """Create objective from old format.""" # Normalize direction direction = obj.get("direction", obj.get("type", obj.get("goal", "minimize"))) if direction not in ["minimize", "maximize"]: direction = "minimize" if "min" in direction.lower() else "maximize" obj_name = obj.get("name", f"objective_{self._objective_counter}") return { "id": f"obj_{self._objective_counter:03d}", "name": obj.get("description", obj_name), "direction": direction, "weight": obj.get("weight", 1.0), "source": { "extractor_id": extractor_id, "output_name": obj_name }, "target": obj.get("target"), "units": obj.get("units", ""), "canvas_position": {"x": 1020, "y": 100 + (self._objective_counter - 1) * 100} } # ========================================================================= # Constraints Migration # ========================================================================= def _migrate_constraints(self, config: Dict, extractors: List[Dict]) -> List[Dict]: """Migrate constraints.""" constraints = [] for con in config.get("constraints", []): self._constraint_counter += 1 # Determine constraint type con_type = con.get("type", "hard") if con_type not in ["hard", "soft"]: # Infer from type field if con_type in ["less_than", "greater_than", "less_equal", "greater_equal"]: con_type = "hard" # Determine operator operator = con.get("operator", "<=") old_type = con.get("type", "") if "less" in old_type: operator = "<=" if "equal" in old_type else "<" elif "greater" in old_type: operator = ">=" if "equal" in old_type else ">" # Try to parse expression for threshold threshold = con.get("threshold", con.get("value")) if threshold is None and "expression" in con: # Parse from expression like "mass_kg <= 120.0" match = re.search(r'([<>=!]+)\s*([\d.]+)', con["expression"]) if match: operator = match.group(1) threshold = float(match.group(2)) # Find or create extractor for constraint con_name = con.get("name", "constraint") extractor_id = None output_name = con_name # Check if name matches existing objective (share extractor) for ext in extractors: for out in ext.get("outputs", []): if con_name.replace("_max", "").replace("_min", "") in out["name"]: extractor_id = ext["id"] output_name = out["name"] break if extractor_id: break # If no match, use first extractor or create mass extractor for mass constraints if extractor_id is None: if "mass" in con_name.lower(): # Check if mass extractor exists for ext in extractors: if ext.get("type") == "mass": extractor_id = ext["id"] break if extractor_id is None: # Create mass extractor ext_id = f"ext_{len(extractors) + 1:03d}" extractors.append({ "id": ext_id, "name": "Mass Extractor", "type": "mass", "builtin": True, "outputs": [{"name": "mass_kg", "metric": "total"}], "canvas_position": {"x": 740, "y": 100 + len(extractors) * 150} }) extractor_id = ext_id output_name = "mass_kg" elif extractors: extractor_id = extractors[0]["id"] output_name = extractors[0]["outputs"][0]["name"] if extractors[0].get("outputs") else con_name constraint = { "id": f"con_{self._constraint_counter:03d}", "name": con.get("description", con_name), "type": con_type if con_type in ["hard", "soft"] else "hard", "operator": operator, "threshold": threshold or 0, "source": { "extractor_id": extractor_id or "ext_001", "output_name": output_name }, "penalty_config": { "method": "quadratic", "weight": con.get("penalty_weight", 1000.0) }, "canvas_position": {"x": 1020, "y": 400 + (self._constraint_counter - 1) * 100} } constraints.append(constraint) return constraints # ========================================================================= # Optimization Migration # ========================================================================= def _migrate_optimization(self, config: Dict, config_type: str) -> Dict: """Migrate optimization settings.""" # Extract from different locations if "optimization" in config: opt = config["optimization"] elif "optimization_settings" in config: opt = config["optimization_settings"] else: opt = {} # Normalize algorithm name algo = opt.get("algorithm", opt.get("sampler", "TPE")) algo_map = { "tpe": "TPE", "tpesampler": "TPE", "cma-es": "CMA-ES", "cmaes": "CMA-ES", "nsga-ii": "NSGA-II", "nsgaii": "NSGA-II", "nsga2": "NSGA-II", "random": "RandomSearch", "randomsampler": "RandomSearch", "randomsearch": "RandomSearch", "sat": "SAT_v3", "sat_v3": "SAT_v3", "turbo": "SAT_v3", "gp": "GP-BO", "gp-bo": "GP-BO", "gpbo": "GP-BO", "bo": "GP-BO", "bayesian": "GP-BO" } # Valid algorithm types for schema valid_algorithms = {"TPE", "CMA-ES", "NSGA-II", "RandomSearch", "SAT_v3", "GP-BO"} algo = algo_map.get(algo.lower(), algo) # Fallback to TPE if still invalid if algo not in valid_algorithms: algo = "TPE" optimization = { "algorithm": { "type": algo, "config": {} }, "budget": { "max_trials": opt.get("n_trials", 100) }, "canvas_position": {"x": 1300, "y": 150} } # Algorithm-specific config if algo == "CMA-ES": optimization["algorithm"]["config"]["sigma0"] = opt.get("sigma0", 0.3) elif algo == "NSGA-II": optimization["algorithm"]["config"]["population_size"] = opt.get("population_size", 50) elif algo == "TPE": optimization["algorithm"]["config"]["n_startup_trials"] = opt.get("n_startup_trials", 10) # Seed if "seed" in opt: optimization["algorithm"]["config"]["seed"] = opt["seed"] # Timeout/patience if opt.get("timeout"): optimization["budget"]["max_time_hours"] = opt["timeout"] / 3600 # SAT/surrogate settings if "sat_settings" in config: sat = config["sat_settings"] optimization["surrogate"] = { "enabled": True, "type": "ensemble", "config": { "n_models": sat.get("n_ensemble_models", 10), "architecture": sat.get("hidden_dims", [256, 128]), "train_every_n_trials": sat.get("retrain_frequency", 20), "min_training_samples": sat.get("min_samples", 30) } } return optimization # ========================================================================= # Workflow Migration # ========================================================================= def _migrate_workflow(self, config: Dict) -> Dict: """Migrate SAT/turbo workflow settings.""" sat = config.get("sat_settings", {}) exploration_trials = sat.get("min_samples", 30) total_trials = config.get("optimization", {}).get("n_trials", 100) return { "stages": [ { "id": "stage_exploration", "name": "Design Space Exploration", "algorithm": "RandomSearch", "trials": exploration_trials, "purpose": "Build initial training data for surrogate" }, { "id": "stage_optimization", "name": "Surrogate-Assisted Optimization", "algorithm": "SAT_v3", "trials": total_trials - exploration_trials, "purpose": "Neural-accelerated optimization" } ], "transitions": [ { "from": "stage_exploration", "to": "stage_optimization", "condition": f"trial_count >= {exploration_trials}" } ] } # ========================================================================= # Canvas Edge Generation # ========================================================================= def _generate_edges(self, spec: Dict) -> List[Dict]: """Generate canvas edges connecting nodes.""" edges = [] # DVs -> model for dv in spec.get("design_variables", []): edges.append({"source": dv["id"], "target": "model"}) # model -> solver edges.append({"source": "model", "target": "solver"}) # solver -> extractors for ext in spec.get("extractors", []): edges.append({"source": "solver", "target": ext["id"]}) # extractors -> objectives for obj in spec.get("objectives", []): ext_id = obj.get("source", {}).get("extractor_id") if ext_id: edges.append({"source": ext_id, "target": obj["id"]}) # extractors -> constraints for con in spec.get("constraints", []): ext_id = con.get("source", {}).get("extractor_id") if ext_id: edges.append({"source": ext_id, "target": con["id"]}) # objectives -> optimization for obj in spec.get("objectives", []): edges.append({"source": obj["id"], "target": "optimization"}) # constraints -> optimization for con in spec.get("constraints", []): edges.append({"source": con["id"], "target": "optimization"}) return edges # ============================================================================ # Convenience Functions # ============================================================================ def migrate_config( old_config: Dict[str, Any], study_name: Optional[str] = None ) -> Dict[str, Any]: """ Migrate old config dict to AtomizerSpec v2.0. Args: old_config: Legacy config dict study_name: Override study name Returns: AtomizerSpec v2.0 dict """ migrator = SpecMigrator() return migrator.migrate(old_config, study_name) def migrate_config_file( config_path: Union[str, Path], output_path: Optional[Union[str, Path]] = None ) -> Dict[str, Any]: """ Migrate a config file to AtomizerSpec v2.0. Args: config_path: Path to old config file output_path: Path to save new spec (optional) Returns: AtomizerSpec v2.0 dict """ migrator = SpecMigrator() return migrator.migrate_file(config_path, output_path)