Files
Atomizer/studies/drone_gimbal_arm_optimization/run_optimization.py
Anto01 d2c18bb7db feat: Migrate drone_gimbal_arm_optimization to use structured logging system (Phase 1.3.1)
Migrate drone_gimbal_arm study as reference implementation for Phase 1.3 logging system.

Changes:
- Replace all print() statements with logger calls throughout run_optimization.py
- Add logger.trial_start() and logger.trial_complete() for structured trial logging
- Use logger.trial_failed() for error handling with full tracebacks
- Add logger.study_start() and logger.study_complete() for lifecycle logging
- Replace constraint violation prints with logger.warning()
- Create comprehensive LOGGING_MIGRATION_GUIDE.md with before/after examples

Benefits:
- Color-coded console output (green INFO, yellow WARNING, red ERROR)
- Automatic file logging to 2_results/optimization.log with rotation (50MB, 3 backups)
- Structured format with timestamps for dashboard integration
- Professional error handling with exc_info=True
- Reference implementation for migrating remaining studies

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 09:39:56 -05:00

323 lines
13 KiB
Python

"""
Drone Gimbal Arm Optimization - Protocol 11 (Multi-Objective NSGA-II)
======================================================================
Multi-objective optimization using NSGA-II to find Pareto front:
1. Minimize mass (target < 120g)
2. Maximize fundamental frequency (target > 150 Hz)
Constraints:
- Max displacement < 1.5mm (850g camera payload)
- Max stress < 120 MPa (Al 6061-T6, SF=2.3)
- Natural frequency > 150 Hz (avoid rotor resonance)
Usage:
python run_optimization.py --trials 30
python run_optimization.py --trials 5 # Quick test
python run_optimization.py --resume # Continue existing study
"""
import sys
import json
import argparse
from pathlib import Path
from datetime import datetime
# Add project root to path
project_root = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(project_root))
import optuna
from optuna.samplers import NSGAIISampler
from optimization_engine.nx_solver import NXSolver
from optimization_engine.extractors.extract_displacement import extract_displacement
from optimization_engine.extractors.extract_von_mises_stress import extract_solid_stress
from optimization_engine.extractors.op2_extractor import OP2Extractor
from optimization_engine.extractors.extract_frequency import extract_frequency
from optimization_engine.extractors.extract_mass_from_bdf import extract_mass_from_bdf
from optimization_engine.logger import get_logger
# Import central configuration
try:
import config as atomizer_config
except ImportError:
atomizer_config = None
def load_config(config_file: Path) -> dict:
"""Load configuration from JSON file."""
with open(config_file, 'r') as f:
return json.load(f)
def main():
parser = argparse.ArgumentParser(description='Run drone gimbal arm multi-objective optimization')
parser.add_argument('--trials', type=int, default=30, help='Number of optimization trials')
parser.add_argument('--resume', action='store_true', help='Resume existing study')
args = parser.parse_args()
# Get study directory
study_dir = Path(__file__).parent
results_dir = study_dir / "2_results"
results_dir.mkdir(exist_ok=True)
# Initialize logger with file logging
logger = get_logger(
"drone_gimbal_arm",
study_dir=results_dir
)
logger.info("=" * 80)
logger.info("DRONE GIMBAL ARM OPTIMIZATION - PROTOCOL 11 (NSGA-II)")
logger.info("=" * 80)
logger.info("")
logger.info("Engineering Scenario:")
logger.info(" Professional aerial cinematography drone camera gimbal support arm")
logger.info("")
logger.info("Objectives:")
logger.info(" 1. MINIMIZE mass (target < 4000g, baseline = 4500g)")
logger.info(" 2. MAXIMIZE fundamental frequency (target > 150 Hz)")
logger.info("")
logger.info("Constraints:")
logger.info(" - Max displacement < 1.5mm (850g camera payload)")
logger.info(" - Max von Mises stress < 120 MPa (Al 6061-T6, SF=2.3)")
logger.info(" - Natural frequency > 150 Hz (avoid rotor resonance 80-120 Hz)")
logger.info("")
logger.info("Design Variables:")
logger.info(" - beam_half_core_thickness: 5-10 mm")
logger.info(" - beam_face_thickness: 1-3 mm")
logger.info(" - holes_diameter: 10-50 mm")
logger.info(" - hole_count: 8-14")
logger.info("")
logger.info(f"Running {args.trials} trials with NSGA-II sampler...")
logger.info("=" * 80)
logger.info("")
# Load configuration
opt_config_file = study_dir / "1_setup" / "optimization_config.json"
if not opt_config_file.exists():
logger.error(f"Optimization config not found: {opt_config_file}")
sys.exit(1)
opt_config = load_config(opt_config_file)
logger.info(f"Loaded optimization config: {opt_config['study_name']}")
logger.info(f"Protocol: {opt_config['optimization_settings']['protocol']}")
logger.info("")
# Setup paths
model_dir = study_dir / "1_setup" / "model"
model_file = model_dir / "Beam.prt"
sim_file = model_dir / "Beam_sim1.sim"
# Initialize NX solver
nx_solver = NXSolver(
nastran_version=atomizer_config.NX_VERSION if atomizer_config else "2412",
timeout=atomizer_config.NASTRAN_TIMEOUT if atomizer_config else 600,
use_journal=True,
enable_session_management=True,
study_name="drone_gimbal_arm_optimization"
)
def objective(trial: optuna.Trial) -> tuple:
"""
Multi-objective function for NSGA-II.
Returns:
(mass, frequency): Tuple for NSGA-II (minimize mass, maximize frequency)
"""
# Sample design variables
design_vars = {}
for dv in opt_config['design_variables']:
design_vars[dv['parameter']] = trial.suggest_float(
dv['parameter'],
dv['bounds'][0],
dv['bounds'][1]
)
logger.trial_start(trial.number, design_vars)
# Run simulation
logger.info("Running simulation...")
try:
result = nx_solver.run_simulation(
sim_file=sim_file,
working_dir=model_dir,
expression_updates=design_vars,
solution_name=None # Solve all solutions (static + modal)
)
if not result['success']:
error_msg = result.get('error', 'Unknown error')
logger.trial_failed(trial.number, error_msg)
trial.set_user_attr("feasible", False)
trial.set_user_attr("error", error_msg)
# Prune failed simulations instead of returning penalty values
raise optuna.TrialPruned(f"Simulation failed: {error_msg}")
op2_file = result['op2_file']
logger.info(f"Simulation successful: {op2_file}")
# Extract all objectives and constraints
logger.info("Extracting results...")
# Extract mass (grams) from CAD expression p173
# This expression measures the CAD mass directly
from optimization_engine.extractors.extract_mass_from_expression import extract_mass_from_expression
prt_file = model_file # Beam.prt
mass_kg = extract_mass_from_expression(prt_file, expression_name="p173")
mass = mass_kg * 1000.0 # Convert to grams
logger.info(f" mass: {mass:.3f} g (from CAD expression p173)")
# Extract frequency (Hz) - from modal analysis (solution 2)
# The drone gimbal has TWO solutions: solution_1 (static) and solution_2 (modal)
op2_modal = str(op2_file).replace("solution_1", "solution_2")
freq_result = extract_frequency(op2_modal, subcase=1, mode_number=1)
frequency = freq_result['frequency']
logger.info(f" fundamental_frequency: {frequency:.3f} Hz")
# Extract displacement (mm) - from static analysis (subcase 1)
disp_result = extract_displacement(op2_file, subcase=1)
max_disp = disp_result['max_displacement']
logger.info(f" max_displacement_limit: {max_disp:.3f} mm")
# Extract stress (MPa) - from static analysis (subcase 1)
stress_result = extract_solid_stress(op2_file, subcase=1, element_type='cquad4')
max_stress = stress_result['max_von_mises']
logger.info(f" max_stress_limit: {max_stress:.3f} MPa")
# Frequency constraint uses same value as objective
min_freq = frequency
logger.info(f" min_frequency_limit: {min_freq:.3f} Hz")
# Check constraints
constraint_values = {
'max_displacement_limit': max_disp,
'max_stress_limit': max_stress,
'min_frequency_limit': min_freq
}
constraint_violations = []
for constraint in opt_config['constraints']:
name = constraint['name']
value = constraint_values[name]
threshold = constraint['threshold']
c_type = constraint['type']
if c_type == 'less_than' and value > threshold:
violation = (value - threshold) / threshold
constraint_violations.append(f"{name}: {value:.2f} > {threshold} (violation: {violation:.1%})")
elif c_type == 'greater_than' and value < threshold:
violation = (threshold - value) / threshold
constraint_violations.append(f"{name}: {value:.2f} < {threshold} (violation: {violation:.1%})")
if constraint_violations:
logger.warning("Constraint violations:")
for v in constraint_violations:
logger.warning(f" - {v}")
trial.set_user_attr("constraint_violations", constraint_violations)
trial.set_user_attr("feasible", False)
# NSGA-II handles constraints through constraint_satisfied flag - no penalty needed
else:
logger.info("All constraints satisfied")
trial.set_user_attr("feasible", True)
# Store all results as trial attributes for dashboard
trial.set_user_attr("mass", mass)
trial.set_user_attr("frequency", frequency)
trial.set_user_attr("max_displacement", max_disp)
trial.set_user_attr("max_stress", max_stress)
trial.set_user_attr("design_vars", design_vars)
# Log successful trial completion
objectives = {"mass": mass, "frequency": frequency}
constraints_dict = {
"max_displacement_limit": max_disp,
"max_stress_limit": max_stress,
"min_frequency_limit": min_freq
}
feasible = len(constraint_violations) == 0
logger.trial_complete(trial.number, objectives, constraints_dict, feasible)
# Return tuple for NSGA-II: (minimize mass, maximize frequency)
# Using proper semantic directions in study creation
return (mass, frequency)
except optuna.TrialPruned:
# Re-raise pruned exceptions (don't catch them)
raise
except Exception as e:
logger.trial_failed(trial.number, str(e))
logger.error("Full traceback:", exc_info=True)
trial.set_user_attr("error", str(e))
trial.set_user_attr("feasible", False)
# Prune corrupted trials instead of returning penalty values
raise optuna.TrialPruned(f"Trial failed with exception: {str(e)}")
# Create Optuna study with NSGA-II sampler
study_name = opt_config['study_name']
storage = f"sqlite:///{results_dir / 'study.db'}"
if args.resume:
logger.info(f"Resuming existing study: {study_name}")
study = optuna.load_study(
study_name=study_name,
storage=storage,
sampler=NSGAIISampler()
)
logger.info(f"Loaded study with {len(study.trials)} existing trials")
else:
logger.info(f"Creating new study: {study_name}")
study = optuna.create_study(
study_name=study_name,
storage=storage,
directions=['minimize', 'maximize'], # Minimize mass, maximize frequency
sampler=NSGAIISampler(),
load_if_exists=True # Always allow resuming existing study
)
# Log study start
logger.study_start(study_name, n_trials=args.trials, sampler="NSGAIISampler")
logger.info("")
study.optimize(
objective,
n_trials=args.trials,
show_progress_bar=True
)
# Log study completion
n_successful = len([t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE])
logger.study_complete(study_name, n_trials=len(study.trials), n_successful=n_successful)
logger.info("")
logger.info(f"Pareto front solutions: {len(study.best_trials)}")
logger.info("")
# Show Pareto front
logger.info("Pareto Front (non-dominated solutions):")
logger.info("")
for i, trial in enumerate(study.best_trials):
mass = trial.values[0]
freq = trial.values[1] # Frequency is stored as positive now
feasible = trial.user_attrs.get('feasible', False)
logger.info(f" Solution #{i+1} (Trial {trial.number}):")
logger.info(f" Mass: {mass:.2f} g")
logger.info(f" Frequency: {freq:.2f} Hz")
logger.info(f" Feasible: {feasible}")
logger.info("")
logger.info("Results available in: studies/drone_gimbal_arm_optimization/2_results/")
logger.info("")
logger.info("View in Dashboard:")
logger.info(" 1. Ensure backend is running: cd atomizer-dashboard/backend && python -m uvicorn api.main:app --reload")
logger.info(" 2. Open dashboard: http://localhost:3003")
logger.info(" 3. Select study: drone_gimbal_arm_optimization")
logger.info("")
if __name__ == "__main__":
main()