diff --git a/optimization_engine/extractors/extract_zernike_trajectory.py b/optimization_engine/extractors/extract_zernike_trajectory.py index 87d7e549..5dffa24f 100644 --- a/optimization_engine/extractors/extract_zernike_trajectory.py +++ b/optimization_engine/extractors/extract_zernike_trajectory.py @@ -275,9 +275,30 @@ class ZernikeTrajectoryExtractor: print(f"[ZernikeTrajectory] Loading geometry: {self.bdf_path}") self.node_coords = read_node_geometry(self.bdf_path) - # Get available subcases + # Get available subcases and build ID -> angle mapping self.available_subcases = list(self.op2.displacements.keys()) + + # Try to extract angle labels from subcase metadata + self.subcase_to_angle = {} + self.subcase_label_map = {} # Maps angle (as string) to subcase ID + for sc_id in self.available_subcases: + disp = self.op2.displacements[sc_id] + if hasattr(disp, 'label') and disp.label: + # Label format: "90 SUBCASE 1" - extract first number + label_str = disp.label.strip().split()[0] + try: + angle = float(label_str) + self.subcase_to_angle[sc_id] = angle + self.subcase_label_map[label_str] = sc_id + except ValueError: + pass + + # Load displacements using the standard function (returns dict keyed by label) + self.displacements = extract_displacements_by_subcase(self.op2_path) + print(f"[ZernikeTrajectory] Available subcases: {self.available_subcases}") + print(f"[ZernikeTrajectory] Subcase->Angle mapping: {self.subcase_to_angle}") + print(f"[ZernikeTrajectory] Displacement labels: {list(self.displacements.keys())}") def extract_subcase(self, subcase_id: int) -> Dict[str, Any]: """ @@ -289,23 +310,33 @@ class ZernikeTrajectoryExtractor: Returns: Dict with coefficients and metrics """ - # Get displacements - displacements = extract_displacements_by_subcase( - self.op2, subcase_id, self.node_coords - ) + # Get the label for this subcase (angle as string) + angle = self.subcase_to_angle.get(subcase_id) + if angle is not None: + label = str(int(angle)) # e.g., "20", "40", etc. + else: + label = str(subcase_id) + + if label not in self.displacements: + raise ValueError(f"No displacements found for subcase {subcase_id} (label '{label}'). Available: {list(self.displacements.keys())}") + + data = self.displacements[label] + node_ids = data['node_ids'] + disp = data['disp'] # Shape: (n_nodes, 6) - dx, dy, dz, rx, ry, rz - if displacements is None or len(displacements) == 0: - raise ValueError(f"No displacements found for subcase {subcase_id}") + # Filter to nodes we have geometry for + valid_mask = np.array([int(nid) in self.node_coords for nid in node_ids]) + node_ids = node_ids[valid_mask] + disp = disp[valid_mask] + + # Extract coordinates + x = np.array([self.node_coords[int(nid)][0] for nid in node_ids]) + y = np.array([self.node_coords[int(nid)][1] for nid in node_ids]) + z = np.array([self.node_coords[int(nid)][2] for nid in node_ids]) - # Extract coordinates and displacements - node_ids = list(displacements.keys()) - x = np.array([self.node_coords[nid][0] for nid in node_ids]) - y = np.array([self.node_coords[nid][1] for nid in node_ids]) - z = np.array([self.node_coords[nid][2] for nid in node_ids]) - - dx = np.array([displacements[nid][0] for nid in node_ids]) - dy = np.array([displacements[nid][1] for nid in node_ids]) - dz = np.array([displacements[nid][2] for nid in node_ids]) + dx = disp[:, 0] + dy = disp[:, 1] + dz = disp[:, 2] # Compute WFE (surface error * 2 for reflective surface) # For now, use Z-displacement (can add OPD correction if focal_length provided) @@ -324,27 +355,29 @@ class ZernikeTrajectoryExtractor: # Convert to WFE in nm wfe = surface_error * 2.0 * self.nm_scale - # Fit Zernike coefficients (excluding first 4 modes by convention) - coeffs = compute_zernike_coefficients( + # Fit Zernike coefficients (include first 4 modes for fitting, we'll filter later) + coeffs, R_max = compute_zernike_coefficients( x + dx if self.focal_length else x, # Use deformed coords if OPD y + dy if self.focal_length else y, wfe, n_modes=self.n_modes + 4, # Include modes 1-4 for fitting - filter_orders=0 # Don't filter, we'll handle it ) # Return coefficients starting from mode 5 (index 4) + # Modes 1-4 (piston, tip, tilt, defocus) are excluded from optimization return { 'coefficients': coeffs[4:self.n_modes + 4], # Modes 5 to n_modes+4 'raw_coefficients': coeffs, 'n_nodes': len(node_ids), 'wfe_array': wfe, + 'R_max': R_max, } def extract_trajectory( self, subcases: Optional[List[int]] = None, angles: Optional[List[float]] = None, + exclude_angles: Optional[List[float]] = None, ) -> Dict[str, Any]: """ Extract full trajectory analysis across multiple subcases. @@ -352,18 +385,33 @@ class ZernikeTrajectoryExtractor: Args: subcases: List of subcase IDs. If None, auto-detect from OP2. angles: Corresponding elevation angles in degrees. - If None, assumes subcase IDs are angles (e.g., 20, 40, 60). + If None, uses auto-detected labels from OP2. + exclude_angles: Angles to exclude (e.g., [90] to skip manufacturing). Returns: Dict with trajectory analysis results """ + exclude_angles = exclude_angles or [90.0] # Default: exclude 90° manufacturing + # Determine subcases and angles - if subcases is None: - subcases = sorted(self.available_subcases) - - if angles is None: - # Assume subcase IDs are angles - angles = [float(sc) for sc in subcases] + if subcases is None and angles is None: + # Auto-detect from OP2 labels + if self.subcase_to_angle: + # Filter out excluded angles and sort by angle + valid_pairs = [ + (sc, ang) for sc, ang in self.subcase_to_angle.items() + if ang not in exclude_angles + ] + valid_pairs.sort(key=lambda x: x[1]) # Sort by angle + subcases = [p[0] for p in valid_pairs] + angles = [p[1] for p in valid_pairs] + else: + # Fallback: use subcase IDs as angles + subcases = sorted(self.available_subcases) + angles = [float(sc) for sc in subcases] + elif subcases is not None and angles is None: + # Use provided subcases, try to get angles from mapping + angles = [self.subcase_to_angle.get(sc, float(sc)) for sc in subcases] if len(subcases) != len(angles): raise ValueError("subcases and angles must have same length")