From 4749944a4886d984967233e71a91c7ae7f54f7b9 Mon Sep 17 00:00:00 2001 From: Anto01 Date: Tue, 20 Jan 2026 20:51:25 -0500 Subject: [PATCH] feat: Add extract endpoint to use existing FEA results without re-solving - scan_existing_fea_results() scans study for existing OP2/F06 files - Introspection now returns existing_fea_results with recommended source - New POST /nx/extract endpoint runs extractors on existing OP2 files - Supports: displacement, stress, frequency, mass_bdf, zernike - No NX solve needed - uses PyNastran and Atomizer extractors directly This allows users to test extractors and get physics data from existing simulation results without re-running the FEA solver. --- .../backend/api/routes/optimization.py | 267 +++++++++++++++++- 1 file changed, 266 insertions(+), 1 deletion(-) diff --git a/atomizer-dashboard/backend/api/routes/optimization.py b/atomizer-dashboard/backend/api/routes/optimization.py index 874d3d26..1568b70c 100644 --- a/atomizer-dashboard/backend/api/routes/optimization.py +++ b/atomizer-dashboard/backend/api/routes/optimization.py @@ -4599,6 +4599,101 @@ def scan_nx_file_dependencies(model_dir: Path) -> dict: return result +def scan_existing_fea_results(study_dir: Path) -> dict: + """ + Scan study for existing FEA result files (.op2, .f06, .bdf/.dat). + + Checks: + 1. 1_setup/model/ - results from manual NX runs + 2. 2_iterations/iter0/ or trial_baseline/ - baseline results + 3. 2_iterations/iter{N}/ - latest completed iteration + + Returns dict with paths to existing results that can be used for extraction. + """ + result = { + "has_results": False, + "sources": [], # List of {location, op2, f06, bdf, timestamp} + "recommended": None, # Best source to use + } + + # Check model directory + model_dir = study_dir / "1_setup" / "model" + if model_dir.exists(): + op2_files = list(model_dir.glob("*.op2")) + f06_files = list(model_dir.glob("*.f06")) + dat_files = list(model_dir.glob("*.dat")) + + if op2_files or f06_files: + source = { + "location": "model", + "path": str(model_dir), + "op2": [f.name for f in op2_files], + "f06": [f.name for f in f06_files], + "bdf": [f.name for f in dat_files], + } + if op2_files: + source["timestamp"] = op2_files[0].stat().st_mtime + result["sources"].append(source) + + # Check iterations directory + iter_dir = study_dir / "2_iterations" + if iter_dir.exists(): + # Check for baseline (iter0 or trial_baseline) + for baseline_name in ["iter0", "trial_baseline"]: + baseline_dir = iter_dir / baseline_name + if baseline_dir.exists(): + op2_files = list(baseline_dir.glob("*.op2")) + f06_files = list(baseline_dir.glob("*.f06")) + dat_files = list(baseline_dir.glob("*.dat")) + + if op2_files or f06_files: + source = { + "location": f"iterations/{baseline_name}", + "path": str(baseline_dir), + "op2": [f.name for f in op2_files], + "f06": [f.name for f in f06_files], + "bdf": [f.name for f in dat_files], + } + if op2_files: + source["timestamp"] = op2_files[0].stat().st_mtime + result["sources"].append(source) + + # Find latest iteration with results + iter_folders = sorted( + [d for d in iter_dir.iterdir() if d.is_dir() and d.name.startswith("iter")], + key=lambda d: int(d.name.replace("iter", "")) + if d.name.replace("iter", "").isdigit() + else 0, + reverse=True, + ) + + for iter_folder in iter_folders[:3]: # Check top 3 most recent + op2_files = list(iter_folder.glob("*.op2")) + if op2_files: + f06_files = list(iter_folder.glob("*.f06")) + dat_files = list(iter_folder.glob("*.dat")) + source = { + "location": f"iterations/{iter_folder.name}", + "path": str(iter_folder), + "op2": [f.name for f in op2_files], + "f06": [f.name for f in f06_files], + "bdf": [f.name for f in dat_files], + "timestamp": op2_files[0].stat().st_mtime, + } + result["sources"].append(source) + break # Found latest with results + + # Determine if we have usable results + if result["sources"]: + result["has_results"] = True + # Recommend the most recent source with OP2 + sources_with_op2 = [s for s in result["sources"] if s.get("op2")] + if sources_with_op2: + result["recommended"] = max(sources_with_op2, key=lambda s: s.get("timestamp", 0)) + + return result + + @router.get("/studies/{study_id}/nx/introspect") async def introspect_nx_model(study_id: str, force: bool = False): """ @@ -4671,6 +4766,9 @@ async def introspect_nx_model(study_id: str, force: bool = False): # Scan file dependencies (fast, doesn't need NX) file_deps = scan_nx_file_dependencies(model_dir) + # Scan for existing FEA results (OP2, F06, etc.) - no NX needed! + existing_results = scan_existing_fea_results(study_dir) + # Run introspection try: from optimization_engine.extractors.introspect_part import introspect_part @@ -4680,6 +4778,9 @@ async def introspect_nx_model(study_id: str, force: bool = False): # Add file dependencies to result result["file_dependencies"] = file_deps + # Add existing FEA results info + result["existing_fea_results"] = existing_results + # Cache results with open(cache_file, "w") as f: json.dump(result, f, indent=2) @@ -4687,7 +4788,7 @@ async def introspect_nx_model(study_id: str, force: bool = False): return {"study_id": study_id, "cached": False, "introspection": result} except ImportError: - # If introspect_part not available, return just file dependencies + # If introspect_part not available, return just file dependencies and existing results return { "study_id": study_id, "cached": False, @@ -4695,6 +4796,7 @@ async def introspect_nx_model(study_id: str, force: bool = False): "success": False, "error": "introspect_part module not available", "file_dependencies": file_deps, + "existing_fea_results": existing_results, }, } except Exception as e: @@ -4937,6 +5039,169 @@ async def run_baseline_simulation(study_id: str): raise HTTPException(status_code=500, detail=f"Failed to run baseline: {str(e)}") +class ExtractRequest(BaseModel): + op2_path: Optional[str] = None + extractors: Optional[List[str]] = None + + +@router.post("/studies/{study_id}/nx/extract") +async def extract_from_existing_results( + study_id: str, + request: ExtractRequest = None, +): + """ + Extract physics data from existing FEA result files (OP2/F06). + + NO NX SOLVE NEEDED - uses PyNastran and Atomizer extractors directly + on existing result files. + + Args: + study_id: Study identifier + request.op2_path: Optional specific OP2 file path. If not provided, uses latest available. + request.extractors: List of extractor IDs to run. If not provided, runs defaults. + Options: ["displacement", "stress", "frequency", "mass_bdf", "zernike"] + + Returns: + JSON with extracted physics data from each extractor + """ + from pathlib import Path as P + + # Handle optional request body + op2_path = request.op2_path if request else None + extractors = request.extractors if request else None + + try: + study_dir = resolve_study_path(study_id) + if not study_dir.exists(): + raise HTTPException(status_code=404, detail=f"Study {study_id} not found") + + # Find OP2 file to use + target_op2 = None + target_dir = None + + if op2_path: + # Use specified path + target_op2 = P(op2_path) + if not target_op2.exists(): + raise HTTPException(status_code=404, detail=f"OP2 file not found: {op2_path}") + target_dir = target_op2.parent + else: + # Auto-detect from existing results + existing = scan_existing_fea_results(study_dir) + if not existing["has_results"]: + raise HTTPException( + status_code=404, + detail="No existing FEA results found. Run baseline first or provide op2_path.", + ) + + recommended = existing["recommended"] + if not recommended or not recommended.get("op2"): + raise HTTPException(status_code=404, detail="No OP2 files found in study") + + target_dir = P(recommended["path"]) + target_op2 = target_dir / recommended["op2"][0] + + print(f"[extract] Using OP2: {target_op2}") + + # Find associated BDF/DAT file + bdf_file = None + for ext in [".bdf", ".dat"]: + candidate = target_op2.with_suffix(ext) + if candidate.exists(): + bdf_file = candidate + break + + # Run extractors + results = { + "study_id": study_id, + "op2_file": str(target_op2), + "bdf_file": str(bdf_file) if bdf_file else None, + "extractions": {}, + "errors": [], + } + + # Default extractors if not specified + if not extractors: + extractors = ["displacement", "mass_bdf"] + + # Import extractors as needed + # Note: extractors are modules with same-named functions inside + for ext_id in extractors: + try: + if ext_id == "displacement": + from optimization_engine.extractors.extract_displacement import ( + extract_displacement, + ) + + disp = extract_displacement(str(target_op2)) + results["extractions"]["displacement"] = disp + + elif ext_id == "stress": + from optimization_engine.extractors.extract_solid_stress import ( + extract_solid_stress, + ) + + stress = extract_solid_stress(str(target_op2)) + results["extractions"]["stress"] = stress + + elif ext_id == "frequency": + from optimization_engine.extractors.extract_frequency import extract_frequency + + freq = extract_frequency(str(target_op2)) + results["extractions"]["frequency"] = freq + + elif ext_id == "mass_bdf": + if bdf_file: + from optimization_engine.extractors.extract_mass_from_bdf import ( + extract_mass_from_bdf, + ) + + mass = extract_mass_from_bdf(str(bdf_file)) + results["extractions"]["mass_bdf"] = mass + else: + results["errors"].append("mass_bdf: No BDF file found") + + elif ext_id == "zernike": + from optimization_engine.extractors.extract_zernike_opd import ( + ZernikeOPDExtractor, + ) + + extractor = ZernikeOPDExtractor( + str(target_op2), + bdf_path=str(bdf_file) if bdf_file else None, + n_modes=36, + ) + # Extract for subcase 1 (or first available) + try: + zernike_result = extractor.extract_subcase("1") + results["extractions"]["zernike"] = { + "subcase": "1", + "rms_nm": zernike_result.get("filtered_rms_nm"), + "peak_valley_nm": zernike_result.get("peak_to_valley_nm"), + "coefficients": zernike_result.get("coefficients", [])[:10] + if zernike_result.get("coefficients") + else [], + } + except Exception as ze: + results["errors"].append(f"zernike: {str(ze)}") + + else: + results["errors"].append(f"Unknown extractor: {ext_id}") + + except ImportError as ie: + results["errors"].append(f"{ext_id}: Module not available - {str(ie)}") + except Exception as e: + results["errors"].append(f"{ext_id}: {str(e)}") + + results["success"] = len(results["extractions"]) > 0 + return results + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Extraction failed: {str(e)}") + + @router.get("/studies/{study_id}/nx/expressions") async def get_nx_expressions(study_id: str): """