Files
Atomizer/projects/hydrotech-beam/studies/01_doe_landscape/sampling.py

275 lines
8.8 KiB
Python

"""Latin Hypercube Sampling for Hydrotech Beam DoE.
Generates LHS sample points for the 4 design variables with:
- Maximin LHS for space-filling coverage
- Integer rounding for hole_count (DV4)
- Stratified integer sampling to ensure all 11 hole_count levels are covered
- Baseline (Trial 0) always included
References:
OPTIMIZATION_STRATEGY.md §2.3 — Phase 1 configuration
OPTIMIZATION_STRATEGY.md §1.4 — Integer handling
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
import numpy as np
from scipy.stats.qmc import LatinHypercube
from geometric_checks import DesignPoint
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Design variable bounds
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DVBounds:
"""Design variable bounds and metadata."""
name: str
nx_expression: str
lower: float
upper: float
baseline: float
is_integer: bool = False
DV_DEFINITIONS: list[DVBounds] = [
DVBounds("beam_half_core_thickness", "beam_half_core_thickness", 10.0, 40.0, 25.162),
DVBounds("beam_face_thickness", "beam_face_thickness", 10.0, 40.0, 21.504),
DVBounds("holes_diameter", "holes_diameter", 150.0, 450.0, 300.0),
DVBounds("hole_count", "hole_count", 5.0, 15.0, 10.0, is_integer=True),
]
N_DVS = len(DV_DEFINITIONS)
BASELINE_VALUES = [dv.baseline for dv in DV_DEFINITIONS]
def get_baseline_point() -> DesignPoint:
"""Return the baseline design point (Trial 0).
Returns:
DesignPoint with confirmed baseline values from NX introspection.
"""
return DesignPoint(
beam_half_core_thickness=25.162,
beam_face_thickness=21.504,
holes_diameter=300.0,
hole_count=10,
)
def generate_lhs_samples(
n_samples: int = 50,
seed: int = 42,
include_baseline: bool = True,
) -> list[DesignPoint]:
"""Generate LHS sample points for the DoE study.
Strategy for integer coverage (hole_count = 5..15, 11 levels):
1. Generate `n_samples` LHS points with continuous DV4
2. Round DV4 to nearest integer
3. Check coverage — if any of the 11 levels are missing, replace
the closest duplicate trial with the missing level
This ensures all integer levels are represented while maintaining
the space-filling property of LHS for the continuous variables.
Args:
n_samples: Number of LHS sample points (default: 50).
seed: Random seed for reproducibility.
include_baseline: If True, prepend baseline as Trial 0.
Returns:
List of DesignPoint instances. If include_baseline is True,
the first element is the baseline (Trial 0).
"""
logger.info(
"Generating %d LHS samples (seed=%d, baseline=%s)",
n_samples, seed, include_baseline,
)
# Generate unit hypercube LHS samples
sampler = LatinHypercube(d=N_DVS, seed=seed, optimization="random-cd")
unit_samples = sampler.random(n=n_samples) # shape: (n_samples, 4)
# Scale to design variable bounds
samples = _scale_samples(unit_samples)
# Round hole_count to nearest integer
samples[:, 3] = np.round(samples[:, 3]).astype(int)
samples[:, 3] = np.clip(samples[:, 3], 5, 15)
# Ensure full integer coverage for hole_count
samples = _ensure_integer_coverage(samples, rng=np.random.default_rng(seed))
# Convert to DesignPoint list
points: list[DesignPoint] = []
if include_baseline:
points.append(get_baseline_point())
for row in samples:
points.append(
DesignPoint(
beam_half_core_thickness=float(row[0]),
beam_face_thickness=float(row[1]),
holes_diameter=float(row[2]),
hole_count=int(row[3]),
)
)
logger.info(
"Generated %d total points (%d LHS + %s baseline)",
len(points),
n_samples,
"1" if include_baseline else "0",
)
_log_coverage(points)
return points
def _scale_samples(unit_samples: np.ndarray) -> np.ndarray:
"""Scale unit hypercube [0,1]^d samples to design variable bounds.
Args:
unit_samples: Array of shape (n, 4) with values in [0, 1].
Returns:
Scaled array with values in [lower, upper] for each DV.
"""
lower = np.array([dv.lower for dv in DV_DEFINITIONS])
upper = np.array([dv.upper for dv in DV_DEFINITIONS])
return lower + unit_samples * (upper - lower)
def _ensure_integer_coverage(
samples: np.ndarray,
rng: np.random.Generator,
) -> np.ndarray:
"""Ensure all 11 hole_count levels (5-15) are represented.
If any integer level is missing, replace a duplicate from the most
over-represented level with a sample at the missing level.
Continuous DVs for replacement samples are drawn randomly within bounds.
Args:
samples: Array of shape (n, 4) with rounded hole_count in col 3.
rng: NumPy random generator for reproducibility.
Returns:
Modified samples array with full integer coverage.
"""
all_levels = set(range(5, 16)) # {5, 6, 7, ..., 15}
present_levels = set(int(x) for x in samples[:, 3])
missing_levels = all_levels - present_levels
if not missing_levels:
logger.info("All 11 hole_count levels represented ✓")
return samples
# Skip patching when sample size is too small to cover all levels
n_samples = len(samples)
if n_samples < len(all_levels):
logger.info(
"Only %d samples — too few to cover all 11 hole_count levels "
"(need ≥11). Skipping stratified patching.",
n_samples,
)
return samples
logger.warning(
"Missing hole_count levels: %s — patching with replacements",
sorted(missing_levels),
)
for missing_level in sorted(missing_levels):
# Find the most over-represented level
unique, counts = np.unique(samples[:, 3].astype(int), return_counts=True)
most_common_idx = np.argmax(counts)
most_common_level = unique[most_common_idx]
# Find indices with the most common level
candidates = np.where(samples[:, 3].astype(int) == most_common_level)[0]
replace_idx = rng.choice(candidates)
# Replace: keep continuous DVs random within bounds, set hole_count
for j, dv in enumerate(DV_DEFINITIONS):
if dv.is_integer:
samples[replace_idx, j] = missing_level
else:
samples[replace_idx, j] = rng.uniform(dv.lower, dv.upper)
logger.info(
" Replaced trial at idx %d: hole_count %d%d",
replace_idx, most_common_level, missing_level,
)
return samples
def _log_coverage(points: list[DesignPoint]) -> None:
"""Log hole_count coverage statistics."""
counts: dict[int, int] = {}
for p in points:
counts[p.hole_count] = counts.get(p.hole_count, 0) + 1
logger.info("Hole count coverage:")
for level in range(5, 16):
n = counts.get(level, 0)
logger.info(" hole_count=%2d: %d trials", level, n)
def points_to_dicts(points: list[DesignPoint]) -> list[dict[str, Any]]:
"""Convert DesignPoint list to list of dicts (for Optuna enqueue).
Args:
points: List of DesignPoint instances.
Returns:
List of dicts with DV names as keys.
"""
return [
{
"beam_half_core_thickness": p.beam_half_core_thickness,
"beam_face_thickness": p.beam_face_thickness,
"holes_diameter": p.holes_diameter,
"hole_count": p.hole_count,
}
for p in points
]
# ---------------------------------------------------------------------------
# Quick self-test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
points = generate_lhs_samples(n_samples=50, seed=42)
print(f"\nGenerated {len(points)} total points")
print(f" Trial 0 (baseline): {points[0]}")
print(f" Trial 1 (first LHS): {points[1]}")
print(f" Trial {len(points)-1} (last LHS): {points[-1]}")
# Verify coverage
hole_counts = {p.hole_count for p in points}
expected = set(range(5, 16))
assert hole_counts == expected, (
f"Missing hole_count levels: {expected - hole_counts}"
)
print("\nAll 11 hole_count levels covered ✓")
# Verify bounds
for p in points[1:]: # skip baseline
assert 10.0 <= p.beam_half_core_thickness <= 40.0
assert 10.0 <= p.beam_face_thickness <= 40.0
assert 150.0 <= p.holes_diameter <= 450.0
assert 5 <= p.hole_count <= 15
print("All samples within bounds ✓")