Phase 1 - Panel Management System: - Create usePanelStore.ts for centralized panel state management - Add PanelContainer.tsx for draggable floating panels - Create FloatingIntrospectionPanel.tsx (persistent, doesn't disappear on node click) - Create ResultsPanel.tsx for trial result details - Refactor NodeConfigPanelV2 to use panel store for introspection - Integrate PanelContainer into CanvasView Phase 2 - Pre-run Validation: - Create specValidator.ts with comprehensive validation rules - Add ValidationPanel (enhanced version with error navigation) - Add Validate button to SpecRenderer with status indicator - Block run if validation fails - Check for: design vars, objectives, extractors, bounds, connections Phase 3 - Error Handling & Recovery: - Create ErrorPanel.tsx for displaying optimization errors - Add error classification (nx_crash, solver_fail, extractor_error, etc.) - Add recovery suggestions based on error type - Update status endpoint to return error info - Add _get_study_error_info helper to check error_status.json and DB - Integrate error detection into status polling Documentation: - Add CANVAS_ROBUSTNESS_PLAN.md with full implementation plan
5836 lines
215 KiB
Python
5836 lines
215 KiB
Python
"""
|
|
Optimization API endpoints
|
|
Handles study status, history retrieval, and control operations
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, UploadFile, File, Form
|
|
from fastapi.responses import JSONResponse, FileResponse
|
|
from pydantic import BaseModel
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
import json
|
|
import sys
|
|
import sqlite3
|
|
import shutil
|
|
import subprocess
|
|
import psutil
|
|
import signal
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# Add project root to path
|
|
sys.path.append(str(Path(__file__).parent.parent.parent.parent.parent))
|
|
|
|
router = APIRouter()
|
|
|
|
# Base studies directory
|
|
STUDIES_DIR = Path(__file__).parent.parent.parent.parent.parent / "studies"
|
|
|
|
|
|
def get_results_dir(study_dir: Path) -> Path:
|
|
"""Get the results directory for a study, supporting both 2_results and 3_results."""
|
|
results_dir = study_dir / "2_results"
|
|
if not results_dir.exists():
|
|
results_dir = study_dir / "3_results"
|
|
return results_dir
|
|
|
|
|
|
def resolve_study_path(study_id: str) -> Path:
|
|
"""Find study folder by scanning all topic directories.
|
|
|
|
Supports nested folder structure: studies/Topic/study_name/
|
|
Study ID can be:
|
|
- Short name (e.g., 'm1_mirror_adaptive_V14') - scans all topic folders
|
|
- Full nested path (e.g., 'M1_Mirror/m1_mirror_cost_reduction_lateral')
|
|
|
|
Returns the full path to the study directory.
|
|
Raises HTTPException 404 if not found.
|
|
"""
|
|
# Handle nested path format (e.g., "M1_Mirror/m1_mirror_cost_reduction_lateral")
|
|
if "/" in study_id:
|
|
# Try with forward slashes
|
|
nested_path = STUDIES_DIR / study_id
|
|
if nested_path.exists() and nested_path.is_dir():
|
|
if _is_valid_study_dir(nested_path):
|
|
return nested_path
|
|
# Try with backslashes (Windows path)
|
|
nested_path = STUDIES_DIR / study_id.replace("/", "\\")
|
|
if nested_path.exists() and nested_path.is_dir():
|
|
if _is_valid_study_dir(nested_path):
|
|
return nested_path
|
|
|
|
# First check direct path (backwards compatibility for flat structure)
|
|
direct_path = STUDIES_DIR / study_id
|
|
if direct_path.exists() and direct_path.is_dir():
|
|
if _is_valid_study_dir(direct_path):
|
|
return direct_path
|
|
|
|
# Scan topic folders for nested structure
|
|
for topic_dir in STUDIES_DIR.iterdir():
|
|
if topic_dir.is_dir() and not topic_dir.name.startswith("."):
|
|
study_dir = topic_dir / study_id
|
|
if study_dir.exists() and study_dir.is_dir():
|
|
if _is_valid_study_dir(study_dir):
|
|
return study_dir
|
|
|
|
raise HTTPException(status_code=404, detail=f"Study not found: {study_id}")
|
|
|
|
|
|
def _is_valid_study_dir(study_dir: Path) -> bool:
|
|
"""Check if a directory is a valid study directory."""
|
|
return (
|
|
(study_dir / "1_setup").exists()
|
|
or (study_dir / "optimization_config.json").exists()
|
|
or (study_dir / "atomizer_spec.json").exists()
|
|
)
|
|
|
|
|
|
def get_study_topic(study_dir: Path) -> Optional[str]:
|
|
"""Get the topic folder name for a study, or None if in root."""
|
|
# Check if parent is a topic folder (not the root studies dir)
|
|
parent = study_dir.parent
|
|
if parent != STUDIES_DIR and parent.parent == STUDIES_DIR:
|
|
return parent.name
|
|
return None
|
|
|
|
|
|
def is_optimization_running(study_id: str) -> bool:
|
|
"""Check if an optimization process is currently running for a study.
|
|
|
|
Looks for Python processes running run_optimization.py with the study_id in the command line.
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
except HTTPException:
|
|
return False
|
|
|
|
for proc in psutil.process_iter(["pid", "name", "cmdline", "cwd"]):
|
|
try:
|
|
cmdline = proc.info.get("cmdline") or []
|
|
cmdline_str = " ".join(cmdline) if cmdline else ""
|
|
|
|
# Check if this is a Python process running run_optimization.py for this study
|
|
if "python" in cmdline_str.lower() and "run_optimization" in cmdline_str:
|
|
if study_id in cmdline_str or str(study_dir) in cmdline_str:
|
|
return True
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
|
|
return False
|
|
|
|
|
|
def get_accurate_study_status(
|
|
study_id: str, trial_count: int, total_trials: int, has_db: bool
|
|
) -> str:
|
|
"""Determine accurate study status based on multiple factors.
|
|
|
|
Status can be:
|
|
- not_started: No database or 0 trials
|
|
- running: Active process found
|
|
- paused: Has trials but no active process and not completed
|
|
- completed: Reached trial target
|
|
- failed: Has error indicators (future enhancement)
|
|
|
|
Args:
|
|
study_id: The study identifier
|
|
trial_count: Number of completed trials
|
|
total_trials: Target number of trials from config
|
|
has_db: Whether the study database exists
|
|
|
|
Returns:
|
|
Status string: "not_started", "running", "paused", or "completed"
|
|
"""
|
|
# No database or no trials = not started
|
|
if not has_db or trial_count == 0:
|
|
return "not_started"
|
|
|
|
# Check if we've reached the target
|
|
if trial_count >= total_trials:
|
|
return "completed"
|
|
|
|
# Check if process is actively running
|
|
if is_optimization_running(study_id):
|
|
return "running"
|
|
|
|
# Has trials but not running and not complete = paused
|
|
return "paused"
|
|
|
|
|
|
def _get_study_error_info(study_dir: Path, results_dir: Path) -> dict:
|
|
"""Get error information from study if any errors occurred.
|
|
|
|
Checks for:
|
|
1. error_status.json file (written by optimization process on error)
|
|
2. Failed trials in database
|
|
3. Error logs
|
|
|
|
Returns:
|
|
dict with keys: error, error_details, error_timestamp, current_trial, status_override
|
|
"""
|
|
error_info = {}
|
|
|
|
# Check for error_status.json (written by optimization process)
|
|
error_file = results_dir / "error_status.json"
|
|
if error_file.exists():
|
|
try:
|
|
with open(error_file) as f:
|
|
error_data = json.load(f)
|
|
error_info["error"] = error_data.get("error", "Unknown error")
|
|
error_info["error_details"] = error_data.get("details")
|
|
error_info["error_timestamp"] = error_data.get("timestamp")
|
|
error_info["current_trial"] = error_data.get("trial")
|
|
|
|
# If error is recent (within last 5 minutes), set status to failed
|
|
if error_data.get("timestamp"):
|
|
error_age = time.time() - error_data["timestamp"]
|
|
if error_age < 300: # 5 minutes
|
|
error_info["status_override"] = "failed"
|
|
except Exception:
|
|
pass
|
|
|
|
# Check for failed trials in database
|
|
study_db = results_dir / "study.db"
|
|
if study_db.exists() and "error" not in error_info:
|
|
try:
|
|
conn = sqlite3.connect(str(study_db), timeout=2.0)
|
|
cursor = conn.cursor()
|
|
|
|
# Check for FAIL state trials (Optuna uses 'FAIL' not 'FAILED')
|
|
cursor.execute("""
|
|
SELECT number, datetime_complete
|
|
FROM trials
|
|
WHERE state = 'FAIL'
|
|
ORDER BY datetime_complete DESC
|
|
LIMIT 1
|
|
""")
|
|
failed = cursor.fetchone()
|
|
|
|
if failed:
|
|
trial_number, fail_time = failed
|
|
error_info["error"] = f"Trial {trial_number} failed"
|
|
error_info["current_trial"] = trial_number
|
|
# Parse datetime to timestamp if available
|
|
if fail_time:
|
|
try:
|
|
from datetime import datetime
|
|
|
|
dt = datetime.fromisoformat(fail_time)
|
|
error_info["error_timestamp"] = dt.timestamp()
|
|
except Exception:
|
|
error_info["error_timestamp"] = int(time.time())
|
|
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# Check optimization log for errors
|
|
log_file = results_dir / "optimization.log"
|
|
if log_file.exists() and "error" not in error_info:
|
|
try:
|
|
# Read last 50 lines of log
|
|
with open(log_file, "r") as f:
|
|
lines = f.readlines()[-50:]
|
|
|
|
for line in reversed(lines):
|
|
line_lower = line.lower()
|
|
if "error" in line_lower or "failed" in line_lower or "exception" in line_lower:
|
|
error_info["error"] = line.strip()[:200] # Truncate long messages
|
|
error_info["error_timestamp"] = int(log_file.stat().st_mtime)
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
return error_info
|
|
|
|
|
|
def _load_study_info(study_dir: Path, topic: Optional[str] = None) -> Optional[dict]:
|
|
"""Load study info from a study directory. Returns None if not a valid study."""
|
|
# Look for optimization config (check multiple locations)
|
|
config_file = study_dir / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
if not config_file.exists():
|
|
return None
|
|
|
|
# Load config
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
|
|
# Check if results directory exists (support both 2_results and 3_results)
|
|
results_dir = study_dir / "2_results"
|
|
if not results_dir.exists():
|
|
results_dir = study_dir / "3_results"
|
|
|
|
# Check for Optuna database (Protocol 10) or JSON history (other protocols)
|
|
study_db = results_dir / "study.db"
|
|
history_file = results_dir / "optimization_history_incremental.json"
|
|
|
|
trial_count = 0
|
|
best_value = None
|
|
has_db = False
|
|
|
|
# Protocol 10: Read from Optuna SQLite database
|
|
if study_db.exists():
|
|
has_db = True
|
|
try:
|
|
# Use timeout to avoid blocking on locked databases
|
|
conn = sqlite3.connect(str(study_db), timeout=2.0)
|
|
cursor = conn.cursor()
|
|
|
|
# Get trial count and status
|
|
cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'COMPLETE'")
|
|
trial_count = cursor.fetchone()[0]
|
|
|
|
# Get best trial (for single-objective, or first objective for multi-objective)
|
|
if trial_count > 0:
|
|
cursor.execute("""
|
|
SELECT value FROM trial_values
|
|
WHERE trial_id IN (
|
|
SELECT trial_id FROM trials WHERE state = 'COMPLETE'
|
|
)
|
|
ORDER BY value ASC
|
|
LIMIT 1
|
|
""")
|
|
result = cursor.fetchone()
|
|
if result:
|
|
best_value = result[0]
|
|
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Failed to read Optuna database for {study_dir.name}: {e}")
|
|
|
|
# Legacy: Read from JSON history
|
|
elif history_file.exists():
|
|
has_db = True
|
|
with open(history_file) as f:
|
|
history = json.load(f)
|
|
trial_count = len(history)
|
|
if history:
|
|
# Find best trial
|
|
best_trial = min(history, key=lambda x: x["objective"])
|
|
best_value = best_trial["objective"]
|
|
|
|
# Get total trials from config (supports both formats)
|
|
total_trials = (
|
|
config.get("optimization_settings", {}).get("n_trials")
|
|
or config.get("optimization", {}).get("n_trials")
|
|
or config.get("trials", {}).get("n_trials", 50)
|
|
)
|
|
|
|
# Get accurate status using process detection
|
|
status = get_accurate_study_status(study_dir.name, trial_count, total_trials, has_db)
|
|
|
|
# Get creation date from directory or config modification time
|
|
created_at = None
|
|
try:
|
|
# First try to get from database (most accurate)
|
|
if study_db.exists():
|
|
created_at = datetime.fromtimestamp(study_db.stat().st_mtime).isoformat()
|
|
elif config_file.exists():
|
|
created_at = datetime.fromtimestamp(config_file.stat().st_mtime).isoformat()
|
|
else:
|
|
created_at = datetime.fromtimestamp(study_dir.stat().st_ctime).isoformat()
|
|
except:
|
|
created_at = None
|
|
|
|
# Get last modified time
|
|
last_modified = None
|
|
try:
|
|
if study_db.exists():
|
|
last_modified = datetime.fromtimestamp(study_db.stat().st_mtime).isoformat()
|
|
elif history_file.exists():
|
|
last_modified = datetime.fromtimestamp(history_file.stat().st_mtime).isoformat()
|
|
except:
|
|
last_modified = None
|
|
|
|
return {
|
|
"id": study_dir.name,
|
|
"name": study_dir.name.replace("_", " ").title(),
|
|
"topic": topic, # NEW: topic field for grouping
|
|
"status": status,
|
|
"progress": {"current": trial_count, "total": total_trials},
|
|
"best_value": best_value,
|
|
"target": config.get("target", {}).get("value"),
|
|
"path": str(study_dir),
|
|
"created_at": created_at,
|
|
"last_modified": last_modified,
|
|
}
|
|
|
|
|
|
@router.get("/studies")
|
|
async def list_studies():
|
|
"""List all available optimization studies.
|
|
|
|
Supports both flat and nested folder structures:
|
|
- Flat: studies/study_name/
|
|
- Nested: studies/Topic/study_name/
|
|
|
|
Returns studies with 'topic' field for frontend grouping.
|
|
"""
|
|
try:
|
|
studies = []
|
|
if not STUDIES_DIR.exists():
|
|
return {"studies": []}
|
|
|
|
for item in STUDIES_DIR.iterdir():
|
|
if not item.is_dir():
|
|
continue
|
|
if item.name.startswith("."):
|
|
continue
|
|
|
|
# Check if this is a study (flat structure) or a topic folder (nested structure)
|
|
is_study = (item / "1_setup").exists() or (item / "optimization_config.json").exists()
|
|
|
|
if is_study:
|
|
# Flat structure: study directly in studies/
|
|
study_info = _load_study_info(item, topic=None)
|
|
if study_info:
|
|
studies.append(study_info)
|
|
else:
|
|
# Nested structure: this might be a topic folder
|
|
# Check if it contains study subdirectories
|
|
for sub_item in item.iterdir():
|
|
if not sub_item.is_dir():
|
|
continue
|
|
if sub_item.name.startswith("."):
|
|
continue
|
|
|
|
# Check if this subdirectory is a study
|
|
sub_is_study = (sub_item / "1_setup").exists() or (
|
|
sub_item / "optimization_config.json"
|
|
).exists()
|
|
if sub_is_study:
|
|
study_info = _load_study_info(sub_item, topic=item.name)
|
|
if study_info:
|
|
studies.append(study_info)
|
|
|
|
return {"studies": studies}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to list studies: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/status")
|
|
async def get_study_status(study_id: str):
|
|
"""Get detailed status of a specific study"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Load config (check multiple locations)
|
|
config_file = study_dir / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
|
|
# Check for results (support both 2_results and 3_results)
|
|
results_dir = get_results_dir(study_dir)
|
|
study_db = results_dir / "study.db"
|
|
history_file = results_dir / "optimization_history_incremental.json"
|
|
|
|
# Protocol 10: Read from Optuna database
|
|
if study_db.exists():
|
|
conn = sqlite3.connect(str(study_db))
|
|
cursor = conn.cursor()
|
|
|
|
# Get trial counts by state
|
|
cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'COMPLETE'")
|
|
trial_count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'PRUNED'")
|
|
pruned_count = cursor.fetchone()[0]
|
|
|
|
# Get best trial (first objective for multi-objective)
|
|
best_trial = None
|
|
if trial_count > 0:
|
|
cursor.execute("""
|
|
SELECT t.trial_id, t.number, tv.value
|
|
FROM trials t
|
|
JOIN trial_values tv ON t.trial_id = tv.trial_id
|
|
WHERE t.state = 'COMPLETE'
|
|
ORDER BY tv.value ASC
|
|
LIMIT 1
|
|
""")
|
|
result = cursor.fetchone()
|
|
if result:
|
|
trial_id, trial_number, best_value = result
|
|
|
|
# Get parameters for this trial
|
|
cursor.execute(
|
|
"""
|
|
SELECT param_name, param_value
|
|
FROM trial_params
|
|
WHERE trial_id = ?
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
params = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
best_trial = {
|
|
"trial_number": trial_number,
|
|
"objective": best_value,
|
|
"design_variables": params,
|
|
"results": {"first_frequency": best_value},
|
|
}
|
|
|
|
conn.close()
|
|
|
|
total_trials = config.get("optimization_settings", {}).get("n_trials", 50)
|
|
status = get_accurate_study_status(study_id, trial_count, total_trials, True)
|
|
|
|
# Check for error status
|
|
error_info = _get_study_error_info(study_dir, results_dir)
|
|
|
|
return {
|
|
"study_id": study_id,
|
|
"status": error_info.get("status_override") or status,
|
|
"progress": {
|
|
"current": trial_count,
|
|
"total": total_trials,
|
|
"percentage": (trial_count / total_trials * 100) if total_trials > 0 else 0,
|
|
},
|
|
"best_trial": best_trial,
|
|
"pruned_trials": pruned_count,
|
|
"config": config,
|
|
"error": error_info.get("error"),
|
|
"error_details": error_info.get("error_details"),
|
|
"error_timestamp": error_info.get("error_timestamp"),
|
|
"current_trial": error_info.get("current_trial"),
|
|
}
|
|
|
|
# Legacy: Read from JSON history
|
|
if not history_file.exists():
|
|
return {
|
|
"study_id": study_id,
|
|
"status": "not_started",
|
|
"progress": {"current": 0, "total": config.get("trials", {}).get("n_trials", 50)},
|
|
"config": config,
|
|
}
|
|
|
|
with open(history_file) as f:
|
|
history = json.load(f)
|
|
|
|
trial_count = len(history)
|
|
total_trials = config.get("trials", {}).get("n_trials", 50)
|
|
|
|
# Find best trial
|
|
best_trial = None
|
|
if history:
|
|
best_trial = min(history, key=lambda x: x["objective"])
|
|
|
|
# Check for pruning data
|
|
pruning_file = results_dir / "pruning_history.json"
|
|
pruned_count = 0
|
|
if pruning_file.exists():
|
|
with open(pruning_file) as f:
|
|
pruning_history = json.load(f)
|
|
pruned_count = len(pruning_history)
|
|
|
|
status = "completed" if trial_count >= total_trials else "running"
|
|
|
|
# Check for error status
|
|
error_info = _get_study_error_info(study_dir, results_dir)
|
|
|
|
return {
|
|
"study_id": study_id,
|
|
"status": error_info.get("status_override") or status,
|
|
"progress": {
|
|
"current": trial_count,
|
|
"total": total_trials,
|
|
"percentage": (trial_count / total_trials * 100) if total_trials > 0 else 0,
|
|
},
|
|
"best_trial": best_trial,
|
|
"pruned_trials": pruned_count,
|
|
"config": config,
|
|
"error": error_info.get("error"),
|
|
"error_details": error_info.get("error_details"),
|
|
"error_timestamp": error_info.get("error_timestamp"),
|
|
"current_trial": error_info.get("current_trial"),
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get study status: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/history")
|
|
async def get_optimization_history(study_id: str, limit: Optional[int] = None):
|
|
"""Get optimization history (all trials)"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
results_dir = get_results_dir(study_dir)
|
|
study_db = results_dir / "study.db"
|
|
history_file = results_dir / "optimization_history_incremental.json"
|
|
|
|
# Protocol 10: Read from Optuna database
|
|
if study_db.exists():
|
|
conn = sqlite3.connect(str(study_db))
|
|
cursor = conn.cursor()
|
|
|
|
# Get all completed trials FROM ALL STUDIES in the database
|
|
# This handles adaptive optimizations that create multiple Optuna studies
|
|
# (e.g., v11_fea for FEA trials, v11_iter1_nn for NN trials, etc.)
|
|
cursor.execute(
|
|
"""
|
|
SELECT t.trial_id, t.number, t.datetime_start, t.datetime_complete, s.study_name
|
|
FROM trials t
|
|
JOIN studies s ON t.study_id = s.study_id
|
|
WHERE t.state = 'COMPLETE'
|
|
ORDER BY t.datetime_start DESC
|
|
"""
|
|
+ (f" LIMIT {limit}" if limit else "")
|
|
)
|
|
|
|
trial_rows = cursor.fetchall()
|
|
|
|
trials = []
|
|
for trial_id, trial_num, start_time, end_time, study_name in trial_rows:
|
|
# Get objectives for this trial
|
|
cursor.execute(
|
|
"""
|
|
SELECT value
|
|
FROM trial_values
|
|
WHERE trial_id = ?
|
|
ORDER BY objective
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
values = [row[0] for row in cursor.fetchall()]
|
|
|
|
# Get parameters for this trial
|
|
cursor.execute(
|
|
"""
|
|
SELECT param_name, param_value
|
|
FROM trial_params
|
|
WHERE trial_id = ?
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
params = {}
|
|
for param_name, param_value in cursor.fetchall():
|
|
try:
|
|
params[param_name] = float(param_value) if param_value is not None else None
|
|
except (ValueError, TypeError):
|
|
params[param_name] = param_value
|
|
|
|
# Get user attributes (extracted results: mass, frequency, stress, displacement, etc.)
|
|
cursor.execute(
|
|
"""
|
|
SELECT key, value_json
|
|
FROM trial_user_attributes
|
|
WHERE trial_id = ?
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
user_attrs = {}
|
|
for key, value_json in cursor.fetchall():
|
|
try:
|
|
user_attrs[key] = json.loads(value_json)
|
|
except (ValueError, TypeError):
|
|
user_attrs[key] = value_json
|
|
|
|
# Extract ALL numeric metrics from user_attrs for results
|
|
# This ensures multi-objective studies show all Zernike metrics, RMS values, etc.
|
|
results = {}
|
|
excluded_keys = {"design_vars", "constraint_satisfied", "constraint_violations"}
|
|
for key, val in user_attrs.items():
|
|
if key in excluded_keys:
|
|
continue
|
|
# Include numeric values and lists of numbers
|
|
if isinstance(val, (int, float)):
|
|
results[key] = val
|
|
elif (
|
|
isinstance(val, list) and len(val) > 0 and isinstance(val[0], (int, float))
|
|
):
|
|
# For lists, store as-is (e.g., Zernike coefficients)
|
|
results[key] = val
|
|
elif key == "objectives" and isinstance(val, dict):
|
|
# Extract nested objectives dict (Zernike multi-objective studies)
|
|
for obj_key, obj_val in val.items():
|
|
if isinstance(obj_val, (int, float)):
|
|
results[obj_key] = obj_val
|
|
# Fallback to first frequency from objectives if available
|
|
if not results and len(values) > 0:
|
|
results["first_frequency"] = values[0]
|
|
|
|
# CRITICAL: Extract design_vars from user_attrs if stored there
|
|
# The optimization code does: trial.set_user_attr("design_vars", design_vars)
|
|
design_vars_from_attrs = user_attrs.get("design_vars", {})
|
|
|
|
# Merge with params (prefer user_attrs design_vars if available)
|
|
final_design_vars = (
|
|
{**params, **design_vars_from_attrs} if design_vars_from_attrs else params
|
|
)
|
|
|
|
# Extract source for FEA vs NN differentiation
|
|
source = user_attrs.get("source", "FEA") # Default to FEA for legacy studies
|
|
|
|
# Get iter_num from user_attrs if available (this is the actual iteration folder number)
|
|
iter_num = user_attrs.get("iter_num", None)
|
|
|
|
# Use iter_num if available, otherwise use trial_id as unique identifier
|
|
# trial_id is unique across all studies in the database
|
|
unique_trial_num = iter_num if iter_num is not None else trial_id
|
|
|
|
trials.append(
|
|
{
|
|
"trial_number": unique_trial_num,
|
|
"trial_id": trial_id, # Keep original for debugging
|
|
"optuna_trial_num": trial_num, # Keep original Optuna trial number
|
|
"objective": values[0] if len(values) > 0 else None, # Primary objective
|
|
"objectives": values
|
|
if len(values) > 1
|
|
else None, # All objectives for multi-objective
|
|
"design_variables": final_design_vars, # Use merged design vars
|
|
"results": results,
|
|
"user_attrs": user_attrs, # Include all user attributes
|
|
"source": source, # FEA or NN
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
"study_name": study_name, # Include for debugging
|
|
}
|
|
)
|
|
|
|
conn.close()
|
|
return {"trials": trials}
|
|
|
|
# Legacy: Read from JSON history
|
|
if not history_file.exists():
|
|
return {"trials": []}
|
|
|
|
with open(history_file) as f:
|
|
history = json.load(f)
|
|
|
|
# Apply limit if specified
|
|
if limit:
|
|
history = history[-limit:]
|
|
|
|
return {"trials": history}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get history: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/pruning")
|
|
async def get_pruning_history(study_id: str):
|
|
"""Get pruning diagnostics from Optuna database or legacy JSON file"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
results_dir = get_results_dir(study_dir)
|
|
study_db = results_dir / "study.db"
|
|
pruning_file = results_dir / "pruning_history.json"
|
|
|
|
# Protocol 10+: Read from Optuna database
|
|
if study_db.exists():
|
|
conn = sqlite3.connect(str(study_db))
|
|
cursor = conn.cursor()
|
|
|
|
# Get all pruned trials from Optuna database
|
|
cursor.execute("""
|
|
SELECT t.trial_id, t.number, t.datetime_start, t.datetime_complete
|
|
FROM trials t
|
|
WHERE t.state = 'PRUNED'
|
|
ORDER BY t.number DESC
|
|
""")
|
|
pruned_rows = cursor.fetchall()
|
|
|
|
pruned_trials = []
|
|
for trial_id, trial_num, start_time, end_time in pruned_rows:
|
|
# Get parameters for this trial
|
|
cursor.execute(
|
|
"""
|
|
SELECT param_name, param_value
|
|
FROM trial_params
|
|
WHERE trial_id = ?
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
params = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
# Get user attributes (may contain pruning cause)
|
|
cursor.execute(
|
|
"""
|
|
SELECT key, value_json
|
|
FROM trial_user_attributes
|
|
WHERE trial_id = ?
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
user_attrs = {}
|
|
for key, value_json in cursor.fetchall():
|
|
try:
|
|
user_attrs[key] = json.loads(value_json)
|
|
except (ValueError, TypeError):
|
|
user_attrs[key] = value_json
|
|
|
|
pruned_trials.append(
|
|
{
|
|
"trial_number": trial_num,
|
|
"params": params,
|
|
"pruning_cause": user_attrs.get("pruning_cause", "Unknown"),
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
}
|
|
)
|
|
|
|
conn.close()
|
|
return {"pruned_trials": pruned_trials, "count": len(pruned_trials)}
|
|
|
|
# Legacy: Read from JSON history
|
|
if not pruning_file.exists():
|
|
return {"pruned_trials": [], "count": 0}
|
|
|
|
with open(pruning_file) as f:
|
|
pruning_history = json.load(f)
|
|
|
|
return {"pruned_trials": pruning_history, "count": len(pruning_history)}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get pruning history: {str(e)}")
|
|
|
|
|
|
def _infer_objective_unit(objective: Dict) -> str:
|
|
"""Infer unit from objective name and description"""
|
|
name = objective.get("name", "").lower()
|
|
desc = objective.get("description", "").lower()
|
|
|
|
# Common unit patterns
|
|
if "frequency" in name or "hz" in desc:
|
|
return "Hz"
|
|
elif "stiffness" in name or "n/mm" in desc:
|
|
return "N/mm"
|
|
elif "mass" in name or "kg" in desc:
|
|
return "kg"
|
|
elif "stress" in name or "mpa" in desc or "pa" in desc:
|
|
return "MPa"
|
|
elif "displacement" in name or "mm" in desc:
|
|
return "mm"
|
|
elif "force" in name or "newton" in desc:
|
|
return "N"
|
|
elif "%" in desc or "percent" in desc:
|
|
return "%"
|
|
|
|
# Check if unit is explicitly mentioned in description (e.g., "(N/mm)")
|
|
import re
|
|
|
|
unit_match = re.search(r"\(([^)]+)\)", desc)
|
|
if unit_match:
|
|
return unit_match.group(1)
|
|
|
|
return "" # No unit found
|
|
|
|
|
|
@router.get("/studies/{study_id}/metadata")
|
|
async def get_study_metadata(study_id: str):
|
|
"""Read optimization_config.json for objectives, design vars, units (Protocol 13)"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Load config (check multiple locations)
|
|
config_file = study_dir / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
|
|
if not config_file.exists():
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Config file not found for study {study_id}"
|
|
)
|
|
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
|
|
# Enhance objectives with inferred units if not present
|
|
objectives = config.get("objectives", [])
|
|
for obj in objectives:
|
|
if "unit" not in obj or not obj["unit"]:
|
|
obj["unit"] = _infer_objective_unit(obj)
|
|
|
|
# Get sampler/algorithm info
|
|
optimization = config.get("optimization", {})
|
|
algorithm = optimization.get("algorithm", "TPE")
|
|
|
|
# Map algorithm names to Optuna sampler names for frontend display
|
|
sampler_map = {
|
|
"CMA-ES": "CmaEsSampler",
|
|
"cma-es": "CmaEsSampler",
|
|
"cmaes": "CmaEsSampler",
|
|
"TPE": "TPESampler",
|
|
"tpe": "TPESampler",
|
|
"NSGA-II": "NSGAIISampler",
|
|
"nsga-ii": "NSGAIISampler",
|
|
"NSGA-III": "NSGAIIISampler",
|
|
"Random": "RandomSampler",
|
|
}
|
|
sampler = sampler_map.get(algorithm, algorithm)
|
|
|
|
return {
|
|
"objectives": objectives,
|
|
"design_variables": config.get("design_variables", []),
|
|
"constraints": config.get("constraints", []),
|
|
"study_name": config.get("study_name", study_id),
|
|
"description": config.get("description", ""),
|
|
"sampler": sampler,
|
|
"algorithm": algorithm,
|
|
"n_trials": optimization.get("n_trials", 100),
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get study metadata: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/optimizer-state")
|
|
async def get_optimizer_state(study_id: str):
|
|
"""
|
|
Read realtime optimizer state from multiple sources:
|
|
1. dashboard_state.json (written by optimization scripts)
|
|
2. intelligent_optimizer/optimizer_state.json (Protocol 13)
|
|
3. Computed state from database (fallback)
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
results_dir = get_results_dir(study_dir)
|
|
|
|
# Priority 1: Check for dashboard_state.json (written by optimization script)
|
|
dashboard_state_file = results_dir / "dashboard_state.json"
|
|
if dashboard_state_file.exists():
|
|
try:
|
|
with open(dashboard_state_file) as f:
|
|
state = json.load(f)
|
|
return {"available": True, "source": "dashboard_state", **state}
|
|
except Exception:
|
|
pass
|
|
|
|
# Priority 2: Check intelligent_optimizer state
|
|
imso_state_file = results_dir / "intelligent_optimizer" / "optimizer_state.json"
|
|
if imso_state_file.exists():
|
|
try:
|
|
with open(imso_state_file) as f:
|
|
state = json.load(f)
|
|
return {"available": True, "source": "intelligent_optimizer", **state}
|
|
except Exception:
|
|
pass
|
|
|
|
# Priority 3: Compute state from config and database
|
|
config_file = study_dir / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
|
|
if config_file.exists():
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
|
|
# Extract sampler info
|
|
optimizer_config = config.get("optimizer", {})
|
|
sampler_name = optimizer_config.get("sampler", "TPESampler")
|
|
n_trials = optimizer_config.get("n_trials", config.get("n_trials", 100))
|
|
|
|
# Get trial count from database
|
|
study_db = results_dir / "study.db"
|
|
completed_trials = 0
|
|
best_value = None
|
|
|
|
if study_db.exists():
|
|
try:
|
|
conn = sqlite3.connect(str(study_db), timeout=2.0)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'COMPLETE'")
|
|
completed_trials = cursor.fetchone()[0]
|
|
|
|
# Get best value
|
|
cursor.execute("""
|
|
SELECT MIN(value) FROM trial_values
|
|
WHERE trial_id IN (SELECT trial_id FROM trials WHERE state = 'COMPLETE')
|
|
""")
|
|
result = cursor.fetchone()
|
|
if result and result[0] is not None:
|
|
best_value = result[0]
|
|
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# Determine phase
|
|
if completed_trials == 0:
|
|
phase = "initializing"
|
|
phase_description = "Waiting for first trial"
|
|
phase_progress = 0.0
|
|
elif completed_trials < 10:
|
|
phase = "exploration"
|
|
phase_description = "Initial random sampling"
|
|
phase_progress = completed_trials / 10
|
|
elif completed_trials < n_trials * 0.5:
|
|
phase = "exploitation"
|
|
phase_description = "Focusing on promising regions"
|
|
phase_progress = (completed_trials - 10) / (n_trials * 0.5 - 10)
|
|
elif completed_trials < n_trials * 0.9:
|
|
phase = "refinement"
|
|
phase_description = "Fine-tuning best solutions"
|
|
phase_progress = (completed_trials - n_trials * 0.5) / (n_trials * 0.4)
|
|
else:
|
|
phase = "convergence"
|
|
phase_description = "Near completion"
|
|
phase_progress = (completed_trials - n_trials * 0.9) / (n_trials * 0.1)
|
|
|
|
# Build objectives list from config
|
|
objectives = []
|
|
if "objectives" in config:
|
|
for obj in config["objectives"]:
|
|
objectives.append(
|
|
{
|
|
"name": obj.get("name", "objective"),
|
|
"direction": obj.get("direction", "minimize"),
|
|
"current_best": best_value if len(objectives) == 0 else None,
|
|
"unit": obj.get("units", ""),
|
|
}
|
|
)
|
|
|
|
# Sampler descriptions
|
|
sampler_descriptions = {
|
|
"TPESampler": "Tree-structured Parzen Estimator - Bayesian optimization using kernel density estimation",
|
|
"NSGAIISampler": "NSGA-II - Multi-objective genetic algorithm with non-dominated sorting",
|
|
"NSGAIIISampler": "NSGA-III - Reference-point based multi-objective optimization",
|
|
"CmaEsSampler": "CMA-ES - Covariance Matrix Adaptation Evolution Strategy",
|
|
"RandomSampler": "Random sampling - Uniform random search across parameter space",
|
|
"QMCSampler": "Quasi-Monte Carlo - Low-discrepancy sequence sampling",
|
|
}
|
|
|
|
return {
|
|
"available": True,
|
|
"source": "computed",
|
|
"phase": phase,
|
|
"phase_description": phase_description,
|
|
"phase_progress": min(1.0, max(0.0, phase_progress)),
|
|
"current_strategy": f"{sampler_name} sampling",
|
|
"sampler": {
|
|
"name": sampler_name,
|
|
"description": sampler_descriptions.get(
|
|
sampler_name, f"{sampler_name} optimization algorithm"
|
|
),
|
|
},
|
|
"objectives": objectives,
|
|
"plan": {
|
|
"total_phases": 4,
|
|
"current_phase": {
|
|
"initializing": 0,
|
|
"exploration": 1,
|
|
"exploitation": 2,
|
|
"refinement": 3,
|
|
"convergence": 4,
|
|
}.get(phase, 0),
|
|
"phases": ["Exploration", "Exploitation", "Refinement", "Convergence"],
|
|
},
|
|
"completed_trials": completed_trials,
|
|
"total_trials": n_trials,
|
|
}
|
|
|
|
return {"available": False}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get optimizer state: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/pareto-front")
|
|
async def get_pareto_front(study_id: str):
|
|
"""Get Pareto-optimal solutions for multi-objective studies (Protocol 13)"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
results_dir = get_results_dir(study_dir)
|
|
study_db = results_dir / "study.db"
|
|
|
|
if not study_db.exists():
|
|
return {"is_multi_objective": False, "pareto_front": []}
|
|
|
|
# Import optuna here to avoid loading it for all endpoints
|
|
import optuna
|
|
|
|
storage = optuna.storages.RDBStorage(f"sqlite:///{study_db}")
|
|
study = optuna.load_study(study_name=study_id, storage=storage)
|
|
|
|
# Check if multi-objective
|
|
if len(study.directions) == 1:
|
|
return {"is_multi_objective": False, "pareto_front": []}
|
|
|
|
# Get Pareto front
|
|
pareto_trials = study.best_trials
|
|
|
|
return {
|
|
"is_multi_objective": True,
|
|
"pareto_front": [
|
|
{
|
|
"trial_number": t.number,
|
|
"values": t.values,
|
|
"params": t.params,
|
|
"user_attrs": dict(t.user_attrs),
|
|
"constraint_satisfied": t.user_attrs.get("constraint_satisfied", True),
|
|
}
|
|
for t in pareto_trials
|
|
],
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get Pareto front: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/nn-pareto-front")
|
|
async def get_nn_pareto_front(study_id: str):
|
|
"""Get NN surrogate Pareto front from nn_pareto_front.json"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
results_dir = get_results_dir(study_dir)
|
|
nn_pareto_file = results_dir / "nn_pareto_front.json"
|
|
|
|
if not nn_pareto_file.exists():
|
|
return {"has_nn_results": False, "pareto_front": []}
|
|
|
|
with open(nn_pareto_file) as f:
|
|
nn_pareto = json.load(f)
|
|
|
|
# Transform to match Trial interface format
|
|
transformed = []
|
|
for trial in nn_pareto:
|
|
transformed.append(
|
|
{
|
|
"trial_number": trial.get("trial_number"),
|
|
"values": [trial.get("mass"), trial.get("frequency")],
|
|
"params": trial.get("params", {}),
|
|
"user_attrs": {
|
|
"source": "NN",
|
|
"feasible": trial.get("feasible", False),
|
|
"predicted_stress": trial.get("predicted_stress"),
|
|
"predicted_displacement": trial.get("predicted_displacement"),
|
|
"mass": trial.get("mass"),
|
|
"frequency": trial.get("frequency"),
|
|
},
|
|
"constraint_satisfied": trial.get("feasible", False),
|
|
"source": "NN",
|
|
}
|
|
)
|
|
|
|
return {"has_nn_results": True, "pareto_front": transformed, "count": len(transformed)}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get NN Pareto front: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/nn-state")
|
|
async def get_nn_optimization_state(study_id: str):
|
|
"""Get NN optimization state/summary from nn_optimization_state.json"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
results_dir = get_results_dir(study_dir)
|
|
nn_state_file = results_dir / "nn_optimization_state.json"
|
|
|
|
if not nn_state_file.exists():
|
|
return {"has_nn_state": False}
|
|
|
|
with open(nn_state_file) as f:
|
|
state = json.load(f)
|
|
|
|
return {
|
|
"has_nn_state": True,
|
|
"total_fea_count": state.get("total_fea_count", 0),
|
|
"total_nn_count": state.get("total_nn_count", 0),
|
|
"pareto_front_size": state.get("pareto_front_size", 0),
|
|
"best_mass": state.get("best_mass"),
|
|
"best_frequency": state.get("best_frequency"),
|
|
"timestamp": state.get("timestamp"),
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get NN state: {str(e)}")
|
|
|
|
|
|
@router.post("/studies")
|
|
async def create_study(
|
|
config: str = Form(...),
|
|
prt_file: Optional[UploadFile] = File(None),
|
|
sim_file: Optional[UploadFile] = File(None),
|
|
fem_file: Optional[UploadFile] = File(None),
|
|
):
|
|
"""
|
|
Create a new optimization study
|
|
Accepts:
|
|
- config: JSON string with study configuration
|
|
- prt_file: NX part file (optional if using existing study)
|
|
- sim_file: NX simulation file (optional)
|
|
- fem_file: NX FEM file (optional)
|
|
"""
|
|
try:
|
|
# Parse config
|
|
config_data = json.loads(config)
|
|
study_name = config_data.get("name") # Changed from study_name to name to match frontend
|
|
|
|
if not study_name:
|
|
raise HTTPException(status_code=400, detail="name is required in config")
|
|
|
|
# Create study directory structure
|
|
study_dir = STUDIES_DIR / study_name
|
|
if study_dir.exists():
|
|
raise HTTPException(status_code=400, detail=f"Study {study_name} already exists")
|
|
|
|
setup_dir = study_dir / "1_setup"
|
|
model_dir = setup_dir / "model"
|
|
results_dir = study_dir / "2_results"
|
|
|
|
setup_dir.mkdir(parents=True, exist_ok=True)
|
|
model_dir.mkdir(parents=True, exist_ok=True)
|
|
results_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save config file
|
|
config_file = setup_dir / "optimization_config.json"
|
|
with open(config_file, "w") as f:
|
|
json.dump(config_data, f, indent=2)
|
|
|
|
# Save uploaded files
|
|
files_saved = {}
|
|
if prt_file:
|
|
prt_path = model_dir / prt_file.filename
|
|
with open(prt_path, "wb") as f:
|
|
content = await prt_file.read()
|
|
f.write(content)
|
|
files_saved["prt_file"] = str(prt_path)
|
|
|
|
if sim_file:
|
|
sim_path = model_dir / sim_file.filename
|
|
with open(sim_path, "wb") as f:
|
|
content = await sim_file.read()
|
|
f.write(content)
|
|
files_saved["sim_file"] = str(sim_path)
|
|
|
|
if fem_file:
|
|
fem_path = model_dir / fem_file.filename
|
|
with open(fem_path, "wb") as f:
|
|
content = await fem_file.read()
|
|
f.write(content)
|
|
files_saved["fem_file"] = str(fem_path)
|
|
|
|
return JSONResponse(
|
|
status_code=201,
|
|
content={
|
|
"status": "created",
|
|
"study_id": study_name,
|
|
"study_path": str(study_dir),
|
|
"config_path": str(config_file),
|
|
"files_saved": files_saved,
|
|
"message": f"Study {study_name} created successfully. Ready to run optimization.",
|
|
},
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(status_code=400, detail=f"Invalid JSON in config: {str(e)}")
|
|
except Exception as e:
|
|
# Clean up on error
|
|
if "study_dir" in locals() and study_dir.exists():
|
|
shutil.rmtree(study_dir)
|
|
raise HTTPException(status_code=500, detail=f"Failed to create study: {str(e)}")
|
|
|
|
|
|
@router.post("/studies/{study_id}/convert-mesh")
|
|
async def convert_study_mesh(study_id: str):
|
|
"""
|
|
Convert study mesh to GLTF for 3D visualization
|
|
Creates a web-viewable 3D model with FEA results as vertex colors
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Import mesh converter
|
|
sys.path.append(str(Path(__file__).parent.parent.parent.parent.parent))
|
|
from optimization_engine.nx.mesh_converter import convert_study_mesh
|
|
|
|
# Convert mesh
|
|
output_path = convert_study_mesh(study_dir)
|
|
|
|
if output_path and output_path.exists():
|
|
return {
|
|
"status": "success",
|
|
"gltf_path": str(output_path),
|
|
"gltf_url": f"/api/optimization/studies/{study_id}/mesh/model.gltf",
|
|
"metadata_url": f"/api/optimization/studies/{study_id}/mesh/model.json",
|
|
"message": "Mesh converted successfully",
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Mesh conversion failed")
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to convert mesh: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/mesh/{filename}")
|
|
async def get_mesh_file(study_id: str, filename: str):
|
|
"""
|
|
Serve GLTF mesh files and metadata
|
|
Supports .gltf, .bin, and .json files
|
|
"""
|
|
try:
|
|
# Validate filename to prevent directory traversal
|
|
if ".." in filename or "/" in filename or "\\" in filename:
|
|
raise HTTPException(status_code=400, detail="Invalid filename")
|
|
|
|
study_dir = resolve_study_path(study_id)
|
|
visualization_dir = study_dir / "3_visualization"
|
|
|
|
file_path = visualization_dir / filename
|
|
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail=f"File {filename} not found")
|
|
|
|
# Determine content type
|
|
suffix = file_path.suffix.lower()
|
|
content_types = {
|
|
".gltf": "model/gltf+json",
|
|
".bin": "application/octet-stream",
|
|
".json": "application/json",
|
|
".glb": "model/gltf-binary",
|
|
}
|
|
|
|
content_type = content_types.get(suffix, "application/octet-stream")
|
|
|
|
return FileResponse(path=str(file_path), media_type=content_type, filename=filename)
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"File not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to serve mesh file: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/optuna-url")
|
|
async def get_optuna_dashboard_url(study_id: str):
|
|
"""
|
|
Get the Optuna dashboard URL for a specific study.
|
|
Returns the URL to access the study in Optuna dashboard.
|
|
|
|
The Optuna dashboard should be started with a relative path from the Atomizer root:
|
|
sqlite:///studies/{study_id}/2_results/study.db
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
results_dir = get_results_dir(study_dir)
|
|
study_db = results_dir / "study.db"
|
|
|
|
if not study_db.exists():
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No Optuna database found for study {study_id}"
|
|
)
|
|
|
|
# Get the study name from the database (may differ from folder name)
|
|
import optuna
|
|
|
|
storage = optuna.storages.RDBStorage(f"sqlite:///{study_db}")
|
|
studies = storage.get_all_studies()
|
|
|
|
if not studies:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No Optuna study found in database for {study_id}"
|
|
)
|
|
|
|
# Use the actual study name from the database
|
|
optuna_study_name = studies[0].study_name
|
|
|
|
# Return URL info for the frontend
|
|
# The dashboard should be running on port 8081 with the correct database
|
|
return {
|
|
"study_id": study_id,
|
|
"optuna_study_name": optuna_study_name,
|
|
"database_path": f"studies/{study_id}/2_results/study.db",
|
|
"dashboard_url": f"http://localhost:8081/dashboard/studies/{studies[0]._study_id}",
|
|
"dashboard_base": "http://localhost:8081",
|
|
"note": "Optuna dashboard must be started with: sqlite:///studies/{study_id}/2_results/study.db",
|
|
}
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get Optuna URL: {str(e)}")
|
|
|
|
|
|
@router.post("/studies/{study_id}/generate-report")
|
|
async def generate_report(
|
|
study_id: str, format: str = "markdown", include_llm_summary: bool = False
|
|
):
|
|
"""
|
|
Generate an optimization report in the specified format
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
format: Report format ('markdown', 'html', or 'pdf')
|
|
include_llm_summary: Whether to include LLM-generated executive summary
|
|
|
|
Returns:
|
|
Information about the generated report including download URL
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Validate format
|
|
valid_formats = ["markdown", "md", "html", "pdf"]
|
|
if format.lower() not in valid_formats:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid format. Must be one of: {', '.join(valid_formats)}",
|
|
)
|
|
|
|
# Import report generator
|
|
sys.path.append(str(Path(__file__).parent.parent.parent.parent.parent))
|
|
from optimization_engine.future.report_generator import generate_study_report
|
|
|
|
# Generate report
|
|
output_path = generate_study_report(
|
|
study_dir=study_dir,
|
|
output_format=format.lower(),
|
|
include_llm_summary=include_llm_summary,
|
|
)
|
|
|
|
if output_path and output_path.exists():
|
|
# Get relative path for URL
|
|
rel_path = output_path.relative_to(study_dir)
|
|
|
|
return {
|
|
"status": "success",
|
|
"format": format,
|
|
"file_path": str(output_path),
|
|
"download_url": f"/api/optimization/studies/{study_id}/reports/{output_path.name}",
|
|
"file_size": output_path.stat().st_size,
|
|
"message": f"Report generated successfully in {format} format",
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Report generation failed")
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to generate report: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/reports/{filename}")
|
|
async def download_report(study_id: str, filename: str):
|
|
"""
|
|
Download a generated report file
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
filename: Report filename
|
|
|
|
Returns:
|
|
Report file for download
|
|
"""
|
|
try:
|
|
# Validate filename to prevent directory traversal
|
|
if ".." in filename or "/" in filename or "\\" in filename:
|
|
raise HTTPException(status_code=400, detail="Invalid filename")
|
|
|
|
study_dir = resolve_study_path(study_id)
|
|
results_dir = get_results_dir(study_dir)
|
|
|
|
file_path = results_dir / filename
|
|
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail=f"Report file {filename} not found")
|
|
|
|
# Determine content type
|
|
suffix = file_path.suffix.lower()
|
|
content_types = {
|
|
".md": "text/markdown",
|
|
".html": "text/html",
|
|
".pdf": "application/pdf",
|
|
".json": "application/json",
|
|
}
|
|
|
|
content_type = content_types.get(suffix, "application/octet-stream")
|
|
|
|
return FileResponse(
|
|
path=str(file_path),
|
|
media_type=content_type,
|
|
filename=filename,
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"},
|
|
)
|
|
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Report file not found")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to download report: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/console")
|
|
async def get_console_output(study_id: str, lines: int = 200):
|
|
"""
|
|
Get the latest console output/logs from the optimization run
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
lines: Number of lines to return (default: 200)
|
|
|
|
Returns:
|
|
JSON with console output lines
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Look for log files in various locations
|
|
log_paths = [
|
|
study_dir / "optimization.log",
|
|
study_dir / "2_results" / "optimization.log",
|
|
study_dir / "3_results" / "optimization.log",
|
|
study_dir / "run.log",
|
|
]
|
|
|
|
log_content = None
|
|
log_path_used = None
|
|
|
|
for log_path in log_paths:
|
|
if log_path.exists():
|
|
log_path_used = log_path
|
|
break
|
|
|
|
if log_path_used is None:
|
|
return {
|
|
"lines": [],
|
|
"total_lines": 0,
|
|
"log_file": None,
|
|
"message": "No log file found. Optimization may not have started yet.",
|
|
}
|
|
|
|
# Read the last N lines efficiently
|
|
with open(log_path_used, "r", encoding="utf-8", errors="replace") as f:
|
|
all_lines = f.readlines()
|
|
|
|
# Get last N lines
|
|
last_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
|
|
|
|
# Clean up lines (remove trailing newlines)
|
|
last_lines = [line.rstrip("\n\r") for line in last_lines]
|
|
|
|
return {
|
|
"lines": last_lines,
|
|
"total_lines": len(all_lines),
|
|
"displayed_lines": len(last_lines),
|
|
"log_file": str(log_path_used),
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to read console output: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/report")
|
|
async def get_study_report(study_id: str):
|
|
"""
|
|
Get the STUDY_REPORT.md file content for a study
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with the markdown content
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Look for STUDY_REPORT.md in the study root
|
|
report_path = study_dir / "STUDY_REPORT.md"
|
|
|
|
if not report_path.exists():
|
|
raise HTTPException(status_code=404, detail="No STUDY_REPORT.md found for this study")
|
|
|
|
with open(report_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
return {"content": content, "path": str(report_path), "study_id": study_id}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to read study report: {str(e)}")
|
|
|
|
|
|
# ============================================================================
|
|
# Study README and Config Endpoints
|
|
# ============================================================================
|
|
|
|
|
|
@router.get("/studies/{study_id}/readme")
|
|
async def get_study_readme(study_id: str):
|
|
"""
|
|
Get the README.md file content for a study (from 1_setup folder)
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with the markdown content
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Look for README.md in various locations
|
|
readme_paths = [
|
|
study_dir / "README.md",
|
|
study_dir / "1_setup" / "README.md",
|
|
study_dir / "readme.md",
|
|
]
|
|
|
|
readme_content = None
|
|
readme_path = None
|
|
|
|
for path in readme_paths:
|
|
if path.exists():
|
|
readme_path = path
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
readme_content = f.read()
|
|
break
|
|
|
|
if readme_content is None:
|
|
# Generate a basic README from config if none exists
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "optimization_config.json"
|
|
|
|
if config_file.exists():
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
|
|
readme_content = f"""# {config.get("study_name", study_id)}
|
|
|
|
{config.get("description", "No description available.")}
|
|
|
|
## Design Variables
|
|
{chr(10).join([f"- **{dv['name']}**: {dv.get('min', '?')} - {dv.get('max', '?')} {dv.get('units', '')}" for dv in config.get("design_variables", [])])}
|
|
|
|
## Objectives
|
|
{chr(10).join([f"- **{obj['name']}**: {obj.get('description', '')} ({obj.get('direction', 'minimize')})" for obj in config.get("objectives", [])])}
|
|
"""
|
|
else:
|
|
readme_content = f"# {study_id}\n\nNo README or configuration found for this study."
|
|
|
|
return {
|
|
"content": readme_content,
|
|
"path": str(readme_path) if readme_path else None,
|
|
"study_id": study_id,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to read README: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/image/{image_path:path}")
|
|
async def get_study_image(study_id: str, image_path: str):
|
|
"""
|
|
Serve images from a study directory.
|
|
|
|
Supports images in:
|
|
- study_dir/image.png
|
|
- study_dir/1_setup/image.png
|
|
- study_dir/3_results/image.png
|
|
- study_dir/assets/image.png
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
image_path: Relative path to the image within the study
|
|
|
|
Returns:
|
|
FileResponse with the image
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Sanitize path to prevent directory traversal
|
|
image_path = image_path.replace("..", "").lstrip("/")
|
|
|
|
# Try multiple locations for the image
|
|
possible_paths = [
|
|
study_dir / image_path,
|
|
study_dir / "1_setup" / image_path,
|
|
study_dir / "3_results" / image_path,
|
|
study_dir / "2_results" / image_path,
|
|
study_dir / "assets" / image_path,
|
|
]
|
|
|
|
image_file = None
|
|
for path in possible_paths:
|
|
if path.exists() and path.is_file():
|
|
image_file = path
|
|
break
|
|
|
|
if image_file is None:
|
|
raise HTTPException(status_code=404, detail=f"Image not found: {image_path}")
|
|
|
|
# Determine media type
|
|
suffix = image_file.suffix.lower()
|
|
media_types = {
|
|
".png": "image/png",
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".gif": "image/gif",
|
|
".svg": "image/svg+xml",
|
|
".webp": "image/webp",
|
|
}
|
|
media_type = media_types.get(suffix, "application/octet-stream")
|
|
|
|
return FileResponse(image_file, media_type=media_type)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to serve image: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id:path}/config")
|
|
async def get_study_config(study_id: str):
|
|
"""
|
|
Get the study configuration - reads from atomizer_spec.json (v2.0) first,
|
|
falls back to legacy optimization_config.json if not found.
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with the complete configuration in a unified format
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Priority 1: atomizer_spec.json (v2.0 unified format)
|
|
spec_file = study_dir / "atomizer_spec.json"
|
|
if spec_file.exists():
|
|
with open(spec_file) as f:
|
|
spec = json.load(f)
|
|
|
|
# Transform AtomizerSpec to the expected config format
|
|
config = _transform_spec_to_config(spec, study_id)
|
|
return {
|
|
"config": config,
|
|
"path": str(spec_file),
|
|
"study_id": study_id,
|
|
"source": "atomizer_spec",
|
|
}
|
|
|
|
# Priority 2: Legacy optimization_config.json
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "optimization_config.json"
|
|
|
|
if not config_file.exists():
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Config file not found for study {study_id}"
|
|
)
|
|
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
|
|
return {
|
|
"config": config,
|
|
"path": str(config_file),
|
|
"study_id": study_id,
|
|
"source": "legacy_config",
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to read config: {str(e)}")
|
|
|
|
|
|
def _transform_spec_to_config(spec: dict, study_id: str) -> dict:
|
|
"""Transform AtomizerSpec v2.0 format to legacy config format for backwards compatibility."""
|
|
meta = spec.get("meta", {})
|
|
model = spec.get("model", {})
|
|
optimization = spec.get("optimization", {})
|
|
|
|
# Transform design variables
|
|
design_variables = []
|
|
for dv in spec.get("design_variables", []):
|
|
bounds = dv.get("bounds", {})
|
|
design_variables.append(
|
|
{
|
|
"name": dv.get("name"),
|
|
"expression_name": dv.get("expression_name"),
|
|
"type": "float" if dv.get("type") == "continuous" else dv.get("type", "float"),
|
|
"min": bounds.get("min"),
|
|
"max": bounds.get("max"),
|
|
"low": bounds.get("min"), # Alias for compatibility
|
|
"high": bounds.get("max"), # Alias for compatibility
|
|
"baseline": dv.get("baseline"),
|
|
"unit": dv.get("units"),
|
|
"units": dv.get("units"),
|
|
"enabled": dv.get("enabled", True),
|
|
}
|
|
)
|
|
|
|
# Transform objectives
|
|
objectives = []
|
|
for obj in spec.get("objectives", []):
|
|
source = obj.get("source", {})
|
|
objectives.append(
|
|
{
|
|
"name": obj.get("name"),
|
|
"direction": obj.get("direction", "minimize"),
|
|
"weight": obj.get("weight", 1.0),
|
|
"target": obj.get("target"),
|
|
"unit": obj.get("units"),
|
|
"units": obj.get("units"),
|
|
"extractor_id": source.get("extractor_id"),
|
|
"output_key": source.get("output_key"),
|
|
}
|
|
)
|
|
|
|
# Transform constraints
|
|
constraints = []
|
|
for con in spec.get("constraints", []):
|
|
constraints.append(
|
|
{
|
|
"name": con.get("name"),
|
|
"type": _operator_to_type(con.get("operator", "<=")),
|
|
"operator": con.get("operator"),
|
|
"max_value": con.get("threshold") if con.get("operator") in ["<=", "<"] else None,
|
|
"min_value": con.get("threshold") if con.get("operator") in [">=", ">"] else None,
|
|
"bound": con.get("threshold"),
|
|
"unit": con.get("units"),
|
|
"units": con.get("units"),
|
|
}
|
|
)
|
|
|
|
# Transform extractors
|
|
extractors = []
|
|
for ext in spec.get("extractors", []):
|
|
extractors.append(
|
|
{
|
|
"name": ext.get("name"),
|
|
"type": ext.get("type"),
|
|
"builtin": ext.get("builtin", True),
|
|
"config": ext.get("config", {}),
|
|
"outputs": ext.get("outputs", []),
|
|
}
|
|
)
|
|
|
|
# Get algorithm info
|
|
algorithm = optimization.get("algorithm", {})
|
|
budget = optimization.get("budget", {})
|
|
|
|
# Build the config in legacy format
|
|
config = {
|
|
"study_name": meta.get("study_name", study_id),
|
|
"description": meta.get("description", ""),
|
|
"version": meta.get("version", "2.0"),
|
|
"design_variables": design_variables,
|
|
"objectives": objectives,
|
|
"constraints": constraints,
|
|
"extractors": extractors,
|
|
"optimization": {
|
|
"algorithm": algorithm.get("type", "TPE"),
|
|
"n_trials": budget.get("max_trials", 100),
|
|
"max_time_hours": budget.get("max_time_hours"),
|
|
"convergence_patience": budget.get("convergence_patience"),
|
|
},
|
|
"optimization_settings": {
|
|
"sampler": algorithm.get("type", "TPE"),
|
|
"n_trials": budget.get("max_trials", 100),
|
|
},
|
|
"algorithm": {
|
|
"name": "Optuna",
|
|
"sampler": algorithm.get("type", "TPE"),
|
|
"n_trials": budget.get("max_trials", 100),
|
|
},
|
|
"model": model,
|
|
"sim_file": model.get("sim", {}).get("path")
|
|
if isinstance(model.get("sim"), dict)
|
|
else None,
|
|
}
|
|
|
|
return config
|
|
|
|
|
|
def _operator_to_type(operator: str) -> str:
|
|
"""Convert constraint operator to legacy type string."""
|
|
mapping = {"<=": "le", "<": "le", ">=": "ge", ">": "ge", "==": "eq", "=": "eq"}
|
|
return mapping.get(operator, "le")
|
|
|
|
|
|
# ============================================================================
|
|
# Process Control Endpoints
|
|
# ============================================================================
|
|
|
|
# Track running processes by study_id
|
|
_running_processes: Dict[str, int] = {}
|
|
|
|
# Track paused processes by study_id
|
|
_paused_processes: Dict[str, bool] = {}
|
|
|
|
|
|
def _find_optimization_process(study_id: str) -> Optional[psutil.Process]:
|
|
"""Find a running optimization process for a given study.
|
|
|
|
Uses multiple detection strategies:
|
|
1. Check tracked processes dictionary (most reliable)
|
|
2. Check process working directory (Windows-friendly)
|
|
3. Check cmdline for script names
|
|
"""
|
|
study_dir = resolve_study_path(study_id)
|
|
study_dir_str = str(study_dir).lower()
|
|
study_dir_normalized = study_dir_str.replace("/", "\\")
|
|
|
|
# Strategy 1: Check tracked processes first (most reliable)
|
|
if study_id in _running_processes:
|
|
tracked_pid = _running_processes[study_id]
|
|
try:
|
|
proc = psutil.Process(tracked_pid)
|
|
if proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE:
|
|
return proc
|
|
else:
|
|
# Process died, clean up tracking
|
|
del _running_processes[study_id]
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
del _running_processes[study_id]
|
|
|
|
# Strategy 2 & 3: Scan all Python processes
|
|
for proc in psutil.process_iter(["pid", "name", "cmdline", "cwd"]):
|
|
try:
|
|
# Only check Python processes
|
|
proc_name = (proc.info.get("name") or "").lower()
|
|
if "python" not in proc_name:
|
|
continue
|
|
|
|
cmdline = proc.info.get("cmdline") or []
|
|
cmdline_str = " ".join(cmdline) if cmdline else ""
|
|
cmdline_lower = cmdline_str.lower()
|
|
|
|
# Check if this is an optimization script (run_optimization.py or run_sat_optimization.py)
|
|
is_opt_script = (
|
|
"run_optimization" in cmdline_lower
|
|
or "run_sat_optimization" in cmdline_lower
|
|
or "run_imso" in cmdline_lower
|
|
)
|
|
|
|
if not is_opt_script:
|
|
continue
|
|
|
|
# Strategy 2: Check CWD (most reliable on Windows)
|
|
try:
|
|
proc_cwd = proc.cwd().lower().replace("/", "\\")
|
|
if proc_cwd == study_dir_normalized or study_dir_normalized in proc_cwd:
|
|
# Track this process for future lookups
|
|
_running_processes[study_id] = proc.pid
|
|
return proc
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, OSError):
|
|
pass
|
|
|
|
# Strategy 3: Check cmdline for study path or study_id
|
|
cmdline_normalized = cmdline_lower.replace("/", "\\")
|
|
if study_dir_normalized in cmdline_normalized:
|
|
_running_processes[study_id] = proc.pid
|
|
return proc
|
|
|
|
# Also check if study_id appears in cmdline (e.g., as part of path)
|
|
if study_id.lower() in cmdline_lower:
|
|
_running_processes[study_id] = proc.pid
|
|
return proc
|
|
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
@router.get("/studies/{study_id}/process")
|
|
async def get_process_status(study_id: str):
|
|
"""
|
|
Get the process status for a study's optimization run
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with process status (is_running, pid, iteration counts)
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Check if process is running
|
|
proc = _find_optimization_process(study_id)
|
|
is_running = proc is not None
|
|
pid = proc.pid if proc else None
|
|
|
|
# Get iteration counts from database
|
|
results_dir = get_results_dir(study_dir)
|
|
study_db = results_dir / "study.db"
|
|
|
|
fea_count = 0
|
|
nn_count = 0
|
|
iteration = None
|
|
total_trials = None
|
|
time_per_trial = None
|
|
eta_seconds = None
|
|
eta_formatted = None
|
|
rate_per_hour = None
|
|
start_time = None
|
|
|
|
# Try to get total_trials from config
|
|
config_file = study_dir / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
if config_file.exists():
|
|
try:
|
|
with open(config_file) as f:
|
|
config = json.load(f)
|
|
total_trials = config.get("optimizer", {}).get("n_trials")
|
|
if not total_trials:
|
|
total_trials = config.get("n_trials")
|
|
except Exception:
|
|
pass
|
|
|
|
if study_db.exists():
|
|
try:
|
|
conn = sqlite3.connect(str(study_db))
|
|
cursor = conn.cursor()
|
|
|
|
# Count FEA trials (from main study or studies with "_fea" suffix)
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM trials t
|
|
JOIN studies s ON t.study_id = s.study_id
|
|
WHERE t.state = 'COMPLETE'
|
|
AND (s.study_name LIKE '%_fea' OR s.study_name NOT LIKE '%_nn%')
|
|
""")
|
|
fea_count = cursor.fetchone()[0]
|
|
|
|
# Count NN trials
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM trials t
|
|
JOIN studies s ON t.study_id = s.study_id
|
|
WHERE t.state = 'COMPLETE'
|
|
AND s.study_name LIKE '%_nn%'
|
|
""")
|
|
nn_count = cursor.fetchone()[0]
|
|
|
|
# Try to get current iteration from study names
|
|
cursor.execute("""
|
|
SELECT study_name FROM studies
|
|
WHERE study_name LIKE '%_iter%'
|
|
ORDER BY study_name DESC LIMIT 1
|
|
""")
|
|
result = cursor.fetchone()
|
|
if result:
|
|
import re
|
|
|
|
match = re.search(r"iter(\d+)", result[0])
|
|
if match:
|
|
iteration = int(match.group(1))
|
|
|
|
# Calculate ETA from trial timestamps
|
|
# Get timestamps from trial_user_attributes or trial datetime_complete
|
|
cursor.execute("""
|
|
SELECT datetime_complete FROM trials
|
|
WHERE state = 'COMPLETE' AND datetime_complete IS NOT NULL
|
|
ORDER BY datetime_complete DESC
|
|
LIMIT 20
|
|
""")
|
|
timestamps = cursor.fetchall()
|
|
|
|
if len(timestamps) >= 2:
|
|
from datetime import datetime as dt
|
|
|
|
try:
|
|
# Parse timestamps and calculate average time between trials
|
|
times = []
|
|
for ts in timestamps:
|
|
if ts[0]:
|
|
# Handle different timestamp formats
|
|
ts_str = ts[0]
|
|
try:
|
|
times.append(dt.fromisoformat(ts_str.replace("Z", "+00:00")))
|
|
except ValueError:
|
|
try:
|
|
times.append(dt.strptime(ts_str, "%Y-%m-%d %H:%M:%S.%f"))
|
|
except ValueError:
|
|
times.append(dt.strptime(ts_str, "%Y-%m-%d %H:%M:%S"))
|
|
|
|
if len(times) >= 2:
|
|
# Sort ascending and calculate time differences
|
|
times.sort()
|
|
diffs = [
|
|
(times[i + 1] - times[i]).total_seconds()
|
|
for i in range(len(times) - 1)
|
|
]
|
|
# Filter out outliers (e.g., breaks in optimization)
|
|
diffs = [
|
|
d for d in diffs if d > 0 and d < 7200
|
|
] # Max 2 hours between trials
|
|
if diffs:
|
|
time_per_trial = sum(diffs) / len(diffs)
|
|
rate_per_hour = 3600 / time_per_trial if time_per_trial > 0 else 0
|
|
|
|
# Calculate ETA
|
|
completed = fea_count + nn_count
|
|
if total_trials and completed < total_trials:
|
|
remaining = total_trials - completed
|
|
eta_seconds = time_per_trial * remaining
|
|
|
|
# Format ETA nicely
|
|
if eta_seconds < 60:
|
|
eta_formatted = f"{int(eta_seconds)}s"
|
|
elif eta_seconds < 3600:
|
|
eta_formatted = f"{int(eta_seconds / 60)}m"
|
|
else:
|
|
hours = int(eta_seconds / 3600)
|
|
minutes = int((eta_seconds % 3600) / 60)
|
|
eta_formatted = f"{hours}h {minutes}m"
|
|
|
|
# Get start time (first trial)
|
|
if times:
|
|
start_time = times[0].isoformat()
|
|
except Exception as e:
|
|
print(f"Warning: Failed to calculate ETA: {e}")
|
|
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Warning: Failed to read database for process status: {e}")
|
|
|
|
# Check if paused
|
|
is_paused = _paused_processes.get(study_id, False) if is_running else False
|
|
|
|
return {
|
|
"is_running": is_running,
|
|
"is_paused": is_paused,
|
|
"pid": pid,
|
|
"iteration": iteration,
|
|
"fea_count": fea_count,
|
|
"nn_count": nn_count,
|
|
"total_trials": total_trials,
|
|
"completed_trials": fea_count + nn_count,
|
|
"time_per_trial_seconds": round(time_per_trial, 1) if time_per_trial else None,
|
|
"eta_seconds": round(eta_seconds) if eta_seconds else None,
|
|
"eta_formatted": eta_formatted,
|
|
"rate_per_hour": round(rate_per_hour, 2) if rate_per_hour else None,
|
|
"start_time": start_time,
|
|
"study_id": study_id,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get process status: {str(e)}")
|
|
|
|
|
|
class StartOptimizationRequest(BaseModel):
|
|
freshStart: bool = False
|
|
maxIterations: int = 100
|
|
trials: int = 300 # For SAT scripts
|
|
feaBatchSize: int = 5
|
|
tuneTrials: int = 30
|
|
ensembleSize: int = 3
|
|
patience: int = 5
|
|
|
|
|
|
def _detect_script_type(script_path: Path) -> str:
|
|
"""Detect the type of optimization script (SAT, IMSO, etc.)"""
|
|
try:
|
|
content = script_path.read_text(encoding="utf-8", errors="ignore")
|
|
if "run_sat_optimization" in content or "SAT" in content or "Self-Aware Turbo" in content:
|
|
return "sat"
|
|
elif "IMSO" in content or "intelligent_optimizer" in content:
|
|
return "imso"
|
|
else:
|
|
return "generic"
|
|
except:
|
|
return "generic"
|
|
|
|
|
|
@router.post("/studies/{study_id}/start")
|
|
async def start_optimization(study_id: str, request: StartOptimizationRequest = None):
|
|
"""
|
|
Start the optimization process for a study
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
request: Optional start options
|
|
|
|
Returns:
|
|
JSON with process info
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Check if already running
|
|
existing_proc = _find_optimization_process(study_id)
|
|
if existing_proc:
|
|
return {
|
|
"success": False,
|
|
"message": f"Optimization already running (PID: {existing_proc.pid})",
|
|
"pid": existing_proc.pid,
|
|
}
|
|
|
|
# Find run_optimization.py or run_sat_optimization.py
|
|
run_script = study_dir / "run_sat_optimization.py"
|
|
if not run_script.exists():
|
|
run_script = study_dir / "run_optimization.py"
|
|
|
|
# Fallback to generic runner if no study script found but spec exists
|
|
is_generic_runner = False
|
|
if not run_script.exists() and (study_dir / "atomizer_spec.json").exists():
|
|
generic_runner = (
|
|
Path(__file__).parent.parent.parent.parent.parent
|
|
/ "optimization_engine"
|
|
/ "run_optimization.py"
|
|
)
|
|
if generic_runner.exists():
|
|
run_script = generic_runner
|
|
is_generic_runner = True
|
|
|
|
if not run_script.exists():
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No optimization script found for study {study_id}"
|
|
)
|
|
|
|
# Detect script type and build appropriate command
|
|
script_type = (
|
|
_detect_script_type(run_script) if not is_generic_runner else "generic_universal"
|
|
)
|
|
python_exe = sys.executable
|
|
cmd = [python_exe, str(run_script)]
|
|
|
|
if is_generic_runner:
|
|
# Configure generic runner arguments
|
|
cmd.extend(["--config", str(study_dir / "atomizer_spec.json")])
|
|
|
|
# Auto-detect PRT and SIM files
|
|
model_dir = study_dir / "1_setup" / "model"
|
|
if not model_dir.exists():
|
|
model_dir = study_dir / "model"
|
|
|
|
prt_files = list(model_dir.glob("*.prt"))
|
|
sim_files = list(model_dir.glob("*.sim"))
|
|
|
|
if prt_files:
|
|
cmd.extend(["--prt", str(prt_files[0])])
|
|
if sim_files:
|
|
cmd.extend(["--sim", str(sim_files[0])])
|
|
|
|
if request and request.trials:
|
|
cmd.extend(["--trials", str(request.trials)])
|
|
elif request:
|
|
if script_type == "sat":
|
|
# SAT scripts use --trials
|
|
cmd.extend(["--trials", str(request.trials)])
|
|
if request.freshStart:
|
|
# SAT scripts might support --resume, so fresh = no --resume
|
|
pass # Default is fresh start for SAT
|
|
else:
|
|
# IMSO/generic scripts use different flags
|
|
cmd.append("--start")
|
|
if request.freshStart:
|
|
cmd.append("--fresh")
|
|
cmd.extend(["--fea-batch", str(request.feaBatchSize)])
|
|
cmd.extend(["--tune-trials", str(request.tuneTrials)])
|
|
cmd.extend(["--ensemble-size", str(request.ensembleSize)])
|
|
cmd.extend(["--patience", str(request.patience)])
|
|
|
|
# Start process in background
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
cwd=str(study_dir),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
start_new_session=True,
|
|
)
|
|
|
|
_running_processes[study_id] = proc.pid
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Optimization started successfully ({script_type} script)",
|
|
"pid": proc.pid,
|
|
"command": " ".join(cmd),
|
|
"script_type": script_type,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to start optimization: {str(e)}")
|
|
|
|
|
|
class StopRequest(BaseModel):
|
|
force: bool = True # Default to force kill
|
|
|
|
|
|
@router.post("/studies/{study_id}/stop")
|
|
async def stop_optimization(study_id: str, request: StopRequest = None):
|
|
"""
|
|
Stop the optimization process for a study (hard kill by default)
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
request.force: If True (default), immediately kill. If False, try graceful first.
|
|
|
|
Returns:
|
|
JSON with result
|
|
"""
|
|
if request is None:
|
|
request = StopRequest()
|
|
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find running process
|
|
proc = _find_optimization_process(study_id)
|
|
|
|
if not proc:
|
|
return {"success": False, "message": "No running optimization process found"}
|
|
|
|
pid = proc.pid
|
|
killed_pids = []
|
|
|
|
try:
|
|
# FIRST: Get all children BEFORE killing parent
|
|
children = []
|
|
try:
|
|
children = proc.children(recursive=True)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
if request.force:
|
|
# Hard kill: immediately kill parent and all children
|
|
# Kill children first (bottom-up)
|
|
for child in reversed(children):
|
|
try:
|
|
child.kill() # SIGKILL on Unix, TerminateProcess on Windows
|
|
killed_pids.append(child.pid)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
# Then kill parent
|
|
try:
|
|
proc.kill()
|
|
killed_pids.append(pid)
|
|
except psutil.NoSuchProcess:
|
|
pass
|
|
else:
|
|
# Graceful: try SIGTERM first, then force
|
|
try:
|
|
proc.terminate()
|
|
proc.wait(timeout=5)
|
|
except psutil.TimeoutExpired:
|
|
# Didn't stop gracefully, force kill
|
|
for child in reversed(children):
|
|
try:
|
|
child.kill()
|
|
killed_pids.append(child.pid)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
proc.kill()
|
|
killed_pids.append(pid)
|
|
except psutil.NoSuchProcess:
|
|
pass
|
|
|
|
# Clean up tracking
|
|
if study_id in _running_processes:
|
|
del _running_processes[study_id]
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Optimization killed (PID: {pid}, +{len(children)} children)",
|
|
"pid": pid,
|
|
"killed_pids": killed_pids,
|
|
}
|
|
|
|
except psutil.NoSuchProcess:
|
|
if study_id in _running_processes:
|
|
del _running_processes[study_id]
|
|
return {"success": True, "message": "Process already terminated"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to stop optimization: {str(e)}")
|
|
|
|
|
|
@router.post("/studies/{study_id}/pause")
|
|
async def pause_optimization(study_id: str):
|
|
"""
|
|
Pause the optimization process for a study.
|
|
|
|
Uses psutil.Process.suspend() which sends SIGSTOP on Unix or suspends on Windows.
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with result
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find running process
|
|
proc = _find_optimization_process(study_id)
|
|
|
|
if not proc:
|
|
return {"success": False, "message": "No running optimization process found"}
|
|
|
|
# Check if already paused
|
|
if _paused_processes.get(study_id, False):
|
|
return {"success": False, "message": "Optimization is already paused"}
|
|
|
|
pid = proc.pid
|
|
|
|
try:
|
|
# Suspend the main process and all children
|
|
children = []
|
|
try:
|
|
children = proc.children(recursive=True)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
# Suspend children first
|
|
for child in children:
|
|
try:
|
|
child.suspend()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
# Suspend parent
|
|
proc.suspend()
|
|
|
|
# Mark as paused
|
|
_paused_processes[study_id] = True
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Optimization paused (PID: {pid}, +{len(children)} children)",
|
|
"pid": pid,
|
|
}
|
|
|
|
except psutil.NoSuchProcess:
|
|
return {"success": False, "message": "Process no longer exists"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to pause optimization: {str(e)}")
|
|
|
|
|
|
@router.post("/studies/{study_id}/resume")
|
|
async def resume_optimization(study_id: str):
|
|
"""
|
|
Resume a paused optimization process for a study.
|
|
|
|
Uses psutil.Process.resume() which sends SIGCONT on Unix or resumes on Windows.
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with result
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find running process
|
|
proc = _find_optimization_process(study_id)
|
|
|
|
if not proc:
|
|
return {"success": False, "message": "No optimization process found"}
|
|
|
|
# Check if actually paused
|
|
if not _paused_processes.get(study_id, False):
|
|
return {"success": False, "message": "Optimization is not paused"}
|
|
|
|
pid = proc.pid
|
|
|
|
try:
|
|
# Resume the main process and all children
|
|
children = []
|
|
try:
|
|
children = proc.children(recursive=True)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
# Resume parent first
|
|
proc.resume()
|
|
|
|
# Resume children
|
|
for child in children:
|
|
try:
|
|
child.resume()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
# Mark as not paused
|
|
_paused_processes[study_id] = False
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Optimization resumed (PID: {pid}, +{len(children)} children)",
|
|
"pid": pid,
|
|
}
|
|
|
|
except psutil.NoSuchProcess:
|
|
_paused_processes[study_id] = False
|
|
return {"success": False, "message": "Process no longer exists"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to resume optimization: {str(e)}")
|
|
|
|
|
|
class ValidateRequest(BaseModel):
|
|
topN: int = 5
|
|
|
|
|
|
@router.post("/studies/{study_id}/validate")
|
|
async def validate_optimization(study_id: str, request: ValidateRequest = None):
|
|
"""
|
|
Run final FEA validation on top NN predictions
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
request: Validation options (topN)
|
|
|
|
Returns:
|
|
JSON with process info
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Check if optimization is still running
|
|
existing_proc = _find_optimization_process(study_id)
|
|
if existing_proc:
|
|
return {
|
|
"success": False,
|
|
"message": "Cannot validate while optimization is running. Stop optimization first.",
|
|
}
|
|
|
|
# Look for final_validation.py script
|
|
validation_script = study_dir / "final_validation.py"
|
|
|
|
if not validation_script.exists():
|
|
# Fall back to run_optimization.py with --validate flag if script doesn't exist
|
|
run_script = study_dir / "run_optimization.py"
|
|
if not run_script.exists():
|
|
raise HTTPException(status_code=404, detail="No validation script found")
|
|
|
|
python_exe = sys.executable
|
|
top_n = request.topN if request else 5
|
|
cmd = [python_exe, str(run_script), "--validate", "--top", str(top_n)]
|
|
else:
|
|
python_exe = sys.executable
|
|
top_n = request.topN if request else 5
|
|
cmd = [python_exe, str(validation_script), "--top", str(top_n)]
|
|
|
|
# Start validation process
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
cwd=str(study_dir),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
start_new_session=True,
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Validation started for top {top_n} NN predictions",
|
|
"pid": proc.pid,
|
|
"command": " ".join(cmd),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to start validation: {str(e)}")
|
|
|
|
|
|
# ============================================================================
|
|
# Optuna Dashboard Launch
|
|
# ============================================================================
|
|
|
|
_optuna_processes: Dict[str, subprocess.Popen] = {}
|
|
|
|
|
|
@router.post("/studies/{study_id}/optuna-dashboard")
|
|
async def launch_optuna_dashboard(study_id: str):
|
|
"""
|
|
Launch Optuna dashboard for a specific study
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with dashboard URL and process info
|
|
"""
|
|
import time
|
|
import socket
|
|
|
|
def is_port_in_use(port: int) -> bool:
|
|
"""Check if a port is already in use"""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
return s.connect_ex(("localhost", port)) == 0
|
|
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
results_dir = get_results_dir(study_dir)
|
|
study_db = results_dir / "study.db"
|
|
|
|
if not study_db.exists():
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No Optuna database found for study {study_id}"
|
|
)
|
|
|
|
port = 8081
|
|
|
|
# Check if dashboard is already running on this port
|
|
if is_port_in_use(port):
|
|
# Check if it's our process
|
|
if study_id in _optuna_processes:
|
|
proc = _optuna_processes[study_id]
|
|
if proc.poll() is None: # Still running
|
|
return {
|
|
"success": True,
|
|
"url": f"http://localhost:{port}",
|
|
"pid": proc.pid,
|
|
"message": "Optuna dashboard already running",
|
|
}
|
|
# Port in use but not by us - still return success since dashboard is available
|
|
return {
|
|
"success": True,
|
|
"url": f"http://localhost:{port}",
|
|
"pid": None,
|
|
"message": "Optuna dashboard already running on port 8081",
|
|
}
|
|
|
|
# Launch optuna-dashboard using CLI command (more robust than Python import)
|
|
# Use absolute path with POSIX format for SQLite URL
|
|
abs_db_path = study_db.absolute().as_posix()
|
|
storage_url = f"sqlite:///{abs_db_path}"
|
|
|
|
# Find optuna-dashboard executable - use full path for conda environment
|
|
import platform
|
|
import shutil as sh
|
|
|
|
optuna_dashboard_cmd = "optuna-dashboard"
|
|
|
|
if platform.system() == "Windows":
|
|
# Try to find optuna-dashboard in the conda environment
|
|
conda_paths = [
|
|
Path(r"C:\Users\antoi\anaconda3\envs\atomizer\Scripts\optuna-dashboard.exe"),
|
|
Path(r"C:\Users\antoi\anaconda3\envs\atomizer\Library\bin\optuna-dashboard.exe"),
|
|
]
|
|
for conda_path in conda_paths:
|
|
if conda_path.exists():
|
|
optuna_dashboard_cmd = str(conda_path)
|
|
break
|
|
else:
|
|
# Fallback: check if it's in PATH
|
|
found_path = sh.which("optuna-dashboard")
|
|
if found_path:
|
|
optuna_dashboard_cmd = found_path
|
|
else:
|
|
# Return helpful error message
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="optuna-dashboard not found. Install with: pip install optuna-dashboard",
|
|
)
|
|
|
|
cmd = [optuna_dashboard_cmd, storage_url, "--port", str(port), "--host", "0.0.0.0"]
|
|
|
|
# On Windows, use CREATE_NEW_PROCESS_GROUP and DETACHED_PROCESS flags
|
|
if platform.system() == "Windows":
|
|
# Windows-specific: create detached process
|
|
DETACHED_PROCESS = 0x00000008
|
|
CREATE_NEW_PROCESS_GROUP = 0x00000200
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP,
|
|
)
|
|
else:
|
|
proc = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True
|
|
)
|
|
|
|
_optuna_processes[study_id] = proc
|
|
|
|
# Wait for dashboard to start (check port repeatedly)
|
|
max_wait = 5 # seconds
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_wait:
|
|
if is_port_in_use(port):
|
|
return {
|
|
"success": True,
|
|
"url": f"http://localhost:{port}",
|
|
"pid": proc.pid,
|
|
"message": "Optuna dashboard launched successfully",
|
|
}
|
|
# Check if process died
|
|
if proc.poll() is not None:
|
|
stderr = ""
|
|
try:
|
|
stderr = proc.stderr.read().decode() if proc.stderr else ""
|
|
except:
|
|
pass
|
|
return {"success": False, "message": f"Failed to start Optuna dashboard: {stderr}"}
|
|
time.sleep(0.5)
|
|
|
|
# Timeout - process might still be starting
|
|
if proc.poll() is None:
|
|
return {
|
|
"success": True,
|
|
"url": f"http://localhost:{port}",
|
|
"pid": proc.pid,
|
|
"message": "Optuna dashboard starting (may take a moment)",
|
|
}
|
|
else:
|
|
stderr = ""
|
|
try:
|
|
stderr = proc.stderr.read().decode() if proc.stderr else ""
|
|
except:
|
|
pass
|
|
return {"success": False, "message": f"Failed to start Optuna dashboard: {stderr}"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to launch Optuna dashboard: {str(e)}")
|
|
|
|
|
|
@router.get("/optuna-dashboard/check")
|
|
async def check_optuna_dashboard_available():
|
|
"""
|
|
Check if optuna-dashboard is installed and available.
|
|
|
|
Returns:
|
|
JSON with availability status and installation instructions if not available.
|
|
"""
|
|
import platform
|
|
import shutil as sh
|
|
|
|
if platform.system() == "Windows":
|
|
# Check conda environment paths
|
|
conda_paths = [
|
|
Path(r"C:\Users\antoi\anaconda3\envs\atomizer\Scripts\optuna-dashboard.exe"),
|
|
Path(r"C:\Users\antoi\anaconda3\envs\atomizer\Library\bin\optuna-dashboard.exe"),
|
|
]
|
|
for conda_path in conda_paths:
|
|
if conda_path.exists():
|
|
return {
|
|
"available": True,
|
|
"path": str(conda_path),
|
|
"message": "optuna-dashboard is installed",
|
|
}
|
|
|
|
# Fallback: check system PATH
|
|
found_path = sh.which("optuna-dashboard")
|
|
if found_path:
|
|
return {"available": True, "path": found_path, "message": "optuna-dashboard is installed"}
|
|
|
|
return {
|
|
"available": False,
|
|
"path": None,
|
|
"message": "optuna-dashboard not found",
|
|
"install_instructions": "pip install optuna-dashboard",
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Model Files Endpoint
|
|
# ============================================================================
|
|
|
|
|
|
@router.get("/studies/{study_id}/model-files")
|
|
async def get_model_files(study_id: str):
|
|
"""
|
|
Get list of NX model files (.prt, .sim, .fem, .bdf, .dat, .op2) for a study
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with list of model files and their paths
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Look for model directory (check multiple locations)
|
|
model_dirs = [
|
|
study_dir / "1_setup" / "model",
|
|
study_dir / "model",
|
|
study_dir / "1_setup",
|
|
study_dir,
|
|
]
|
|
|
|
model_files = []
|
|
model_dir_path = None
|
|
|
|
# NX and FEA file extensions to look for
|
|
nx_extensions = {".prt", ".sim", ".fem", ".bdf", ".dat", ".op2", ".f06", ".inp"}
|
|
|
|
for model_dir in model_dirs:
|
|
if model_dir.exists() and model_dir.is_dir():
|
|
for file_path in model_dir.iterdir():
|
|
if file_path.is_file() and file_path.suffix.lower() in nx_extensions:
|
|
model_files.append(
|
|
{
|
|
"name": file_path.name,
|
|
"path": str(file_path),
|
|
"extension": file_path.suffix.lower(),
|
|
"size_bytes": file_path.stat().st_size,
|
|
"size_display": _format_file_size(file_path.stat().st_size),
|
|
"modified": datetime.fromtimestamp(
|
|
file_path.stat().st_mtime
|
|
).isoformat(),
|
|
}
|
|
)
|
|
if model_dir_path is None:
|
|
model_dir_path = str(model_dir)
|
|
|
|
# Sort by extension for better display (prt first, then sim, fem, etc.)
|
|
extension_order = {
|
|
".prt": 0,
|
|
".sim": 1,
|
|
".fem": 2,
|
|
".bdf": 3,
|
|
".dat": 4,
|
|
".op2": 5,
|
|
".f06": 6,
|
|
".inp": 7,
|
|
}
|
|
model_files.sort(key=lambda x: (extension_order.get(x["extension"], 99), x["name"]))
|
|
|
|
return {
|
|
"study_id": study_id,
|
|
"model_dir": model_dir_path or str(study_dir / "1_setup" / "model"),
|
|
"files": model_files,
|
|
"count": len(model_files),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get model files: {str(e)}")
|
|
|
|
|
|
def _format_file_size(size_bytes: int) -> str:
|
|
"""Format file size in human-readable form"""
|
|
if size_bytes < 1024:
|
|
return f"{size_bytes} B"
|
|
elif size_bytes < 1024 * 1024:
|
|
return f"{size_bytes / 1024:.1f} KB"
|
|
elif size_bytes < 1024 * 1024 * 1024:
|
|
return f"{size_bytes / (1024 * 1024):.1f} MB"
|
|
else:
|
|
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
|
|
|
|
|
|
@router.post("/studies/{study_id}/open-folder")
|
|
async def open_model_folder(study_id: str, folder_type: str = "model"):
|
|
"""
|
|
Open the model folder in system file explorer
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
folder_type: Type of folder to open (model, results, setup)
|
|
|
|
Returns:
|
|
JSON with success status
|
|
"""
|
|
import os
|
|
import platform
|
|
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Determine which folder to open
|
|
if folder_type == "model":
|
|
target_dir = study_dir / "1_setup" / "model"
|
|
if not target_dir.exists():
|
|
target_dir = study_dir / "1_setup"
|
|
elif folder_type == "results":
|
|
target_dir = get_results_dir(study_dir)
|
|
elif folder_type == "setup":
|
|
target_dir = study_dir / "1_setup"
|
|
else:
|
|
target_dir = study_dir
|
|
|
|
if not target_dir.exists():
|
|
target_dir = study_dir
|
|
|
|
# Open in file explorer based on platform
|
|
system = platform.system()
|
|
try:
|
|
if system == "Windows":
|
|
os.startfile(str(target_dir))
|
|
elif system == "Darwin": # macOS
|
|
subprocess.Popen(["open", str(target_dir)])
|
|
else: # Linux
|
|
subprocess.Popen(["xdg-open", str(target_dir)])
|
|
|
|
return {"success": True, "message": f"Opened {target_dir}", "path": str(target_dir)}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"message": f"Failed to open folder: {str(e)}",
|
|
"path": str(target_dir),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to open folder: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/best-solution")
|
|
async def get_best_solution(study_id: str):
|
|
"""Get the best trial(s) for a study with improvement metrics"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study '{study_id}' not found")
|
|
|
|
results_dir = get_results_dir(study_dir)
|
|
db_path = results_dir / "study.db"
|
|
|
|
if not db_path.exists():
|
|
return {
|
|
"study_id": study_id,
|
|
"best_trial": None,
|
|
"first_trial": None,
|
|
"improvements": {},
|
|
"total_trials": 0,
|
|
}
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Get best trial (single objective - minimize by default)
|
|
cursor.execute("""
|
|
SELECT t.trial_id, t.number, tv.value as objective,
|
|
datetime(tv.value_id, 'unixepoch') as timestamp
|
|
FROM trials t
|
|
JOIN trial_values tv ON t.trial_id = tv.trial_id
|
|
WHERE t.state = 'COMPLETE'
|
|
ORDER BY tv.value ASC
|
|
LIMIT 1
|
|
""")
|
|
best_row = cursor.fetchone()
|
|
|
|
# Get first completed trial for comparison
|
|
cursor.execute("""
|
|
SELECT t.trial_id, t.number, tv.value as objective
|
|
FROM trials t
|
|
JOIN trial_values tv ON t.trial_id = tv.trial_id
|
|
WHERE t.state = 'COMPLETE'
|
|
ORDER BY t.number ASC
|
|
LIMIT 1
|
|
""")
|
|
first_row = cursor.fetchone()
|
|
|
|
# Get total trial count
|
|
cursor.execute("SELECT COUNT(*) FROM trials WHERE state = 'COMPLETE'")
|
|
total_trials = cursor.fetchone()[0]
|
|
|
|
best_trial = None
|
|
first_trial = None
|
|
improvements = {}
|
|
|
|
if best_row:
|
|
best_trial_id = best_row["trial_id"]
|
|
|
|
# Get design variables
|
|
cursor.execute(
|
|
"""
|
|
SELECT param_name, param_value
|
|
FROM trial_params
|
|
WHERE trial_id = ?
|
|
""",
|
|
(best_trial_id,),
|
|
)
|
|
params = {row["param_name"]: row["param_value"] for row in cursor.fetchall()}
|
|
|
|
# Get user attributes (including results)
|
|
cursor.execute(
|
|
"""
|
|
SELECT key, value_json
|
|
FROM trial_user_attributes
|
|
WHERE trial_id = ?
|
|
""",
|
|
(best_trial_id,),
|
|
)
|
|
user_attrs = {}
|
|
for row in cursor.fetchall():
|
|
try:
|
|
user_attrs[row["key"]] = json.loads(row["value_json"])
|
|
except:
|
|
user_attrs[row["key"]] = row["value_json"]
|
|
|
|
best_trial = {
|
|
"trial_number": best_row["number"],
|
|
"objective": best_row["objective"],
|
|
"design_variables": params,
|
|
"user_attrs": user_attrs,
|
|
"timestamp": best_row["timestamp"],
|
|
}
|
|
|
|
if first_row:
|
|
first_trial_id = first_row["trial_id"]
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT param_name, param_value
|
|
FROM trial_params
|
|
WHERE trial_id = ?
|
|
""",
|
|
(first_trial_id,),
|
|
)
|
|
first_params = {row["param_name"]: row["param_value"] for row in cursor.fetchall()}
|
|
|
|
first_trial = {
|
|
"trial_number": first_row["number"],
|
|
"objective": first_row["objective"],
|
|
"design_variables": first_params,
|
|
}
|
|
|
|
# Calculate improvement
|
|
if best_row and first_row["objective"] != 0:
|
|
improvement_pct = (
|
|
(first_row["objective"] - best_row["objective"]) / abs(first_row["objective"])
|
|
) * 100
|
|
improvements["objective"] = {
|
|
"initial": first_row["objective"],
|
|
"final": best_row["objective"],
|
|
"improvement_pct": round(improvement_pct, 2),
|
|
"absolute_change": round(first_row["objective"] - best_row["objective"], 6),
|
|
}
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"study_id": study_id,
|
|
"best_trial": best_trial,
|
|
"first_trial": first_trial,
|
|
"improvements": improvements,
|
|
"total_trials": total_trials,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get best solution: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/runs")
|
|
async def get_study_runs(study_id: str):
|
|
"""
|
|
Get all optimization runs/studies in the database for comparison.
|
|
Many studies have multiple Optuna studies (e.g., v11_fea, v11_iter1_nn, v11_iter2_nn).
|
|
This endpoint returns metrics for each sub-study.
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study '{study_id}' not found")
|
|
|
|
results_dir = get_results_dir(study_dir)
|
|
db_path = results_dir / "study.db"
|
|
|
|
if not db_path.exists():
|
|
return {"runs": [], "total_runs": 0}
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Get all Optuna studies in this database
|
|
cursor.execute("""
|
|
SELECT study_id, study_name
|
|
FROM studies
|
|
ORDER BY study_id
|
|
""")
|
|
studies = cursor.fetchall()
|
|
|
|
runs = []
|
|
for study_row in studies:
|
|
optuna_study_id = study_row["study_id"]
|
|
study_name = study_row["study_name"]
|
|
|
|
# Get trial count
|
|
cursor.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM trials
|
|
WHERE study_id = ? AND state = 'COMPLETE'
|
|
""",
|
|
(optuna_study_id,),
|
|
)
|
|
trial_count = cursor.fetchone()[0]
|
|
|
|
if trial_count == 0:
|
|
continue
|
|
|
|
# Get best value (first objective)
|
|
cursor.execute(
|
|
"""
|
|
SELECT MIN(tv.value) as best_value
|
|
FROM trial_values tv
|
|
JOIN trials t ON tv.trial_id = t.trial_id
|
|
WHERE t.study_id = ? AND t.state = 'COMPLETE' AND tv.objective = 0
|
|
""",
|
|
(optuna_study_id,),
|
|
)
|
|
best_result = cursor.fetchone()
|
|
best_value = best_result["best_value"] if best_result else None
|
|
|
|
# Get average value
|
|
cursor.execute(
|
|
"""
|
|
SELECT AVG(tv.value) as avg_value
|
|
FROM trial_values tv
|
|
JOIN trials t ON tv.trial_id = t.trial_id
|
|
WHERE t.study_id = ? AND t.state = 'COMPLETE' AND tv.objective = 0
|
|
""",
|
|
(optuna_study_id,),
|
|
)
|
|
avg_result = cursor.fetchone()
|
|
avg_value = avg_result["avg_value"] if avg_result else None
|
|
|
|
# Get time range
|
|
cursor.execute(
|
|
"""
|
|
SELECT MIN(datetime_start) as first_trial, MAX(datetime_complete) as last_trial
|
|
FROM trials
|
|
WHERE study_id = ? AND state = 'COMPLETE'
|
|
""",
|
|
(optuna_study_id,),
|
|
)
|
|
time_result = cursor.fetchone()
|
|
|
|
# Determine source type (FEA or NN)
|
|
source = "NN" if "_nn" in study_name.lower() else "FEA"
|
|
|
|
runs.append(
|
|
{
|
|
"run_id": optuna_study_id,
|
|
"name": study_name,
|
|
"source": source,
|
|
"trial_count": trial_count,
|
|
"best_value": best_value,
|
|
"avg_value": avg_value,
|
|
"first_trial": time_result["first_trial"] if time_result else None,
|
|
"last_trial": time_result["last_trial"] if time_result else None,
|
|
}
|
|
)
|
|
|
|
conn.close()
|
|
|
|
return {"runs": runs, "total_runs": len(runs), "study_id": study_id}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get runs: {str(e)}")
|
|
|
|
|
|
class UpdateConfigRequest(BaseModel):
|
|
config: Optional[dict] = None
|
|
intent: Optional[dict] = None
|
|
|
|
|
|
def intent_to_config(intent: dict, existing_config: Optional[dict] = None) -> dict:
|
|
"""
|
|
Convert canvas intent format to optimization_config.json format.
|
|
|
|
Preserves existing config fields that aren't in the intent.
|
|
"""
|
|
# Start with existing config or empty
|
|
config = existing_config.copy() if existing_config else {}
|
|
|
|
# Metadata
|
|
if intent.get("model", {}).get("path"):
|
|
model_path = Path(intent["model"]["path"]).name
|
|
if "simulation" not in config:
|
|
config["simulation"] = {}
|
|
config["simulation"]["model_file"] = model_path
|
|
# Try to infer other files from model name
|
|
base_name = model_path.replace(".prt", "")
|
|
if not config["simulation"].get("fem_file"):
|
|
config["simulation"]["fem_file"] = f"{base_name}_fem1.fem"
|
|
if not config["simulation"].get("sim_file"):
|
|
config["simulation"]["sim_file"] = f"{base_name}_sim1.sim"
|
|
|
|
# Solver
|
|
if intent.get("solver", {}).get("type"):
|
|
solver_type = intent["solver"]["type"]
|
|
if "simulation" not in config:
|
|
config["simulation"] = {}
|
|
config["simulation"]["solver"] = "nastran"
|
|
# Map SOL types to analysis_types
|
|
sol_to_analysis = {
|
|
"SOL101": ["static"],
|
|
"SOL103": ["modal"],
|
|
"SOL105": ["buckling"],
|
|
"SOL106": ["nonlinear"],
|
|
"SOL111": ["modal", "frequency_response"],
|
|
"SOL112": ["modal", "transient"],
|
|
}
|
|
config["simulation"]["analysis_types"] = sol_to_analysis.get(solver_type, ["static"])
|
|
|
|
# Design Variables
|
|
if intent.get("design_variables"):
|
|
config["design_variables"] = []
|
|
for dv in intent["design_variables"]:
|
|
config["design_variables"].append(
|
|
{
|
|
"parameter": dv.get("name", dv.get("expression_name", "")),
|
|
"bounds": [dv.get("min", 0), dv.get("max", 100)],
|
|
"description": dv.get("description", f"Design variable: {dv.get('name', '')}"),
|
|
}
|
|
)
|
|
|
|
# Extractors → used for objectives/constraints extraction
|
|
extractor_map = {}
|
|
if intent.get("extractors"):
|
|
for ext in intent["extractors"]:
|
|
ext_id = ext.get("id", "")
|
|
ext_name = ext.get("name", "")
|
|
extractor_map[ext_name] = ext
|
|
|
|
# Objectives
|
|
if intent.get("objectives"):
|
|
config["objectives"] = []
|
|
for obj in intent["objectives"]:
|
|
obj_config = {
|
|
"name": obj.get("name", "objective"),
|
|
"goal": obj.get("direction", "minimize"),
|
|
"weight": obj.get("weight", 1.0),
|
|
"description": obj.get("description", f"Objective: {obj.get('name', '')}"),
|
|
}
|
|
# Add extraction config if extractor referenced
|
|
extractor_name = obj.get("extractor")
|
|
if extractor_name and extractor_name in extractor_map:
|
|
ext = extractor_map[extractor_name]
|
|
ext_config = ext.get("config", {})
|
|
obj_config["extraction"] = {
|
|
"action": _extractor_id_to_action(ext.get("id", "")),
|
|
"domain": "result_extraction",
|
|
"params": ext_config,
|
|
}
|
|
config["objectives"].append(obj_config)
|
|
|
|
# Constraints
|
|
if intent.get("constraints"):
|
|
config["constraints"] = []
|
|
for con in intent["constraints"]:
|
|
op = con.get("operator", "<=")
|
|
con_type = "less_than" if "<" in op else "greater_than" if ">" in op else "equal_to"
|
|
con_config = {
|
|
"name": con.get("name", "constraint"),
|
|
"type": con_type,
|
|
"threshold": con.get("value", 0),
|
|
"description": con.get("description", f"Constraint: {con.get('name', '')}"),
|
|
}
|
|
# Add extraction config if extractor referenced
|
|
extractor_name = con.get("extractor")
|
|
if extractor_name and extractor_name in extractor_map:
|
|
ext = extractor_map[extractor_name]
|
|
ext_config = ext.get("config", {})
|
|
con_config["extraction"] = {
|
|
"action": _extractor_id_to_action(ext.get("id", "")),
|
|
"domain": "result_extraction",
|
|
"params": ext_config,
|
|
}
|
|
config["constraints"].append(con_config)
|
|
|
|
# Optimization settings
|
|
if intent.get("optimization"):
|
|
opt = intent["optimization"]
|
|
if "optimization_settings" not in config:
|
|
config["optimization_settings"] = {}
|
|
if opt.get("max_trials"):
|
|
config["optimization_settings"]["n_trials"] = opt["max_trials"]
|
|
if opt.get("method"):
|
|
# Map method names to Optuna sampler names
|
|
method_map = {
|
|
"TPE": "TPESampler",
|
|
"CMA-ES": "CmaEsSampler",
|
|
"NSGA-II": "NSGAIISampler",
|
|
"RandomSearch": "RandomSampler",
|
|
"GP-BO": "GPSampler",
|
|
}
|
|
config["optimization_settings"]["sampler"] = method_map.get(
|
|
opt["method"], opt["method"]
|
|
)
|
|
|
|
# Surrogate
|
|
if intent.get("surrogate", {}).get("enabled"):
|
|
config["surrogate"] = {
|
|
"type": intent["surrogate"].get("type", "MLP"),
|
|
"min_trials": intent["surrogate"].get("min_trials", 20),
|
|
}
|
|
|
|
return config
|
|
|
|
|
|
def _extractor_id_to_action(ext_id: str) -> str:
|
|
"""Map extractor IDs (E1, E2, etc.) to extraction action names."""
|
|
action_map = {
|
|
"E1": "extract_displacement",
|
|
"E2": "extract_frequency",
|
|
"E3": "extract_stress",
|
|
"E4": "extract_mass",
|
|
"E5": "extract_mass",
|
|
"E8": "extract_zernike",
|
|
"E9": "extract_zernike",
|
|
"E10": "extract_zernike",
|
|
"displacement": "extract_displacement",
|
|
"frequency": "extract_frequency",
|
|
"stress": "extract_stress",
|
|
"mass": "extract_mass",
|
|
"mass_bdf": "extract_mass",
|
|
"mass_cad": "extract_mass",
|
|
"zernike": "extract_zernike",
|
|
"zernike_opd": "extract_zernike",
|
|
}
|
|
return action_map.get(ext_id, "extract_displacement")
|
|
|
|
|
|
@router.put("/studies/{study_id}/config")
|
|
async def update_study_config(study_id: str, request: UpdateConfigRequest):
|
|
"""
|
|
Update the optimization_config.json for a study
|
|
|
|
Accepts either:
|
|
- {"config": {...}} - Direct config object (overwrites)
|
|
- {"intent": {...}} - Canvas intent (converted and merged with existing)
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
request: New configuration data (config or intent)
|
|
|
|
Returns:
|
|
JSON with success status
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Check if optimization is running - don't allow config changes while running
|
|
if is_optimization_running(study_id):
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Cannot modify config while optimization is running. Stop the optimization first.",
|
|
)
|
|
|
|
# Find config file location
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
if not config_file.exists():
|
|
config_file = study_dir / "optimization_config.json"
|
|
|
|
if not config_file.exists():
|
|
raise HTTPException(
|
|
status_code=404, detail=f"Config file not found for study {study_id}"
|
|
)
|
|
|
|
# Backup existing config
|
|
backup_file = config_file.with_suffix(".json.backup")
|
|
shutil.copy(config_file, backup_file)
|
|
|
|
# Determine which format was provided
|
|
if request.config is not None:
|
|
# Direct config update
|
|
new_config = request.config
|
|
elif request.intent is not None:
|
|
# Convert intent to config, merging with existing
|
|
with open(config_file, "r") as f:
|
|
existing_config = json.load(f)
|
|
new_config = intent_to_config(request.intent, existing_config)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400, detail="Request must include either 'config' or 'intent' field"
|
|
)
|
|
|
|
# Write new config
|
|
with open(config_file, "w") as f:
|
|
json.dump(new_config, f, indent=2)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Configuration updated successfully",
|
|
"path": str(config_file),
|
|
"backup_path": str(backup_file),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to update config: {str(e)}")
|
|
|
|
|
|
# ============================================================================
|
|
# Zernike Analysis Endpoints
|
|
# ============================================================================
|
|
|
|
|
|
@router.get("/studies/{study_id}/zernike-available")
|
|
async def get_zernike_available_trials(study_id: str):
|
|
"""
|
|
Get list of trial numbers that have Zernike analysis available (OP2 files).
|
|
|
|
Returns:
|
|
JSON with list of trial numbers that have iteration folders with OP2 files
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study '{study_id}' not found")
|
|
|
|
iter_base = study_dir / "2_iterations"
|
|
if not iter_base.exists():
|
|
return {"study_id": study_id, "available_trials": [], "count": 0}
|
|
|
|
available_trials = []
|
|
for d in iter_base.iterdir():
|
|
if d.is_dir() and d.name.startswith("iter"):
|
|
# Check for OP2 file
|
|
op2_files = list(d.glob("*.op2"))
|
|
if op2_files:
|
|
iter_num_str = d.name.replace("iter", "")
|
|
try:
|
|
iter_num = int(iter_num_str)
|
|
# Map iter number to trial number (iter1 -> trial 0, etc.)
|
|
# But also keep iter_num as possibility
|
|
if iter_num != 9999:
|
|
available_trials.append(iter_num - 1) # 0-indexed trial
|
|
else:
|
|
available_trials.append(9999) # Special test iteration
|
|
except ValueError:
|
|
pass
|
|
|
|
available_trials.sort()
|
|
return {
|
|
"study_id": study_id,
|
|
"available_trials": available_trials,
|
|
"count": len(available_trials),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get available trials: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/trials/{trial_number}/zernike")
|
|
async def get_trial_zernike(study_id: str, trial_number: int):
|
|
"""
|
|
Generate or retrieve Zernike analysis HTML for a specific trial.
|
|
|
|
This endpoint generates interactive Zernike wavefront analysis for mirror
|
|
optimization trials. It produces 3D surface residual plots, RMS metrics,
|
|
and coefficient bar charts for each angle comparison (40_vs_20, 60_vs_20, 90_vs_20).
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
trial_number: Trial/iteration number
|
|
|
|
Returns:
|
|
JSON with HTML content for each comparison, or error if OP2 not found
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study '{study_id}' not found")
|
|
|
|
# Find iteration directory
|
|
# Trial numbers in Optuna DB may differ from iteration folder numbers
|
|
# Common patterns:
|
|
# 1. iter{trial_number} - direct mapping
|
|
# 2. iter{trial_number + 1} - 0-indexed trials vs 1-indexed folders
|
|
# 3. Check for actual folder existence
|
|
|
|
iter_dir = None
|
|
possible_iter_nums = [trial_number, trial_number + 1]
|
|
|
|
for iter_num in possible_iter_nums:
|
|
candidate = study_dir / "2_iterations" / f"iter{iter_num}"
|
|
if candidate.exists():
|
|
iter_dir = candidate
|
|
break
|
|
|
|
if iter_dir is None:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No FEA results for trial {trial_number}. This trial may have used surrogate model (NN) prediction instead of full FEA simulation. Zernike analysis requires OP2 results from actual FEA runs.",
|
|
)
|
|
|
|
# Check for OP2 file BEFORE doing expensive imports
|
|
op2_files = list(iter_dir.glob("*.op2"))
|
|
if not op2_files:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No OP2 results file found in {iter_dir.name}. FEA may not have completed.",
|
|
)
|
|
|
|
# Only import heavy dependencies after we know we have an OP2 file
|
|
sys.path.append(str(Path(__file__).parent.parent.parent.parent.parent))
|
|
from optimization_engine.extractors.extract_zernike_figure import ZernikeOPDExtractor
|
|
from optimization_engine.extractors import ZernikeExtractor
|
|
import numpy as np
|
|
from math import factorial
|
|
import plotly.graph_objects as go
|
|
from plotly.subplots import make_subplots
|
|
from matplotlib.tri import Triangulation
|
|
|
|
# Also find BDF/DAT geometry file for OPD extractor
|
|
bdf_files = list(iter_dir.glob("*.dat")) + list(iter_dir.glob("*.bdf"))
|
|
bdf_path = bdf_files[0] if bdf_files else None
|
|
|
|
# Configuration
|
|
N_MODES = 50
|
|
AMP = 0.5 # Reduced deformation scaling (0.5x)
|
|
PANCAKE = 3.0 # Z-axis range multiplier
|
|
PLOT_DOWNSAMPLE = 5000 # Reduced for faster loading
|
|
FILTER_LOW_ORDERS = 4
|
|
COLORSCALE = "Turbo" # Colorscale: 'RdBu_r', 'Viridis', 'Plasma', 'Turbo'
|
|
|
|
SUBCASE_MAP = {
|
|
"1": "90",
|
|
"2": "20",
|
|
"3": "40",
|
|
"4": "60",
|
|
}
|
|
REF_SUBCASE = "2"
|
|
|
|
def noll_indices(j: int):
|
|
if j < 1:
|
|
raise ValueError("Noll index j must be >= 1")
|
|
count = 0
|
|
n = 0
|
|
while True:
|
|
if n == 0:
|
|
ms = [0]
|
|
elif n % 2 == 0:
|
|
ms = [0] + [m for k in range(1, n // 2 + 1) for m in (-2 * k, 2 * k)]
|
|
else:
|
|
ms = [m for k in range(0, (n + 1) // 2) for m in (-(2 * k + 1), (2 * k + 1))]
|
|
for m in ms:
|
|
count += 1
|
|
if count == j:
|
|
return n, m
|
|
n += 1
|
|
|
|
def zernike_noll(j: int, r: np.ndarray, th: np.ndarray) -> np.ndarray:
|
|
n, m = noll_indices(j)
|
|
R = np.zeros_like(r)
|
|
for s in range((n - abs(m)) // 2 + 1):
|
|
c = (
|
|
(-1) ** s
|
|
* factorial(n - s)
|
|
/ (
|
|
factorial(s)
|
|
* factorial((n + abs(m)) // 2 - s)
|
|
* factorial((n - abs(m)) // 2 - s)
|
|
)
|
|
)
|
|
R += c * r ** (n - 2 * s)
|
|
if m == 0:
|
|
return R
|
|
return R * (np.cos(m * th) if m > 0 else np.sin(-m * th))
|
|
|
|
def zernike_common_name(n: int, m: int) -> str:
|
|
names = {
|
|
(0, 0): "Piston",
|
|
(1, -1): "Tilt X",
|
|
(1, 1): "Tilt Y",
|
|
(2, 0): "Defocus",
|
|
(2, -2): "Astig 45°",
|
|
(2, 2): "Astig 0°",
|
|
(3, -1): "Coma X",
|
|
(3, 1): "Coma Y",
|
|
(3, -3): "Trefoil X",
|
|
(3, 3): "Trefoil Y",
|
|
(4, 0): "Primary Spherical",
|
|
(4, -2): "Sec Astig X",
|
|
(4, 2): "Sec Astig Y",
|
|
(4, -4): "Quadrafoil X",
|
|
(4, 4): "Quadrafoil Y",
|
|
(5, -1): "Sec Coma X",
|
|
(5, 1): "Sec Coma Y",
|
|
(5, -3): "Sec Trefoil X",
|
|
(5, 3): "Sec Trefoil Y",
|
|
(5, -5): "Pentafoil X",
|
|
(5, 5): "Pentafoil Y",
|
|
(6, 0): "Sec Spherical",
|
|
}
|
|
return names.get((n, m), f"Z(n={n}, m={m})")
|
|
|
|
def zernike_label(j: int) -> str:
|
|
n, m = noll_indices(j)
|
|
return f"J{j:02d} - {zernike_common_name(n, m)}"
|
|
|
|
def compute_manufacturing_metrics(coefficients: np.ndarray) -> dict:
|
|
"""Compute manufacturing-related aberration metrics."""
|
|
return {
|
|
"defocus_nm": float(abs(coefficients[3])), # J4
|
|
"astigmatism_rms": float(
|
|
np.sqrt(coefficients[4] ** 2 + coefficients[5] ** 2)
|
|
), # J5+J6
|
|
"coma_rms": float(np.sqrt(coefficients[6] ** 2 + coefficients[7] ** 2)), # J7+J8
|
|
"trefoil_rms": float(
|
|
np.sqrt(coefficients[8] ** 2 + coefficients[9] ** 2)
|
|
), # J9+J10
|
|
"spherical_nm": float(abs(coefficients[10]))
|
|
if len(coefficients) > 10
|
|
else 0.0, # J11
|
|
}
|
|
|
|
def compute_rms_filter_j1to3(X, Y, W_nm, coefficients, R):
|
|
"""Compute RMS with J1-J3 filtered (keeping defocus for optician workload)."""
|
|
Xc = X - np.mean(X)
|
|
Yc = Y - np.mean(Y)
|
|
r = np.hypot(Xc / R, Yc / R)
|
|
th = np.arctan2(Yc, Xc)
|
|
Z_j1to3 = np.column_stack([zernike_noll(j, r, th) for j in range(1, 4)])
|
|
W_filter_j1to3 = W_nm - Z_j1to3 @ coefficients[:3]
|
|
return float(np.sqrt(np.mean(W_filter_j1to3**2)))
|
|
|
|
def generate_zernike_html(
|
|
title: str,
|
|
X: np.ndarray,
|
|
Y: np.ndarray,
|
|
W_nm: np.ndarray,
|
|
coefficients: np.ndarray,
|
|
rms_global: float,
|
|
rms_filtered: float,
|
|
ref_title: str = "20 deg",
|
|
abs_pair=None,
|
|
is_manufacturing: bool = False,
|
|
mfg_metrics: dict = None,
|
|
correction_metrics: dict = None,
|
|
) -> str:
|
|
"""Generate HTML string for Zernike visualization with full tables."""
|
|
# Compute residual surface (filtered)
|
|
Xc = X - np.mean(X)
|
|
Yc = Y - np.mean(Y)
|
|
R = float(np.max(np.hypot(Xc, Yc)))
|
|
r = np.hypot(Xc / R, Yc / R)
|
|
th = np.arctan2(Yc, Xc)
|
|
|
|
Z = np.column_stack([zernike_noll(j, r, th) for j in range(1, N_MODES + 1)])
|
|
W_res_filt = W_nm - Z[:, :FILTER_LOW_ORDERS].dot(coefficients[:FILTER_LOW_ORDERS])
|
|
|
|
# Compute J1-J3 filtered RMS (optician workload metric)
|
|
rms_filter_j1to3 = compute_rms_filter_j1to3(X, Y, W_nm, coefficients, R)
|
|
|
|
# Downsample for display
|
|
n = len(X)
|
|
if n > PLOT_DOWNSAMPLE:
|
|
rng = np.random.default_rng(42)
|
|
sel = rng.choice(n, size=PLOT_DOWNSAMPLE, replace=False)
|
|
Xp, Yp, Wp = X[sel], Y[sel], W_res_filt[sel]
|
|
else:
|
|
Xp, Yp, Wp = X, Y, W_res_filt
|
|
|
|
res_amp = AMP * Wp
|
|
max_amp = float(np.max(np.abs(res_amp))) if res_amp.size else 1.0
|
|
|
|
# Create smooth shaded SURFACE mesh with lighting
|
|
surface_trace = None
|
|
try:
|
|
tri = Triangulation(Xp, Yp)
|
|
if tri.triangles is not None and len(tri.triangles) > 0:
|
|
i_idx, j_idx, k_idx = tri.triangles.T
|
|
surface_trace = go.Mesh3d(
|
|
x=Xp.tolist(),
|
|
y=Yp.tolist(),
|
|
z=res_amp.tolist(),
|
|
i=i_idx.tolist(),
|
|
j=j_idx.tolist(),
|
|
k=k_idx.tolist(),
|
|
intensity=res_amp.tolist(),
|
|
colorscale=COLORSCALE,
|
|
opacity=1.0,
|
|
flatshading=False, # Smooth shading
|
|
lighting=dict(
|
|
ambient=0.4, diffuse=0.8, specular=0.3, roughness=0.5, fresnel=0.2
|
|
),
|
|
lightposition=dict(x=100, y=200, z=300),
|
|
showscale=True,
|
|
colorbar=dict(
|
|
title=dict(text="Residual (nm)", side="right"),
|
|
thickness=15,
|
|
len=0.6,
|
|
tickformat=".1f",
|
|
),
|
|
hovertemplate="X: %{x:.1f}<br>Y: %{y:.1f}<br>Residual: %{z:.2f} nm<extra></extra>",
|
|
)
|
|
except Exception as e:
|
|
print(f"Triangulation failed: {e}")
|
|
|
|
labels = [zernike_label(j) for j in range(1, N_MODES + 1)]
|
|
coeff_abs = np.abs(coefficients)
|
|
mfg = compute_manufacturing_metrics(coefficients)
|
|
|
|
# Determine layout based on whether this is manufacturing (90 deg) view
|
|
if is_manufacturing and mfg_metrics and correction_metrics:
|
|
# Manufacturing view: 5 rows
|
|
fig = make_subplots(
|
|
rows=5,
|
|
cols=1,
|
|
specs=[
|
|
[{"type": "scene"}],
|
|
[{"type": "table"}],
|
|
[{"type": "table"}],
|
|
[{"type": "table"}],
|
|
[{"type": "xy"}],
|
|
],
|
|
row_heights=[0.35, 0.10, 0.15, 0.15, 0.25],
|
|
vertical_spacing=0.025,
|
|
subplot_titles=[
|
|
f"<b>Surface Residual (relative to {ref_title})</b>",
|
|
"<b>RMS Metrics</b>",
|
|
"<b>Mode Magnitudes (Absolute 90 deg)</b>",
|
|
"<b>Pre-Correction (90 deg - 20 deg)</b>",
|
|
f"<b>Zernike Coefficients ({N_MODES} modes)</b>",
|
|
],
|
|
)
|
|
else:
|
|
# Standard relative view: 4 rows with full coefficient table
|
|
fig = make_subplots(
|
|
rows=4,
|
|
cols=1,
|
|
specs=[
|
|
[{"type": "scene"}],
|
|
[{"type": "table"}],
|
|
[{"type": "table"}],
|
|
[{"type": "xy"}],
|
|
],
|
|
row_heights=[0.40, 0.12, 0.28, 0.20],
|
|
vertical_spacing=0.03,
|
|
subplot_titles=[
|
|
f"<b>Surface Residual (relative to {ref_title})</b>",
|
|
"<b>RMS Metrics</b>",
|
|
f"<b>Zernike Coefficients ({N_MODES} modes)</b>",
|
|
"<b>Top 20 |Zernike Coefficients| (nm)</b>",
|
|
],
|
|
)
|
|
|
|
# Add surface mesh (or fallback to scatter)
|
|
if surface_trace is not None:
|
|
fig.add_trace(surface_trace, row=1, col=1)
|
|
else:
|
|
# Fallback to scatter if triangulation failed
|
|
fig.add_trace(
|
|
go.Scatter3d(
|
|
x=Xp.tolist(),
|
|
y=Yp.tolist(),
|
|
z=res_amp.tolist(),
|
|
mode="markers",
|
|
marker=dict(
|
|
size=2, color=res_amp.tolist(), colorscale=COLORSCALE, showscale=True
|
|
),
|
|
showlegend=False,
|
|
),
|
|
row=1,
|
|
col=1,
|
|
)
|
|
|
|
fig.update_scenes(
|
|
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
|
|
xaxis=dict(
|
|
title="X (mm)",
|
|
showgrid=True,
|
|
gridcolor="rgba(128,128,128,0.3)",
|
|
showbackground=True,
|
|
backgroundcolor="rgba(240,240,240,0.9)",
|
|
),
|
|
yaxis=dict(
|
|
title="Y (mm)",
|
|
showgrid=True,
|
|
gridcolor="rgba(128,128,128,0.3)",
|
|
showbackground=True,
|
|
backgroundcolor="rgba(240,240,240,0.9)",
|
|
),
|
|
zaxis=dict(
|
|
title="Residual (nm)",
|
|
range=[-max_amp * PANCAKE, max_amp * PANCAKE],
|
|
showgrid=True,
|
|
gridcolor="rgba(128,128,128,0.3)",
|
|
showbackground=True,
|
|
backgroundcolor="rgba(230,230,250,0.9)",
|
|
),
|
|
aspectmode="manual",
|
|
aspectratio=dict(x=1, y=1, z=0.4),
|
|
)
|
|
|
|
# Row 2: RMS table with all metrics
|
|
if abs_pair is not None:
|
|
abs_global, abs_filtered = abs_pair
|
|
fig.add_trace(
|
|
go.Table(
|
|
header=dict(
|
|
values=[
|
|
"<b>Metric</b>",
|
|
"<b>Relative (nm)</b>",
|
|
"<b>Absolute (nm)</b>",
|
|
],
|
|
align="left",
|
|
fill_color="rgb(55, 83, 109)",
|
|
font=dict(color="white", size=12),
|
|
),
|
|
cells=dict(
|
|
values=[
|
|
[
|
|
"Global RMS",
|
|
"Filtered RMS (J1-J4)",
|
|
"Filtered RMS (J1-J3, w/ defocus)",
|
|
],
|
|
[
|
|
f"{rms_global:.2f}",
|
|
f"{rms_filtered:.2f}",
|
|
f"{rms_filter_j1to3:.2f}",
|
|
],
|
|
[f"{abs_global:.2f}", f"{abs_filtered:.2f}", "-"],
|
|
],
|
|
align="left",
|
|
fill_color="rgb(243, 243, 243)",
|
|
),
|
|
),
|
|
row=2,
|
|
col=1,
|
|
)
|
|
else:
|
|
fig.add_trace(
|
|
go.Table(
|
|
header=dict(
|
|
values=["<b>Metric</b>", "<b>Value (nm)</b>"],
|
|
align="left",
|
|
fill_color="rgb(55, 83, 109)",
|
|
font=dict(color="white", size=12),
|
|
),
|
|
cells=dict(
|
|
values=[
|
|
[
|
|
"Global RMS",
|
|
"Filtered RMS (J1-J4)",
|
|
"Filtered RMS (J1-J3, w/ defocus)",
|
|
],
|
|
[
|
|
f"{rms_global:.2f}",
|
|
f"{rms_filtered:.2f}",
|
|
f"{rms_filter_j1to3:.2f}",
|
|
],
|
|
],
|
|
align="left",
|
|
fill_color="rgb(243, 243, 243)",
|
|
),
|
|
),
|
|
row=2,
|
|
col=1,
|
|
)
|
|
|
|
if is_manufacturing and mfg_metrics and correction_metrics:
|
|
# Row 3: Mode magnitudes at 90 deg (absolute)
|
|
fig.add_trace(
|
|
go.Table(
|
|
header=dict(
|
|
values=["<b>Mode</b>", "<b>Value (nm)</b>"],
|
|
align="left",
|
|
fill_color="rgb(55, 83, 109)",
|
|
font=dict(color="white", size=11),
|
|
),
|
|
cells=dict(
|
|
values=[
|
|
[
|
|
"Defocus (J4)",
|
|
"Astigmatism (J5+J6)",
|
|
"Coma (J7+J8)",
|
|
"Trefoil (J9+J10)",
|
|
"Spherical (J11)",
|
|
],
|
|
[
|
|
f"{mfg_metrics['defocus_nm']:.2f}",
|
|
f"{mfg_metrics['astigmatism_rms']:.2f}",
|
|
f"{mfg_metrics['coma_rms']:.2f}",
|
|
f"{mfg_metrics['trefoil_rms']:.2f}",
|
|
f"{mfg_metrics['spherical_nm']:.2f}",
|
|
],
|
|
],
|
|
align="left",
|
|
fill_color="rgb(243, 243, 243)",
|
|
),
|
|
),
|
|
row=3,
|
|
col=1,
|
|
)
|
|
|
|
# Row 4: Pre-correction (90 deg - 20 deg)
|
|
fig.add_trace(
|
|
go.Table(
|
|
header=dict(
|
|
values=["<b>Correction Mode</b>", "<b>Value (nm)</b>"],
|
|
align="left",
|
|
fill_color="rgb(55, 83, 109)",
|
|
font=dict(color="white", size=11),
|
|
),
|
|
cells=dict(
|
|
values=[
|
|
[
|
|
"Total RMS (J1-J3 filter)",
|
|
"Defocus (J4)",
|
|
"Astigmatism (J5+J6)",
|
|
"Coma (J7+J8)",
|
|
],
|
|
[
|
|
f"{correction_metrics.get('rms_filter_j1to3', 0):.2f}",
|
|
f"{correction_metrics['defocus_nm']:.2f}",
|
|
f"{correction_metrics['astigmatism_rms']:.2f}",
|
|
f"{correction_metrics['coma_rms']:.2f}",
|
|
],
|
|
],
|
|
align="left",
|
|
fill_color="rgb(243, 243, 243)",
|
|
),
|
|
),
|
|
row=4,
|
|
col=1,
|
|
)
|
|
|
|
# Row 5: Bar chart
|
|
sorted_idx = np.argsort(coeff_abs)[::-1][:20]
|
|
fig.add_trace(
|
|
go.Bar(
|
|
x=[float(coeff_abs[i]) for i in sorted_idx],
|
|
y=[labels[i] for i in sorted_idx],
|
|
orientation="h",
|
|
marker_color="rgb(55, 83, 109)",
|
|
hovertemplate="%{y}<br>|Coeff| = %{x:.3f} nm<extra></extra>",
|
|
showlegend=False,
|
|
),
|
|
row=5,
|
|
col=1,
|
|
)
|
|
else:
|
|
# Row 3: Full coefficient table
|
|
fig.add_trace(
|
|
go.Table(
|
|
header=dict(
|
|
values=[
|
|
"<b>Noll j</b>",
|
|
"<b>Mode Name</b>",
|
|
"<b>Coeff (nm)</b>",
|
|
"<b>|Coeff| (nm)</b>",
|
|
],
|
|
align="left",
|
|
fill_color="rgb(55, 83, 109)",
|
|
font=dict(color="white", size=11),
|
|
),
|
|
cells=dict(
|
|
values=[
|
|
list(range(1, N_MODES + 1)),
|
|
labels,
|
|
[f"{c:+.3f}" for c in coefficients],
|
|
[f"{abs(c):.3f}" for c in coefficients],
|
|
],
|
|
align="left",
|
|
fill_color="rgb(243, 243, 243)",
|
|
font=dict(size=10),
|
|
height=22,
|
|
),
|
|
),
|
|
row=3,
|
|
col=1,
|
|
)
|
|
|
|
# Row 4: Bar chart - top 20 modes by magnitude
|
|
sorted_idx = np.argsort(coeff_abs)[::-1][:20]
|
|
fig.add_trace(
|
|
go.Bar(
|
|
x=[float(coeff_abs[i]) for i in sorted_idx],
|
|
y=[labels[i] for i in sorted_idx],
|
|
orientation="h",
|
|
marker_color="rgb(55, 83, 109)",
|
|
hovertemplate="%{y}<br>|Coeff| = %{x:.3f} nm<extra></extra>",
|
|
showlegend=False,
|
|
),
|
|
row=4,
|
|
col=1,
|
|
)
|
|
|
|
fig.update_layout(
|
|
width=1400,
|
|
height=1800 if is_manufacturing else 1600,
|
|
margin=dict(t=80, b=20, l=20, r=20),
|
|
title=dict(text=f"<b>{title}</b>", font=dict(size=20), x=0.5),
|
|
paper_bgcolor="white",
|
|
plot_bgcolor="white",
|
|
)
|
|
|
|
return fig.to_html(include_plotlyjs="cdn", full_html=True)
|
|
|
|
# =====================================================================
|
|
# NEW: Use OPD method (accounts for lateral X,Y displacement)
|
|
# =====================================================================
|
|
op2_path = op2_files[0]
|
|
|
|
# Try OPD extractor first (more accurate), fall back to Standard if no BDF
|
|
use_opd = bdf_path is not None
|
|
if use_opd:
|
|
try:
|
|
opd_extractor = ZernikeOPDExtractor(
|
|
str(op2_path),
|
|
bdf_path=str(bdf_path),
|
|
n_modes=N_MODES,
|
|
filter_orders=FILTER_LOW_ORDERS,
|
|
)
|
|
except Exception as e:
|
|
print(f"OPD extractor failed, falling back to Standard: {e}")
|
|
use_opd = False
|
|
|
|
# Also create Standard extractor for comparison
|
|
std_extractor = ZernikeExtractor(str(op2_path), displacement_unit="mm", n_modes=N_MODES)
|
|
|
|
def generate_dual_method_html(
|
|
title: str, target_sc: str, ref_sc: str, is_manufacturing: bool = False
|
|
) -> tuple:
|
|
"""Generate HTML with OPD method and displacement component views.
|
|
|
|
Returns: (html_content, rms_global_opd, rms_filtered_opd, lateral_stats)
|
|
"""
|
|
target_angle = SUBCASE_MAP.get(target_sc, target_sc)
|
|
ref_angle = SUBCASE_MAP.get(ref_sc, ref_sc)
|
|
|
|
# Extract using OPD method (primary)
|
|
if use_opd:
|
|
opd_rel = opd_extractor.extract_relative(target_sc, ref_sc)
|
|
opd_abs = opd_extractor.extract_subcase(target_sc)
|
|
else:
|
|
opd_rel = None
|
|
opd_abs = None
|
|
|
|
# Extract using Standard method (for comparison)
|
|
std_rel = std_extractor.extract_relative(target_sc, ref_sc, include_coefficients=True)
|
|
std_abs = std_extractor.extract_subcase(target_sc, include_coefficients=True)
|
|
|
|
# Get OPD data with full arrays for visualization
|
|
if use_opd:
|
|
opd_data = opd_extractor._build_figure_opd_data(target_sc)
|
|
opd_ref_data = opd_extractor._build_figure_opd_data(ref_sc)
|
|
|
|
# Build relative displacement arrays (node-by-node)
|
|
ref_map = {int(nid): i for i, nid in enumerate(opd_ref_data["node_ids"])}
|
|
X_list, Y_list, WFE_list = [], [], []
|
|
dx_list, dy_list, dz_list = [], [], []
|
|
|
|
for i, nid in enumerate(opd_data["node_ids"]):
|
|
nid = int(nid)
|
|
if nid not in ref_map:
|
|
continue
|
|
ref_idx = ref_map[nid]
|
|
|
|
# Use deformed coordinates from OPD
|
|
X_list.append(opd_data["x_deformed"][i])
|
|
Y_list.append(opd_data["y_deformed"][i])
|
|
WFE_list.append(opd_data["wfe_nm"][i] - opd_ref_data["wfe_nm"][ref_idx])
|
|
|
|
# Relative displacements (target - reference)
|
|
dx_list.append(opd_data["dx"][i] - opd_ref_data["dx"][ref_idx])
|
|
dy_list.append(opd_data["dy"][i] - opd_ref_data["dy"][ref_idx])
|
|
dz_list.append(opd_data["dz"][i] - opd_ref_data["dz"][ref_idx])
|
|
|
|
X = np.array(X_list)
|
|
Y = np.array(Y_list)
|
|
W = np.array(WFE_list)
|
|
dx = np.array(dx_list) * 1000.0 # mm to µm
|
|
dy = np.array(dy_list) * 1000.0
|
|
dz = np.array(dz_list) * 1000.0
|
|
|
|
# Lateral displacement magnitude
|
|
lateral_um = np.sqrt(dx**2 + dy**2)
|
|
max_lateral = float(np.max(np.abs(lateral_um)))
|
|
rms_lateral = float(np.sqrt(np.mean(lateral_um**2)))
|
|
|
|
rms_global_opd = opd_rel["relative_global_rms_nm"]
|
|
rms_filtered_opd = opd_rel["relative_filtered_rms_nm"]
|
|
coefficients = np.array(opd_rel.get("delta_coefficients", std_rel["coefficients"]))
|
|
else:
|
|
# Fallback to Standard method arrays
|
|
target_disp = std_extractor.displacements[target_sc]
|
|
ref_disp = std_extractor.displacements[ref_sc]
|
|
ref_map = {int(nid): i for i, nid in enumerate(ref_disp["node_ids"])}
|
|
|
|
X_list, Y_list, W_list = [], [], []
|
|
dx_list, dy_list, dz_list = [], [], []
|
|
|
|
for i, nid in enumerate(target_disp["node_ids"]):
|
|
nid = int(nid)
|
|
if nid not in ref_map:
|
|
continue
|
|
geo = std_extractor.node_geometry.get(nid)
|
|
if geo is None:
|
|
continue
|
|
ref_idx = ref_map[nid]
|
|
|
|
X_list.append(geo[0])
|
|
Y_list.append(geo[1])
|
|
|
|
target_wfe = target_disp["disp"][i, 2] * std_extractor.wfe_factor
|
|
ref_wfe = ref_disp["disp"][ref_idx, 2] * std_extractor.wfe_factor
|
|
W_list.append(target_wfe - ref_wfe)
|
|
|
|
# Relative displacements (mm to µm)
|
|
dx_list.append(
|
|
(target_disp["disp"][i, 0] - ref_disp["disp"][ref_idx, 0]) * 1000.0
|
|
)
|
|
dy_list.append(
|
|
(target_disp["disp"][i, 1] - ref_disp["disp"][ref_idx, 1]) * 1000.0
|
|
)
|
|
dz_list.append(
|
|
(target_disp["disp"][i, 2] - ref_disp["disp"][ref_idx, 2]) * 1000.0
|
|
)
|
|
|
|
X = np.array(X_list)
|
|
Y = np.array(Y_list)
|
|
W = np.array(W_list)
|
|
dx = np.array(dx_list)
|
|
dy = np.array(dy_list)
|
|
dz = np.array(dz_list)
|
|
|
|
lateral_um = np.sqrt(dx**2 + dy**2)
|
|
max_lateral = float(np.max(np.abs(lateral_um)))
|
|
rms_lateral = float(np.sqrt(np.mean(lateral_um**2)))
|
|
|
|
rms_global_opd = std_rel["relative_global_rms_nm"]
|
|
rms_filtered_opd = std_rel["relative_filtered_rms_nm"]
|
|
coefficients = np.array(std_rel["coefficients"])
|
|
|
|
# Standard method RMS values
|
|
rms_global_std = std_rel["relative_global_rms_nm"]
|
|
rms_filtered_std = std_rel["relative_filtered_rms_nm"]
|
|
|
|
# Compute residual surface
|
|
Xc = X - np.mean(X)
|
|
Yc = Y - np.mean(Y)
|
|
R = float(np.max(np.hypot(Xc, Yc)))
|
|
r = np.hypot(Xc / R, Yc / R)
|
|
th = np.arctan2(Yc, Xc)
|
|
|
|
Z_basis = np.column_stack([zernike_noll(j, r, th) for j in range(1, N_MODES + 1)])
|
|
W_res_filt = W - Z_basis[:, :FILTER_LOW_ORDERS].dot(coefficients[:FILTER_LOW_ORDERS])
|
|
|
|
# Downsample for display
|
|
n = len(X)
|
|
if n > PLOT_DOWNSAMPLE:
|
|
rng = np.random.default_rng(42)
|
|
sel = rng.choice(n, size=PLOT_DOWNSAMPLE, replace=False)
|
|
Xp, Yp = X[sel], Y[sel]
|
|
Wp = W_res_filt[sel]
|
|
dxp, dyp, dzp = dx[sel], dy[sel], dz[sel]
|
|
else:
|
|
Xp, Yp, Wp = X, Y, W_res_filt
|
|
dxp, dyp, dzp = dx, dy, dz
|
|
|
|
res_amp = AMP * Wp
|
|
max_amp = float(np.max(np.abs(res_amp))) if res_amp.size else 1.0
|
|
|
|
# Helper to build mesh trace
|
|
def build_mesh_trace(Zp, colorscale, colorbar_title, unit):
|
|
try:
|
|
tri = Triangulation(Xp, Yp)
|
|
if tri.triangles is not None and len(tri.triangles) > 0:
|
|
i_idx, j_idx, k_idx = tri.triangles.T
|
|
return go.Mesh3d(
|
|
x=Xp.tolist(),
|
|
y=Yp.tolist(),
|
|
z=Zp.tolist(),
|
|
i=i_idx.tolist(),
|
|
j=j_idx.tolist(),
|
|
k=k_idx.tolist(),
|
|
intensity=Zp.tolist(),
|
|
colorscale=colorscale,
|
|
opacity=1.0,
|
|
flatshading=False,
|
|
lighting=dict(
|
|
ambient=0.4, diffuse=0.8, specular=0.3, roughness=0.5, fresnel=0.2
|
|
),
|
|
lightposition=dict(x=100, y=200, z=300),
|
|
showscale=True,
|
|
colorbar=dict(
|
|
title=dict(text=colorbar_title, side="right"), thickness=15, len=0.5
|
|
),
|
|
hovertemplate=f"X: %{{x:.1f}}<br>Y: %{{y:.1f}}<br>{unit}: %{{z:.3f}}<extra></extra>",
|
|
)
|
|
except Exception:
|
|
pass
|
|
return go.Scatter3d(
|
|
x=Xp.tolist(),
|
|
y=Yp.tolist(),
|
|
z=Zp.tolist(),
|
|
mode="markers",
|
|
marker=dict(size=2, color=Zp.tolist(), colorscale=colorscale, showscale=True),
|
|
)
|
|
|
|
# Build traces for each view
|
|
trace_wfe = build_mesh_trace(res_amp, COLORSCALE, "WFE (nm)", "WFE nm")
|
|
trace_dx = build_mesh_trace(dxp, "RdBu_r", "ΔX (µm)", "ΔX µm")
|
|
trace_dy = build_mesh_trace(dyp, "RdBu_r", "ΔY (µm)", "ΔY µm")
|
|
trace_dz = build_mesh_trace(dzp, "RdBu_r", "ΔZ (µm)", "ΔZ µm")
|
|
|
|
# Create figure with dropdown to switch views
|
|
fig = go.Figure()
|
|
|
|
# Add all traces (only WFE visible initially)
|
|
trace_wfe.visible = True
|
|
trace_dx.visible = False
|
|
trace_dy.visible = False
|
|
trace_dz.visible = False
|
|
|
|
fig.add_trace(trace_wfe)
|
|
fig.add_trace(trace_dx)
|
|
fig.add_trace(trace_dy)
|
|
fig.add_trace(trace_dz)
|
|
|
|
# Dropdown menu for view selection
|
|
fig.update_layout(
|
|
updatemenus=[
|
|
dict(
|
|
type="buttons",
|
|
direction="right",
|
|
x=0.0,
|
|
y=1.12,
|
|
xanchor="left",
|
|
showactive=True,
|
|
buttons=[
|
|
dict(
|
|
label="WFE (nm)",
|
|
method="update",
|
|
args=[{"visible": [True, False, False, False]}],
|
|
),
|
|
dict(
|
|
label="ΔX (µm)",
|
|
method="update",
|
|
args=[{"visible": [False, True, False, False]}],
|
|
),
|
|
dict(
|
|
label="ΔY (µm)",
|
|
method="update",
|
|
args=[{"visible": [False, False, True, False]}],
|
|
),
|
|
dict(
|
|
label="ΔZ (µm)",
|
|
method="update",
|
|
args=[{"visible": [False, False, False, True]}],
|
|
),
|
|
],
|
|
font=dict(size=12),
|
|
pad=dict(r=10, t=10),
|
|
)
|
|
]
|
|
)
|
|
|
|
# Compute method difference
|
|
pct_diff = (
|
|
100.0 * (rms_filtered_opd - rms_filtered_std) / rms_filtered_std
|
|
if rms_filtered_std > 0
|
|
else 0.0
|
|
)
|
|
|
|
# Annotations for metrics
|
|
method_label = "OPD (X,Y,Z)" if use_opd else "Standard (Z-only)"
|
|
annotations_text = f"""
|
|
<b>Method:</b> {method_label} {"← More Accurate" if use_opd else "(BDF not found)"}
|
|
<b>RMS Metrics (Filtered J1-J4):</b>
|
|
• OPD: {rms_filtered_opd:.2f} nm
|
|
• Standard: {rms_filtered_std:.2f} nm
|
|
• Δ: {pct_diff:+.1f}%
|
|
<b>Lateral Displacement:</b>
|
|
• Max: {max_lateral:.3f} µm
|
|
• RMS: {rms_lateral:.3f} µm
|
|
<b>Displacement RMS:</b>
|
|
• ΔX: {float(np.sqrt(np.mean(dx**2))):.3f} µm
|
|
• ΔY: {float(np.sqrt(np.mean(dy**2))):.3f} µm
|
|
• ΔZ: {float(np.sqrt(np.mean(dz**2))):.3f} µm
|
|
"""
|
|
|
|
# Z-axis range for different views
|
|
max_disp = max(
|
|
float(np.max(np.abs(dxp))),
|
|
float(np.max(np.abs(dyp))),
|
|
float(np.max(np.abs(dzp))),
|
|
0.1,
|
|
)
|
|
|
|
fig.update_layout(
|
|
scene=dict(
|
|
camera=dict(eye=dict(x=1.2, y=1.2, z=0.8), up=dict(x=0, y=0, z=1)),
|
|
xaxis=dict(
|
|
title="X (mm)",
|
|
showgrid=True,
|
|
gridcolor="rgba(128,128,128,0.3)",
|
|
showbackground=True,
|
|
backgroundcolor="rgba(240,240,240,0.9)",
|
|
),
|
|
yaxis=dict(
|
|
title="Y (mm)",
|
|
showgrid=True,
|
|
gridcolor="rgba(128,128,128,0.3)",
|
|
showbackground=True,
|
|
backgroundcolor="rgba(240,240,240,0.9)",
|
|
),
|
|
zaxis=dict(
|
|
title="Value",
|
|
showgrid=True,
|
|
gridcolor="rgba(128,128,128,0.3)",
|
|
showbackground=True,
|
|
backgroundcolor="rgba(230,230,250,0.9)",
|
|
),
|
|
aspectmode="manual",
|
|
aspectratio=dict(x=1, y=1, z=0.4),
|
|
),
|
|
width=1400,
|
|
height=900,
|
|
margin=dict(t=120, b=20, l=20, r=20),
|
|
title=dict(
|
|
text=f"<b>{title}</b><br><sup>Click buttons to switch: WFE, ΔX, ΔY, ΔZ</sup>",
|
|
font=dict(size=18),
|
|
x=0.5,
|
|
),
|
|
paper_bgcolor="white",
|
|
plot_bgcolor="white",
|
|
annotations=[
|
|
dict(
|
|
text=annotations_text.replace("\n", "<br>"),
|
|
xref="paper",
|
|
yref="paper",
|
|
x=1.02,
|
|
y=0.98,
|
|
xanchor="left",
|
|
yanchor="top",
|
|
showarrow=False,
|
|
font=dict(family="monospace", size=11),
|
|
align="left",
|
|
bgcolor="rgba(255,255,255,0.9)",
|
|
bordercolor="rgba(0,0,0,0.3)",
|
|
borderwidth=1,
|
|
borderpad=8,
|
|
),
|
|
dict(
|
|
text="<b>View:</b>",
|
|
xref="paper",
|
|
yref="paper",
|
|
x=0.0,
|
|
y=1.15,
|
|
xanchor="left",
|
|
yanchor="top",
|
|
showarrow=False,
|
|
font=dict(size=12),
|
|
),
|
|
],
|
|
)
|
|
|
|
html_content = fig.to_html(include_plotlyjs="cdn", full_html=True)
|
|
|
|
return (
|
|
html_content,
|
|
rms_global_opd,
|
|
rms_filtered_opd,
|
|
{
|
|
"max_lateral_um": max_lateral,
|
|
"rms_lateral_um": rms_lateral,
|
|
"method": "opd" if use_opd else "standard",
|
|
"rms_std": rms_filtered_std,
|
|
"pct_diff": pct_diff,
|
|
},
|
|
)
|
|
|
|
# Generate results for each comparison
|
|
results = {}
|
|
comparisons = [
|
|
("3", "2", "40_vs_20", "40 deg vs 20 deg"),
|
|
("4", "2", "60_vs_20", "60 deg vs 20 deg"),
|
|
("1", "2", "90_vs_20", "90 deg vs 20 deg (manufacturing)"),
|
|
]
|
|
|
|
for target_sc, ref_sc, key, title_suffix in comparisons:
|
|
if target_sc not in std_extractor.displacements:
|
|
continue
|
|
|
|
target_angle = SUBCASE_MAP.get(target_sc, target_sc)
|
|
ref_angle = SUBCASE_MAP.get(ref_sc, ref_sc)
|
|
is_mfg = key == "90_vs_20"
|
|
|
|
html_content, rms_global, rms_filtered, lateral_stats = generate_dual_method_html(
|
|
title=f"iter{trial_number}: {target_angle}° vs {ref_angle}°",
|
|
target_sc=target_sc,
|
|
ref_sc=ref_sc,
|
|
is_manufacturing=is_mfg,
|
|
)
|
|
|
|
results[key] = {
|
|
"html": html_content,
|
|
"rms_global": rms_global,
|
|
"rms_filtered": rms_filtered,
|
|
"title": f"{target_angle}° vs {ref_angle}°",
|
|
"method": lateral_stats["method"],
|
|
"rms_std": lateral_stats["rms_std"],
|
|
"pct_diff": lateral_stats["pct_diff"],
|
|
"max_lateral_um": lateral_stats["max_lateral_um"],
|
|
"rms_lateral_um": lateral_stats["rms_lateral_um"],
|
|
}
|
|
|
|
if not results:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to generate Zernike analysis. Check if subcases are available.",
|
|
)
|
|
|
|
return {
|
|
"study_id": study_id,
|
|
"trial_number": trial_number,
|
|
"comparisons": results,
|
|
"available_comparisons": list(results.keys()),
|
|
"method": "opd" if use_opd else "standard",
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Failed to generate Zernike analysis: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/studies/{study_id}/export/{format}")
|
|
async def export_study_data(study_id: str, format: str):
|
|
"""Export study data in various formats: csv, json, excel"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study '{study_id}' not found")
|
|
|
|
results_dir = get_results_dir(study_dir)
|
|
db_path = results_dir / "study.db"
|
|
|
|
if not db_path.exists():
|
|
raise HTTPException(status_code=404, detail="No study data available")
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Get all completed trials with their params and values
|
|
cursor.execute("""
|
|
SELECT t.trial_id, t.number, tv.value as objective
|
|
FROM trials t
|
|
JOIN trial_values tv ON t.trial_id = tv.trial_id
|
|
WHERE t.state = 'COMPLETE'
|
|
ORDER BY t.number
|
|
""")
|
|
trials_data = []
|
|
|
|
for row in cursor.fetchall():
|
|
trial_id = row["trial_id"]
|
|
|
|
# Get params
|
|
cursor.execute(
|
|
"""
|
|
SELECT param_name, param_value
|
|
FROM trial_params
|
|
WHERE trial_id = ?
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
params = {r["param_name"]: r["param_value"] for r in cursor.fetchall()}
|
|
|
|
# Get user attrs
|
|
cursor.execute(
|
|
"""
|
|
SELECT key, value_json
|
|
FROM trial_user_attributes
|
|
WHERE trial_id = ?
|
|
""",
|
|
(trial_id,),
|
|
)
|
|
user_attrs = {}
|
|
for r in cursor.fetchall():
|
|
try:
|
|
user_attrs[r["key"]] = json.loads(r["value_json"])
|
|
except:
|
|
user_attrs[r["key"]] = r["value_json"]
|
|
|
|
trials_data.append(
|
|
{
|
|
"trial_number": row["number"],
|
|
"objective": row["objective"],
|
|
"params": params,
|
|
"user_attrs": user_attrs,
|
|
}
|
|
)
|
|
|
|
conn.close()
|
|
|
|
if format.lower() == "json":
|
|
return JSONResponse(
|
|
content={
|
|
"study_id": study_id,
|
|
"total_trials": len(trials_data),
|
|
"trials": trials_data,
|
|
}
|
|
)
|
|
|
|
elif format.lower() == "csv":
|
|
import io
|
|
import csv
|
|
|
|
if not trials_data:
|
|
return JSONResponse(content={"error": "No data to export"})
|
|
|
|
# Build CSV
|
|
output = io.StringIO()
|
|
|
|
# Get all param names
|
|
param_names = sorted(
|
|
set(key for trial in trials_data for key in trial["params"].keys())
|
|
)
|
|
|
|
fieldnames = ["trial_number", "objective"] + param_names
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for trial in trials_data:
|
|
row_data = {"trial_number": trial["trial_number"], "objective": trial["objective"]}
|
|
row_data.update(trial["params"])
|
|
writer.writerow(row_data)
|
|
|
|
csv_content = output.getvalue()
|
|
|
|
return JSONResponse(
|
|
content={
|
|
"filename": f"{study_id}_data.csv",
|
|
"content": csv_content,
|
|
"content_type": "text/csv",
|
|
}
|
|
)
|
|
|
|
elif format.lower() == "config":
|
|
# Export optimization config
|
|
setup_dir = study_dir / "1_setup"
|
|
config_path = setup_dir / "optimization_config.json"
|
|
|
|
if config_path.exists():
|
|
with open(config_path, "r") as f:
|
|
config = json.load(f)
|
|
return JSONResponse(
|
|
content={
|
|
"filename": f"{study_id}_config.json",
|
|
"content": json.dumps(config, indent=2),
|
|
"content_type": "application/json",
|
|
}
|
|
)
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Config file not found")
|
|
|
|
else:
|
|
raise HTTPException(status_code=400, detail=f"Unsupported format: {format}")
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to export data: {str(e)}")
|
|
|
|
|
|
# ============================================================================
|
|
# NX Model Introspection Endpoints
|
|
# ============================================================================
|
|
|
|
|
|
def scan_nx_file_dependencies(model_dir: Path) -> dict:
|
|
"""
|
|
Scan a model directory for NX files and build a dependency tree.
|
|
|
|
This uses file naming conventions to determine relationships:
|
|
- .sim files reference .afm (assembly FEM) or .fem files
|
|
- .afm files reference multiple .fem files
|
|
- .fem files reference _i.prt (idealized) files
|
|
- _i.prt files are derived from .prt (geometry) files
|
|
|
|
Returns:
|
|
Dictionary with file lists and dependency tree
|
|
"""
|
|
result = {
|
|
"files": {
|
|
"sim": [], # Simulation files
|
|
"afm": [], # Assembly FEM files
|
|
"fem": [], # FEM files
|
|
"prt": [], # Part/geometry files
|
|
"idealized": [], # Idealized parts (*_i.prt)
|
|
},
|
|
"dependencies": [], # List of {source, target, type} edges
|
|
"root_sim": None,
|
|
}
|
|
|
|
# Collect all files by type
|
|
for f in model_dir.iterdir():
|
|
if not f.is_file():
|
|
continue
|
|
name = f.name.lower()
|
|
|
|
if name.endswith(".sim"):
|
|
result["files"]["sim"].append(f.name)
|
|
elif name.endswith(".afm"):
|
|
result["files"]["afm"].append(f.name)
|
|
elif name.endswith(".fem"):
|
|
result["files"]["fem"].append(f.name)
|
|
elif name.endswith("_i.prt"):
|
|
result["files"]["idealized"].append(f.name)
|
|
elif name.endswith(".prt"):
|
|
result["files"]["prt"].append(f.name)
|
|
|
|
# Build dependency edges based on NX naming conventions
|
|
# SIM -> AFM: SIM file basename matches AFM (e.g., ASSY_M1_assyfem1_sim1.sim -> ASSY_M1_assyfem1.afm)
|
|
for sim in result["files"]["sim"]:
|
|
sim_base = sim.lower().replace(".sim", "")
|
|
# Remove _sim1, _sim2 suffix to get AFM name
|
|
afm_base = sim_base.rsplit("_sim", 1)[0] if "_sim" in sim_base else sim_base
|
|
|
|
# Check for matching AFM
|
|
for afm in result["files"]["afm"]:
|
|
if afm.lower().replace(".afm", "") == afm_base:
|
|
result["dependencies"].append({"source": sim, "target": afm, "type": "sim_to_afm"})
|
|
break
|
|
else:
|
|
# No AFM, check for direct FEM reference
|
|
for fem in result["files"]["fem"]:
|
|
fem_base = fem.lower().replace(".fem", "")
|
|
if afm_base.endswith(fem_base) or fem_base in afm_base:
|
|
result["dependencies"].append(
|
|
{"source": sim, "target": fem, "type": "sim_to_fem"}
|
|
)
|
|
break
|
|
|
|
# Set root sim (first one found)
|
|
if result["root_sim"] is None:
|
|
result["root_sim"] = sim
|
|
|
|
# AFM -> FEM: Assembly FEM references component FEMs
|
|
# Convention: AFM basename often contains assembly name, FEMs have component names
|
|
for afm in result["files"]["afm"]:
|
|
afm_base = afm.lower().replace(".afm", "")
|
|
# Get the assembly prefix (e.g., ASSY_M1_assyfem1 -> ASSY_M1)
|
|
parts = afm_base.split("_")
|
|
|
|
for fem in result["files"]["fem"]:
|
|
fem_base = fem.lower().replace(".fem", "")
|
|
# FEM is likely a component if it's not the assembly FEM itself
|
|
if fem_base != afm_base:
|
|
result["dependencies"].append({"source": afm, "target": fem, "type": "afm_to_fem"})
|
|
|
|
# FEM -> Idealized PRT: FEM references idealized geometry
|
|
# Convention: Model_fem1.fem -> Model_fem1_i.prt
|
|
for fem in result["files"]["fem"]:
|
|
fem_base = fem.lower().replace(".fem", "")
|
|
|
|
for idealized in result["files"]["idealized"]:
|
|
idealized_base = idealized.lower().replace("_i.prt", "")
|
|
if idealized_base == fem_base:
|
|
result["dependencies"].append(
|
|
{"source": fem, "target": idealized, "type": "fem_to_idealized"}
|
|
)
|
|
break
|
|
|
|
# Idealized PRT -> PRT: Idealized part derives from geometry
|
|
# Convention: Model_fem1_i.prt -> Model.prt
|
|
for idealized in result["files"]["idealized"]:
|
|
idealized_base = idealized.lower().replace("_i.prt", "")
|
|
# Remove fem suffix to get geometry name (Model_fem1_i -> Model)
|
|
geom_base = (
|
|
idealized_base.rsplit("_fem", 1)[0] if "_fem" in idealized_base else idealized_base
|
|
)
|
|
|
|
for prt in result["files"]["prt"]:
|
|
prt_base = prt.lower().replace(".prt", "")
|
|
if prt_base == geom_base:
|
|
result["dependencies"].append(
|
|
{"source": idealized, "target": prt, "type": "idealized_to_prt"}
|
|
)
|
|
break
|
|
|
|
return result
|
|
|
|
|
|
def scan_existing_fea_results(study_dir: Path) -> dict:
|
|
"""
|
|
Scan study for existing FEA result files (.op2, .f06, .bdf/.dat).
|
|
|
|
Checks:
|
|
1. 1_setup/model/ - results from manual NX runs
|
|
2. 2_iterations/iter0/ or trial_baseline/ - baseline results
|
|
3. 2_iterations/iter{N}/ - latest completed iteration
|
|
|
|
Returns dict with paths to existing results that can be used for extraction.
|
|
"""
|
|
result = {
|
|
"has_results": False,
|
|
"sources": [], # List of {location, op2, f06, bdf, timestamp}
|
|
"recommended": None, # Best source to use
|
|
}
|
|
|
|
# Check model directory
|
|
model_dir = study_dir / "1_setup" / "model"
|
|
if model_dir.exists():
|
|
op2_files = list(model_dir.glob("*.op2"))
|
|
f06_files = list(model_dir.glob("*.f06"))
|
|
dat_files = list(model_dir.glob("*.dat"))
|
|
|
|
if op2_files or f06_files:
|
|
source = {
|
|
"location": "model",
|
|
"path": str(model_dir),
|
|
"op2": [f.name for f in op2_files],
|
|
"f06": [f.name for f in f06_files],
|
|
"bdf": [f.name for f in dat_files],
|
|
}
|
|
if op2_files:
|
|
source["timestamp"] = op2_files[0].stat().st_mtime
|
|
result["sources"].append(source)
|
|
|
|
# Check iterations directory
|
|
iter_dir = study_dir / "2_iterations"
|
|
if iter_dir.exists():
|
|
# Check for baseline (iter0 or trial_baseline)
|
|
for baseline_name in ["iter0", "trial_baseline"]:
|
|
baseline_dir = iter_dir / baseline_name
|
|
if baseline_dir.exists():
|
|
op2_files = list(baseline_dir.glob("*.op2"))
|
|
f06_files = list(baseline_dir.glob("*.f06"))
|
|
dat_files = list(baseline_dir.glob("*.dat"))
|
|
|
|
if op2_files or f06_files:
|
|
source = {
|
|
"location": f"iterations/{baseline_name}",
|
|
"path": str(baseline_dir),
|
|
"op2": [f.name for f in op2_files],
|
|
"f06": [f.name for f in f06_files],
|
|
"bdf": [f.name for f in dat_files],
|
|
}
|
|
if op2_files:
|
|
source["timestamp"] = op2_files[0].stat().st_mtime
|
|
result["sources"].append(source)
|
|
|
|
# Find latest iteration with results
|
|
iter_folders = sorted(
|
|
[d for d in iter_dir.iterdir() if d.is_dir() and d.name.startswith("iter")],
|
|
key=lambda d: int(d.name.replace("iter", ""))
|
|
if d.name.replace("iter", "").isdigit()
|
|
else 0,
|
|
reverse=True,
|
|
)
|
|
|
|
for iter_folder in iter_folders[:3]: # Check top 3 most recent
|
|
op2_files = list(iter_folder.glob("*.op2"))
|
|
if op2_files:
|
|
f06_files = list(iter_folder.glob("*.f06"))
|
|
dat_files = list(iter_folder.glob("*.dat"))
|
|
source = {
|
|
"location": f"iterations/{iter_folder.name}",
|
|
"path": str(iter_folder),
|
|
"op2": [f.name for f in op2_files],
|
|
"f06": [f.name for f in f06_files],
|
|
"bdf": [f.name for f in dat_files],
|
|
"timestamp": op2_files[0].stat().st_mtime,
|
|
}
|
|
result["sources"].append(source)
|
|
break # Found latest with results
|
|
|
|
# Determine if we have usable results
|
|
if result["sources"]:
|
|
result["has_results"] = True
|
|
# Recommend the most recent source with OP2
|
|
sources_with_op2 = [s for s in result["sources"] if s.get("op2")]
|
|
if sources_with_op2:
|
|
result["recommended"] = max(sources_with_op2, key=lambda s: s.get("timestamp", 0))
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/studies/{study_id}/nx/introspect")
|
|
async def introspect_nx_model(study_id: str, force: bool = False):
|
|
"""
|
|
Introspect NX model to extract expressions and properties.
|
|
|
|
Uses the optimization_engine.extractors.introspect_part module to run
|
|
NX journal that extracts expressions, mass properties, materials, etc.
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
force: Force re-introspection even if cached results exist
|
|
|
|
Returns:
|
|
JSON with introspection results including expressions list
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
print(f"[introspect] study_id={study_id}, study_dir={study_dir}")
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find model directory - check multiple possible locations
|
|
model_dir = None
|
|
possible_dirs = [
|
|
study_dir / "1_setup" / "model", # Standard Atomizer structure
|
|
study_dir / "1_model",
|
|
study_dir / "0_model",
|
|
study_dir / "model",
|
|
study_dir / "1_setup", # Model files directly in 1_setup
|
|
]
|
|
for possible_dir in possible_dirs:
|
|
if possible_dir.exists():
|
|
prt_files = list(possible_dir.glob("*.prt"))
|
|
print(
|
|
f"[introspect] checking {possible_dir}: exists=True, prt_count={len(prt_files)}"
|
|
)
|
|
if prt_files:
|
|
model_dir = possible_dir
|
|
break
|
|
else:
|
|
print(f"[introspect] checking {possible_dir}: exists=False")
|
|
|
|
if model_dir is None:
|
|
detail = f"[V2] No model dir for {study_id} in {study_dir}"
|
|
raise HTTPException(status_code=404, detail=detail)
|
|
|
|
# Find .prt file
|
|
prt_files = list(model_dir.glob("*.prt"))
|
|
# Exclude idealized parts
|
|
prt_files = [f for f in prt_files if "_i.prt" not in f.name.lower()]
|
|
|
|
if not prt_files:
|
|
raise HTTPException(status_code=404, detail=f"No .prt files found in {model_dir}")
|
|
|
|
prt_file = prt_files[0] # Use first non-idealized part
|
|
|
|
# Check for cached results
|
|
cache_file = model_dir / "_introspection_cache.json"
|
|
if cache_file.exists() and not force:
|
|
try:
|
|
with open(cache_file, "r") as f:
|
|
cached = json.load(f)
|
|
# Check if cache is for same file
|
|
if cached.get("part_file") == str(prt_file):
|
|
return {"study_id": study_id, "cached": True, "introspection": cached}
|
|
except:
|
|
pass # Invalid cache, re-run
|
|
|
|
# Scan file dependencies (fast, doesn't need NX)
|
|
file_deps = scan_nx_file_dependencies(model_dir)
|
|
|
|
# Scan for existing FEA results (OP2, F06, etc.) - no NX needed!
|
|
existing_results = scan_existing_fea_results(study_dir)
|
|
|
|
# Run introspection
|
|
try:
|
|
from optimization_engine.extractors.introspect_part import introspect_part
|
|
|
|
result = introspect_part(str(prt_file), str(model_dir), verbose=False)
|
|
|
|
# Add file dependencies to result
|
|
result["file_dependencies"] = file_deps
|
|
|
|
# Add existing FEA results info
|
|
result["existing_fea_results"] = existing_results
|
|
|
|
# Cache results
|
|
with open(cache_file, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
return {"study_id": study_id, "cached": False, "introspection": result}
|
|
|
|
except ImportError:
|
|
# If introspect_part not available, return just file dependencies and existing results
|
|
return {
|
|
"study_id": study_id,
|
|
"cached": False,
|
|
"introspection": {
|
|
"success": False,
|
|
"error": "introspect_part module not available",
|
|
"file_dependencies": file_deps,
|
|
"existing_fea_results": existing_results,
|
|
},
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Introspection failed: {str(e)}")
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to introspect: {str(e)}")
|
|
|
|
|
|
@router.post("/studies/{study_id}/nx/run-baseline")
|
|
async def run_baseline_simulation(study_id: str):
|
|
"""
|
|
Run a baseline FEA simulation with current/default parameter values.
|
|
|
|
Uses the same NXSolver pattern as run_optimization.py:
|
|
1. NXSolver with use_iteration_folders=True and master_model_dir
|
|
2. create_iteration_folder() to copy model files properly
|
|
3. run_simulation() with the copied .sim file
|
|
|
|
This ensures all NX file dependencies are handled correctly.
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with baseline run status and paths to result files
|
|
"""
|
|
import re
|
|
import time
|
|
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
print(f"[run-baseline] study_id={study_id}, study_dir={study_dir}")
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find model directory
|
|
model_dir = None
|
|
for possible_dir in [
|
|
study_dir / "1_setup" / "model",
|
|
study_dir / "1_model",
|
|
study_dir / "0_model",
|
|
study_dir / "model",
|
|
study_dir / "1_setup",
|
|
]:
|
|
if possible_dir.exists() and list(possible_dir.glob("*.sim")):
|
|
model_dir = possible_dir
|
|
break
|
|
|
|
if model_dir is None:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"No model directory with .sim file found for {study_id}"
|
|
)
|
|
|
|
# Find .sim file
|
|
sim_files = list(model_dir.glob("*.sim"))
|
|
if not sim_files:
|
|
raise HTTPException(status_code=404, detail=f"No .sim file found in {model_dir}")
|
|
|
|
sim_file = sim_files[0]
|
|
print(f"[run-baseline] sim_file={sim_file.name}")
|
|
print(f"[run-baseline] model_dir={model_dir}")
|
|
|
|
# Ensure iterations directory exists
|
|
iterations_dir = study_dir / "2_iterations"
|
|
iterations_dir.mkdir(exist_ok=True)
|
|
|
|
# Try to run the solver using the same pattern as run_optimization.py
|
|
try:
|
|
from optimization_engine.nx.solver import NXSolver
|
|
from pathlib import Path as P
|
|
|
|
# Load config/spec for NX settings
|
|
spec_file = study_dir / "atomizer_spec.json"
|
|
config_file = study_dir / "1_setup" / "optimization_config.json"
|
|
|
|
nx_install_path = None
|
|
sim_file_name = sim_file.name
|
|
solution_name = "Solution 1"
|
|
|
|
# Try to get settings from spec or config
|
|
if spec_file.exists():
|
|
with open(spec_file) as f:
|
|
spec_data = json.load(f)
|
|
nx_settings = spec_data.get("model", {}).get("nx_settings", {})
|
|
nx_install_path = nx_settings.get("nx_install_path")
|
|
sim_file_name = nx_settings.get("sim_file", sim_file.name)
|
|
solution_name = nx_settings.get("solution_name", "Solution 1")
|
|
elif config_file.exists():
|
|
with open(config_file) as f:
|
|
config_data = json.load(f)
|
|
nx_settings = config_data.get("nx_settings", {})
|
|
nx_install_path = nx_settings.get("nx_install_path")
|
|
sim_file_name = nx_settings.get("sim_file", sim_file.name)
|
|
solution_name = nx_settings.get("solution_name", "Solution 1")
|
|
|
|
# Default to common installation path if not in spec/config
|
|
if not nx_install_path:
|
|
for candidate in [
|
|
"C:/Program Files/Siemens/DesigncenterNX2512",
|
|
"C:/Program Files/Siemens/NX2506",
|
|
"C:/Program Files/Siemens/NX2412",
|
|
]:
|
|
if P(candidate).exists():
|
|
nx_install_path = candidate
|
|
break
|
|
|
|
# Extract version from path
|
|
version_match = re.search(r"NX(\d+)|DesigncenterNX(\d+)", nx_install_path or "")
|
|
nastran_version = (
|
|
(version_match.group(1) or version_match.group(2)) if version_match else "2512"
|
|
)
|
|
|
|
print(f"[run-baseline] NX install: {nx_install_path}")
|
|
print(f"[run-baseline] Nastran version: {nastran_version}")
|
|
print(f"[run-baseline] Solution: {solution_name}")
|
|
|
|
# Create solver with iteration folder support (same as run_optimization.py)
|
|
solver = NXSolver(
|
|
master_model_dir=str(model_dir), # Source of model files
|
|
nx_install_dir=nx_install_path,
|
|
nastran_version=nastran_version,
|
|
timeout=600,
|
|
use_journal=True,
|
|
use_iteration_folders=True, # Key: let NXSolver handle file copying
|
|
study_name=study_id,
|
|
)
|
|
|
|
# Create iteration folder (this copies all model files properly)
|
|
# Use iteration number 0 for baseline
|
|
print(f"[run-baseline] Creating iteration folder...")
|
|
iter_folder = solver.create_iteration_folder(
|
|
iterations_base_dir=iterations_dir,
|
|
iteration_number=0, # baseline = iter0
|
|
expression_updates=None, # No expression changes for baseline
|
|
)
|
|
|
|
print(f"[run-baseline] Iteration folder: {iter_folder}")
|
|
|
|
# Find the sim file in the iteration folder
|
|
iter_sim_file = iter_folder / sim_file_name
|
|
if not iter_sim_file.exists():
|
|
# Try to find any .sim file
|
|
sim_files_in_iter = list(iter_folder.glob("*.sim"))
|
|
if sim_files_in_iter:
|
|
iter_sim_file = sim_files_in_iter[0]
|
|
else:
|
|
raise FileNotFoundError(f"No .sim file found in {iter_folder}")
|
|
|
|
print(f"[run-baseline] Running simulation: {iter_sim_file.name}")
|
|
t_start = time.time()
|
|
|
|
result = solver.run_simulation(
|
|
sim_file=iter_sim_file,
|
|
working_dir=iter_folder,
|
|
cleanup=False, # Keep all files for inspection
|
|
expression_updates=None, # Use baseline values
|
|
solution_name=solution_name,
|
|
)
|
|
|
|
elapsed = time.time() - t_start
|
|
print(
|
|
f"[run-baseline] Simulation completed in {elapsed:.1f}s, success={result.get('success')}"
|
|
)
|
|
|
|
# Find result files
|
|
op2_files = list(iter_folder.glob("*.op2"))
|
|
f06_files = list(iter_folder.glob("*.f06"))
|
|
bdf_files = list(iter_folder.glob("*.bdf")) + list(iter_folder.glob("*.dat"))
|
|
log_files = list(iter_folder.glob("*.log"))
|
|
|
|
# Parse Nastran log for specific error messages
|
|
error_details = result.get("errors", [])
|
|
memory_error = False
|
|
if not result.get("success") and log_files:
|
|
try:
|
|
with open(log_files[0], "r") as f:
|
|
log_content = f.read()
|
|
if "Unable to allocate requested memory" in log_content:
|
|
memory_error = True
|
|
mem_match = re.search(r"Requested size = (\d+) Gbytes", log_content)
|
|
avail_match = re.search(
|
|
r"Physical memory available:\s+(\d+) MB", log_content
|
|
)
|
|
if mem_match and avail_match:
|
|
requested = int(mem_match.group(1))
|
|
available = int(avail_match.group(1)) / 1024
|
|
error_details.append(
|
|
f"Nastran memory allocation failed: Requested {requested}GB but only {available:.1f}GB available. "
|
|
"Try closing other applications or reduce memory in nastran.rcf"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"success": result.get("success", False),
|
|
"study_id": study_id,
|
|
"baseline_dir": str(iter_folder),
|
|
"sim_file": str(iter_sim_file),
|
|
"elapsed_time": elapsed,
|
|
"result_files": {
|
|
"op2": [f.name for f in op2_files],
|
|
"f06": [f.name for f in f06_files],
|
|
"bdf": [f.name for f in bdf_files],
|
|
"log": [f.name for f in log_files],
|
|
},
|
|
"errors": error_details,
|
|
"message": "Baseline simulation complete"
|
|
if result.get("success")
|
|
else (
|
|
"Nastran out of memory - close other applications and retry"
|
|
if memory_error
|
|
else "Simulation failed"
|
|
),
|
|
}
|
|
|
|
except ImportError as e:
|
|
return {
|
|
"success": False,
|
|
"study_id": study_id,
|
|
"baseline_dir": str(baseline_dir),
|
|
"error": f"NXSolver not available: {str(e)}",
|
|
"message": "Model files copied but solver not available. Run manually in NX.",
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"study_id": study_id,
|
|
"baseline_dir": str(baseline_dir),
|
|
"error": str(e),
|
|
"message": f"Solver execution failed: {str(e)}",
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to run baseline: {str(e)}")
|
|
|
|
|
|
class ExtractRequest(BaseModel):
|
|
op2_path: Optional[str] = None
|
|
extractors: Optional[List[str]] = None
|
|
|
|
|
|
@router.post("/studies/{study_id}/nx/extract")
|
|
async def extract_from_existing_results(
|
|
study_id: str,
|
|
request: ExtractRequest = None,
|
|
):
|
|
"""
|
|
Extract physics data from existing FEA result files (OP2/F06).
|
|
|
|
NO NX SOLVE NEEDED - uses PyNastran and Atomizer extractors directly
|
|
on existing result files.
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
request.op2_path: Optional specific OP2 file path. If not provided, uses latest available.
|
|
request.extractors: List of extractor IDs to run. If not provided, runs defaults.
|
|
Options: ["displacement", "stress", "frequency", "mass_bdf", "zernike"]
|
|
|
|
Returns:
|
|
JSON with extracted physics data from each extractor
|
|
"""
|
|
from pathlib import Path as P
|
|
|
|
# Handle optional request body
|
|
op2_path = request.op2_path if request else None
|
|
extractors = request.extractors if request else None
|
|
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find OP2 file to use
|
|
target_op2 = None
|
|
target_dir = None
|
|
|
|
if op2_path:
|
|
# Use specified path
|
|
target_op2 = P(op2_path)
|
|
if not target_op2.exists():
|
|
raise HTTPException(status_code=404, detail=f"OP2 file not found: {op2_path}")
|
|
target_dir = target_op2.parent
|
|
else:
|
|
# Auto-detect from existing results
|
|
existing = scan_existing_fea_results(study_dir)
|
|
if not existing["has_results"]:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="No existing FEA results found. Run baseline first or provide op2_path.",
|
|
)
|
|
|
|
recommended = existing["recommended"]
|
|
if not recommended or not recommended.get("op2"):
|
|
raise HTTPException(status_code=404, detail="No OP2 files found in study")
|
|
|
|
target_dir = P(recommended["path"])
|
|
target_op2 = target_dir / recommended["op2"][0]
|
|
|
|
print(f"[extract] Using OP2: {target_op2}")
|
|
|
|
# Find associated BDF/DAT file
|
|
bdf_file = None
|
|
for ext in [".bdf", ".dat"]:
|
|
candidate = target_op2.with_suffix(ext)
|
|
if candidate.exists():
|
|
bdf_file = candidate
|
|
break
|
|
|
|
# Run extractors
|
|
results = {
|
|
"study_id": study_id,
|
|
"op2_file": str(target_op2),
|
|
"bdf_file": str(bdf_file) if bdf_file else None,
|
|
"extractions": {},
|
|
"errors": [],
|
|
}
|
|
|
|
# Default extractors if not specified
|
|
if not extractors:
|
|
extractors = ["displacement", "mass_bdf"]
|
|
|
|
# Import extractors as needed
|
|
# Note: extractors are modules with same-named functions inside
|
|
for ext_id in extractors:
|
|
try:
|
|
if ext_id == "displacement":
|
|
from optimization_engine.extractors.extract_displacement import (
|
|
extract_displacement,
|
|
)
|
|
|
|
disp = extract_displacement(str(target_op2))
|
|
results["extractions"]["displacement"] = disp
|
|
|
|
elif ext_id == "stress":
|
|
from optimization_engine.extractors.extract_solid_stress import (
|
|
extract_solid_stress,
|
|
)
|
|
|
|
stress = extract_solid_stress(str(target_op2))
|
|
results["extractions"]["stress"] = stress
|
|
|
|
elif ext_id == "frequency":
|
|
from optimization_engine.extractors.extract_frequency import extract_frequency
|
|
|
|
freq = extract_frequency(str(target_op2))
|
|
results["extractions"]["frequency"] = freq
|
|
|
|
elif ext_id == "mass_bdf":
|
|
if bdf_file:
|
|
from optimization_engine.extractors.extract_mass_from_bdf import (
|
|
extract_mass_from_bdf,
|
|
)
|
|
|
|
mass = extract_mass_from_bdf(str(bdf_file))
|
|
results["extractions"]["mass_bdf"] = mass
|
|
else:
|
|
results["errors"].append("mass_bdf: No BDF file found")
|
|
|
|
elif ext_id == "zernike":
|
|
from optimization_engine.extractors.extract_zernike_opd import (
|
|
ZernikeOPDExtractor,
|
|
)
|
|
|
|
extractor = ZernikeOPDExtractor(
|
|
str(target_op2),
|
|
bdf_path=str(bdf_file) if bdf_file else None,
|
|
n_modes=36,
|
|
)
|
|
# Extract for subcase 1 (or first available)
|
|
try:
|
|
zernike_result = extractor.extract_subcase("1")
|
|
results["extractions"]["zernike"] = {
|
|
"subcase": "1",
|
|
"rms_nm": zernike_result.get("filtered_rms_nm"),
|
|
"peak_valley_nm": zernike_result.get("peak_to_valley_nm"),
|
|
"coefficients": zernike_result.get("coefficients", [])[:10]
|
|
if zernike_result.get("coefficients")
|
|
else [],
|
|
}
|
|
except Exception as ze:
|
|
results["errors"].append(f"zernike: {str(ze)}")
|
|
|
|
else:
|
|
results["errors"].append(f"Unknown extractor: {ext_id}")
|
|
|
|
except ImportError as ie:
|
|
results["errors"].append(f"{ext_id}: Module not available - {str(ie)}")
|
|
except Exception as e:
|
|
results["errors"].append(f"{ext_id}: {str(e)}")
|
|
|
|
results["success"] = len(results["extractions"]) > 0
|
|
return results
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Extraction failed: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/nx/expressions")
|
|
async def get_nx_expressions(study_id: str):
|
|
"""
|
|
Get just the expressions from NX model (simplified endpoint).
|
|
|
|
Returns list of user expressions suitable for design variable selection.
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
|
|
Returns:
|
|
JSON with expressions array containing name, value, units, formula
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find model directory - check multiple possible locations
|
|
model_dir = None
|
|
for possible_dir in [
|
|
study_dir / "1_setup" / "model", # Standard Atomizer structure
|
|
study_dir / "1_model",
|
|
study_dir / "0_model",
|
|
study_dir / "model",
|
|
study_dir / "1_setup", # Model files directly in 1_setup
|
|
]:
|
|
if possible_dir.exists():
|
|
# Check if it has .prt files
|
|
if list(possible_dir.glob("*.prt")):
|
|
model_dir = possible_dir
|
|
break
|
|
|
|
if model_dir is None:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"[expr] No model dir for {study_id} in {study_dir}"
|
|
)
|
|
|
|
# Check cache first
|
|
cache_file = model_dir / "_introspection_cache.json"
|
|
|
|
if cache_file.exists():
|
|
with open(cache_file, "r") as f:
|
|
cached = json.load(f)
|
|
|
|
expressions = cached.get("expressions", {}).get("user", [])
|
|
return {"study_id": study_id, "expressions": expressions, "count": len(expressions)}
|
|
|
|
# No cache, need to run introspection
|
|
|
|
prt_files = list(model_dir.glob("*.prt"))
|
|
prt_files = [f for f in prt_files if "_i.prt" not in f.name.lower()]
|
|
|
|
if not prt_files:
|
|
raise HTTPException(status_code=404, detail=f"No .prt files found")
|
|
|
|
prt_file = prt_files[0]
|
|
|
|
try:
|
|
from optimization_engine.extractors.introspect_part import introspect_part
|
|
|
|
result = introspect_part(str(prt_file), str(model_dir), verbose=False)
|
|
|
|
# Cache for future
|
|
with open(cache_file, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
expressions = result.get("expressions", {}).get("user", [])
|
|
return {"study_id": study_id, "expressions": expressions, "count": len(expressions)}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get expressions: {str(e)}")
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to get expressions: {str(e)}")
|
|
|
|
|
|
def introspect_fem_file(fem_path: Path) -> dict:
|
|
"""
|
|
Introspect a .fem or .afm file using PyNastran.
|
|
|
|
Extracts: element counts, material info, property info, node count, etc.
|
|
"""
|
|
result = {
|
|
"file_type": "fem",
|
|
"file_name": fem_path.name,
|
|
"success": False,
|
|
}
|
|
|
|
try:
|
|
from pyNastran.bdf.bdf import BDF
|
|
|
|
# Read BDF/FEM file
|
|
bdf = BDF()
|
|
bdf.read_bdf(str(fem_path), xref=False)
|
|
|
|
# Extract info
|
|
result["success"] = True
|
|
result["nodes"] = {
|
|
"count": len(bdf.nodes),
|
|
}
|
|
|
|
# Element counts by type
|
|
element_counts = {}
|
|
for eid, elem in bdf.elements.items():
|
|
elem_type = elem.type
|
|
element_counts[elem_type] = element_counts.get(elem_type, 0) + 1
|
|
result["elements"] = {
|
|
"total": len(bdf.elements),
|
|
"by_type": element_counts,
|
|
}
|
|
|
|
# Materials
|
|
materials = []
|
|
for mid, mat in bdf.materials.items():
|
|
mat_info = {"id": mid, "type": mat.type}
|
|
if hasattr(mat, "e"):
|
|
mat_info["E"] = mat.e
|
|
if hasattr(mat, "nu"):
|
|
mat_info["nu"] = mat.nu
|
|
if hasattr(mat, "rho"):
|
|
mat_info["rho"] = mat.rho
|
|
materials.append(mat_info)
|
|
result["materials"] = materials
|
|
|
|
# Properties
|
|
properties = []
|
|
for pid, prop in bdf.properties.items():
|
|
prop_info = {"id": pid, "type": prop.type}
|
|
properties.append(prop_info)
|
|
result["properties"] = properties
|
|
|
|
# Coordinate systems
|
|
result["coord_systems"] = len(bdf.coords)
|
|
|
|
# Loads and constraints summary
|
|
result["loads"] = {
|
|
"load_cases": len(bdf.loads),
|
|
}
|
|
result["constraints"] = {
|
|
"spcs": len(bdf.spcs),
|
|
"mpcs": len(bdf.mpcs),
|
|
}
|
|
|
|
except ImportError:
|
|
result["error"] = "PyNastran not available"
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
def introspect_sim_file(sim_path: Path, use_nx: bool = True) -> dict:
|
|
"""
|
|
Introspect a .sim file - extract solution info.
|
|
|
|
Args:
|
|
sim_path: Path to the .sim file
|
|
use_nx: If True, use NX journal for full introspection (requires NX license)
|
|
If False, use file-based heuristics only
|
|
|
|
Note: .sim is a binary NX format. Full introspection requires NX Open.
|
|
"""
|
|
result = {
|
|
"file_type": "sim",
|
|
"file_name": sim_path.name,
|
|
"success": True,
|
|
}
|
|
|
|
# Try NX journal introspection if requested
|
|
if use_nx:
|
|
try:
|
|
import subprocess
|
|
import json
|
|
|
|
# Find NX installation
|
|
nx_paths = [
|
|
Path("C:/Program Files/Siemens/DesigncenterNX2512/NXBIN/run_journal.exe"),
|
|
Path("C:/Program Files/Siemens/NX2412/NXBIN/run_journal.exe"),
|
|
Path("C:/Program Files/Siemens/Simcenter3D_2412/NXBIN/run_journal.exe"),
|
|
]
|
|
journal_runner = None
|
|
for p in nx_paths:
|
|
if p.exists():
|
|
journal_runner = p
|
|
break
|
|
|
|
if journal_runner:
|
|
journal_path = (
|
|
Path(__file__).parent.parent.parent.parent.parent
|
|
/ "nx_journals"
|
|
/ "introspect_sim.py"
|
|
)
|
|
if not journal_path.exists():
|
|
# Try alternate path
|
|
journal_path = Path("C:/Users/antoi/Atomizer/nx_journals/introspect_sim.py")
|
|
|
|
if journal_path.exists():
|
|
cmd = [
|
|
str(journal_runner),
|
|
str(journal_path),
|
|
str(sim_path),
|
|
str(sim_path.parent),
|
|
]
|
|
|
|
# Run NX journal with timeout
|
|
proc = subprocess.run(
|
|
cmd, capture_output=True, text=True, timeout=120, cwd=str(sim_path.parent)
|
|
)
|
|
|
|
# Check for output file
|
|
output_file = sim_path.parent / "_introspection_sim.json"
|
|
if output_file.exists():
|
|
with open(output_file, "r") as f:
|
|
nx_result = json.load(f)
|
|
result.update(nx_result)
|
|
result["introspection_method"] = "nx_journal"
|
|
return result
|
|
else:
|
|
result["nx_error"] = "Journal completed but no output file"
|
|
result["nx_stdout"] = proc.stdout[-2000:] if proc.stdout else None
|
|
result["nx_stderr"] = proc.stderr[-2000:] if proc.stderr else None
|
|
else:
|
|
result["nx_error"] = f"Journal not found: {journal_path}"
|
|
else:
|
|
result["nx_error"] = "NX installation not found"
|
|
except subprocess.TimeoutExpired:
|
|
result["nx_error"] = "NX journal timed out (120s)"
|
|
except Exception as e:
|
|
result["nx_error"] = f"NX introspection failed: {str(e)}"
|
|
|
|
# Fallback: file-based heuristics
|
|
result["introspection_method"] = "file_heuristics"
|
|
result["note"] = "SIM files are binary NX format. NX introspection unavailable."
|
|
|
|
# Check for associated .dat file that might have been exported
|
|
dat_file = sim_path.parent / (sim_path.stem + ".dat")
|
|
if not dat_file.exists():
|
|
# Try common naming patterns
|
|
for f in sim_path.parent.glob("*solution*.dat"):
|
|
dat_file = f
|
|
break
|
|
|
|
if dat_file.exists():
|
|
result["associated_dat"] = dat_file.name
|
|
|
|
# Try to infer solver type from related files
|
|
parent_dir = sim_path.parent
|
|
if any(f.suffix.lower() == ".afm" for f in parent_dir.iterdir() if f.is_file()):
|
|
result["inferred"] = {"assembly_fem": True, "solver": "nastran"}
|
|
elif any(f.suffix.lower() == ".fem" for f in parent_dir.iterdir() if f.is_file()):
|
|
result["inferred"] = {"assembly_fem": False, "solver": "nastran"}
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/studies/{study_id}/nx/introspect/{file_name}")
|
|
async def introspect_specific_file(study_id: str, file_name: str, force: bool = False):
|
|
"""
|
|
Introspect a specific file in the model directory.
|
|
|
|
Supports ALL NX file types:
|
|
- .prt / _i.prt - Uses NX journal to extract expressions, mass, etc.
|
|
- .fem / .afm - Uses PyNastran to extract mesh info, materials, properties
|
|
- .sim - Extracts simulation/solution info
|
|
|
|
Args:
|
|
study_id: Study identifier
|
|
file_name: Name of the file (e.g., "M1_Blank.prt", "M1_Blank_fem1.fem")
|
|
force: Force re-introspection even if cached
|
|
|
|
Returns:
|
|
JSON with introspection results appropriate for the file type
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find model directory
|
|
model_dir = None
|
|
for possible_dir in [
|
|
study_dir / "1_setup" / "model",
|
|
study_dir / "1_model",
|
|
study_dir / "model",
|
|
study_dir / "1_setup",
|
|
]:
|
|
if possible_dir.exists():
|
|
model_dir = possible_dir
|
|
break
|
|
|
|
if model_dir is None:
|
|
raise HTTPException(status_code=404, detail=f"No model directory found for {study_id}")
|
|
|
|
# Determine file type and find the file
|
|
file_name_lower = file_name.lower()
|
|
target_file = None
|
|
file_type = None
|
|
|
|
# Add extension if missing
|
|
if "." not in file_name:
|
|
# Try common extensions in order
|
|
for ext in [".prt", ".fem", ".afm", ".sim"]:
|
|
candidate = model_dir / (file_name + ext)
|
|
if candidate.exists():
|
|
target_file = candidate
|
|
file_type = ext[1:] # Remove leading dot
|
|
break
|
|
else:
|
|
# Has extension, find exact match (case-insensitive)
|
|
for f in model_dir.iterdir():
|
|
if f.name.lower() == file_name_lower:
|
|
target_file = f
|
|
if "_i.prt" in f.name.lower():
|
|
file_type = "idealized"
|
|
elif f.suffix.lower() == ".prt":
|
|
file_type = "prt"
|
|
elif f.suffix.lower() == ".fem":
|
|
file_type = "fem"
|
|
elif f.suffix.lower() == ".afm":
|
|
file_type = "afm"
|
|
elif f.suffix.lower() == ".sim":
|
|
file_type = "sim"
|
|
break
|
|
|
|
if target_file is None:
|
|
# List available files for helpful error
|
|
available = [
|
|
f.name
|
|
for f in model_dir.iterdir()
|
|
if f.is_file() and f.suffix.lower() in [".prt", ".fem", ".afm", ".sim"]
|
|
]
|
|
raise HTTPException(
|
|
status_code=404, detail=f"File '{file_name}' not found. Available: {available}"
|
|
)
|
|
|
|
# Check cache
|
|
cache_file = model_dir / f"_introspection_{target_file.stem}.json"
|
|
if cache_file.exists() and not force:
|
|
try:
|
|
with open(cache_file, "r") as f:
|
|
cached = json.load(f)
|
|
return {
|
|
"study_id": study_id,
|
|
"file_name": target_file.name,
|
|
"file_type": file_type,
|
|
"cached": True,
|
|
"introspection": cached,
|
|
}
|
|
except:
|
|
pass
|
|
|
|
# Run appropriate introspection based on file type
|
|
result = None
|
|
|
|
if file_type in ["prt", "idealized"]:
|
|
# Use NX journal introspection for .prt files
|
|
try:
|
|
from optimization_engine.extractors.introspect_part import introspect_part
|
|
|
|
result = introspect_part(str(target_file), str(model_dir), verbose=False)
|
|
except ImportError:
|
|
result = {"success": False, "error": "introspect_part module not available"}
|
|
except Exception as e:
|
|
result = {"success": False, "error": str(e)}
|
|
|
|
elif file_type in ["fem", "afm"]:
|
|
# Use PyNastran for FEM files
|
|
result = introspect_fem_file(target_file)
|
|
|
|
elif file_type == "sim":
|
|
# Extract what we can from SIM file
|
|
result = introspect_sim_file(target_file)
|
|
|
|
if result is None:
|
|
result = {"success": False, "error": f"Unknown file type: {file_type}"}
|
|
|
|
# Cache results
|
|
try:
|
|
with open(cache_file, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
except Exception:
|
|
pass # Caching is optional
|
|
|
|
return {
|
|
"study_id": study_id,
|
|
"file_name": target_file.name,
|
|
"file_type": file_type,
|
|
"cached": False,
|
|
"introspection": result,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to introspect file: {str(e)}")
|
|
|
|
|
|
@router.get("/studies/{study_id}/nx/parts")
|
|
async def list_model_files(study_id: str):
|
|
"""
|
|
List all NX model files in the study's model directory.
|
|
|
|
Includes ALL file types in the NX dependency chain:
|
|
- .sim (Simulation) - loads, BCs, solution settings
|
|
- .afm (Assembly FEM) - assembly of component FEMs
|
|
- .fem (FEM) - mesh, materials, properties
|
|
- _i.prt (Idealized) - simplified geometry for meshing
|
|
- .prt (Geometry) - full CAD geometry with expressions
|
|
|
|
Returns:
|
|
JSON with categorized list of all model files
|
|
"""
|
|
try:
|
|
study_dir = resolve_study_path(study_id)
|
|
if not study_dir.exists():
|
|
raise HTTPException(status_code=404, detail=f"Study {study_id} not found")
|
|
|
|
# Find model directory
|
|
model_dir = None
|
|
for possible_dir in [
|
|
study_dir / "1_setup" / "model",
|
|
study_dir / "1_model",
|
|
study_dir / "model",
|
|
study_dir / "1_setup",
|
|
]:
|
|
if possible_dir.exists():
|
|
model_dir = possible_dir
|
|
break
|
|
|
|
if model_dir is None:
|
|
raise HTTPException(status_code=404, detail=f"No model directory found for {study_id}")
|
|
|
|
# Collect all NX files by type
|
|
files = {
|
|
"sim": [], # Simulation files
|
|
"afm": [], # Assembly FEM files
|
|
"fem": [], # FEM files
|
|
"idealized": [], # Idealized parts (*_i.prt)
|
|
"prt": [], # Geometry parts
|
|
}
|
|
|
|
for f in sorted(model_dir.iterdir()):
|
|
if not f.is_file():
|
|
continue
|
|
|
|
name_lower = f.name.lower()
|
|
file_info = {
|
|
"name": f.name,
|
|
"stem": f.stem,
|
|
"size_kb": round(f.stat().st_size / 1024, 1),
|
|
"has_cache": (model_dir / f"_introspection_{f.stem}.json").exists(),
|
|
}
|
|
|
|
if name_lower.endswith(".sim"):
|
|
file_info["type"] = "sim"
|
|
file_info["description"] = "Simulation (loads, BCs, solutions)"
|
|
files["sim"].append(file_info)
|
|
elif name_lower.endswith(".afm"):
|
|
file_info["type"] = "afm"
|
|
file_info["description"] = "Assembly FEM"
|
|
files["afm"].append(file_info)
|
|
elif name_lower.endswith(".fem"):
|
|
file_info["type"] = "fem"
|
|
file_info["description"] = "FEM (mesh, materials, properties)"
|
|
files["fem"].append(file_info)
|
|
elif "_i.prt" in name_lower:
|
|
file_info["type"] = "idealized"
|
|
file_info["description"] = "Idealized geometry for meshing"
|
|
files["idealized"].append(file_info)
|
|
elif name_lower.endswith(".prt"):
|
|
file_info["type"] = "prt"
|
|
file_info["description"] = "CAD geometry with expressions"
|
|
files["prt"].append(file_info)
|
|
|
|
# Build flat list for dropdown (all introspectable files)
|
|
all_files = []
|
|
for category in ["sim", "afm", "fem", "idealized", "prt"]:
|
|
all_files.extend(files[category])
|
|
|
|
return {
|
|
"study_id": study_id,
|
|
"model_dir": str(model_dir),
|
|
"files": files,
|
|
"all_files": all_files,
|
|
"count": len(all_files),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Failed to list model files: {str(e)}")
|