feat: Add Studio UI, intake system, and extractor improvements

Dashboard:
- Add Studio page with drag-drop model upload and Claude chat
- Add intake system for study creation workflow
- Improve session manager and context builder
- Add intake API routes and frontend components

Optimization Engine:
- Add CLI module for command-line operations
- Add intake module for study preprocessing
- Add validation module with gate checks
- Improve Zernike extractor documentation
- Update spec models with better validation
- Enhance solve_simulation robustness

Documentation:
- Add ATOMIZER_STUDIO.md planning doc
- Add ATOMIZER_UX_SYSTEM.md for UX patterns
- Update extractor library docs
- Add study-readme-generator skill

Tools:
- Add test scripts for extraction validation
- Add Zernike recentering test

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-27 12:02:30 -05:00
parent 3193831340
commit a26914bbe8
56 changed files with 14173 additions and 646 deletions

View File

@@ -0,0 +1,19 @@
"""
Atomizer CLI
============
Command-line interface for Atomizer operations.
Commands:
- atomizer intake <folder> - Process an intake folder
- atomizer validate <study> - Validate a study before running
- atomizer finalize <study> - Generate final report
Usage:
from optimization_engine.cli import main
main()
"""
from .main import main, app
__all__ = ["main", "app"]

View File

@@ -0,0 +1,383 @@
"""
Atomizer CLI Main Entry Point
=============================
Provides the `atomizer` command with subcommands:
- intake: Process an intake folder
- validate: Validate a study
- finalize: Generate final report
- list: List studies
Usage:
atomizer intake bracket_project
atomizer validate bracket_mass_opt
atomizer finalize bracket_mass_opt --format html
"""
from __future__ import annotations
import sys
from pathlib import Path
from typing import Optional
import argparse
import logging
def setup_logging(verbose: bool = False):
"""Setup logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(message)s",
)
def find_project_root() -> Path:
"""Find the Atomizer project root."""
current = Path(__file__).parent
while current != current.parent:
if (current / "CLAUDE.md").exists():
return current
current = current.parent
return Path.cwd()
def cmd_intake(args):
"""Process an intake folder."""
from optimization_engine.intake import IntakeProcessor
# Determine inbox folder
inbox_path = Path(args.folder)
if not inbox_path.is_absolute():
# Check if it's in _inbox
project_root = find_project_root()
inbox_dir = project_root / "studies" / "_inbox"
if (inbox_dir / args.folder).exists():
inbox_path = inbox_dir / args.folder
elif (project_root / "studies" / args.folder).exists():
inbox_path = project_root / "studies" / args.folder
if not inbox_path.exists():
print(f"Error: Folder not found: {inbox_path}")
return 1
print(f"Processing intake: {inbox_path}")
print("=" * 60)
# Progress callback
def progress(message: str, percent: float):
bar_width = 30
filled = int(bar_width * percent)
bar = "=" * filled + "-" * (bar_width - filled)
print(f"\r[{bar}] {percent * 100:5.1f}% {message}", end="", flush=True)
if percent >= 1.0:
print() # Newline at end
try:
processor = IntakeProcessor(
inbox_path,
progress_callback=progress if not args.quiet else None,
)
context = processor.process(
run_baseline=not args.skip_baseline,
copy_files=True,
run_introspection=True,
)
print("\n" + "=" * 60)
print("INTAKE COMPLETE")
print("=" * 60)
# Show summary
summary = context.get_context_summary()
print(f"\nStudy: {context.study_name}")
print(f"Location: {processor.study_dir}")
print(f"\nContext loaded:")
print(f" Model: {'Yes' if summary['has_model'] else 'No'}")
print(f" Introspection: {'Yes' if summary['has_introspection'] else 'No'}")
print(f" Baseline: {'Yes' if summary['has_baseline'] else 'No'}")
print(f" Goals: {'Yes' if summary['has_goals'] else 'No'}")
print(f" Pre-config: {'Yes' if summary['has_preconfig'] else 'No'}")
print(
f" Expressions: {summary['num_expressions']} ({summary['num_dv_candidates']} candidates)"
)
if context.has_baseline:
print(f"\nBaseline: {context.get_baseline_summary()}")
if summary["warnings"]:
print(f"\nWarnings:")
for w in summary["warnings"]:
print(f" - {w}")
if args.interview:
print(f"\nTo continue with interview: atomizer interview {context.study_name}")
elif args.canvas:
print(f"\nOpen dashboard to configure in Canvas mode")
else:
print(f"\nNext steps:")
print(f" 1. Review context in {processor.study_dir / '0_intake'}")
print(f" 2. Configure study via interview or canvas")
print(f" 3. Run: atomizer validate {context.study_name}")
return 0
except Exception as e:
print(f"\nError: {e}")
if args.verbose:
import traceback
traceback.print_exc()
return 1
def cmd_validate(args):
"""Validate a study before running."""
from optimization_engine.validation import ValidationGate
# Find study directory
study_path = Path(args.study)
if not study_path.is_absolute():
project_root = find_project_root()
study_path = project_root / "studies" / args.study
if not study_path.exists():
print(f"Error: Study not found: {study_path}")
return 1
print(f"Validating study: {study_path.name}")
print("=" * 60)
# Progress callback
def progress(message: str, percent: float):
bar_width = 30
filled = int(bar_width * percent)
bar = "=" * filled + "-" * (bar_width - filled)
print(f"\r[{bar}] {percent * 100:5.1f}% {message}", end="", flush=True)
if percent >= 1.0:
print()
try:
gate = ValidationGate(
study_path,
progress_callback=progress if not args.quiet else None,
)
result = gate.validate(
run_test_trials=not args.skip_trials,
n_test_trials=args.trials,
)
print("\n" + "=" * 60)
if result.passed:
print("VALIDATION PASSED")
else:
print("VALIDATION FAILED")
print("=" * 60)
# Show spec validation
if result.spec_check:
print(f"\nSpec Validation:")
print(f" Errors: {len(result.spec_check.errors)}")
print(f" Warnings: {len(result.spec_check.warnings)}")
for issue in result.spec_check.errors:
print(f" [ERROR] {issue.message}")
for issue in result.spec_check.warnings[:5]: # Limit warnings shown
print(f" [WARN] {issue.message}")
# Show test trials
if result.test_trials:
print(f"\nTest Trials:")
successful = [t for t in result.test_trials if t.success]
print(f" Completed: {len(successful)}/{len(result.test_trials)}")
if result.results_vary:
print(f" Results vary: Yes (good!)")
else:
print(f" Results vary: NO - MESH MAY NOT BE UPDATING!")
# Show trial results table
print(f"\n {'Trial':<8} {'Status':<10} {'Time (s)':<10}", end="")
if successful and successful[0].objectives:
for obj in list(successful[0].objectives.keys())[:3]:
print(f" {obj:<12}", end="")
print()
print(" " + "-" * 50)
for trial in result.test_trials:
status = "OK" if trial.success else "FAIL"
print(
f" {trial.trial_number:<8} {status:<10} {trial.solve_time_seconds:<10.1f}",
end="",
)
for val in list(trial.objectives.values())[:3]:
print(f" {val:<12.4f}", end="")
print()
# Show estimates
if result.avg_solve_time:
print(f"\nRuntime Estimate:")
print(f" Avg solve time: {result.avg_solve_time:.1f}s")
if result.estimated_total_runtime:
hours = result.estimated_total_runtime / 3600
print(f" Est. total: {hours:.1f} hours")
# Show errors
if result.errors:
print(f"\nErrors:")
for err in result.errors:
print(f" - {err}")
# Approve if passed and requested
if result.passed:
if args.approve:
gate.approve()
print(f"\nStudy approved for optimization.")
else:
print(f"\nTo approve and start: atomizer validate {args.study} --approve")
# Save result
output_path = gate.save_result(result)
print(f"\nResult saved: {output_path}")
return 0 if result.passed else 1
except Exception as e:
print(f"\nError: {e}")
if args.verbose:
import traceback
traceback.print_exc()
return 1
def cmd_list(args):
"""List available studies."""
project_root = find_project_root()
studies_dir = project_root / "studies"
print("Available Studies:")
print("=" * 60)
# List inbox items
inbox_dir = studies_dir / "_inbox"
if inbox_dir.exists():
inbox_items = [d for d in inbox_dir.iterdir() if d.is_dir() and not d.name.startswith(".")]
if inbox_items:
print("\nPending Intake (_inbox/):")
for item in sorted(inbox_items):
has_config = (item / "intake.yaml").exists()
has_model = bool(list(item.glob("**/*.sim")))
status = []
if has_config:
status.append("config")
if has_model:
status.append("model")
print(f" {item.name:<30} [{', '.join(status) or 'empty'}]")
# List active studies
print("\nActive Studies:")
for study_dir in sorted(studies_dir.iterdir()):
if (
study_dir.is_dir()
and not study_dir.name.startswith("_")
and not study_dir.name.startswith(".")
):
# Check status
has_spec = (study_dir / "atomizer_spec.json").exists() or (
study_dir / "optimization_config.json"
).exists()
has_db = (study_dir / "3_results" / "study.db").exists() or (
study_dir / "2_results" / "study.db"
).exists()
has_approval = (study_dir / ".validation_approved").exists()
status = []
if has_spec:
status.append("configured")
if has_approval:
status.append("approved")
if has_db:
status.append("has_results")
print(f" {study_dir.name:<30} [{', '.join(status) or 'new'}]")
return 0
def cmd_finalize(args):
"""Generate final report for a study."""
print(f"Finalize command not yet implemented for: {args.study}")
print("This will generate the interactive HTML report.")
return 0
def create_parser() -> argparse.ArgumentParser:
"""Create the argument parser."""
parser = argparse.ArgumentParser(
prog="atomizer",
description="Atomizer - FEA Optimization Command Line Interface",
)
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("-q", "--quiet", action="store_true", help="Minimal output")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# intake command
intake_parser = subparsers.add_parser("intake", help="Process an intake folder")
intake_parser.add_argument("folder", help="Path to intake folder")
intake_parser.add_argument("--skip-baseline", action="store_true", help="Skip baseline solve")
intake_parser.add_argument(
"--interview", action="store_true", help="Continue to interview mode"
)
intake_parser.add_argument("--canvas", action="store_true", help="Open in canvas mode")
intake_parser.set_defaults(func=cmd_intake)
# validate command
validate_parser = subparsers.add_parser("validate", help="Validate a study")
validate_parser.add_argument("study", help="Study name or path")
validate_parser.add_argument("--skip-trials", action="store_true", help="Skip test trials")
validate_parser.add_argument("--trials", type=int, default=3, help="Number of test trials")
validate_parser.add_argument(
"--approve", action="store_true", help="Approve if validation passes"
)
validate_parser.set_defaults(func=cmd_validate)
# list command
list_parser = subparsers.add_parser("list", help="List studies")
list_parser.set_defaults(func=cmd_list)
# finalize command
finalize_parser = subparsers.add_parser("finalize", help="Generate final report")
finalize_parser.add_argument("study", help="Study name or path")
finalize_parser.add_argument("--format", choices=["html", "pdf", "all"], default="html")
finalize_parser.set_defaults(func=cmd_finalize)
return parser
def main(args=None):
"""Main entry point."""
parser = create_parser()
parsed_args = parser.parse_args(args)
setup_logging(getattr(parsed_args, "verbose", False))
if parsed_args.command is None:
parser.print_help()
return 0
return parsed_args.func(parsed_args)
# For typer/click compatibility
app = main
if __name__ == "__main__":
sys.exit(main())

View File

@@ -7,7 +7,7 @@ They provide validation and type safety for the unified configuration system.
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from pydantic import BaseModel, Field, field_validator, model_validator
import re
@@ -16,17 +16,34 @@ import re
# Enums
# ============================================================================
class SpecCreatedBy(str, Enum):
"""Who/what created the spec."""
CANVAS = "canvas"
CLAUDE = "claude"
API = "api"
MIGRATION = "migration"
MANUAL = "manual"
DASHBOARD_INTAKE = "dashboard_intake"
class SpecStatus(str, Enum):
"""Study lifecycle status."""
DRAFT = "draft"
INTROSPECTED = "introspected"
CONFIGURED = "configured"
VALIDATED = "validated"
READY = "ready"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class SolverType(str, Enum):
"""Supported solver types."""
NASTRAN = "nastran"
NX_NASTRAN = "NX_Nastran"
ABAQUS = "abaqus"
@@ -34,6 +51,7 @@ class SolverType(str, Enum):
class SubcaseType(str, Enum):
"""Subcase analysis types."""
STATIC = "static"
MODAL = "modal"
THERMAL = "thermal"
@@ -42,6 +60,7 @@ class SubcaseType(str, Enum):
class DesignVariableType(str, Enum):
"""Design variable types."""
CONTINUOUS = "continuous"
INTEGER = "integer"
CATEGORICAL = "categorical"
@@ -49,6 +68,7 @@ class DesignVariableType(str, Enum):
class ExtractorType(str, Enum):
"""Physics extractor types."""
DISPLACEMENT = "displacement"
FREQUENCY = "frequency"
STRESS = "stress"
@@ -62,18 +82,21 @@ class ExtractorType(str, Enum):
class OptimizationDirection(str, Enum):
"""Optimization direction."""
MINIMIZE = "minimize"
MAXIMIZE = "maximize"
class ConstraintType(str, Enum):
"""Constraint types."""
HARD = "hard"
SOFT = "soft"
class ConstraintOperator(str, Enum):
"""Constraint comparison operators."""
LE = "<="
GE = ">="
LT = "<"
@@ -83,6 +106,7 @@ class ConstraintOperator(str, Enum):
class PenaltyMethod(str, Enum):
"""Penalty methods for constraints."""
LINEAR = "linear"
QUADRATIC = "quadratic"
EXPONENTIAL = "exponential"
@@ -90,6 +114,7 @@ class PenaltyMethod(str, Enum):
class AlgorithmType(str, Enum):
"""Optimization algorithm types."""
TPE = "TPE"
CMA_ES = "CMA-ES"
NSGA_II = "NSGA-II"
@@ -100,6 +125,7 @@ class AlgorithmType(str, Enum):
class SurrogateType(str, Enum):
"""Surrogate model types."""
MLP = "MLP"
GNN = "GNN"
ENSEMBLE = "ensemble"
@@ -109,58 +135,104 @@ class SurrogateType(str, Enum):
# Position Model
# ============================================================================
class CanvasPosition(BaseModel):
"""Canvas position for nodes."""
x: float = 0
y: float = 0
# ============================================================================
# Introspection Models (for intake workflow)
# ============================================================================
class ExpressionInfo(BaseModel):
"""Information about an NX expression from introspection."""
name: str = Field(..., description="Expression name in NX")
value: Optional[float] = Field(default=None, description="Current value")
units: Optional[str] = Field(default=None, description="Physical units")
formula: Optional[str] = Field(default=None, description="Expression formula if any")
is_candidate: bool = Field(
default=False, description="Whether this is a design variable candidate"
)
confidence: float = Field(
default=0.0, ge=0.0, le=1.0, description="Confidence that this is a DV"
)
class BaselineData(BaseModel):
"""Results from baseline FEA solve."""
timestamp: datetime = Field(..., description="When baseline was run")
solve_time_seconds: float = Field(..., description="How long the solve took")
mass_kg: Optional[float] = Field(default=None, description="Computed mass from BDF/FEM")
max_displacement_mm: Optional[float] = Field(
default=None, description="Max displacement result"
)
max_stress_mpa: Optional[float] = Field(default=None, description="Max von Mises stress")
success: bool = Field(default=True, description="Whether baseline solve succeeded")
error: Optional[str] = Field(default=None, description="Error message if failed")
class IntrospectionData(BaseModel):
"""Model introspection results stored in the spec."""
timestamp: datetime = Field(..., description="When introspection was run")
solver_type: Optional[str] = Field(default=None, description="Detected solver type")
mass_kg: Optional[float] = Field(
default=None, description="Mass from expressions or properties"
)
volume_mm3: Optional[float] = Field(default=None, description="Volume from mass properties")
expressions: List[ExpressionInfo] = Field(
default_factory=list, description="Discovered expressions"
)
baseline: Optional[BaselineData] = Field(default=None, description="Baseline solve results")
warnings: List[str] = Field(default_factory=list, description="Warnings from introspection")
def get_design_candidates(self) -> List[ExpressionInfo]:
"""Return expressions marked as design variable candidates."""
return [e for e in self.expressions if e.is_candidate]
# ============================================================================
# Meta Models
# ============================================================================
class SpecMeta(BaseModel):
"""Metadata about the spec."""
version: str = Field(
...,
pattern=r"^2\.\d+$",
description="Schema version (e.g., '2.0')"
)
created: Optional[datetime] = Field(
default=None,
description="When the spec was created"
)
version: str = Field(..., pattern=r"^2\.\d+$", description="Schema version (e.g., '2.0')")
created: Optional[datetime] = Field(default=None, description="When the spec was created")
modified: Optional[datetime] = Field(
default=None,
description="When the spec was last modified"
default=None, description="When the spec was last modified"
)
created_by: Optional[SpecCreatedBy] = Field(
default=None,
description="Who/what created the spec"
)
modified_by: Optional[str] = Field(
default=None,
description="Who/what last modified the spec"
default=None, description="Who/what created the spec"
)
modified_by: Optional[str] = Field(default=None, description="Who/what last modified the spec")
study_name: str = Field(
...,
min_length=3,
max_length=100,
pattern=r"^[a-z0-9_]+$",
description="Unique study identifier (snake_case)"
description="Unique study identifier (snake_case)",
)
description: Optional[str] = Field(
default=None,
max_length=1000,
description="Human-readable description"
)
tags: Optional[List[str]] = Field(
default=None,
description="Tags for categorization"
default=None, max_length=1000, description="Human-readable description"
)
tags: Optional[List[str]] = Field(default=None, description="Tags for categorization")
engineering_context: Optional[str] = Field(
default=None, description="Real-world engineering context"
)
status: SpecStatus = Field(default=SpecStatus.DRAFT, description="Study lifecycle status")
topic: Optional[str] = Field(
default=None,
description="Real-world engineering context"
pattern=r"^[A-Za-z0-9_]+$",
description="Topic folder for grouping related studies",
)
@@ -168,15 +240,20 @@ class SpecMeta(BaseModel):
# Model Configuration Models
# ============================================================================
class NxPartConfig(BaseModel):
"""NX geometry part file configuration."""
path: Optional[str] = Field(default=None, description="Path to .prt file")
hash: Optional[str] = Field(default=None, description="File hash for change detection")
idealized_part: Optional[str] = Field(default=None, description="Idealized part filename (_i.prt)")
idealized_part: Optional[str] = Field(
default=None, description="Idealized part filename (_i.prt)"
)
class FemConfig(BaseModel):
"""FEM mesh file configuration."""
path: Optional[str] = Field(default=None, description="Path to .fem file")
element_count: Optional[int] = Field(default=None, description="Number of elements")
node_count: Optional[int] = Field(default=None, description="Number of nodes")
@@ -184,6 +261,7 @@ class FemConfig(BaseModel):
class Subcase(BaseModel):
"""Simulation subcase definition."""
id: int
name: Optional[str] = None
type: Optional[SubcaseType] = None
@@ -191,18 +269,18 @@ class Subcase(BaseModel):
class SimConfig(BaseModel):
"""Simulation file configuration."""
path: str = Field(..., description="Path to .sim file")
solver: SolverType = Field(..., description="Solver type")
solution_type: Optional[str] = Field(
default=None,
pattern=r"^SOL\d+$",
description="Solution type (e.g., SOL101)"
default=None, pattern=r"^SOL\d+$", description="Solution type (e.g., SOL101)"
)
subcases: Optional[List[Subcase]] = Field(default=None, description="Defined subcases")
class NxSettings(BaseModel):
"""NX runtime settings."""
nx_install_path: Optional[str] = None
simulation_timeout_s: Optional[int] = Field(default=None, ge=60, le=7200)
auto_start_nx: Optional[bool] = None
@@ -210,23 +288,31 @@ class NxSettings(BaseModel):
class ModelConfig(BaseModel):
"""NX model files and configuration."""
nx_part: Optional[NxPartConfig] = None
fem: Optional[FemConfig] = None
sim: SimConfig
sim: Optional[SimConfig] = Field(
default=None, description="Simulation file config (required for optimization)"
)
nx_settings: Optional[NxSettings] = None
introspection: Optional[IntrospectionData] = Field(
default=None, description="Model introspection results from intake"
)
# ============================================================================
# Design Variable Models
# ============================================================================
class DesignVariableBounds(BaseModel):
"""Design variable bounds."""
min: float
max: float
@model_validator(mode='after')
def validate_bounds(self) -> 'DesignVariableBounds':
@model_validator(mode="after")
def validate_bounds(self) -> "DesignVariableBounds":
if self.min >= self.max:
raise ValueError(f"min ({self.min}) must be less than max ({self.max})")
return self
@@ -234,16 +320,13 @@ class DesignVariableBounds(BaseModel):
class DesignVariable(BaseModel):
"""A design variable to optimize."""
id: str = Field(
...,
pattern=r"^dv_\d{3}$",
description="Unique identifier (pattern: dv_XXX)"
)
id: str = Field(..., pattern=r"^dv_\d{3}$", description="Unique identifier (pattern: dv_XXX)")
name: str = Field(..., description="Human-readable name")
expression_name: str = Field(
...,
pattern=r"^[a-zA-Z_][a-zA-Z0-9_]*$",
description="NX expression name (must match model)"
description="NX expression name (must match model)",
)
type: DesignVariableType = Field(..., description="Variable type")
bounds: DesignVariableBounds = Field(..., description="Value bounds")
@@ -259,8 +342,10 @@ class DesignVariable(BaseModel):
# Extractor Models
# ============================================================================
class ExtractorConfig(BaseModel):
"""Type-specific extractor configuration."""
inner_radius_mm: Optional[float] = None
outer_radius_mm: Optional[float] = None
n_modes: Optional[int] = None
@@ -279,6 +364,7 @@ class ExtractorConfig(BaseModel):
class CustomFunction(BaseModel):
"""Custom function definition for custom_function extractors."""
name: Optional[str] = Field(default=None, description="Function name")
module: Optional[str] = Field(default=None, description="Python module path")
signature: Optional[str] = Field(default=None, description="Function signature")
@@ -287,32 +373,33 @@ class CustomFunction(BaseModel):
class ExtractorOutput(BaseModel):
"""Output definition for an extractor."""
name: str = Field(..., description="Output name (used by objectives/constraints)")
metric: Optional[str] = Field(default=None, description="Specific metric (max, total, rms, etc.)")
metric: Optional[str] = Field(
default=None, description="Specific metric (max, total, rms, etc.)"
)
subcase: Optional[int] = Field(default=None, description="Subcase ID for this output")
units: Optional[str] = None
class Extractor(BaseModel):
"""Physics extractor that computes outputs from FEA."""
id: str = Field(
...,
pattern=r"^ext_\d{3}$",
description="Unique identifier (pattern: ext_XXX)"
)
id: str = Field(..., pattern=r"^ext_\d{3}$", description="Unique identifier (pattern: ext_XXX)")
name: str = Field(..., description="Human-readable name")
type: ExtractorType = Field(..., description="Extractor type")
builtin: bool = Field(default=True, description="Whether this is a built-in extractor")
config: Optional[ExtractorConfig] = Field(default=None, description="Type-specific configuration")
config: Optional[ExtractorConfig] = Field(
default=None, description="Type-specific configuration"
)
function: Optional[CustomFunction] = Field(
default=None,
description="Custom function definition (for custom_function type)"
default=None, description="Custom function definition (for custom_function type)"
)
outputs: List[ExtractorOutput] = Field(..., min_length=1, description="Output values")
canvas_position: Optional[CanvasPosition] = None
@model_validator(mode='after')
def validate_custom_function(self) -> 'Extractor':
@model_validator(mode="after")
def validate_custom_function(self) -> "Extractor":
if self.type == ExtractorType.CUSTOM_FUNCTION and self.function is None:
raise ValueError("custom_function extractor requires function definition")
return self
@@ -322,19 +409,18 @@ class Extractor(BaseModel):
# Objective Models
# ============================================================================
class ObjectiveSource(BaseModel):
"""Source reference for objective value."""
extractor_id: str = Field(..., description="Reference to extractor")
output_name: str = Field(..., description="Which output from the extractor")
class Objective(BaseModel):
"""Optimization objective."""
id: str = Field(
...,
pattern=r"^obj_\d{3}$",
description="Unique identifier (pattern: obj_XXX)"
)
id: str = Field(..., pattern=r"^obj_\d{3}$", description="Unique identifier (pattern: obj_XXX)")
name: str = Field(..., description="Human-readable name")
direction: OptimizationDirection = Field(..., description="Optimization direction")
weight: float = Field(default=1.0, ge=0, description="Weight for weighted sum")
@@ -349,14 +435,17 @@ class Objective(BaseModel):
# Constraint Models
# ============================================================================
class ConstraintSource(BaseModel):
"""Source reference for constraint value."""
extractor_id: str
output_name: str
class PenaltyConfig(BaseModel):
"""Penalty method configuration for constraints."""
method: Optional[PenaltyMethod] = None
weight: Optional[float] = None
margin: Optional[float] = Field(default=None, description="Soft margin before penalty kicks in")
@@ -364,11 +453,8 @@ class PenaltyConfig(BaseModel):
class Constraint(BaseModel):
"""Hard or soft constraint."""
id: str = Field(
...,
pattern=r"^con_\d{3}$",
description="Unique identifier (pattern: con_XXX)"
)
id: str = Field(..., pattern=r"^con_\d{3}$", description="Unique identifier (pattern: con_XXX)")
name: str
type: ConstraintType = Field(..., description="Constraint type")
operator: ConstraintOperator = Field(..., description="Comparison operator")
@@ -383,8 +469,10 @@ class Constraint(BaseModel):
# Optimization Models
# ============================================================================
class AlgorithmConfig(BaseModel):
"""Algorithm-specific settings."""
population_size: Optional[int] = None
n_generations: Optional[int] = None
mutation_prob: Optional[float] = None
@@ -399,22 +487,24 @@ class AlgorithmConfig(BaseModel):
class Algorithm(BaseModel):
"""Optimization algorithm configuration."""
type: AlgorithmType
config: Optional[AlgorithmConfig] = None
class OptimizationBudget(BaseModel):
"""Computational budget for optimization."""
max_trials: Optional[int] = Field(default=None, ge=1, le=10000)
max_time_hours: Optional[float] = None
convergence_patience: Optional[int] = Field(
default=None,
description="Stop if no improvement for N trials"
default=None, description="Stop if no improvement for N trials"
)
class SurrogateConfig(BaseModel):
"""Neural surrogate model configuration."""
n_models: Optional[int] = None
architecture: Optional[List[int]] = None
train_every_n_trials: Optional[int] = None
@@ -425,6 +515,7 @@ class SurrogateConfig(BaseModel):
class Surrogate(BaseModel):
"""Surrogate model settings."""
enabled: Optional[bool] = None
type: Optional[SurrogateType] = None
config: Optional[SurrogateConfig] = None
@@ -432,6 +523,7 @@ class Surrogate(BaseModel):
class OptimizationConfig(BaseModel):
"""Optimization algorithm configuration."""
algorithm: Algorithm
budget: OptimizationBudget
surrogate: Optional[Surrogate] = None
@@ -442,8 +534,10 @@ class OptimizationConfig(BaseModel):
# Workflow Models
# ============================================================================
class WorkflowStage(BaseModel):
"""A stage in a multi-stage optimization workflow."""
id: str
name: str
algorithm: Optional[str] = None
@@ -453,6 +547,7 @@ class WorkflowStage(BaseModel):
class WorkflowTransition(BaseModel):
"""Transition between workflow stages."""
from_: str = Field(..., alias="from")
to: str
condition: Optional[str] = None
@@ -463,6 +558,7 @@ class WorkflowTransition(BaseModel):
class Workflow(BaseModel):
"""Multi-stage optimization workflow."""
stages: Optional[List[WorkflowStage]] = None
transitions: Optional[List[WorkflowTransition]] = None
@@ -471,8 +567,10 @@ class Workflow(BaseModel):
# Reporting Models
# ============================================================================
class InsightConfig(BaseModel):
"""Insight-specific configuration."""
include_html: Optional[bool] = None
show_pareto_evolution: Optional[bool] = None
@@ -482,6 +580,7 @@ class InsightConfig(BaseModel):
class Insight(BaseModel):
"""Reporting insight definition."""
type: Optional[str] = None
for_trials: Optional[str] = None
config: Optional[InsightConfig] = None
@@ -489,6 +588,7 @@ class Insight(BaseModel):
class ReportingConfig(BaseModel):
"""Reporting configuration."""
auto_report: Optional[bool] = None
report_triggers: Optional[List[str]] = None
insights: Optional[List[Insight]] = None
@@ -498,8 +598,10 @@ class ReportingConfig(BaseModel):
# Canvas Models
# ============================================================================
class CanvasViewport(BaseModel):
"""Canvas viewport settings."""
x: float = 0
y: float = 0
zoom: float = 1.0
@@ -507,6 +609,7 @@ class CanvasViewport(BaseModel):
class CanvasEdge(BaseModel):
"""Connection between canvas nodes."""
source: str
target: str
sourceHandle: Optional[str] = None
@@ -515,6 +618,7 @@ class CanvasEdge(BaseModel):
class CanvasGroup(BaseModel):
"""Grouping of canvas nodes."""
id: str
name: str
node_ids: List[str]
@@ -522,6 +626,7 @@ class CanvasGroup(BaseModel):
class CanvasConfig(BaseModel):
"""Canvas UI state (persisted for reconstruction)."""
layout_version: Optional[str] = None
viewport: Optional[CanvasViewport] = None
edges: Optional[List[CanvasEdge]] = None
@@ -532,6 +637,7 @@ class CanvasConfig(BaseModel):
# Main AtomizerSpec Model
# ============================================================================
class AtomizerSpec(BaseModel):
"""
AtomizerSpec v2.0 - The unified configuration schema for Atomizer optimization studies.
@@ -542,36 +648,32 @@ class AtomizerSpec(BaseModel):
- Claude Assistant (reading and modifying)
- Optimization Engine (execution)
"""
meta: SpecMeta = Field(..., description="Metadata about the spec")
model: ModelConfig = Field(..., description="NX model files and configuration")
design_variables: List[DesignVariable] = Field(
...,
min_length=1,
default_factory=list,
max_length=50,
description="Design variables to optimize"
description="Design variables to optimize (required for running)",
)
extractors: List[Extractor] = Field(
...,
min_length=1,
description="Physics extractors"
default_factory=list, description="Physics extractors (required for running)"
)
objectives: List[Objective] = Field(
...,
min_length=1,
default_factory=list,
max_length=5,
description="Optimization objectives"
description="Optimization objectives (required for running)",
)
constraints: Optional[List[Constraint]] = Field(
default=None,
description="Hard and soft constraints"
default=None, description="Hard and soft constraints"
)
optimization: OptimizationConfig = Field(..., description="Algorithm configuration")
workflow: Optional[Workflow] = Field(default=None, description="Multi-stage workflow")
reporting: Optional[ReportingConfig] = Field(default=None, description="Reporting config")
canvas: Optional[CanvasConfig] = Field(default=None, description="Canvas UI state")
@model_validator(mode='after')
def validate_references(self) -> 'AtomizerSpec':
@model_validator(mode="after")
def validate_references(self) -> "AtomizerSpec":
"""Validate that all references are valid."""
# Collect valid extractor IDs and their outputs
extractor_outputs: Dict[str, set] = {}
@@ -638,13 +740,44 @@ class AtomizerSpec(BaseModel):
"""Check if this is a multi-objective optimization."""
return len(self.objectives) > 1
def is_ready_for_optimization(self) -> Tuple[bool, List[str]]:
"""
Check if spec is complete enough to run optimization.
Returns:
Tuple of (is_ready, list of missing requirements)
"""
missing = []
# Check required fields for optimization
if not self.model.sim:
missing.append("No simulation file (.sim) configured")
if not self.design_variables:
missing.append("No design variables defined")
if not self.extractors:
missing.append("No extractors defined")
if not self.objectives:
missing.append("No objectives defined")
# Check that enabled DVs have valid bounds
for dv in self.get_enabled_design_variables():
if dv.bounds.min >= dv.bounds.max:
missing.append(f"Design variable '{dv.name}' has invalid bounds")
return len(missing) == 0, missing
# ============================================================================
# Validation Response Models
# ============================================================================
class ValidationError(BaseModel):
"""A validation error."""
type: str # 'schema', 'semantic', 'reference'
path: List[str]
message: str
@@ -652,6 +785,7 @@ class ValidationError(BaseModel):
class ValidationWarning(BaseModel):
"""A validation warning."""
type: str
path: List[str]
message: str
@@ -659,6 +793,7 @@ class ValidationWarning(BaseModel):
class ValidationSummary(BaseModel):
"""Summary of spec contents."""
design_variables: int
extractors: int
objectives: int
@@ -668,6 +803,7 @@ class ValidationSummary(BaseModel):
class ValidationReport(BaseModel):
"""Full validation report."""
valid: bool
errors: List[ValidationError]
warnings: List[ValidationWarning]

View File

@@ -65,6 +65,16 @@ from optimization_engine.extractors.extract_zernike_figure import (
extract_zernike_figure_rms,
)
# Displacement extraction
from optimization_engine.extractors.extract_displacement import (
extract_displacement,
)
# Mass extraction from BDF
from optimization_engine.extractors.extract_mass_from_bdf import (
extract_mass_from_bdf,
)
# Part mass and material extractor (from NX .prt files)
from optimization_engine.extractors.extract_part_mass_material import (
extract_part_mass_material,
@@ -145,72 +155,76 @@ from optimization_engine.extractors.spec_extractor_builder import (
)
__all__ = [
# Displacement extraction
"extract_displacement",
# Mass extraction (from BDF)
"extract_mass_from_bdf",
# Part mass & material (from .prt)
'extract_part_mass_material',
'extract_part_mass',
'extract_part_material',
'PartMassExtractor',
"extract_part_mass_material",
"extract_part_mass",
"extract_part_material",
"PartMassExtractor",
# Stress extractors
'extract_solid_stress',
'extract_principal_stress',
'extract_max_principal_stress',
'extract_min_principal_stress',
"extract_solid_stress",
"extract_principal_stress",
"extract_max_principal_stress",
"extract_min_principal_stress",
# Strain energy
'extract_strain_energy',
'extract_total_strain_energy',
'extract_strain_energy_density',
"extract_strain_energy",
"extract_total_strain_energy",
"extract_strain_energy_density",
# SPC forces / reactions
'extract_spc_forces',
'extract_total_reaction_force',
'extract_reaction_component',
'check_force_equilibrium',
"extract_spc_forces",
"extract_total_reaction_force",
"extract_reaction_component",
"check_force_equilibrium",
# Zernike (telescope mirrors) - Standard Z-only method
'ZernikeExtractor',
'extract_zernike_from_op2',
'extract_zernike_filtered_rms',
'extract_zernike_relative_rms',
"ZernikeExtractor",
"extract_zernike_from_op2",
"extract_zernike_filtered_rms",
"extract_zernike_relative_rms",
# Zernike OPD (RECOMMENDED - uses actual geometry, no shape assumption)
# Supports annular apertures via inner_radius parameter
'ZernikeOPDExtractor',
'extract_zernike_opd',
'extract_zernike_opd_filtered_rms',
'compute_zernike_coefficients_annular',
"ZernikeOPDExtractor",
"extract_zernike_opd",
"extract_zernike_opd_filtered_rms",
"compute_zernike_coefficients_annular",
# Zernike Analytic (parabola-based with lateral displacement correction)
'ZernikeAnalyticExtractor',
'extract_zernike_analytic',
'extract_zernike_analytic_filtered_rms',
'compare_zernike_methods',
"ZernikeAnalyticExtractor",
"extract_zernike_analytic",
"extract_zernike_analytic_filtered_rms",
"compare_zernike_methods",
# Backwards compatibility (deprecated)
'ZernikeFigureExtractor',
'extract_zernike_figure',
'extract_zernike_figure_rms',
"ZernikeFigureExtractor",
"extract_zernike_figure",
"extract_zernike_figure_rms",
# Temperature (Phase 3 - thermal)
'extract_temperature',
'extract_temperature_gradient',
'extract_heat_flux',
'get_max_temperature',
"extract_temperature",
"extract_temperature_gradient",
"extract_heat_flux",
"get_max_temperature",
# Modal mass (Phase 3 - dynamics)
'extract_modal_mass',
'extract_frequencies',
'get_first_frequency',
'get_modal_mass_ratio',
"extract_modal_mass",
"extract_frequencies",
"get_first_frequency",
"get_modal_mass_ratio",
# Part introspection (Phase 4)
'introspect_part',
'get_expressions_dict',
'get_expression_value',
'print_introspection_summary',
"introspect_part",
"get_expressions_dict",
"get_expression_value",
"print_introspection_summary",
# Custom extractor loader (Phase 5)
'CustomExtractor',
'CustomExtractorLoader',
'CustomExtractorContext',
'ExtractorSecurityError',
'ExtractorValidationError',
'load_custom_extractors',
'execute_custom_extractor',
'validate_custom_extractor',
"CustomExtractor",
"CustomExtractorLoader",
"CustomExtractorContext",
"ExtractorSecurityError",
"ExtractorValidationError",
"load_custom_extractors",
"execute_custom_extractor",
"validate_custom_extractor",
# Spec extractor builder
'SpecExtractorBuilder',
'build_extractors_from_spec',
'get_extractor_outputs',
'list_available_builtin_extractors',
"SpecExtractorBuilder",
"build_extractors_from_spec",
"get_extractor_outputs",
"list_available_builtin_extractors",
]

View File

@@ -1,26 +1,30 @@
"""
Extract mass from Nastran BDF/DAT file as fallback when OP2 doesn't have GRDPNT
Extract mass from Nastran BDF/DAT file.
This module provides a simple wrapper around the BDFMassExtractor class.
"""
from pathlib import Path
from typing import Dict, Any
import re
from optimization_engine.extractors.bdf_mass_extractor import BDFMassExtractor
def extract_mass_from_bdf(bdf_file: Path) -> Dict[str, Any]:
"""
Extract mass from Nastran BDF file by parsing material and element definitions.
This is a fallback when OP2 doesn't have PARAM,GRDPNT output.
Extract mass from Nastran BDF file.
Args:
bdf_file: Path to .dat or .bdf file
Returns:
dict: {
'mass_kg': total mass in kg,
'mass_g': total mass in grams,
'method': 'bdf_calculation'
'total_mass': mass in kg (primary key),
'mass_kg': mass in kg,
'mass_g': mass in grams,
'cg': center of gravity [x, y, z],
'num_elements': number of elements,
'breakdown': mass by element type
}
"""
bdf_file = Path(bdf_file)
@@ -28,35 +32,23 @@ def extract_mass_from_bdf(bdf_file: Path) -> Dict[str, Any]:
if not bdf_file.exists():
raise FileNotFoundError(f"BDF file not found: {bdf_file}")
# Parse using pyNastran BDF reader
from pyNastran.bdf.bdf import read_bdf
extractor = BDFMassExtractor(str(bdf_file))
result = extractor.extract_mass()
model = read_bdf(str(bdf_file), validate=False, xref=True, punch=False,
encoding='utf-8', log=None, debug=False, mode='msc')
# Add 'total_mass' as primary key for compatibility
result["total_mass"] = result["mass_kg"]
# Calculate total mass by summing element masses
# model.mass_properties() returns (mass, cg, inertia)
mass_properties = model.mass_properties()
mass_ton = mass_properties[0] # Mass in tons (ton-mm-sec)
# NX Nastran typically uses ton-mm-sec units
mass_kg = mass_ton * 1000.0 # Convert tons to kg
mass_g = mass_kg * 1000.0 # Convert kg to grams
return {
'mass_kg': mass_kg,
'mass_g': mass_g,
'mass_ton': mass_ton,
'method': 'bdf_calculation',
'units': 'ton-mm-sec (converted to kg/g)'
}
return result
if __name__ == '__main__':
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
bdf_file = Path(sys.argv[1])
result = extract_mass_from_bdf(bdf_file)
print(f"Mass from BDF: {result['mass_kg']:.6f} kg ({result['mass_g']:.3f} g)")
print(f"CG: {result['cg']}")
print(f"Elements: {result['num_elements']}")
else:
print(f"Usage: python {sys.argv[0]} <bdf_file>")

View File

@@ -1,74 +1,86 @@
"""
Extract maximum von Mises stress from structural analysis
Auto-generated by Atomizer Phase 3 - pyNastran Research Agent
Extract maximum von Mises stress from structural analysis.
Pattern: solid_stress
Element Type: CTETRA
Result Type: stress
API: model.ctetra_stress[subcase] or model.chexa_stress[subcase]
Supports all solid element types (CTETRA, CHEXA, CPENTA, CPYRAM) and
shell elements (CQUAD4, CTRIA3).
Unit Note: NX Nastran in kg-mm-s outputs stress in kPa. This extractor
converts to MPa (divide by 1000) for engineering use.
"""
from pathlib import Path
from typing import Dict, Any
from typing import Dict, Any, Optional
import numpy as np
from pyNastran.op2.op2 import OP2
def extract_solid_stress(op2_file: Path, subcase: int = 1, element_type: str = 'ctetra'):
"""Extract stress from solid elements."""
from pyNastran.op2.op2 import OP2
import numpy as np
def extract_solid_stress(
op2_file: Path,
subcase: int = 1,
element_type: Optional[str] = None,
convert_to_mpa: bool = True,
) -> Dict[str, Any]:
"""
Extract maximum von Mises stress from solid elements.
model = OP2()
Args:
op2_file: Path to OP2 results file
subcase: Subcase ID (default 1)
element_type: Specific element type to check ('ctetra', 'chexa', etc.)
If None, checks ALL solid element types and returns max.
convert_to_mpa: If True, divide by 1000 to convert kPa to MPa (default True)
Returns:
dict with 'max_von_mises' (in MPa if convert_to_mpa=True),
'max_stress_element', and 'element_type'
"""
model = OP2(debug=False, log=None)
model.read_op2(str(op2_file))
# Get stress object for element type
# Different element types have different stress attributes
stress_attr_map = {
'ctetra': 'ctetra_stress',
'chexa': 'chexa_stress',
'cquad4': 'cquad4_stress',
'ctria3': 'ctria3_stress'
}
# All solid element types to check
solid_element_types = ["ctetra", "chexa", "cpenta", "cpyram"]
shell_element_types = ["cquad4", "ctria3"]
stress_attr = stress_attr_map.get(element_type.lower())
if not stress_attr:
raise ValueError(f"Unknown element type: {element_type}")
# Access stress through op2_results container
# pyNastran structure: model.op2_results.stress.cquad4_stress[subcase]
stress_dict = None
if hasattr(model, 'op2_results') and hasattr(model.op2_results, 'stress'):
stress_container = model.op2_results.stress
if hasattr(stress_container, stress_attr):
stress_dict = getattr(stress_container, stress_attr)
if stress_dict is None:
raise ValueError(f"No {element_type} stress results in OP2. Available attributes: {[a for a in dir(model) if 'stress' in a.lower()]}")
# stress_dict is a dictionary with subcase IDs as keys
available_subcases = list(stress_dict.keys())
if not available_subcases:
raise ValueError(f"No stress data found in OP2 file")
# Use the specified subcase or first available
if subcase in available_subcases:
actual_subcase = subcase
# If specific element type requested, only check that one
if element_type:
element_types_to_check = [element_type.lower()]
else:
actual_subcase = available_subcases[0]
# Check all solid types by default
element_types_to_check = solid_element_types
stress = stress_dict[actual_subcase]
if not hasattr(model, "op2_results") or not hasattr(model.op2_results, "stress"):
raise ValueError("No stress results in OP2 file")
itime = 0
stress_container = model.op2_results.stress
# Extract von Mises if available
if stress.is_von_mises: # Property, not method
# Different element types have von Mises at different column indices
# Shell elements (CQUAD4, CTRIA3): 8 columns, von Mises at column 7
# Solid elements (CTETRA, CHEXA): 10 columns, von Mises at column 9
# Find max stress across all requested element types
max_stress = 0.0
max_stress_elem = 0
max_stress_type = None
for elem_type in element_types_to_check:
stress_attr = f"{elem_type}_stress"
if not hasattr(stress_container, stress_attr):
continue
stress_dict = getattr(stress_container, stress_attr)
if not stress_dict:
continue
# Get subcase
available_subcases = list(stress_dict.keys())
if not available_subcases:
continue
actual_subcase = subcase if subcase in available_subcases else available_subcases[0]
stress = stress_dict[actual_subcase]
if not stress.is_von_mises:
continue
# Determine von Mises column
ncols = stress.data.shape[2]
if ncols == 8:
# Shell elements - von Mises is last column
von_mises_col = 7
@@ -76,27 +88,37 @@ def extract_solid_stress(op2_file: Path, subcase: int = 1, element_type: str = '
# Solid elements - von Mises is column 9
von_mises_col = 9
else:
# Unknown format, try last column
von_mises_col = ncols - 1
itime = 0
von_mises = stress.data[itime, :, von_mises_col]
max_stress = float(np.max(von_mises))
elem_max = float(np.max(von_mises))
# Get element info
element_ids = [eid for (eid, node) in stress.element_node]
max_stress_elem = element_ids[np.argmax(von_mises)]
if elem_max > max_stress:
max_stress = elem_max
element_ids = [eid for (eid, node) in stress.element_node]
max_stress_elem = int(element_ids[np.argmax(von_mises)])
max_stress_type = elem_type.upper()
return {
'max_von_mises': max_stress,
'max_stress_element': int(max_stress_elem)
}
else:
raise ValueError("von Mises stress not available")
if max_stress_type is None:
raise ValueError(f"No stress results found for element types: {element_types_to_check}")
# Convert from kPa to MPa (NX kg-mm-s unit system outputs kPa)
if convert_to_mpa:
max_stress = max_stress / 1000.0
return {
"max_von_mises": max_stress,
"max_stress_element": max_stress_elem,
"element_type": max_stress_type,
"units": "MPa" if convert_to_mpa else "kPa",
}
if __name__ == '__main__':
if __name__ == "__main__":
# Example usage
import sys
if len(sys.argv) > 1:
op2_file = Path(sys.argv[1])
result = extract_solid_stress(op2_file)

View File

@@ -473,23 +473,33 @@ def extract_displacements_by_subcase(
ngt = darr.node_gridtype.astype(int)
node_ids = ngt if ngt.ndim == 1 else ngt[:, 0]
# Try to identify subcase from subtitle or isubcase
# Try to identify subcase from subtitle, label, or isubcase
subtitle = getattr(darr, 'subtitle', None)
op2_label = getattr(darr, 'label', None)
isubcase = getattr(darr, 'isubcase', None)
# Extract numeric from subtitle
label = None
if isinstance(subtitle, str):
import re
# Extract numeric from subtitle first, then label, then isubcase
import re
subcase_id = None
# Priority 1: subtitle (e.g., "GRAVITY 20 DEG")
if isinstance(subtitle, str) and subtitle.strip():
m = re.search(r'-?\d+', subtitle)
if m:
label = m.group(0)
subcase_id = m.group(0)
if label is None and isinstance(isubcase, int):
label = str(isubcase)
# Priority 2: label field (e.g., "90 SUBCASE 1")
if subcase_id is None and isinstance(op2_label, str) and op2_label.strip():
m = re.search(r'-?\d+', op2_label)
if m:
subcase_id = m.group(0)
if label:
result[label] = {
# Priority 3: isubcase number
if subcase_id is None and isinstance(isubcase, int):
subcase_id = str(isubcase)
if subcase_id:
result[subcase_id] = {
'node_ids': node_ids.astype(int),
'disp': dmat.copy()
}

View File

@@ -0,0 +1,46 @@
"""
Atomizer Intake System
======================
Provides structured intake processing for optimization studies.
Components:
- IntakeConfig: Pydantic schema for intake.yaml
- StudyContext: Complete assembled context for study creation
- IntakeProcessor: File handling and processing
- ContextAssembler: Combines all context sources
Usage:
from optimization_engine.intake import IntakeProcessor, IntakeConfig
processor = IntakeProcessor(inbox_folder)
context = processor.process()
"""
from .config import (
IntakeConfig,
StudyConfig,
ObjectiveConfig,
ConstraintConfig,
DesignVariableConfig,
BudgetConfig,
AlgorithmConfig,
MaterialConfig,
)
from .context import StudyContext, IntrospectionData, BaselineResult
from .processor import IntakeProcessor
__all__ = [
"IntakeConfig",
"StudyConfig",
"ObjectiveConfig",
"ConstraintConfig",
"DesignVariableConfig",
"BudgetConfig",
"AlgorithmConfig",
"MaterialConfig",
"StudyContext",
"IntrospectionData",
"BaselineResult",
"IntakeProcessor",
]

View File

@@ -0,0 +1,371 @@
"""
Intake Configuration Schema
===========================
Pydantic models for intake.yaml configuration files.
These models define the structure of pre-configuration that users can
provide to skip interview questions and speed up study setup.
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional, List, Literal, Union, Any, Dict
from pydantic import BaseModel, Field, field_validator, model_validator
import yaml
class ObjectiveConfig(BaseModel):
"""Configuration for an optimization objective."""
goal: Literal["minimize", "maximize"]
target: str = Field(
description="What to optimize: mass, displacement, stress, frequency, stiffness, or custom name"
)
weight: float = Field(default=1.0, ge=0.0, le=10.0)
extractor: Optional[str] = Field(
default=None, description="Custom extractor function name (auto-detected if not specified)"
)
@field_validator("target")
@classmethod
def validate_target(cls, v: str) -> str:
"""Normalize target names."""
known_targets = {
"mass",
"weight",
"displacement",
"deflection",
"stress",
"frequency",
"stiffness",
"strain_energy",
"volume",
}
normalized = v.lower().strip()
# Map common aliases
aliases = {
"weight": "mass",
"deflection": "displacement",
}
return aliases.get(normalized, normalized)
class ConstraintConfig(BaseModel):
"""Configuration for an optimization constraint."""
type: str = Field(
description="Constraint type: max_stress, max_displacement, min_frequency, etc."
)
threshold: float
units: str = ""
description: Optional[str] = None
@field_validator("type")
@classmethod
def normalize_type(cls, v: str) -> str:
"""Normalize constraint type names."""
return v.lower().strip().replace(" ", "_")
class DesignVariableConfig(BaseModel):
"""Configuration for a design variable."""
name: str = Field(description="NX expression name")
bounds: tuple[float, float] = Field(description="(min, max) bounds")
units: Optional[str] = None
description: Optional[str] = None
step: Optional[float] = Field(default=None, description="Step size for discrete variables")
@field_validator("bounds")
@classmethod
def validate_bounds(cls, v: tuple[float, float]) -> tuple[float, float]:
"""Ensure bounds are valid."""
if len(v) != 2:
raise ValueError("Bounds must be a tuple of (min, max)")
if v[0] >= v[1]:
raise ValueError(f"Lower bound ({v[0]}) must be less than upper bound ({v[1]})")
return v
@property
def range(self) -> float:
"""Get the range of the design variable."""
return self.bounds[1] - self.bounds[0]
@property
def range_ratio(self) -> float:
"""Get the ratio of upper to lower bound."""
if self.bounds[0] == 0:
return float("inf")
return self.bounds[1] / self.bounds[0]
class BudgetConfig(BaseModel):
"""Configuration for optimization budget."""
max_trials: int = Field(default=100, ge=1, le=10000)
timeout_per_trial: int = Field(default=300, ge=10, le=7200, description="Seconds per FEA solve")
target_runtime: Optional[str] = Field(
default=None, description="Target total runtime (e.g., '2h', '30m')"
)
def get_target_runtime_seconds(self) -> Optional[int]:
"""Parse target_runtime string to seconds."""
if not self.target_runtime:
return None
runtime = self.target_runtime.lower().strip()
if runtime.endswith("h"):
return int(float(runtime[:-1]) * 3600)
elif runtime.endswith("m"):
return int(float(runtime[:-1]) * 60)
elif runtime.endswith("s"):
return int(float(runtime[:-1]))
else:
# Assume seconds
return int(float(runtime))
class AlgorithmConfig(BaseModel):
"""Configuration for optimization algorithm."""
method: Literal["auto", "TPE", "CMA-ES", "NSGA-II", "random"] = "auto"
neural_acceleration: bool = Field(
default=False, description="Enable surrogate model for speedup"
)
priority: Literal["speed", "accuracy", "balanced"] = "balanced"
seed: Optional[int] = Field(default=None, description="Random seed for reproducibility")
class MaterialConfig(BaseModel):
"""Configuration for material properties."""
name: str
yield_stress: Optional[float] = Field(default=None, ge=0, description="Yield stress in MPa")
ultimate_stress: Optional[float] = Field(
default=None, ge=0, description="Ultimate stress in MPa"
)
density: Optional[float] = Field(default=None, ge=0, description="Density in kg/m3")
youngs_modulus: Optional[float] = Field(
default=None, ge=0, description="Young's modulus in GPa"
)
poissons_ratio: Optional[float] = Field(
default=None, ge=0, le=0.5, description="Poisson's ratio"
)
class ObjectivesConfig(BaseModel):
"""Configuration for all objectives."""
primary: ObjectiveConfig
secondary: Optional[List[ObjectiveConfig]] = None
@property
def is_multi_objective(self) -> bool:
"""Check if this is a multi-objective problem."""
return self.secondary is not None and len(self.secondary) > 0
@property
def all_objectives(self) -> List[ObjectiveConfig]:
"""Get all objectives as a flat list."""
objectives = [self.primary]
if self.secondary:
objectives.extend(self.secondary)
return objectives
class StudyConfig(BaseModel):
"""Configuration for study metadata."""
name: Optional[str] = Field(
default=None, description="Study name (auto-generated from folder if omitted)"
)
type: Literal["single_objective", "multi_objective"] = "single_objective"
description: Optional[str] = None
tags: Optional[List[str]] = None
class IntakeConfig(BaseModel):
"""
Complete intake.yaml configuration schema.
All fields are optional - anything not specified will be asked
in the interview or auto-detected from introspection.
"""
study: Optional[StudyConfig] = None
objectives: Optional[ObjectivesConfig] = None
constraints: Optional[List[ConstraintConfig]] = None
design_variables: Optional[List[DesignVariableConfig]] = None
budget: Optional[BudgetConfig] = None
algorithm: Optional[AlgorithmConfig] = None
material: Optional[MaterialConfig] = None
notes: Optional[str] = None
@classmethod
def from_yaml(cls, yaml_path: Union[str, Path]) -> "IntakeConfig":
"""Load configuration from a YAML file."""
yaml_path = Path(yaml_path)
if not yaml_path.exists():
raise FileNotFoundError(f"Intake config not found: {yaml_path}")
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data is None:
return cls()
return cls.model_validate(data)
@classmethod
def from_yaml_safe(cls, yaml_path: Union[str, Path]) -> Optional["IntakeConfig"]:
"""Load configuration from YAML, returning None if file doesn't exist."""
yaml_path = Path(yaml_path)
if not yaml_path.exists():
return None
try:
return cls.from_yaml(yaml_path)
except Exception:
return None
def to_yaml(self, yaml_path: Union[str, Path]) -> None:
"""Save configuration to a YAML file."""
yaml_path = Path(yaml_path)
data = self.model_dump(exclude_none=True)
with open(yaml_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
def get_value(self, key: str) -> Optional[Any]:
"""
Get a configuration value by dot-notation key.
Examples:
config.get_value("study.name")
config.get_value("budget.max_trials")
config.get_value("objectives.primary.goal")
"""
parts = key.split(".")
value: Any = self
for part in parts:
if value is None:
return None
if hasattr(value, part):
value = getattr(value, part)
elif isinstance(value, dict):
value = value.get(part)
else:
return None
return value
def is_complete(self) -> bool:
"""Check if all required configuration is provided."""
return (
self.objectives is not None
and self.design_variables is not None
and len(self.design_variables) > 0
)
def get_missing_fields(self) -> List[str]:
"""Get list of fields that still need to be configured."""
missing = []
if self.objectives is None:
missing.append("objectives")
if self.design_variables is None or len(self.design_variables) == 0:
missing.append("design_variables")
if self.constraints is None:
missing.append("constraints (recommended)")
if self.budget is None:
missing.append("budget")
return missing
@model_validator(mode="after")
def validate_consistency(self) -> "IntakeConfig":
"""Validate consistency between configuration sections."""
# Check study type matches objectives
if self.study and self.objectives:
is_multi = self.objectives.is_multi_objective
declared_multi = self.study.type == "multi_objective"
if is_multi and not declared_multi:
# Auto-correct study type
self.study.type = "multi_objective"
return self
# Common material presets
MATERIAL_PRESETS: Dict[str, MaterialConfig] = {
"aluminum_6061_t6": MaterialConfig(
name="Aluminum 6061-T6",
yield_stress=276,
ultimate_stress=310,
density=2700,
youngs_modulus=68.9,
poissons_ratio=0.33,
),
"aluminum_7075_t6": MaterialConfig(
name="Aluminum 7075-T6",
yield_stress=503,
ultimate_stress=572,
density=2810,
youngs_modulus=71.7,
poissons_ratio=0.33,
),
"steel_a36": MaterialConfig(
name="Steel A36",
yield_stress=250,
ultimate_stress=400,
density=7850,
youngs_modulus=200,
poissons_ratio=0.26,
),
"stainless_304": MaterialConfig(
name="Stainless Steel 304",
yield_stress=215,
ultimate_stress=505,
density=8000,
youngs_modulus=193,
poissons_ratio=0.29,
),
"titanium_6al4v": MaterialConfig(
name="Titanium Ti-6Al-4V",
yield_stress=880,
ultimate_stress=950,
density=4430,
youngs_modulus=113.8,
poissons_ratio=0.342,
),
}
def get_material_preset(name: str) -> Optional[MaterialConfig]:
"""
Get a material preset by name (fuzzy matching).
Examples:
get_material_preset("6061") # Returns aluminum_6061_t6
get_material_preset("steel") # Returns steel_a36
"""
name_lower = name.lower().replace("-", "_").replace(" ", "_")
# Direct match
if name_lower in MATERIAL_PRESETS:
return MATERIAL_PRESETS[name_lower]
# Partial match
for key, material in MATERIAL_PRESETS.items():
if name_lower in key or name_lower in material.name.lower():
return material
return None

View File

@@ -0,0 +1,540 @@
"""
Study Context
=============
Complete assembled context for study creation, combining:
- Model introspection results
- Context files (goals.md, PDFs, images)
- Pre-configuration (intake.yaml)
- LAC memory (similar studies, recommendations)
This context object is used by both Interview Mode and Canvas Mode
to provide intelligent suggestions and pre-filled values.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
from enum import Enum
import json
class ConfidenceLevel(str, Enum):
"""Confidence level for suggestions."""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class ExpressionInfo:
"""Information about an NX expression."""
name: str
value: Optional[float] = None
units: Optional[str] = None
formula: Optional[str] = None
type: str = "Number"
is_design_candidate: bool = False
confidence: ConfidenceLevel = ConfidenceLevel.MEDIUM
reason: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"value": self.value,
"units": self.units,
"formula": self.formula,
"type": self.type,
"is_design_candidate": self.is_design_candidate,
"confidence": self.confidence.value,
"reason": self.reason,
}
@dataclass
class SolutionInfo:
"""Information about an NX solution."""
name: str
type: str # SOL 101, SOL 103, etc.
description: Optional[str] = None
@dataclass
class BoundaryConditionInfo:
"""Information about a boundary condition."""
name: str
type: str # Fixed, Pinned, etc.
location: Optional[str] = None
@dataclass
class LoadInfo:
"""Information about a load."""
name: str
type: str # Force, Pressure, etc.
magnitude: Optional[float] = None
units: Optional[str] = None
location: Optional[str] = None
@dataclass
class MaterialInfo:
"""Information about a material in the model."""
name: str
yield_stress: Optional[float] = None
density: Optional[float] = None
youngs_modulus: Optional[float] = None
@dataclass
class MeshInfo:
"""Information about the mesh."""
element_count: int = 0
node_count: int = 0
element_types: List[str] = field(default_factory=list)
quality_metrics: Dict[str, float] = field(default_factory=dict)
@dataclass
class BaselineResult:
"""Results from baseline solve."""
mass_kg: Optional[float] = None
max_displacement_mm: Optional[float] = None
max_stress_mpa: Optional[float] = None
max_strain: Optional[float] = None
first_frequency_hz: Optional[float] = None
strain_energy_j: Optional[float] = None
solve_time_seconds: Optional[float] = None
success: bool = False
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"mass_kg": self.mass_kg,
"max_displacement_mm": self.max_displacement_mm,
"max_stress_mpa": self.max_stress_mpa,
"max_strain": self.max_strain,
"first_frequency_hz": self.first_frequency_hz,
"strain_energy_j": self.strain_energy_j,
"solve_time_seconds": self.solve_time_seconds,
"success": self.success,
"error": self.error,
}
def get_summary(self) -> str:
"""Get a human-readable summary of baseline results."""
if not self.success:
return f"Baseline solve failed: {self.error or 'Unknown error'}"
parts = []
if self.mass_kg is not None:
parts.append(f"mass={self.mass_kg:.2f}kg")
if self.max_displacement_mm is not None:
parts.append(f"disp={self.max_displacement_mm:.3f}mm")
if self.max_stress_mpa is not None:
parts.append(f"stress={self.max_stress_mpa:.1f}MPa")
if self.first_frequency_hz is not None:
parts.append(f"freq={self.first_frequency_hz:.1f}Hz")
return ", ".join(parts) if parts else "No results"
@dataclass
class IntrospectionData:
"""Complete introspection results from NX model."""
success: bool = False
timestamp: Optional[datetime] = None
error: Optional[str] = None
# Part information
expressions: List[ExpressionInfo] = field(default_factory=list)
bodies: List[Dict[str, Any]] = field(default_factory=list)
# Simulation information
solutions: List[SolutionInfo] = field(default_factory=list)
boundary_conditions: List[BoundaryConditionInfo] = field(default_factory=list)
loads: List[LoadInfo] = field(default_factory=list)
materials: List[MaterialInfo] = field(default_factory=list)
mesh_info: Optional[MeshInfo] = None
# Available result types (from OP2)
available_results: Dict[str, bool] = field(default_factory=dict)
subcases: List[int] = field(default_factory=list)
# Baseline solve
baseline: Optional[BaselineResult] = None
def get_expression_names(self) -> List[str]:
"""Get list of all expression names."""
return [e.name for e in self.expressions]
def get_design_candidates(self) -> List[ExpressionInfo]:
"""Get expressions that look like design variables."""
return [e for e in self.expressions if e.is_design_candidate]
def get_expression(self, name: str) -> Optional[ExpressionInfo]:
"""Get expression by name."""
for expr in self.expressions:
if expr.name == name:
return expr
return None
def get_solver_type(self) -> Optional[str]:
"""Get the primary solver type (SOL 101, etc.)."""
if self.solutions:
return self.solutions[0].type
return None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"success": self.success,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"error": self.error,
"expressions": [e.to_dict() for e in self.expressions],
"solutions": [{"name": s.name, "type": s.type} for s in self.solutions],
"boundary_conditions": [
{"name": bc.name, "type": bc.type} for bc in self.boundary_conditions
],
"loads": [
{"name": l.name, "type": l.type, "magnitude": l.magnitude} for l in self.loads
],
"materials": [{"name": m.name, "yield_stress": m.yield_stress} for m in self.materials],
"available_results": self.available_results,
"subcases": self.subcases,
"baseline": self.baseline.to_dict() if self.baseline else None,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IntrospectionData":
"""Create from dictionary."""
introspection = cls(
success=data.get("success", False),
error=data.get("error"),
)
if data.get("timestamp"):
introspection.timestamp = datetime.fromisoformat(data["timestamp"])
# Parse expressions
for expr_data in data.get("expressions", []):
introspection.expressions.append(
ExpressionInfo(
name=expr_data["name"],
value=expr_data.get("value"),
units=expr_data.get("units"),
formula=expr_data.get("formula"),
type=expr_data.get("type", "Number"),
is_design_candidate=expr_data.get("is_design_candidate", False),
confidence=ConfidenceLevel(expr_data.get("confidence", "medium")),
)
)
# Parse solutions
for sol_data in data.get("solutions", []):
introspection.solutions.append(
SolutionInfo(
name=sol_data["name"],
type=sol_data["type"],
)
)
introspection.available_results = data.get("available_results", {})
introspection.subcases = data.get("subcases", [])
# Parse baseline
if data.get("baseline"):
baseline_data = data["baseline"]
introspection.baseline = BaselineResult(
mass_kg=baseline_data.get("mass_kg"),
max_displacement_mm=baseline_data.get("max_displacement_mm"),
max_stress_mpa=baseline_data.get("max_stress_mpa"),
solve_time_seconds=baseline_data.get("solve_time_seconds"),
success=baseline_data.get("success", False),
error=baseline_data.get("error"),
)
return introspection
@dataclass
class DVSuggestion:
"""Suggested design variable."""
name: str
current_value: Optional[float] = None
suggested_bounds: Optional[tuple[float, float]] = None
units: Optional[str] = None
confidence: ConfidenceLevel = ConfidenceLevel.MEDIUM
reason: str = ""
source: str = "introspection" # introspection, preconfig, lac
lac_insight: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"current_value": self.current_value,
"suggested_bounds": list(self.suggested_bounds) if self.suggested_bounds else None,
"units": self.units,
"confidence": self.confidence.value,
"reason": self.reason,
"source": self.source,
"lac_insight": self.lac_insight,
}
@dataclass
class ObjectiveSuggestion:
"""Suggested optimization objective."""
name: str
goal: str # minimize, maximize
extractor: str
confidence: ConfidenceLevel = ConfidenceLevel.MEDIUM
reason: str = ""
source: str = "goals"
@dataclass
class ConstraintSuggestion:
"""Suggested optimization constraint."""
name: str
type: str # less_than, greater_than
suggested_threshold: Optional[float] = None
units: Optional[str] = None
confidence: ConfidenceLevel = ConfidenceLevel.MEDIUM
reason: str = ""
source: str = "requirements"
@dataclass
class ImageAnalysis:
"""Analysis result from Claude Vision for an image."""
image_path: Path
component_type: Optional[str] = None
dimensions: List[str] = field(default_factory=list)
load_conditions: List[str] = field(default_factory=list)
annotations: List[str] = field(default_factory=list)
suggestions: List[str] = field(default_factory=list)
raw_analysis: Optional[str] = None
@dataclass
class LACInsight:
"""Insight from Learning Atomizer Core."""
study_name: str
similarity_score: float
geometry_type: str
method_used: str
objectives: List[str]
trials_to_convergence: Optional[int] = None
success: bool = True
lesson: Optional[str] = None
@dataclass
class StudyContext:
"""
Complete context for study creation.
This is the central data structure that combines all information
gathered during intake processing, ready for use by Interview Mode
or Canvas Mode.
"""
# === Identity ===
study_name: str
source_folder: Path
created_at: datetime = field(default_factory=datetime.now)
# === Model Files ===
sim_file: Optional[Path] = None
fem_file: Optional[Path] = None
prt_file: Optional[Path] = None
idealized_prt_file: Optional[Path] = None
# === From Introspection ===
introspection: Optional[IntrospectionData] = None
# === From Context Files ===
goals_text: Optional[str] = None
requirements_text: Optional[str] = None
constraints_text: Optional[str] = None
notes_text: Optional[str] = None
image_analyses: List[ImageAnalysis] = field(default_factory=list)
# === From intake.yaml ===
preconfig: Optional[Any] = None # IntakeConfig, imported dynamically to avoid circular import
# === From LAC ===
similar_studies: List[LACInsight] = field(default_factory=list)
recommended_method: Optional[str] = None
known_issues: List[str] = field(default_factory=list)
user_preferences: Dict[str, Any] = field(default_factory=dict)
# === Derived Suggestions ===
suggested_dvs: List[DVSuggestion] = field(default_factory=list)
suggested_objectives: List[ObjectiveSuggestion] = field(default_factory=list)
suggested_constraints: List[ConstraintSuggestion] = field(default_factory=list)
# === Status ===
warnings: List[str] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
@property
def has_introspection(self) -> bool:
"""Check if introspection data is available."""
return self.introspection is not None and self.introspection.success
@property
def has_baseline(self) -> bool:
"""Check if baseline results are available."""
return (
self.introspection is not None
and self.introspection.baseline is not None
and self.introspection.baseline.success
)
@property
def has_preconfig(self) -> bool:
"""Check if pre-configuration is available."""
return self.preconfig is not None
@property
def ready_for_interview(self) -> bool:
"""Check if context is ready for interview mode."""
return self.has_introspection and len(self.errors) == 0
@property
def ready_for_canvas(self) -> bool:
"""Check if context is ready for canvas mode."""
return self.has_introspection and self.sim_file is not None
def get_baseline_summary(self) -> str:
"""Get human-readable baseline summary."""
if self.introspection is None:
return "No baseline data"
if self.introspection.baseline is None:
return "No baseline data"
return self.introspection.baseline.get_summary()
def get_missing_required(self) -> List[str]:
"""Get list of missing required items."""
missing = []
if self.sim_file is None:
missing.append("Simulation file (.sim)")
if not self.has_introspection:
missing.append("Model introspection")
return missing
def get_context_summary(self) -> Dict[str, Any]:
"""Get a summary of loaded context for display."""
return {
"study_name": self.study_name,
"has_model": self.sim_file is not None,
"has_introspection": self.has_introspection,
"has_baseline": self.has_baseline,
"has_goals": self.goals_text is not None,
"has_requirements": self.requirements_text is not None,
"has_preconfig": self.has_preconfig,
"num_expressions": len(self.introspection.expressions) if self.introspection else 0,
"num_dv_candidates": len(self.introspection.get_design_candidates())
if self.introspection
else 0,
"num_similar_studies": len(self.similar_studies),
"warnings": self.warnings,
"errors": self.errors,
}
def to_interview_context(self) -> Dict[str, Any]:
"""Get context formatted for interview mode."""
return {
"study_name": self.study_name,
"baseline": (
self.introspection.baseline.to_dict()
if self.introspection is not None and self.introspection.baseline is not None
else None
),
"expressions": [e.to_dict() for e in self.introspection.expressions]
if self.introspection
else [],
"design_candidates": [e.to_dict() for e in self.introspection.get_design_candidates()]
if self.introspection
else [],
"solver_type": self.introspection.get_solver_type() if self.introspection else None,
"goals_text": self.goals_text,
"requirements_text": self.requirements_text,
"preconfig": self.preconfig.model_dump() if self.preconfig else None,
"suggested_dvs": [dv.to_dict() for dv in self.suggested_dvs],
"similar_studies": [
{"name": s.study_name, "method": s.method_used, "similarity": s.similarity_score}
for s in self.similar_studies
],
"recommended_method": self.recommended_method,
}
def save(self, output_path: Path) -> None:
"""Save context to JSON file."""
data = {
"study_name": self.study_name,
"source_folder": str(self.source_folder),
"created_at": self.created_at.isoformat(),
"sim_file": str(self.sim_file) if self.sim_file else None,
"fem_file": str(self.fem_file) if self.fem_file else None,
"prt_file": str(self.prt_file) if self.prt_file else None,
"introspection": self.introspection.to_dict() if self.introspection else None,
"goals_text": self.goals_text,
"requirements_text": self.requirements_text,
"suggested_dvs": [dv.to_dict() for dv in self.suggested_dvs],
"warnings": self.warnings,
"errors": self.errors,
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
@classmethod
def load(cls, input_path: Path) -> "StudyContext":
"""Load context from JSON file."""
with open(input_path, "r", encoding="utf-8") as f:
data = json.load(f)
context = cls(
study_name=data["study_name"],
source_folder=Path(data["source_folder"]),
created_at=datetime.fromisoformat(data["created_at"]),
)
if data.get("sim_file"):
context.sim_file = Path(data["sim_file"])
if data.get("fem_file"):
context.fem_file = Path(data["fem_file"])
if data.get("prt_file"):
context.prt_file = Path(data["prt_file"])
if data.get("introspection"):
context.introspection = IntrospectionData.from_dict(data["introspection"])
context.goals_text = data.get("goals_text")
context.requirements_text = data.get("requirements_text")
context.warnings = data.get("warnings", [])
context.errors = data.get("errors", [])
return context

View File

@@ -0,0 +1,789 @@
"""
Intake Processor
================
Processes intake folders to create study context:
1. Validates folder structure
2. Copies model files to study directory
3. Parses intake.yaml pre-configuration
4. Extracts text from context files (goals.md, PDFs)
5. Runs model introspection
6. Optionally runs baseline solve
7. Assembles complete StudyContext
Usage:
from optimization_engine.intake import IntakeProcessor
processor = IntakeProcessor(Path("studies/_inbox/my_project"))
context = processor.process(run_baseline=True)
"""
from __future__ import annotations
import logging
import shutil
import re
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Callable, Dict, Any
from .config import IntakeConfig, DesignVariableConfig
from .context import (
StudyContext,
IntrospectionData,
ExpressionInfo,
SolutionInfo,
BaselineResult,
DVSuggestion,
ObjectiveSuggestion,
ConstraintSuggestion,
ConfidenceLevel,
)
logger = logging.getLogger(__name__)
class IntakeError(Exception):
"""Error during intake processing."""
pass
class IntakeProcessor:
"""
Processes an intake folder to create a complete StudyContext.
The processor handles:
- File discovery and validation
- Model file copying
- Configuration parsing
- Context file extraction
- Model introspection (via NX journals)
- Baseline solve (optional)
- Suggestion generation
"""
def __init__(
self,
inbox_folder: Path,
studies_dir: Optional[Path] = None,
progress_callback: Optional[Callable[[str, float], None]] = None,
):
"""
Initialize the intake processor.
Args:
inbox_folder: Path to the intake folder (in _inbox/)
studies_dir: Base studies directory (default: auto-detect)
progress_callback: Optional callback for progress updates (message, percent)
"""
self.inbox_folder = Path(inbox_folder)
self.progress_callback = progress_callback or (lambda m, p: None)
# Validate inbox folder exists
if not self.inbox_folder.exists():
raise IntakeError(f"Inbox folder not found: {self.inbox_folder}")
# Determine study name from folder name
self.study_name = self.inbox_folder.name
if self.study_name.startswith("_"):
# Strip leading underscore (used for examples)
self.study_name = self.study_name[1:]
# Set studies directory
if studies_dir is None:
# Find project root
current = Path(__file__).parent
while current != current.parent:
if (current / "CLAUDE.md").exists():
studies_dir = current / "studies"
break
current = current.parent
else:
studies_dir = Path.cwd() / "studies"
self.studies_dir = Path(studies_dir)
self.study_dir = self.studies_dir / self.study_name
# Initialize context
self.context = StudyContext(
study_name=self.study_name,
source_folder=self.inbox_folder,
)
def process(
self,
run_baseline: bool = True,
copy_files: bool = True,
run_introspection: bool = True,
) -> StudyContext:
"""
Process the intake folder and create StudyContext.
Args:
run_baseline: Run a baseline FEA solve to get actual values
copy_files: Copy model files to study directory
run_introspection: Run NX model introspection
Returns:
Complete StudyContext ready for interview or canvas
"""
logger.info(f"Processing intake: {self.inbox_folder}")
try:
# Step 1: Discover files
self._progress("Discovering files...", 0.0)
self._discover_files()
# Step 2: Parse intake.yaml
self._progress("Parsing configuration...", 0.1)
self._parse_config()
# Step 3: Extract context files
self._progress("Extracting context...", 0.2)
self._extract_context_files()
# Step 4: Copy model files
if copy_files:
self._progress("Copying model files...", 0.3)
self._copy_model_files()
# Step 5: Run introspection
if run_introspection:
self._progress("Introspecting model...", 0.4)
self._run_introspection()
# Step 6: Run baseline solve
if run_baseline and self.context.sim_file:
self._progress("Running baseline solve...", 0.6)
self._run_baseline_solve()
# Step 7: Generate suggestions
self._progress("Generating suggestions...", 0.8)
self._generate_suggestions()
# Step 8: Save context
self._progress("Saving context...", 0.9)
self._save_context()
self._progress("Complete!", 1.0)
except Exception as e:
self.context.errors.append(str(e))
logger.error(f"Intake processing failed: {e}")
raise
return self.context
def _progress(self, message: str, percent: float) -> None:
"""Report progress."""
logger.info(f"[{percent * 100:.0f}%] {message}")
self.progress_callback(message, percent)
def _discover_files(self) -> None:
"""Discover model and context files in the inbox folder."""
# Look for model files
models_dir = self.inbox_folder / "models"
if models_dir.exists():
search_dir = models_dir
else:
# Fall back to root folder
search_dir = self.inbox_folder
# Find simulation file (required)
sim_files = list(search_dir.glob("*.sim"))
if sim_files:
self.context.sim_file = sim_files[0]
logger.info(f"Found sim file: {self.context.sim_file.name}")
else:
self.context.warnings.append("No .sim file found in models/")
# Find FEM file
fem_files = list(search_dir.glob("*.fem"))
if fem_files:
self.context.fem_file = fem_files[0]
logger.info(f"Found fem file: {self.context.fem_file.name}")
# Find part file
prt_files = [f for f in search_dir.glob("*.prt") if "_i.prt" not in f.name.lower()]
if prt_files:
self.context.prt_file = prt_files[0]
logger.info(f"Found prt file: {self.context.prt_file.name}")
# Find idealized part (CRITICAL!)
idealized_files = list(search_dir.glob("*_i.prt")) + list(search_dir.glob("*_I.prt"))
if idealized_files:
self.context.idealized_prt_file = idealized_files[0]
logger.info(f"Found idealized prt: {self.context.idealized_prt_file.name}")
else:
self.context.warnings.append(
"No idealized part (*_i.prt) found - mesh may not update during optimization!"
)
def _parse_config(self) -> None:
"""Parse intake.yaml if present."""
config_path = self.inbox_folder / "intake.yaml"
if config_path.exists():
try:
self.context.preconfig = IntakeConfig.from_yaml(config_path)
logger.info("Loaded intake.yaml configuration")
# Update study name if specified
if self.context.preconfig.study and self.context.preconfig.study.name:
self.context.study_name = self.context.preconfig.study.name
self.study_name = self.context.study_name
self.study_dir = self.studies_dir / self.study_name
except Exception as e:
self.context.warnings.append(f"Failed to parse intake.yaml: {e}")
logger.warning(f"Failed to parse intake.yaml: {e}")
else:
logger.info("No intake.yaml found, will use interview mode")
def _extract_context_files(self) -> None:
"""Extract text from context files."""
context_dir = self.inbox_folder / "context"
# Read goals.md
goals_path = context_dir / "goals.md"
if goals_path.exists():
self.context.goals_text = goals_path.read_text(encoding="utf-8")
logger.info("Loaded goals.md")
# Read constraints.txt
constraints_path = context_dir / "constraints.txt"
if constraints_path.exists():
self.context.constraints_text = constraints_path.read_text(encoding="utf-8")
logger.info("Loaded constraints.txt")
# Read any .txt or .md files in context/
if context_dir.exists():
for txt_file in context_dir.glob("*.txt"):
if txt_file.name != "constraints.txt":
content = txt_file.read_text(encoding="utf-8")
if self.context.notes_text:
self.context.notes_text += f"\n\n--- {txt_file.name} ---\n{content}"
else:
self.context.notes_text = content
# Extract PDF text (basic implementation)
# TODO: Add PyMuPDF and Claude Vision integration
for pdf_path in context_dir.glob("*.pdf") if context_dir.exists() else []:
try:
text = self._extract_pdf_text(pdf_path)
if text:
self.context.requirements_text = text
logger.info(f"Extracted text from {pdf_path.name}")
except Exception as e:
self.context.warnings.append(f"Failed to extract PDF {pdf_path.name}: {e}")
def _extract_pdf_text(self, pdf_path: Path) -> Optional[str]:
"""Extract text from PDF using PyMuPDF if available."""
try:
import fitz # PyMuPDF
doc = fitz.open(pdf_path)
text_parts = []
for page in doc:
text_parts.append(page.get_text())
doc.close()
return "\n".join(text_parts)
except ImportError:
logger.warning("PyMuPDF not installed, skipping PDF extraction")
return None
except Exception as e:
logger.warning(f"PDF extraction failed: {e}")
return None
def _copy_model_files(self) -> None:
"""Copy model files to study directory."""
# Create study directory structure
model_dir = self.study_dir / "1_model"
model_dir.mkdir(parents=True, exist_ok=True)
(self.study_dir / "2_iterations").mkdir(exist_ok=True)
(self.study_dir / "3_results").mkdir(exist_ok=True)
# Copy files
files_to_copy = [
self.context.sim_file,
self.context.fem_file,
self.context.prt_file,
self.context.idealized_prt_file,
]
for src in files_to_copy:
if src and src.exists():
dst = model_dir / src.name
if not dst.exists():
shutil.copy2(src, dst)
logger.info(f"Copied: {src.name}")
else:
logger.info(f"Already exists: {src.name}")
# Update paths to point to copied files
if self.context.sim_file:
self.context.sim_file = model_dir / self.context.sim_file.name
if self.context.fem_file:
self.context.fem_file = model_dir / self.context.fem_file.name
if self.context.prt_file:
self.context.prt_file = model_dir / self.context.prt_file.name
if self.context.idealized_prt_file:
self.context.idealized_prt_file = model_dir / self.context.idealized_prt_file.name
def _run_introspection(self) -> None:
"""Run NX model introspection."""
if not self.context.sim_file or not self.context.sim_file.exists():
self.context.warnings.append("Cannot introspect - no sim file")
return
introspection = IntrospectionData(timestamp=datetime.now())
try:
# Try to use existing introspection modules
from optimization_engine.extractors.introspect_part import introspect_part_expressions
# Introspect part for expressions
if self.context.prt_file and self.context.prt_file.exists():
expressions = introspect_part_expressions(str(self.context.prt_file))
for expr in expressions:
is_candidate = self._is_design_candidate(expr["name"], expr.get("value"))
introspection.expressions.append(
ExpressionInfo(
name=expr["name"],
value=expr.get("value"),
units=expr.get("units"),
formula=expr.get("formula"),
type=expr.get("type", "Number"),
is_design_candidate=is_candidate,
confidence=ConfidenceLevel.HIGH
if is_candidate
else ConfidenceLevel.MEDIUM,
)
)
introspection.success = True
logger.info(f"Introspected {len(introspection.expressions)} expressions")
except ImportError:
logger.warning("Introspection module not available, using fallback")
introspection.success = False
introspection.error = "Introspection module not available"
except Exception as e:
logger.error(f"Introspection failed: {e}")
introspection.success = False
introspection.error = str(e)
self.context.introspection = introspection
def _is_design_candidate(self, name: str, value: Optional[float]) -> bool:
"""Check if an expression looks like a design variable candidate."""
# Skip if no value or non-numeric
if value is None:
return False
# Skip system/reference expressions
if name.startswith("p") and name[1:].isdigit():
return False
# Skip mass-related outputs (not inputs)
if "mass" in name.lower() and "input" not in name.lower():
return False
# Look for typical design parameter names
design_keywords = [
"thickness",
"width",
"height",
"length",
"radius",
"diameter",
"angle",
"offset",
"depth",
"size",
"span",
"pitch",
"gap",
"rib",
"flange",
"web",
"wall",
"fillet",
"chamfer",
]
name_lower = name.lower()
return any(kw in name_lower for kw in design_keywords)
def _run_baseline_solve(self) -> None:
"""Run baseline FEA solve to get actual values."""
if not self.context.introspection:
self.context.introspection = IntrospectionData(timestamp=datetime.now())
baseline = BaselineResult()
try:
from optimization_engine.nx.solver import NXSolver
solver = NXSolver()
model_dir = self.context.sim_file.parent
result = solver.run_simulation(
sim_file=self.context.sim_file,
working_dir=model_dir,
expression_updates={}, # No updates for baseline
cleanup=True,
)
if result["success"]:
baseline.success = True
baseline.solve_time_seconds = result.get("solve_time", 0)
# Extract results from OP2
op2_file = result.get("op2_file")
if op2_file and Path(op2_file).exists():
self._extract_baseline_results(baseline, Path(op2_file), model_dir)
logger.info(f"Baseline solve complete: {baseline.get_summary()}")
else:
baseline.success = False
baseline.error = result.get("error", "Unknown error")
logger.warning(f"Baseline solve failed: {baseline.error}")
except ImportError:
logger.warning("NXSolver not available, skipping baseline")
baseline.success = False
baseline.error = "NXSolver not available"
except Exception as e:
logger.error(f"Baseline solve failed: {e}")
baseline.success = False
baseline.error = str(e)
self.context.introspection.baseline = baseline
def _extract_baseline_results(
self, baseline: BaselineResult, op2_file: Path, model_dir: Path
) -> None:
"""Extract results from OP2 file."""
try:
# Try to extract displacement
from optimization_engine.extractors.extract_displacement import extract_displacement
disp_result = extract_displacement(op2_file, subcase=1)
baseline.max_displacement_mm = disp_result.get("max_displacement")
except Exception as e:
logger.debug(f"Displacement extraction failed: {e}")
try:
# Try to extract stress
from optimization_engine.extractors.extract_von_mises_stress import extract_solid_stress
stress_result = extract_solid_stress(op2_file, subcase=1)
baseline.max_stress_mpa = stress_result.get("max_von_mises")
except Exception as e:
logger.debug(f"Stress extraction failed: {e}")
try:
# Try to extract mass from BDF
from optimization_engine.extractors.bdf_mass_extractor import extract_mass_from_bdf
dat_files = list(model_dir.glob("*.dat"))
if dat_files:
baseline.mass_kg = extract_mass_from_bdf(str(dat_files[0]))
except Exception as e:
logger.debug(f"Mass extraction failed: {e}")
def _generate_suggestions(self) -> None:
"""Generate intelligent suggestions based on all context."""
self._generate_dv_suggestions()
self._generate_objective_suggestions()
self._generate_constraint_suggestions()
self._query_lac()
def _generate_dv_suggestions(self) -> None:
"""Generate design variable suggestions."""
suggestions: Dict[str, DVSuggestion] = {}
# From introspection
if self.context.introspection:
for expr in self.context.introspection.get_design_candidates():
if expr.value is not None and isinstance(expr.value, (int, float)):
# Calculate suggested bounds (50% to 150% of current value)
if expr.value > 0:
bounds = (expr.value * 0.5, expr.value * 1.5)
else:
bounds = (expr.value * 1.5, expr.value * 0.5)
suggestions[expr.name] = DVSuggestion(
name=expr.name,
current_value=expr.value,
suggested_bounds=bounds,
units=expr.units,
confidence=expr.confidence,
reason=f"Numeric expression with value {expr.value}",
source="introspection",
)
# Override/add from preconfig
if self.context.preconfig and self.context.preconfig.design_variables:
for dv in self.context.preconfig.design_variables:
if dv.name in suggestions:
# Update existing suggestion
suggestions[dv.name].suggested_bounds = dv.bounds
suggestions[dv.name].units = dv.units or suggestions[dv.name].units
suggestions[dv.name].source = "preconfig"
suggestions[dv.name].confidence = ConfidenceLevel.HIGH
else:
# Add new suggestion
suggestions[dv.name] = DVSuggestion(
name=dv.name,
suggested_bounds=dv.bounds,
units=dv.units,
confidence=ConfidenceLevel.HIGH,
reason="Specified in intake.yaml",
source="preconfig",
)
self.context.suggested_dvs = list(suggestions.values())
logger.info(f"Generated {len(self.context.suggested_dvs)} DV suggestions")
def _generate_objective_suggestions(self) -> None:
"""Generate objective suggestions from context."""
suggestions = []
# From preconfig
if self.context.preconfig and self.context.preconfig.objectives:
obj = self.context.preconfig.objectives.primary
extractor = self._get_extractor_for_target(obj.target)
suggestions.append(
ObjectiveSuggestion(
name=obj.target,
goal=obj.goal,
extractor=extractor,
confidence=ConfidenceLevel.HIGH,
reason="Specified in intake.yaml",
source="preconfig",
)
)
# From goals text (simple keyword matching)
elif self.context.goals_text:
goals_lower = self.context.goals_text.lower()
if "minimize" in goals_lower and "mass" in goals_lower:
suggestions.append(
ObjectiveSuggestion(
name="mass",
goal="minimize",
extractor="extract_mass_from_bdf",
confidence=ConfidenceLevel.MEDIUM,
reason="Found 'minimize mass' in goals",
source="goals",
)
)
elif "minimize" in goals_lower and "weight" in goals_lower:
suggestions.append(
ObjectiveSuggestion(
name="mass",
goal="minimize",
extractor="extract_mass_from_bdf",
confidence=ConfidenceLevel.MEDIUM,
reason="Found 'minimize weight' in goals",
source="goals",
)
)
if "maximize" in goals_lower and "stiffness" in goals_lower:
suggestions.append(
ObjectiveSuggestion(
name="stiffness",
goal="maximize",
extractor="extract_displacement", # Inverse of displacement
confidence=ConfidenceLevel.MEDIUM,
reason="Found 'maximize stiffness' in goals",
source="goals",
)
)
self.context.suggested_objectives = suggestions
def _generate_constraint_suggestions(self) -> None:
"""Generate constraint suggestions from context."""
suggestions = []
# From preconfig
if self.context.preconfig and self.context.preconfig.constraints:
for const in self.context.preconfig.constraints:
suggestions.append(
ConstraintSuggestion(
name=const.type,
type="less_than" if "max" in const.type else "greater_than",
suggested_threshold=const.threshold,
units=const.units,
confidence=ConfidenceLevel.HIGH,
reason="Specified in intake.yaml",
source="preconfig",
)
)
# From requirements text
if self.context.requirements_text:
# Simple pattern matching for constraints
text = self.context.requirements_text
# Look for stress limits
stress_pattern = r"(?:max(?:imum)?|stress)\s*[:<]?\s*(\d+(?:\.\d+)?)\s*(?:MPa|mpa)"
matches = re.findall(stress_pattern, text, re.IGNORECASE)
if matches:
suggestions.append(
ConstraintSuggestion(
name="max_stress",
type="less_than",
suggested_threshold=float(matches[0]),
units="MPa",
confidence=ConfidenceLevel.MEDIUM,
reason=f"Found stress limit in requirements: {matches[0]} MPa",
source="requirements",
)
)
# Look for displacement limits
disp_pattern = (
r"(?:max(?:imum)?|displacement|deflection)\s*[:<]?\s*(\d+(?:\.\d+)?)\s*(?:mm|MM)"
)
matches = re.findall(disp_pattern, text, re.IGNORECASE)
if matches:
suggestions.append(
ConstraintSuggestion(
name="max_displacement",
type="less_than",
suggested_threshold=float(matches[0]),
units="mm",
confidence=ConfidenceLevel.MEDIUM,
reason=f"Found displacement limit in requirements: {matches[0]} mm",
source="requirements",
)
)
self.context.suggested_constraints = suggestions
def _get_extractor_for_target(self, target: str) -> str:
"""Map optimization target to extractor function."""
extractors = {
"mass": "extract_mass_from_bdf",
"displacement": "extract_displacement",
"stress": "extract_solid_stress",
"frequency": "extract_frequency",
"stiffness": "extract_displacement", # Inverse
"strain_energy": "extract_strain_energy",
}
return extractors.get(target.lower(), f"extract_{target}")
def _query_lac(self) -> None:
"""Query Learning Atomizer Core for similar studies."""
try:
from knowledge_base.lac import get_lac
lac = get_lac()
# Build query from context
query_parts = [self.study_name]
if self.context.goals_text:
query_parts.append(self.context.goals_text[:200])
query = " ".join(query_parts)
# Get similar studies
similar = lac.query_similar_optimizations(query)
# Get method recommendation
n_objectives = 1
if self.context.preconfig and self.context.preconfig.objectives:
n_objectives = len(self.context.preconfig.objectives.all_objectives)
recommendation = lac.get_best_method_for(
geometry_type="unknown", n_objectives=n_objectives
)
if recommendation:
self.context.recommended_method = recommendation.get("method")
logger.info(f"LAC query complete: {len(similar)} similar studies found")
except ImportError:
logger.debug("LAC not available")
except Exception as e:
logger.debug(f"LAC query failed: {e}")
def _save_context(self) -> None:
"""Save assembled context to study directory."""
# Ensure study directory exists
self.study_dir.mkdir(parents=True, exist_ok=True)
# Save context JSON
context_path = self.study_dir / "0_intake" / "study_context.json"
context_path.parent.mkdir(exist_ok=True)
self.context.save(context_path)
# Save introspection report
if self.context.introspection:
introspection_path = self.study_dir / "0_intake" / "introspection.json"
import json
with open(introspection_path, "w") as f:
json.dump(self.context.introspection.to_dict(), f, indent=2)
# Copy original context files
intake_dir = self.study_dir / "0_intake" / "original_context"
intake_dir.mkdir(parents=True, exist_ok=True)
context_source = self.inbox_folder / "context"
if context_source.exists():
for f in context_source.iterdir():
if f.is_file():
shutil.copy2(f, intake_dir / f.name)
# Copy intake.yaml
intake_yaml = self.inbox_folder / "intake.yaml"
if intake_yaml.exists():
shutil.copy2(intake_yaml, self.study_dir / "0_intake" / "intake.yaml")
logger.info(f"Saved context to {self.study_dir / '0_intake'}")
def process_intake(
inbox_folder: Path,
run_baseline: bool = True,
progress_callback: Optional[Callable[[str, float], None]] = None,
) -> StudyContext:
"""
Convenience function to process an intake folder.
Args:
inbox_folder: Path to inbox folder
run_baseline: Run baseline solve
progress_callback: Optional progress callback
Returns:
Complete StudyContext
"""
processor = IntakeProcessor(inbox_folder, progress_callback=progress_callback)
return processor.process(run_baseline=run_baseline)

View File

@@ -70,15 +70,15 @@ def extract_part_mass(theSession, part, output_dir):
import json
results = {
'part_file': part.Name,
'mass_kg': 0.0,
'mass_g': 0.0,
'volume_mm3': 0.0,
'surface_area_mm2': 0.0,
'center_of_gravity_mm': [0.0, 0.0, 0.0],
'num_bodies': 0,
'success': False,
'error': None
"part_file": part.Name,
"mass_kg": 0.0,
"mass_g": 0.0,
"volume_mm3": 0.0,
"surface_area_mm2": 0.0,
"center_of_gravity_mm": [0.0, 0.0, 0.0],
"num_bodies": 0,
"success": False,
"error": None,
}
try:
@@ -88,10 +88,10 @@ def extract_part_mass(theSession, part, output_dir):
if body.IsSolidBody:
bodies.append(body)
results['num_bodies'] = len(bodies)
results["num_bodies"] = len(bodies)
if not bodies:
results['error'] = "No solid bodies found"
results["error"] = "No solid bodies found"
raise ValueError("No solid bodies found in part")
# Get the measure manager
@@ -104,30 +104,30 @@ def extract_part_mass(theSession, part, output_dir):
uc.GetBase("Area"),
uc.GetBase("Volume"),
uc.GetBase("Mass"),
uc.GetBase("Length")
uc.GetBase("Length"),
]
# Create mass properties measurement
measureBodies = measureManager.NewMassProperties(mass_units, 0.99, bodies)
if measureBodies:
results['mass_kg'] = measureBodies.Mass
results['mass_g'] = results['mass_kg'] * 1000.0
results["mass_kg"] = measureBodies.Mass
results["mass_g"] = results["mass_kg"] * 1000.0
try:
results['volume_mm3'] = measureBodies.Volume
results["volume_mm3"] = measureBodies.Volume
except:
pass
try:
results['surface_area_mm2'] = measureBodies.Area
results["surface_area_mm2"] = measureBodies.Area
except:
pass
try:
cog = measureBodies.Centroid
if cog:
results['center_of_gravity_mm'] = [cog.X, cog.Y, cog.Z]
results["center_of_gravity_mm"] = [cog.X, cog.Y, cog.Z]
except:
pass
@@ -136,26 +136,26 @@ def extract_part_mass(theSession, part, output_dir):
except:
pass
results['success'] = True
results["success"] = True
except Exception as e:
results['error'] = str(e)
results['success'] = False
results["error"] = str(e)
results["success"] = False
# Write results to JSON file
output_file = os.path.join(output_dir, "_temp_part_properties.json")
with open(output_file, 'w') as f:
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
# Write simple mass value for backward compatibility
mass_file = os.path.join(output_dir, "_temp_mass.txt")
with open(mass_file, 'w') as f:
f.write(str(results['mass_kg']))
with open(mass_file, "w") as f:
f.write(str(results["mass_kg"]))
if not results['success']:
raise ValueError(results['error'])
if not results["success"]:
raise ValueError(results["error"])
return results['mass_kg']
return results["mass_kg"]
def find_or_open_part(theSession, part_path):
@@ -164,7 +164,7 @@ def find_or_open_part(theSession, part_path):
In NX, calling Parts.Open() on an already-loaded part raises 'File already exists'.
"""
part_name = os.path.splitext(os.path.basename(part_path))[0]
# Try to find in already-loaded parts
for part in theSession.Parts:
if part.Name == part_name:
@@ -174,9 +174,9 @@ def find_or_open_part(theSession, part_path):
return part, True
except:
pass
# Not found, open it
markId = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, f'Load {part_name}')
markId = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, f"Load {part_name}")
part, partLoadStatus = theSession.Parts.Open(part_path)
partLoadStatus.Dispose()
return part, False
@@ -194,26 +194,28 @@ def main(args):
"""
if len(args) < 1:
print("ERROR: No .sim file path provided")
print("Usage: run_journal.exe solve_simulation.py <sim_file_path> [solution_name] [expr1=val1] ...")
print(
"Usage: run_journal.exe solve_simulation.py <sim_file_path> [solution_name] [expr1=val1] ..."
)
return False
sim_file_path = args[0]
solution_name = args[1] if len(args) > 1 and args[1] != 'None' else None
solution_name = args[1] if len(args) > 1 and args[1] != "None" else None
# Parse expression updates
expression_updates = {}
for arg in args[2:]:
if '=' in arg:
name, value = arg.split('=', 1)
if "=" in arg:
name, value = arg.split("=", 1)
expression_updates[name] = float(value)
# Get working directory
working_dir = os.path.dirname(os.path.abspath(sim_file_path))
sim_filename = os.path.basename(sim_file_path)
print(f"[JOURNAL] " + "="*60)
print(f"[JOURNAL] " + "=" * 60)
print(f"[JOURNAL] NX SIMULATION SOLVER (Assembly FEM Workflow)")
print(f"[JOURNAL] " + "="*60)
print(f"[JOURNAL] " + "=" * 60)
print(f"[JOURNAL] Simulation: {sim_filename}")
print(f"[JOURNAL] Working directory: {working_dir}")
print(f"[JOURNAL] Solution: {solution_name or 'Solution 1'}")
@@ -226,7 +228,9 @@ def main(args):
# Set load options
theSession.Parts.LoadOptions.LoadLatest = False
theSession.Parts.LoadOptions.ComponentLoadMethod = NXOpen.LoadOptions.LoadMethod.FromDirectory
theSession.Parts.LoadOptions.ComponentLoadMethod = (
NXOpen.LoadOptions.LoadMethod.FromDirectory
)
theSession.Parts.LoadOptions.SetSearchDirectories([working_dir], [True])
theSession.Parts.LoadOptions.ComponentsToLoad = NXOpen.LoadOptions.LoadComponents.All
theSession.Parts.LoadOptions.PartLoadOption = NXOpen.LoadOptions.LoadOption.FullyLoad
@@ -240,7 +244,7 @@ def main(args):
pass
# Check for assembly FEM files
afm_files = [f for f in os.listdir(working_dir) if f.endswith('.afm')]
afm_files = [f for f in os.listdir(working_dir) if f.endswith(".afm")]
is_assembly = len(afm_files) > 0
if is_assembly and expression_updates:
@@ -262,11 +266,14 @@ def main(args):
except Exception as e:
print(f"[JOURNAL] FATAL ERROR: {e}")
import traceback
traceback.print_exc()
return False
def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expression_updates, working_dir):
def solve_assembly_fem_workflow(
theSession, sim_file_path, solution_name, expression_updates, working_dir
):
"""
Full assembly FEM workflow based on recorded NX journal.
@@ -285,8 +292,7 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
sim_file_full_path = os.path.join(working_dir, sim_filename)
print(f"[JOURNAL] Opening SIM file: {sim_filename}")
basePart, partLoadStatus = theSession.Parts.OpenActiveDisplay(
sim_file_full_path,
NXOpen.DisplayPartOption.AllowAdditional
sim_file_full_path, NXOpen.DisplayPartOption.AllowAdditional
)
partLoadStatus.Dispose()
@@ -330,7 +336,9 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
print(f"[JOURNAL] WARNING: M1_Blank_fem1_i.prt not found!")
# Load M1_Vertical_Support_Skeleton_fem1_i.prt (CRITICAL: idealized geometry for support)
skeleton_idealized_prt_path = os.path.join(working_dir, "M1_Vertical_Support_Skeleton_fem1_i.prt")
skeleton_idealized_prt_path = os.path.join(
working_dir, "M1_Vertical_Support_Skeleton_fem1_i.prt"
)
if os.path.exists(skeleton_idealized_prt_path):
print(f"[JOURNAL] Loading M1_Vertical_Support_Skeleton_fem1_i.prt...")
part3_skel, was_loaded = find_or_open_part(theSession, skeleton_idealized_prt_path)
@@ -347,11 +355,13 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
# Find and switch to M1_Blank part
try:
part3 = theSession.Parts.FindObject("M1_Blank")
markId3 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Change Displayed Part")
markId3 = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Change Displayed Part"
)
status1, partLoadStatus3 = theSession.Parts.SetActiveDisplay(
part3,
NXOpen.DisplayPartOption.AllowAdditional,
NXOpen.PartDisplayPartWorkPartOption.UseLast
NXOpen.PartDisplayPartWorkPartOption.UseLast,
)
partLoadStatus3.Dispose()
@@ -366,10 +376,10 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
# Write expressions to a temp file and import (more reliable than editing one by one)
exp_file_path = os.path.join(working_dir, "_temp_expressions.exp")
with open(exp_file_path, 'w') as f:
with open(exp_file_path, "w") as f:
for expr_name, expr_value in expression_updates.items():
# Determine unit
if 'angle' in expr_name.lower() or 'vertical' in expr_name.lower():
if "angle" in expr_name.lower() or "vertical" in expr_name.lower():
unit_str = "Degrees"
else:
unit_str = "MilliMeter"
@@ -377,12 +387,13 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
print(f"[JOURNAL] {expr_name} = {expr_value} ({unit_str})")
print(f"[JOURNAL] Importing expressions from file...")
markId_import = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Import Expressions")
markId_import = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Import Expressions"
)
try:
expModified, errorMessages = workPart.Expressions.ImportFromFile(
exp_file_path,
NXOpen.ExpressionCollection.ImportMode.Replace
exp_file_path, NXOpen.ExpressionCollection.ImportMode.Replace
)
print(f"[JOURNAL] Expressions imported: {expModified} modified")
if errorMessages:
@@ -390,14 +401,18 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
# Update geometry after import
print(f"[JOURNAL] Rebuilding M1_Blank geometry...")
markId_update = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "NX update")
markId_update = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Invisible, "NX update"
)
nErrs = theSession.UpdateManager.DoUpdate(markId_update)
theSession.DeleteUndoMark(markId_update, "NX update")
print(f"[JOURNAL] M1_Blank geometry rebuilt ({nErrs} errors)")
# CRITICAL: Save M1_Blank after geometry update so FEM can read updated geometry
print(f"[JOURNAL] Saving M1_Blank...")
partSaveStatus_blank = workPart.Save(NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue)
partSaveStatus_blank = workPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus_blank.Dispose()
print(f"[JOURNAL] M1_Blank saved")
@@ -445,11 +460,13 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
print(f"[JOURNAL] Updating {part_name}...")
linked_part = theSession.Parts.FindObject(part_name)
markId_linked = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, f"Update {part_name}")
markId_linked = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, f"Update {part_name}"
)
status_linked, partLoadStatus_linked = theSession.Parts.SetActiveDisplay(
linked_part,
NXOpen.DisplayPartOption.AllowAdditional,
NXOpen.PartDisplayPartWorkPartOption.UseLast
NXOpen.PartDisplayPartWorkPartOption.UseLast,
)
partLoadStatus_linked.Dispose()
@@ -457,14 +474,18 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
theSession.ApplicationSwitchImmediate("UG_APP_MODELING")
# Update to propagate linked expression changes
markId_linked_update = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "NX update")
markId_linked_update = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Invisible, "NX update"
)
nErrs_linked = theSession.UpdateManager.DoUpdate(markId_linked_update)
theSession.DeleteUndoMark(markId_linked_update, "NX update")
print(f"[JOURNAL] {part_name} geometry rebuilt ({nErrs_linked} errors)")
# CRITICAL: Save part after geometry update so FEM can read updated geometry
print(f"[JOURNAL] Saving {part_name}...")
partSaveStatus_linked = linked_part.Save(NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue)
partSaveStatus_linked = linked_part.Save(
NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus_linked.Dispose()
print(f"[JOURNAL] {part_name} saved")
@@ -482,7 +503,9 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
sim_part_name = os.path.splitext(sim_filename)[0] # e.g., "ASSY_M1_assyfem1_sim1"
print(f"[JOURNAL] Looking for sim part: {sim_part_name}")
markId_sim = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Change Displayed Part")
markId_sim = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Change Displayed Part"
)
try:
# First try to find it among loaded parts (like recorded journal)
@@ -490,7 +513,7 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
status_sim, partLoadStatus = theSession.Parts.SetActiveDisplay(
simPart1,
NXOpen.DisplayPartOption.AllowAdditional,
NXOpen.PartDisplayPartWorkPartOption.UseLast
NXOpen.PartDisplayPartWorkPartOption.UseLast,
)
partLoadStatus.Dispose()
print(f"[JOURNAL] Found and activated existing sim part")
@@ -498,8 +521,7 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
# Fallback: Open fresh if not found
print(f"[JOURNAL] Sim part not found, opening fresh: {sim_filename}")
basePart, partLoadStatus = theSession.Parts.OpenActiveDisplay(
sim_file_path,
NXOpen.DisplayPartOption.AllowAdditional
sim_file_path, NXOpen.DisplayPartOption.AllowAdditional
)
partLoadStatus.Dispose()
@@ -517,23 +539,29 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
print(f"[JOURNAL] Updating M1_Blank_fem1...")
try:
component2 = component1.FindObject("COMPONENT M1_Blank_fem1 1")
markId_fem1 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Make Work Part")
markId_fem1 = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Make Work Part"
)
partLoadStatus5 = theSession.Parts.SetWorkComponent(
component2,
NXOpen.PartCollection.RefsetOption.Entire,
NXOpen.PartCollection.WorkComponentOption.Visible
NXOpen.PartCollection.WorkComponentOption.Visible,
)
workFemPart = theSession.Parts.BaseWork
partLoadStatus5.Dispose()
markId_update1 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Update FE Model")
markId_update1 = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Update FE Model"
)
fEModel1 = workFemPart.FindObject("FEModel")
fEModel1.UpdateFemodel()
print(f"[JOURNAL] M1_Blank_fem1 updated")
# CRITICAL: Save FEM file after update to persist mesh changes
print(f"[JOURNAL] Saving M1_Blank_fem1...")
partSaveStatus_fem1 = workFemPart.Save(NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue)
partSaveStatus_fem1 = workFemPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus_fem1.Dispose()
print(f"[JOURNAL] M1_Blank_fem1 saved")
except Exception as e:
@@ -543,23 +571,29 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
print(f"[JOURNAL] Updating M1_Vertical_Support_Skeleton_fem1...")
try:
component3 = component1.FindObject("COMPONENT M1_Vertical_Support_Skeleton_fem1 3")
markId_fem2 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Make Work Part")
markId_fem2 = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Make Work Part"
)
partLoadStatus6 = theSession.Parts.SetWorkComponent(
component3,
NXOpen.PartCollection.RefsetOption.Entire,
NXOpen.PartCollection.WorkComponentOption.Visible
NXOpen.PartCollection.WorkComponentOption.Visible,
)
workFemPart = theSession.Parts.BaseWork
partLoadStatus6.Dispose()
markId_update2 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Update FE Model")
markId_update2 = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Update FE Model"
)
fEModel2 = workFemPart.FindObject("FEModel")
fEModel2.UpdateFemodel()
print(f"[JOURNAL] M1_Vertical_Support_Skeleton_fem1 updated")
# CRITICAL: Save FEM file after update to persist mesh changes
print(f"[JOURNAL] Saving M1_Vertical_Support_Skeleton_fem1...")
partSaveStatus_fem2 = workFemPart.Save(NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue)
partSaveStatus_fem2 = workFemPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus_fem2.Dispose()
print(f"[JOURNAL] M1_Vertical_Support_Skeleton_fem1 saved")
except Exception as e:
@@ -578,7 +612,7 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
partLoadStatus8 = theSession.Parts.SetWorkComponent(
component1,
NXOpen.PartCollection.RefsetOption.Entire,
NXOpen.PartCollection.WorkComponentOption.Visible
NXOpen.PartCollection.WorkComponentOption.Visible,
)
workAssyFemPart = theSession.Parts.BaseWork
displaySimPart = theSession.Parts.BaseDisplay
@@ -643,13 +677,17 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
elif numMerged == 0:
print(f"[JOURNAL] No nodes were merged (0 returned)")
if numDuplicates is None:
print(f"[JOURNAL] WARNING: IdentifyDuplicateNodes returned None - mesh may need display refresh")
print(
f"[JOURNAL] WARNING: IdentifyDuplicateNodes returned None - mesh may need display refresh"
)
else:
print(f"[JOURNAL] MergeDuplicateNodes returned None - batch mode limitation")
except Exception as merge_error:
print(f"[JOURNAL] MergeDuplicateNodes failed: {merge_error}")
if numDuplicates is None:
print(f"[JOURNAL] This combined with IdentifyDuplicateNodes=None suggests display issue")
print(
f"[JOURNAL] This combined with IdentifyDuplicateNodes=None suggests display issue"
)
theSession.SetUndoMarkName(markId_merge, "Duplicate Nodes")
duplicateNodesCheckBuilder1.Destroy()
@@ -658,6 +696,7 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
except Exception as e:
print(f"[JOURNAL] WARNING: Node merge: {e}")
import traceback
traceback.print_exc()
# ==========================================================================
@@ -673,7 +712,9 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
theSession.SetUndoMarkName(markId_labels, "Assembly Label Manager Dialog")
markId_labels2 = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "Assembly Label Manager")
markId_labels2 = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Invisible, "Assembly Label Manager"
)
# Set offsets for each FE model occurrence
# These offsets ensure unique node/element labels across components
@@ -720,7 +761,9 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
print(f"[JOURNAL] STEP 5b: Saving assembly FEM after all updates...")
try:
# Save the assembly FEM to persist all mesh updates and node merges
partSaveStatus_afem = workAssyFemPart.Save(NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue)
partSaveStatus_afem = workAssyFemPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue
)
partSaveStatus_afem.Dispose()
print(f"[JOURNAL] Assembly FEM saved: {workAssyFemPart.Name}")
except Exception as e:
@@ -736,7 +779,7 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
partLoadStatus9 = theSession.Parts.SetWorkComponent(
NXOpen.Assemblies.Component.Null,
NXOpen.PartCollection.RefsetOption.Entire,
NXOpen.PartCollection.WorkComponentOption.Visible
NXOpen.PartCollection.WorkComponentOption.Visible,
)
workSimPart = theSession.Parts.BaseWork
partLoadStatus9.Dispose()
@@ -760,13 +803,15 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
psolutions1,
NXOpen.CAE.SimSolution.SolveOption.Solve,
NXOpen.CAE.SimSolution.SetupCheckOption.CompleteCheckAndOutputErrors,
NXOpen.CAE.SimSolution.SolveMode.Foreground # Use Foreground to ensure OP2 is complete
NXOpen.CAE.SimSolution.SolveMode.Foreground, # Use Foreground to ensure OP2 is complete
)
theSession.DeleteUndoMark(markId_solve2, None)
theSession.SetUndoMarkName(markId_solve, "Solve")
print(f"[JOURNAL] Solve completed: {numsolved} solved, {numfailed} failed, {numskipped} skipped")
print(
f"[JOURNAL] Solve completed: {numsolved} solved, {numfailed} failed, {numskipped} skipped"
)
# ==========================================================================
# STEP 7: SAVE ALL - Save all modified parts (FEM, SIM, PRT)
@@ -784,11 +829,14 @@ def solve_assembly_fem_workflow(theSession, sim_file_path, solution_name, expres
except Exception as e:
print(f"[JOURNAL] ERROR solving: {e}")
import traceback
traceback.print_exc()
return False
def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_updates, working_dir):
def solve_simple_workflow(
theSession, sim_file_path, solution_name, expression_updates, working_dir
):
"""
Workflow for single-part simulations with optional expression updates.
@@ -802,8 +850,7 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
# Open the .sim file
basePart1, partLoadStatus1 = theSession.Parts.OpenActiveDisplay(
sim_file_path,
NXOpen.DisplayPartOption.AllowAdditional
sim_file_path, NXOpen.DisplayPartOption.AllowAdditional
)
partLoadStatus1.Dispose()
@@ -830,11 +877,11 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
part_type = type(part).__name__
# Skip FEM and SIM parts by type
if 'fem' in part_type.lower() or 'sim' in part_type.lower():
if "fem" in part_type.lower() or "sim" in part_type.lower():
continue
# Skip parts with _fem or _sim in name
if '_fem' in part_name or '_sim' in part_name:
if "_fem" in part_name or "_sim" in part_name:
continue
geom_part = part
@@ -845,25 +892,38 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
if geom_part is None:
print(f"[JOURNAL] Geometry part not loaded, searching for .prt file...")
for filename in os.listdir(working_dir):
if filename.endswith('.prt') and '_fem' not in filename.lower() and '_sim' not in filename.lower():
# Skip idealized parts (_i.prt), FEM parts, and SIM parts
if (
filename.endswith(".prt")
and "_fem" not in filename.lower()
and "_sim" not in filename.lower()
and "_i.prt" not in filename.lower()
):
prt_path = os.path.join(working_dir, filename)
print(f"[JOURNAL] Loading geometry part: {filename}")
try:
geom_part, partLoadStatus = theSession.Parts.Open(prt_path)
loaded_part, partLoadStatus = theSession.Parts.Open(prt_path)
partLoadStatus.Dispose()
print(f"[JOURNAL] Geometry part loaded: {geom_part.Name}")
break
# Check if load actually succeeded (Parts.Open can return None)
if loaded_part is not None:
geom_part = loaded_part
print(f"[JOURNAL] Geometry part loaded: {geom_part.Name}")
break
else:
print(f"[JOURNAL] WARNING: Parts.Open returned None for {filename}")
except Exception as e:
print(f"[JOURNAL] WARNING: Could not load {filename}: {e}")
if geom_part:
try:
# Switch to the geometry part for expression editing
markId_expr = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Visible, "Update Expressions")
markId_expr = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Visible, "Update Expressions"
)
status, partLoadStatus = theSession.Parts.SetActiveDisplay(
geom_part,
NXOpen.DisplayPartOption.AllowAdditional,
NXOpen.PartDisplayPartWorkPartOption.UseLast
NXOpen.PartDisplayPartWorkPartOption.UseLast,
)
partLoadStatus.Dispose()
@@ -874,10 +934,10 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
# Write expressions to temp file and import
exp_file_path = os.path.join(working_dir, "_temp_expressions.exp")
with open(exp_file_path, 'w') as f:
with open(exp_file_path, "w") as f:
for expr_name, expr_value in expression_updates.items():
# Determine unit based on name
if 'angle' in expr_name.lower():
if "angle" in expr_name.lower():
unit_str = "Degrees"
else:
unit_str = "MilliMeter"
@@ -886,8 +946,7 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
print(f"[JOURNAL] Importing expressions...")
expModified, errorMessages = workPart.Expressions.ImportFromFile(
exp_file_path,
NXOpen.ExpressionCollection.ImportMode.Replace
exp_file_path, NXOpen.ExpressionCollection.ImportMode.Replace
)
print(f"[JOURNAL] Expressions modified: {expModified}")
if errorMessages:
@@ -895,14 +954,19 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
# Update geometry
print(f"[JOURNAL] Rebuilding geometry...")
markId_update = theSession.SetUndoMark(NXOpen.Session.MarkVisibility.Invisible, "NX update")
markId_update = theSession.SetUndoMark(
NXOpen.Session.MarkVisibility.Invisible, "NX update"
)
nErrs = theSession.UpdateManager.DoUpdate(markId_update)
theSession.DeleteUndoMark(markId_update, "NX update")
print(f"[JOURNAL] Geometry rebuilt ({nErrs} errors)")
# Save geometry part
print(f"[JOURNAL] Saving geometry part...")
partSaveStatus_geom = workPart.Save(NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue)
partSaveStatus_geom = workPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue,
NXOpen.BasePart.CloseAfterSave.FalseValue,
)
partSaveStatus_geom.Dispose()
# Clean up temp file
@@ -914,6 +978,7 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
except Exception as e:
print(f"[JOURNAL] ERROR updating expressions: {e}")
import traceback
traceback.print_exc()
else:
print(f"[JOURNAL] WARNING: Could not find geometry part for expression updates!")
@@ -928,13 +993,18 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
# The chain is: .prt (geometry) -> _i.prt (idealized) -> .fem (mesh)
idealized_part = None
for filename in os.listdir(working_dir):
if '_i.prt' in filename.lower():
if "_i.prt" in filename.lower():
idealized_path = os.path.join(working_dir, filename)
print(f"[JOURNAL] Loading idealized part: {filename}")
try:
idealized_part, partLoadStatus = theSession.Parts.Open(idealized_path)
loaded_part, partLoadStatus = theSession.Parts.Open(idealized_path)
partLoadStatus.Dispose()
print(f"[JOURNAL] Idealized part loaded: {idealized_part.Name}")
# Check if load actually succeeded (Parts.Open can return None)
if loaded_part is not None:
idealized_part = loaded_part
print(f"[JOURNAL] Idealized part loaded: {idealized_part.Name}")
else:
print(f"[JOURNAL] WARNING: Parts.Open returned None for idealized part")
except Exception as e:
print(f"[JOURNAL] WARNING: Could not load idealized part: {e}")
break
@@ -942,7 +1012,7 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
# Find the FEM part
fem_part = None
for part in theSession.Parts:
if '_fem' in part.Name.lower() or part.Name.lower().endswith('.fem'):
if "_fem" in part.Name.lower() or part.Name.lower().endswith(".fem"):
fem_part = part
print(f"[JOURNAL] Found FEM part: {part.Name}")
break
@@ -956,7 +1026,7 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
status, partLoadStatus = theSession.Parts.SetActiveDisplay(
fem_part,
NXOpen.DisplayPartOption.AllowAdditional,
NXOpen.PartDisplayPartWorkPartOption.SameAsDisplay # Critical fix!
NXOpen.PartDisplayPartWorkPartOption.SameAsDisplay, # Critical fix!
)
partLoadStatus.Dispose()
@@ -972,13 +1042,17 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
print(f"[JOURNAL] FE model updated")
# Save FEM
partSaveStatus_fem = workFemPart.Save(NXOpen.BasePart.SaveComponents.TrueValue, NXOpen.BasePart.CloseAfterSave.FalseValue)
partSaveStatus_fem = workFemPart.Save(
NXOpen.BasePart.SaveComponents.TrueValue,
NXOpen.BasePart.CloseAfterSave.FalseValue,
)
partSaveStatus_fem.Dispose()
print(f"[JOURNAL] FEM saved")
except Exception as e:
print(f"[JOURNAL] ERROR updating FEM: {e}")
import traceback
traceback.print_exc()
# =========================================================================
@@ -990,7 +1064,7 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
status, partLoadStatus = theSession.Parts.SetActiveDisplay(
workSimPart,
NXOpen.DisplayPartOption.AllowAdditional,
NXOpen.PartDisplayPartWorkPartOption.UseLast
NXOpen.PartDisplayPartWorkPartOption.UseLast,
)
partLoadStatus.Dispose()
@@ -1016,13 +1090,15 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
psolutions1,
NXOpen.CAE.SimSolution.SolveOption.Solve,
NXOpen.CAE.SimSolution.SetupCheckOption.CompleteCheckAndOutputErrors,
NXOpen.CAE.SimSolution.SolveMode.Foreground # Use Foreground to wait for completion
NXOpen.CAE.SimSolution.SolveMode.Foreground, # Use Foreground to wait for completion
)
theSession.DeleteUndoMark(markId_solve2, None)
theSession.SetUndoMarkName(markId_solve, "Solve")
print(f"[JOURNAL] Solve completed: {numsolved} solved, {numfailed} failed, {numskipped} skipped")
print(
f"[JOURNAL] Solve completed: {numsolved} solved, {numfailed} failed, {numskipped} skipped"
)
# Save all
try:
@@ -1035,6 +1111,6 @@ def solve_simple_workflow(theSession, sim_file_path, solution_name, expression_u
return numfailed == 0
if __name__ == '__main__':
if __name__ == "__main__":
success = main(sys.argv[1:])
sys.exit(0 if success else 1)

View File

@@ -85,7 +85,7 @@
"created_by": {
"type": "string",
"description": "Who/what created the spec",
"enum": ["canvas", "claude", "api", "migration", "manual"]
"enum": ["canvas", "claude", "api", "migration", "manual", "dashboard_intake"]
},
"modified_by": {
"type": "string",
@@ -114,6 +114,17 @@
"engineering_context": {
"type": "string",
"description": "Real-world engineering scenario"
},
"status": {
"type": "string",
"description": "Study lifecycle status",
"enum": ["draft", "introspected", "configured", "validated", "ready", "running", "completed", "failed"],
"default": "draft"
},
"topic": {
"type": "string",
"description": "Topic folder for grouping related studies",
"pattern": "^[A-Za-z0-9_]+$"
}
}
},
@@ -215,6 +226,124 @@
"type": "boolean"
}
}
},
"introspection": {
"$ref": "#/definitions/introspection_data",
"description": "Model introspection results from intake workflow"
}
}
},
"introspection_data": {
"type": "object",
"description": "Model introspection results stored in the spec",
"properties": {
"timestamp": {
"type": "string",
"format": "date-time",
"description": "When introspection was run"
},
"solver_type": {
"type": "string",
"description": "Detected solver type"
},
"mass_kg": {
"type": "number",
"description": "Mass from expressions or mass properties"
},
"volume_mm3": {
"type": "number",
"description": "Volume from mass properties"
},
"expressions": {
"type": "array",
"description": "Discovered NX expressions",
"items": {
"$ref": "#/definitions/expression_info"
}
},
"baseline": {
"$ref": "#/definitions/baseline_data",
"description": "Baseline FEA solve results"
},
"warnings": {
"type": "array",
"description": "Warnings from introspection",
"items": {
"type": "string"
}
}
}
},
"expression_info": {
"type": "object",
"description": "Information about an NX expression from introspection",
"required": ["name"],
"properties": {
"name": {
"type": "string",
"description": "Expression name in NX"
},
"value": {
"type": "number",
"description": "Current value"
},
"units": {
"type": "string",
"description": "Physical units"
},
"formula": {
"type": "string",
"description": "Expression formula if any"
},
"is_candidate": {
"type": "boolean",
"description": "Whether this is a design variable candidate",
"default": false
},
"confidence": {
"type": "number",
"description": "Confidence that this is a design variable (0.0 to 1.0)",
"minimum": 0,
"maximum": 1
}
}
},
"baseline_data": {
"type": "object",
"description": "Results from baseline FEA solve",
"properties": {
"timestamp": {
"type": "string",
"format": "date-time",
"description": "When baseline was run"
},
"solve_time_seconds": {
"type": "number",
"description": "How long the solve took"
},
"mass_kg": {
"type": "number",
"description": "Computed mass from BDF/FEM"
},
"max_displacement_mm": {
"type": "number",
"description": "Max displacement result"
},
"max_stress_mpa": {
"type": "number",
"description": "Max von Mises stress"
},
"success": {
"type": "boolean",
"description": "Whether baseline solve succeeded",
"default": true
},
"error": {
"type": "string",
"description": "Error message if failed"
}
}
},

View File

@@ -0,0 +1,31 @@
"""
Atomizer Validation System
==========================
Validates study configuration before optimization starts.
Components:
- ValidationGate: Main orchestrator for validation
- SpecChecker: Validates atomizer_spec.json
- TestTrialRunner: Runs 2-3 test trials to verify setup
Usage:
from optimization_engine.validation import ValidationGate
gate = ValidationGate(study_dir)
result = gate.validate(run_test_trials=True)
if result.passed:
gate.approve() # Start optimization
"""
from .gate import ValidationGate, ValidationResult, TestTrialResult
from .checker import SpecChecker, ValidationIssue
__all__ = [
"ValidationGate",
"ValidationResult",
"TestTrialResult",
"SpecChecker",
"ValidationIssue",
]

View File

@@ -0,0 +1,454 @@
"""
Specification Checker
=====================
Validates atomizer_spec.json (or optimization_config.json) for:
- Schema compliance
- Semantic correctness
- Anti-pattern detection
- Expression existence
This catches configuration errors BEFORE wasting time on failed trials.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class IssueSeverity(str, Enum):
"""Severity level for validation issues."""
ERROR = "error" # Must fix before proceeding
WARNING = "warning" # Should review, but can proceed
INFO = "info" # Informational note
@dataclass
class ValidationIssue:
"""A single validation issue."""
severity: IssueSeverity
code: str
message: str
path: Optional[str] = None # JSON path to the issue
suggestion: Optional[str] = None
def __str__(self) -> str:
prefix = {
IssueSeverity.ERROR: "[ERROR]",
IssueSeverity.WARNING: "[WARN]",
IssueSeverity.INFO: "[INFO]",
}[self.severity]
location = f" at {self.path}" if self.path else ""
return f"{prefix} {self.message}{location}"
@dataclass
class CheckResult:
"""Result of running the spec checker."""
valid: bool
issues: List[ValidationIssue] = field(default_factory=list)
@property
def errors(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == IssueSeverity.ERROR]
@property
def warnings(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == IssueSeverity.WARNING]
def add_error(self, code: str, message: str, path: str = None, suggestion: str = None):
self.issues.append(
ValidationIssue(
severity=IssueSeverity.ERROR,
code=code,
message=message,
path=path,
suggestion=suggestion,
)
)
self.valid = False
def add_warning(self, code: str, message: str, path: str = None, suggestion: str = None):
self.issues.append(
ValidationIssue(
severity=IssueSeverity.WARNING,
code=code,
message=message,
path=path,
suggestion=suggestion,
)
)
def add_info(self, code: str, message: str, path: str = None):
self.issues.append(
ValidationIssue(
severity=IssueSeverity.INFO,
code=code,
message=message,
path=path,
)
)
class SpecChecker:
"""
Validates study specification files.
Checks:
1. Required fields present
2. Design variable bounds valid
3. Expressions exist in model (if introspection available)
4. Extractors available for objectives/constraints
5. Anti-patterns (mass minimization without constraints, etc.)
"""
# Known extractors
KNOWN_EXTRACTORS = {
"extract_mass_from_bdf",
"extract_part_mass",
"extract_displacement",
"extract_solid_stress",
"extract_principal_stress",
"extract_frequency",
"extract_strain_energy",
"extract_temperature",
"extract_zernike_from_op2",
}
def __init__(
self,
spec_path: Optional[Path] = None,
available_expressions: Optional[List[str]] = None,
):
"""
Initialize the checker.
Args:
spec_path: Path to spec file (atomizer_spec.json or optimization_config.json)
available_expressions: List of expression names from introspection
"""
self.spec_path = spec_path
self.available_expressions = available_expressions or []
self.spec: Dict[str, Any] = {}
def check(self, spec_data: Optional[Dict[str, Any]] = None) -> CheckResult:
"""
Run all validation checks.
Args:
spec_data: Spec dict (or load from spec_path if not provided)
Returns:
CheckResult with all issues found
"""
result = CheckResult(valid=True)
# Load spec if not provided
if spec_data:
self.spec = spec_data
elif self.spec_path and self.spec_path.exists():
with open(self.spec_path) as f:
self.spec = json.load(f)
else:
result.add_error("SPEC_NOT_FOUND", "No specification file found")
return result
# Run checks
self._check_required_fields(result)
self._check_design_variables(result)
self._check_objectives(result)
self._check_constraints(result)
self._check_extractors(result)
self._check_anti_patterns(result)
self._check_files(result)
return result
def _check_required_fields(self, result: CheckResult) -> None:
"""Check that required fields are present."""
# Check for design variables
dvs = self.spec.get("design_variables", [])
if not dvs:
result.add_error(
"NO_DESIGN_VARIABLES",
"No design variables defined",
suggestion="Add at least one design variable to optimize",
)
# Check for objectives
objectives = self.spec.get("objectives", [])
if not objectives:
result.add_error(
"NO_OBJECTIVES",
"No objectives defined",
suggestion="Define at least one objective (e.g., minimize mass)",
)
# Check for simulation settings
sim = self.spec.get("simulation", {})
if not sim.get("sim_file"):
result.add_warning(
"NO_SIM_FILE", "No simulation file specified", path="simulation.sim_file"
)
def _check_design_variables(self, result: CheckResult) -> None:
"""Check design variable definitions."""
dvs = self.spec.get("design_variables", [])
for i, dv in enumerate(dvs):
param = dv.get("parameter", dv.get("expression_name", dv.get("name", f"dv_{i}")))
bounds = dv.get("bounds", [])
path = f"design_variables[{i}]"
# Handle both formats: [min, max] or {"min": x, "max": y}
if isinstance(bounds, dict):
min_val = bounds.get("min")
max_val = bounds.get("max")
elif isinstance(bounds, (list, tuple)) and len(bounds) == 2:
min_val, max_val = bounds
else:
result.add_error(
"INVALID_BOUNDS",
f"Design variable '{param}' has invalid bounds format",
path=path,
suggestion="Bounds must be [min, max] or {min: x, max: y}",
)
continue
# Convert to float if strings
try:
min_val = float(min_val)
max_val = float(max_val)
except (TypeError, ValueError):
result.add_error(
"INVALID_BOUNDS_TYPE",
f"Design variable '{param}' bounds must be numeric",
path=path,
)
continue
# Check bounds order
if min_val >= max_val:
result.add_error(
"BOUNDS_INVERTED",
f"Design variable '{param}': min ({min_val}) >= max ({max_val})",
path=path,
suggestion="Ensure min < max",
)
# Check for very wide bounds
if max_val > 0 and min_val > 0:
ratio = max_val / min_val
if ratio > 100:
result.add_warning(
"BOUNDS_TOO_WIDE",
f"Design variable '{param}' has very wide bounds (ratio: {ratio:.1f}x)",
path=path,
suggestion="Consider narrowing bounds for faster convergence",
)
# Check for very narrow bounds
if max_val > 0 and min_val > 0:
ratio = max_val / min_val
if ratio < 1.1:
result.add_warning(
"BOUNDS_TOO_NARROW",
f"Design variable '{param}' has very narrow bounds (ratio: {ratio:.2f}x)",
path=path,
suggestion="Consider widening bounds to explore more design space",
)
# Check expression exists (if introspection available)
if self.available_expressions and param not in self.available_expressions:
result.add_error(
"EXPRESSION_NOT_FOUND",
f"Expression '{param}' not found in model",
path=path,
suggestion=f"Available expressions: {', '.join(self.available_expressions[:5])}...",
)
def _check_objectives(self, result: CheckResult) -> None:
"""Check objective definitions."""
objectives = self.spec.get("objectives", [])
for i, obj in enumerate(objectives):
name = obj.get("name", f"objective_{i}")
# Handle both formats: "goal" or "direction"
goal = obj.get("goal", obj.get("direction", "")).lower()
path = f"objectives[{i}]"
# Check goal is valid
if goal not in ("minimize", "maximize"):
result.add_error(
"INVALID_GOAL",
f"Objective '{name}' has invalid goal: '{goal}'",
path=path,
suggestion="Use 'minimize' or 'maximize'",
)
# Check extraction is defined
extraction = obj.get("extraction", {})
if not extraction.get("action"):
result.add_warning(
"NO_EXTRACTOR",
f"Objective '{name}' has no extractor specified",
path=path,
)
def _check_constraints(self, result: CheckResult) -> None:
"""Check constraint definitions."""
constraints = self.spec.get("constraints", [])
for i, const in enumerate(constraints):
name = const.get("name", f"constraint_{i}")
const_type = const.get("type", "").lower()
threshold = const.get("threshold")
path = f"constraints[{i}]"
# Check type is valid
if const_type not in ("less_than", "greater_than", "equal_to"):
result.add_warning(
"INVALID_CONSTRAINT_TYPE",
f"Constraint '{name}' has unusual type: '{const_type}'",
path=path,
suggestion="Use 'less_than' or 'greater_than'",
)
# Check threshold is defined
if threshold is None:
result.add_error(
"NO_THRESHOLD",
f"Constraint '{name}' has no threshold defined",
path=path,
)
def _check_extractors(self, result: CheckResult) -> None:
"""Check that referenced extractors exist."""
# Check objective extractors
for obj in self.spec.get("objectives", []):
extraction = obj.get("extraction", {})
action = extraction.get("action", "")
if action and action not in self.KNOWN_EXTRACTORS:
result.add_warning(
"UNKNOWN_EXTRACTOR",
f"Extractor '{action}' is not in the standard library",
suggestion="Ensure custom extractor is available",
)
# Check constraint extractors
for const in self.spec.get("constraints", []):
extraction = const.get("extraction", {})
action = extraction.get("action", "")
if action and action not in self.KNOWN_EXTRACTORS:
result.add_warning(
"UNKNOWN_EXTRACTOR",
f"Extractor '{action}' is not in the standard library",
)
def _check_anti_patterns(self, result: CheckResult) -> None:
"""Check for common optimization anti-patterns."""
objectives = self.spec.get("objectives", [])
constraints = self.spec.get("constraints", [])
# Anti-pattern: Mass minimization without stress/displacement constraints
has_mass_objective = any(
"mass" in obj.get("name", "").lower() and obj.get("goal") == "minimize"
for obj in objectives
)
has_structural_constraint = any(
any(
kw in const.get("name", "").lower()
for kw in ["stress", "displacement", "deflection"]
)
for const in constraints
)
if has_mass_objective and not has_structural_constraint:
result.add_warning(
"MASS_NO_CONSTRAINT",
"Mass minimization without structural constraints",
suggestion="Add stress or displacement constraints to prevent over-optimization",
)
# Anti-pattern: Too many design variables for trial count
n_dvs = len(self.spec.get("design_variables", []))
n_trials = self.spec.get("optimization_settings", {}).get("n_trials", 100)
if n_dvs > 0 and n_trials / n_dvs < 10:
result.add_warning(
"LOW_TRIALS_PER_DV",
f"Only {n_trials / n_dvs:.1f} trials per design variable",
suggestion=f"Consider increasing trials to at least {n_dvs * 20} for better coverage",
)
# Anti-pattern: Too many objectives
n_objectives = len(objectives)
if n_objectives > 3:
result.add_warning(
"TOO_MANY_OBJECTIVES",
f"{n_objectives} objectives may lead to sparse Pareto front",
suggestion="Consider consolidating or using weighted objectives",
)
def _check_files(self, result: CheckResult) -> None:
"""Check that referenced files exist."""
if not self.spec_path:
return
study_dir = self.spec_path.parent.parent # Assuming spec is in 1_setup/
sim = self.spec.get("simulation", {})
sim_file = sim.get("sim_file")
if sim_file:
# Check multiple possible locations
possible_paths = [
study_dir / "1_model" / sim_file,
study_dir / "1_setup" / "model" / sim_file,
study_dir / sim_file,
]
found = any(p.exists() for p in possible_paths)
if not found:
result.add_error(
"SIM_FILE_NOT_FOUND",
f"Simulation file not found: {sim_file}",
path="simulation.sim_file",
suggestion="Ensure model files are copied to study directory",
)
def validate_spec(spec_path: Path, expressions: List[str] = None) -> CheckResult:
"""
Convenience function to validate a spec file.
Args:
spec_path: Path to spec file
expressions: List of available expressions (from introspection)
Returns:
CheckResult with validation issues
"""
checker = SpecChecker(spec_path, expressions)
return checker.check()

