Files
Atomizer/projects/hydrotech-beam/studies/01_doe_landscape/sampling.py

275 lines
9.0 KiB
Python
Raw Normal View History

2026-02-15 08:00:21 +00:00
"""Latin Hypercube Sampling for Hydrotech Beam DoE.
Generates LHS sample points for the 4 design variables with:
- Maximin LHS for space-filling coverage
- Integer rounding for hole_count (DV4)
- Stratified integer sampling to ensure all 11 hole_count levels are covered
- Baseline (Trial 0) always included
References:
OPTIMIZATION_STRATEGY.md §2.3 Phase 1 configuration
OPTIMIZATION_STRATEGY.md §1.4 Integer handling
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
import numpy as np
from scipy.stats.qmc import LatinHypercube
from geometric_checks import DesignPoint
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Design variable bounds
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DVBounds:
"""Design variable bounds and metadata."""
name: str
nx_expression: str
lower: float
upper: float
baseline: float
is_integer: bool = False
DV_DEFINITIONS: list[DVBounds] = [
DVBounds("beam_half_core_thickness", "beam_half_core_thickness", 10.0, 40.0, 25.162),
DVBounds("beam_face_thickness", "beam_face_thickness", 10.0, 40.0, 21.504),
DVBounds("holes_diameter", "holes_diameter", 150.0, 450.0, 300.0),
DVBounds("hole_count", "hole_count", 5.0, 15.0, 10.0, is_integer=True),
]
N_DVS = len(DV_DEFINITIONS)
BASELINE_VALUES = [dv.baseline for dv in DV_DEFINITIONS]
def get_baseline_point() -> DesignPoint:
"""Return the baseline design point (Trial 0).
Returns:
DesignPoint with confirmed baseline values from NX introspection.
"""
return DesignPoint(
beam_half_core_thickness=25.162,
beam_face_thickness=21.504,
holes_diameter=300.0,
hole_count=10,
)
def generate_lhs_samples(
n_samples: int = 50,
seed: int = 42,
include_baseline: bool = True,
) -> list[DesignPoint]:
"""Generate LHS sample points for the DoE study.
Strategy for integer coverage (hole_count = 5..15, 11 levels):
1. Generate `n_samples` LHS points with continuous DV4
2. Round DV4 to nearest integer
3. Check coverage if any of the 11 levels are missing, replace
the closest duplicate trial with the missing level
This ensures all integer levels are represented while maintaining
the space-filling property of LHS for the continuous variables.
Args:
n_samples: Number of LHS sample points (default: 50).
seed: Random seed for reproducibility.
include_baseline: If True, prepend baseline as Trial 0.
Returns:
List of DesignPoint instances. If include_baseline is True,
the first element is the baseline (Trial 0).
"""
logger.info(
"Generating %d LHS samples (seed=%d, baseline=%s)",
n_samples, seed, include_baseline,
)
# Generate unit hypercube LHS samples
sampler = LatinHypercube(d=N_DVS, seed=seed, optimization="random-cd")
unit_samples = sampler.random(n=n_samples) # shape: (n_samples, 4)
# Scale to design variable bounds
samples = _scale_samples(unit_samples)
# Round hole_count to nearest integer
samples[:, 3] = np.round(samples[:, 3]).astype(int)
samples[:, 3] = np.clip(samples[:, 3], 5, 15)
# Ensure full integer coverage for hole_count
samples = _ensure_integer_coverage(samples, rng=np.random.default_rng(seed))
# Convert to DesignPoint list
points: list[DesignPoint] = []
if include_baseline:
points.append(get_baseline_point())
for row in samples:
points.append(
DesignPoint(
beam_half_core_thickness=float(row[0]),
beam_face_thickness=float(row[1]),
holes_diameter=float(row[2]),
hole_count=int(row[3]),
)
)
logger.info(
"Generated %d total points (%d LHS + %s baseline)",
len(points),
n_samples,
"1" if include_baseline else "0",
)
_log_coverage(points)
return points
def _scale_samples(unit_samples: np.ndarray) -> np.ndarray:
"""Scale unit hypercube [0,1]^d samples to design variable bounds.
Args:
unit_samples: Array of shape (n, 4) with values in [0, 1].
Returns:
Scaled array with values in [lower, upper] for each DV.
"""
lower = np.array([dv.lower for dv in DV_DEFINITIONS])
upper = np.array([dv.upper for dv in DV_DEFINITIONS])
return lower + unit_samples * (upper - lower)
def _ensure_integer_coverage(
samples: np.ndarray,
rng: np.random.Generator,
) -> np.ndarray:
"""Ensure all 11 hole_count levels (5-15) are represented.
If any integer level is missing, replace a duplicate from the most
over-represented level with a sample at the missing level.
Continuous DVs for replacement samples are drawn randomly within bounds.
Args:
samples: Array of shape (n, 4) with rounded hole_count in col 3.
rng: NumPy random generator for reproducibility.
Returns:
Modified samples array with full integer coverage.
"""
all_levels = set(range(5, 16)) # {5, 6, 7, ..., 15}
present_levels = set(int(x) for x in samples[:, 3])
missing_levels = all_levels - present_levels
if not missing_levels:
logger.info("All 11 hole_count levels represented ✓")
return samples
# Skip patching when sample size is too small to cover all levels
n_samples = len(samples)
if n_samples < len(all_levels):
logger.info(
"Only %d samples — too few to cover all 11 hole_count levels "
"(need ≥11). Skipping stratified patching.",
n_samples,
)
return samples
logger.warning(
"Missing hole_count levels: %s — patching with replacements",
sorted(missing_levels),
)
for missing_level in sorted(missing_levels):
# Find the most over-represented level
unique, counts = np.unique(samples[:, 3].astype(int), return_counts=True)
most_common_idx = np.argmax(counts)
most_common_level = unique[most_common_idx]
# Find indices with the most common level
candidates = np.where(samples[:, 3].astype(int) == most_common_level)[0]
replace_idx = rng.choice(candidates)
# Replace: keep continuous DVs random within bounds, set hole_count
for j, dv in enumerate(DV_DEFINITIONS):
if dv.is_integer:
samples[replace_idx, j] = missing_level
else:
samples[replace_idx, j] = rng.uniform(dv.lower, dv.upper)
logger.info(
" Replaced trial at idx %d: hole_count %d%d",
replace_idx, most_common_level, missing_level,
)
return samples
def _log_coverage(points: list[DesignPoint]) -> None:
"""Log hole_count coverage statistics."""
counts: dict[int, int] = {}
for p in points:
counts[p.hole_count] = counts.get(p.hole_count, 0) + 1
logger.info("Hole count coverage:")
for level in range(5, 16):
n = counts.get(level, 0)
logger.info(" hole_count=%2d: %d trials", level, n)
def points_to_dicts(points: list[DesignPoint]) -> list[dict[str, Any]]:
"""Convert DesignPoint list to list of dicts (for Optuna enqueue).
Args:
points: List of DesignPoint instances.
Returns:
List of dicts with DV names as keys.
"""
return [
{
"beam_half_core_thickness": p.beam_half_core_thickness,
"beam_face_thickness": p.beam_face_thickness,
"holes_diameter": p.holes_diameter,
"hole_count": p.hole_count,
}
for p in points
]
# ---------------------------------------------------------------------------
# Quick self-test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
points = generate_lhs_samples(n_samples=50, seed=42)
print(f"\nGenerated {len(points)} total points")
print(f" Trial 0 (baseline): {points[0]}")
print(f" Trial 1 (first LHS): {points[1]}")
print(f" Trial {len(points)-1} (last LHS): {points[-1]}")
# Verify coverage
hole_counts = {p.hole_count for p in points}
expected = set(range(5, 16))
assert hole_counts == expected, (
f"Missing hole_count levels: {expected - hole_counts}"
)
print("\nAll 11 hole_count levels covered ✓")
# Verify bounds
for p in points[1:]: # skip baseline
assert 10.0 <= p.beam_half_core_thickness <= 40.0
assert 10.0 <= p.beam_face_thickness <= 40.0
assert 150.0 <= p.holes_diameter <= 450.0
assert 5 <= p.hole_count <= 15
print("All samples within bounds ✓")