View File

@@ -0,0 +1,508 @@
"""
Validation Gate
===============
The final checkpoint before optimization begins.
1. Validates the study specification
2. Runs 2-3 test trials to verify:
- Parameters actually update the model
- Mesh regenerates correctly
- Extractors work
- Results are different (not stuck)
3. Estimates runtime
4. Gets user approval
This is CRITICAL for catching the "mesh not updating" issue that
wastes hours of optimization time.
"""
from __future__ import annotations
import json
import logging
import random
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any, Callable
import numpy as np
from .checker import SpecChecker, CheckResult, IssueSeverity
logger = logging.getLogger(__name__)
@dataclass
class TestTrialResult:
"""Result of a single test trial."""
trial_number: int
parameters: Dict[str, float]
objectives: Dict[str, float]
constraints: Dict[str, float] = field(default_factory=dict)
solve_time_seconds: float = 0.0
success: bool = False
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"trial_number": self.trial_number,
"parameters": self.parameters,
"objectives": self.objectives,
"constraints": self.constraints,
"solve_time_seconds": self.solve_time_seconds,
"success": self.success,
"error": self.error,
}
@dataclass
class ValidationResult:
"""Complete validation result."""
passed: bool
timestamp: datetime = field(default_factory=datetime.now)
# Spec validation
spec_check: Optional[CheckResult] = None
# Test trials
test_trials: List[TestTrialResult] = field(default_factory=list)
results_vary: bool = False
variance_by_objective: Dict[str, float] = field(default_factory=dict)
# Runtime estimates
avg_solve_time: Optional[float] = None
estimated_total_runtime: Optional[float] = None
# Summary
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
def add_error(self, message: str):
self.errors.append(message)
self.passed = False
def add_warning(self, message: str):
self.warnings.append(message)
def get_summary(self) -> str:
"""Get human-readable summary."""
lines = []
if self.passed:
lines.append("VALIDATION PASSED")
else:
lines.append("VALIDATION FAILED")
lines.append(f"\nSpec Validation:")
if self.spec_check:
lines.append(f" Errors: {len(self.spec_check.errors)}")
lines.append(f" Warnings: {len(self.spec_check.warnings)}")
lines.append(f"\nTest Trials:")
lines.append(
f" Completed: {len([t for t in self.test_trials if t.success])}/{len(self.test_trials)}"
)
lines.append(f" Results Vary: {'Yes' if self.results_vary else 'NO - PROBLEM!'}")
if self.variance_by_objective:
lines.append(" Variance by Objective:")
for obj, var in self.variance_by_objective.items():
lines.append(f" {obj}: {var:.6f}")
if self.avg_solve_time:
lines.append(f"\nRuntime Estimate:")
lines.append(f" Avg solve time: {self.avg_solve_time:.1f}s")
if self.estimated_total_runtime:
hours = self.estimated_total_runtime / 3600
lines.append(f" Est. total: {hours:.1f} hours")
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
return {
"passed": self.passed,
"timestamp": self.timestamp.isoformat(),
"spec_errors": len(self.spec_check.errors) if self.spec_check else 0,
"spec_warnings": len(self.spec_check.warnings) if self.spec_check else 0,
"test_trials": [t.to_dict() for t in self.test_trials],
"results_vary": self.results_vary,
"variance_by_objective": self.variance_by_objective,
"avg_solve_time": self.avg_solve_time,
"estimated_total_runtime": self.estimated_total_runtime,
"errors": self.errors,
"warnings": self.warnings,
}
class ValidationGate:
"""
Validates study setup before optimization.
This is the critical checkpoint that prevents wasted optimization time
by catching issues like:
- Missing files
- Invalid bounds
- Mesh not updating (all results identical)
- Broken extractors
"""
def __init__(
self,
study_dir: Path,
progress_callback: Optional[Callable[[str, float], None]] = None,
):
"""
Initialize the validation gate.
Args:
study_dir: Path to the study directory
progress_callback: Optional callback for progress updates
"""
self.study_dir = Path(study_dir)
self.progress_callback = progress_callback or (lambda m, p: None)
# Find spec file
self.spec_path = self._find_spec_path()
self.spec: Dict[str, Any] = {}
if self.spec_path and self.spec_path.exists():
with open(self.spec_path) as f:
self.spec = json.load(f)
def _find_spec_path(self) -> Optional[Path]:
"""Find the specification file."""
# Try atomizer_spec.json first (v2.0)
candidates = [
self.study_dir / "atomizer_spec.json",
self.study_dir / "1_setup" / "atomizer_spec.json",
self.study_dir / "optimization_config.json",
self.study_dir / "1_setup" / "optimization_config.json",
]
for path in candidates:
if path.exists():
return path
return None
def validate(
self,
run_test_trials: bool = True,
n_test_trials: int = 3,
available_expressions: Optional[List[str]] = None,
) -> ValidationResult:
"""
Run full validation.
Args:
run_test_trials: Whether to run test FEA solves
n_test_trials: Number of test trials (2-3 recommended)
available_expressions: Expression names from introspection
Returns:
ValidationResult with all findings
"""
result = ValidationResult(passed=True)
logger.info(f"Validating study: {self.study_dir.name}")
self._progress("Starting validation...", 0.0)
# Step 1: Check spec file exists
if not self.spec_path:
result.add_error("No specification file found")
return result
# Step 2: Validate spec
self._progress("Validating specification...", 0.1)
checker = SpecChecker(self.spec_path, available_expressions)
result.spec_check = checker.check(self.spec)
# Add spec errors to result
for issue in result.spec_check.errors:
result.add_error(str(issue))
for issue in result.spec_check.warnings:
result.add_warning(str(issue))
# Stop if spec has errors (unless they're non-critical)
if result.spec_check.errors:
self._progress("Validation failed: spec errors", 1.0)
return result
# Step 3: Run test trials
if run_test_trials:
self._progress("Running test trials...", 0.2)
self._run_test_trials(result, n_test_trials)
# Step 4: Calculate estimates
self._progress("Calculating estimates...", 0.9)
self._calculate_estimates(result)
self._progress("Validation complete", 1.0)
return result
def _progress(self, message: str, percent: float):
"""Report progress."""
logger.info(f"[{percent * 100:.0f}%] {message}")
self.progress_callback(message, percent)
def _run_test_trials(self, result: ValidationResult, n_trials: int) -> None:
"""Run test trials to verify setup."""
try:
from optimization_engine.nx.solver import NXSolver
except ImportError:
result.add_warning("NXSolver not available - skipping test trials")
return
# Get design variables
design_vars = self.spec.get("design_variables", [])
if not design_vars:
result.add_error("No design variables to test")
return
# Get model directory
model_dir = self._find_model_dir()
if not model_dir:
result.add_error("Model directory not found")
return
# Get sim file
sim_file = self._find_sim_file(model_dir)
if not sim_file:
result.add_error("Simulation file not found")
return
solver = NXSolver()
for i in range(n_trials):
self._progress(f"Running test trial {i + 1}/{n_trials}...", 0.2 + (0.6 * i / n_trials))
trial_result = TestTrialResult(trial_number=i + 1, parameters={}, objectives={})
# Generate random parameters within bounds
params = {}
for dv in design_vars:
param_name = dv.get("parameter", dv.get("name"))
bounds = dv.get("bounds", [0, 1])
# Use random value within bounds
value = random.uniform(bounds[0], bounds[1])
params[param_name] = value
trial_result.parameters = params
try:
start_time = time.time()
# Run simulation
solve_result = solver.run_simulation(
sim_file=sim_file,
working_dir=model_dir,
expression_updates=params,
cleanup=True,
)
trial_result.solve_time_seconds = time.time() - start_time
if solve_result.get("success"):
trial_result.success = True
# Extract results
op2_file = solve_result.get("op2_file")
if op2_file:
objectives = self._extract_objectives(Path(op2_file), model_dir)
trial_result.objectives = objectives
else:
trial_result.success = False
trial_result.error = solve_result.get("error", "Unknown error")
except Exception as e:
trial_result.success = False
trial_result.error = str(e)
logger.error(f"Test trial {i + 1} failed: {e}")
result.test_trials.append(trial_result)
# Check if results vary
self._check_results_variance(result)
def _find_model_dir(self) -> Optional[Path]:
"""Find the model directory."""
candidates = [
self.study_dir / "1_model",
self.study_dir / "1_setup" / "model",
self.study_dir,
]
for path in candidates:
if path.exists() and list(path.glob("*.sim")):
return path
return None
def _find_sim_file(self, model_dir: Path) -> Optional[Path]:
"""Find the simulation file."""
# From spec
sim = self.spec.get("simulation", {})
sim_name = sim.get("sim_file")
if sim_name:
sim_path = model_dir / sim_name
if sim_path.exists():
return sim_path
# Search for .sim files
sim_files = list(model_dir.glob("*.sim"))
if sim_files:
return sim_files[0]
return None
def _extract_objectives(self, op2_file: Path, model_dir: Path) -> Dict[str, float]:
"""Extract objective values from results."""
objectives = {}
# Extract based on configured objectives
for obj in self.spec.get("objectives", []):
name = obj.get("name", "objective")
extraction = obj.get("extraction", {})
action = extraction.get("action", "")
try:
if "mass" in action.lower():
from optimization_engine.extractors.bdf_mass_extractor import (
extract_mass_from_bdf,
)
dat_files = list(model_dir.glob("*.dat"))
if dat_files:
objectives[name] = extract_mass_from_bdf(str(dat_files[0]))
elif "displacement" in action.lower():
from optimization_engine.extractors.extract_displacement import (
extract_displacement,
)
result = extract_displacement(op2_file, subcase=1)
objectives[name] = result.get("max_displacement", 0)
elif "stress" in action.lower():
from optimization_engine.extractors.extract_von_mises_stress import (
extract_solid_stress,
)
result = extract_solid_stress(op2_file, subcase=1)
objectives[name] = result.get("max_von_mises", 0)
except Exception as e:
logger.debug(f"Failed to extract {name}: {e}")
return objectives
def _check_results_variance(self, result: ValidationResult) -> None:
"""Check if test trial results vary (indicating mesh is updating)."""
successful_trials = [t for t in result.test_trials if t.success]
if len(successful_trials) < 2:
result.add_warning("Not enough successful trials to check variance")
return
# Check variance for each objective
for obj_name in successful_trials[0].objectives.keys():
values = [t.objectives.get(obj_name, 0) for t in successful_trials]
if len(values) > 1:
variance = np.var(values)
result.variance_by_objective[obj_name] = variance
# Check if variance is too low (results are stuck)
mean_val = np.mean(values)
if mean_val != 0:
cv = np.sqrt(variance) / abs(mean_val) # Coefficient of variation
if cv < 0.001: # Less than 0.1% variation
result.add_error(
f"Results for '{obj_name}' are nearly identical (CV={cv:.6f}). "
"The mesh may not be updating!"
)
result.results_vary = False
else:
result.results_vary = True
else:
# Can't calculate CV if mean is 0
if variance < 1e-10:
result.add_warning(f"Results for '{obj_name}' show no variation")
else:
result.results_vary = True
# Default to True if we couldn't check
if not result.variance_by_objective:
result.results_vary = True
def _calculate_estimates(self, result: ValidationResult) -> None:
"""Calculate runtime estimates."""
successful_trials = [t for t in result.test_trials if t.success]
if successful_trials:
solve_times = [t.solve_time_seconds for t in successful_trials]
result.avg_solve_time = np.mean(solve_times)
# Get total trials from spec
n_trials = self.spec.get("optimization_settings", {}).get("n_trials", 100)
result.estimated_total_runtime = result.avg_solve_time * n_trials
def approve(self) -> bool:
"""
Mark the study as approved for optimization.
Creates an approval file to indicate validation passed.
"""
approval_file = self.study_dir / ".validation_approved"
try:
approval_file.write_text(datetime.now().isoformat())
logger.info(f"Study approved: {self.study_dir.name}")
return True
except Exception as e:
logger.error(f"Failed to approve: {e}")
return False
def is_approved(self) -> bool:
"""Check if study has been approved."""
approval_file = self.study_dir / ".validation_approved"
return approval_file.exists()
def save_result(self, result: ValidationResult) -> Path:
"""Save validation result to file."""
output_path = self.study_dir / "validation_result.json"
with open(output_path, "w") as f:
json.dump(result.to_dict(), f, indent=2)
return output_path
def validate_study(
study_dir: Path,
run_test_trials: bool = True,
n_test_trials: int = 3,
) -> ValidationResult:
"""
Convenience function to validate a study.
Args:
study_dir: Path to study directory
run_test_trials: Whether to run test FEA solves
n_test_trials: Number of test trials
Returns:
ValidationResult
"""
gate = ValidationGate(study_dir)
return gate.validate(run_test_trials=run_test_trials, n_test_trials=n_test_trials)