""" Auto-Training Trigger System for AtomizerField Monitors training data collection and automatically triggers neural network training when enough data is accumulated. This is the key component to close the neural loop. Workflow: 1. Monitor training data directory for new trials 2. When min_points threshold is reached, trigger training 3. Validate trained model against FEA 4. Deploy model for neural-accelerated optimization Usage: from optimization_engine.auto_trainer import AutoTrainer trainer = AutoTrainer( study_name="uav_arm_optimization", min_points=50, epochs=100 ) # Check if ready to train if trainer.should_train(): model_path = trainer.train() trainer.validate_model(model_path) # Or run continuous monitoring trainer.watch() """ import json import subprocess import sys import time import logging from pathlib import Path from typing import Dict, Any, Optional, Tuple, List from datetime import datetime import shutil logger = logging.getLogger(__name__) class AutoTrainer: """ Automatic neural network training trigger for AtomizerField. Monitors training data accumulation and triggers training when thresholds are met. """ def __init__( self, study_name: str, training_data_dir: Optional[Path] = None, min_points: int = 50, epochs: int = 100, val_split: float = 0.2, retrain_threshold: int = 50, atomizer_field_dir: Optional[Path] = None, output_dir: Optional[Path] = None ): """ Initialize the auto trainer. Args: study_name: Name of the optimization study training_data_dir: Directory containing exported training data min_points: Minimum data points before training (default: 50) epochs: Training epochs (default: 100) val_split: Validation split ratio (default: 0.2) retrain_threshold: New points needed for retraining (default: 50) atomizer_field_dir: Path to atomizer-field repository output_dir: Directory for trained models """ self.study_name = study_name self.min_points = min_points self.epochs = epochs self.val_split = val_split self.retrain_threshold = retrain_threshold # Set up directories project_root = Path(__file__).parent.parent self.training_data_dir = training_data_dir or ( project_root / "atomizer_field_training_data" / study_name ) self.atomizer_field_dir = atomizer_field_dir or (project_root / "atomizer-field") self.output_dir = output_dir or ( self.atomizer_field_dir / "runs" / f"{study_name}_auto" ) # Tracking state self.last_trained_count = 0 self.model_version = 0 self.training_history: List[Dict[str, Any]] = [] # Load state if exists self._load_state() logger.info(f"AutoTrainer initialized for {study_name}") logger.info(f"Training data: {self.training_data_dir}") logger.info(f"Min points: {min_points}, Retrain threshold: {retrain_threshold}") def _state_file(self) -> Path: """Get path to state file.""" return self.output_dir / "auto_trainer_state.json" def _load_state(self) -> None: """Load trainer state from disk.""" state_file = self._state_file() if state_file.exists(): with open(state_file, 'r') as f: state = json.load(f) self.last_trained_count = state.get("last_trained_count", 0) self.model_version = state.get("model_version", 0) self.training_history = state.get("training_history", []) logger.info(f"Loaded state: {self.last_trained_count} points trained, version {self.model_version}") def _save_state(self) -> None: """Save trainer state to disk.""" self.output_dir.mkdir(parents=True, exist_ok=True) state_file = self._state_file() state = { "study_name": self.study_name, "last_trained_count": self.last_trained_count, "model_version": self.model_version, "training_history": self.training_history, "last_updated": datetime.now().isoformat() } with open(state_file, 'w') as f: json.dump(state, f, indent=2) def count_training_points(self) -> int: """ Count available training data points. Returns: Number of trial directories with valid training data """ if not self.training_data_dir.exists(): return 0 count = 0 for trial_dir in self.training_data_dir.glob("trial_*"): if trial_dir.is_dir(): # Check for required files has_input = (trial_dir / "input" / "model.bdf").exists() has_output = (trial_dir / "output" / "model.op2").exists() has_metadata = (trial_dir / "metadata.json").exists() if has_input and has_output and has_metadata: count += 1 return count def should_train(self) -> bool: """ Check if there's enough new data to trigger training. Returns: True if training should be triggered """ current_count = self.count_training_points() # First training - check minimum threshold if self.last_trained_count == 0: return current_count >= self.min_points # Retraining - check new points threshold new_points = current_count - self.last_trained_count return new_points >= self.retrain_threshold def get_new_points_count(self) -> int: """Get number of new points since last training.""" return self.count_training_points() - self.last_trained_count def prepare_training_split(self) -> Tuple[Path, Path]: """ Prepare train/validation split from collected data. Returns: Tuple of (train_dir, val_dir) paths """ train_dir = self.training_data_dir.parent / f"{self.study_name}_train" val_dir = self.training_data_dir.parent / f"{self.study_name}_val" # Clear existing splits if train_dir.exists(): shutil.rmtree(train_dir) if val_dir.exists(): shutil.rmtree(val_dir) train_dir.mkdir(parents=True) val_dir.mkdir(parents=True) # Get all trial directories trial_dirs = sorted(self.training_data_dir.glob("trial_*")) n_trials = len(trial_dirs) n_val = max(1, int(n_trials * self.val_split)) # Split: use latest trials for validation (they're most diverse) train_trials = trial_dirs[:-n_val] if n_val > 0 else trial_dirs val_trials = trial_dirs[-n_val:] if n_val > 0 else [] # Copy to split directories for trial_dir in train_trials: dest = train_dir / trial_dir.name shutil.copytree(trial_dir, dest) for trial_dir in val_trials: dest = val_dir / trial_dir.name shutil.copytree(trial_dir, dest) logger.info(f"Split data: {len(train_trials)} train, {len(val_trials)} validation") return train_dir, val_dir def train(self, train_parametric: bool = True) -> Optional[Path]: """ Trigger neural network training. Args: train_parametric: If True, train parametric predictor (fast). If False, train field predictor (slower, more detailed). Returns: Path to trained model checkpoint, or None if training failed """ current_count = self.count_training_points() if current_count < self.min_points: logger.warning( f"Not enough data for training: {current_count} < {self.min_points}" ) return None logger.info(f"Starting training with {current_count} data points...") # Prepare train/val split train_dir, val_dir = self.prepare_training_split() # Increment model version self.model_version += 1 version_output_dir = self.output_dir / f"v{self.model_version}" version_output_dir.mkdir(parents=True, exist_ok=True) # Choose training script if train_parametric: train_script = self.atomizer_field_dir / "train_parametric.py" else: train_script = self.atomizer_field_dir / "train.py" if not train_script.exists(): logger.error(f"Training script not found: {train_script}") return None # Build training command cmd = [ sys.executable, str(train_script), "--train_dir", str(train_dir), "--val_dir", str(val_dir), "--epochs", str(self.epochs), "--output_dir", str(version_output_dir) ] logger.info(f"Running: {' '.join(cmd)}") # Run training start_time = time.time() try: result = subprocess.run( cmd, capture_output=True, text=True, cwd=str(self.atomizer_field_dir), timeout=3600 * 4 # 4 hour timeout ) elapsed = time.time() - start_time if result.returncode != 0: logger.error(f"Training failed:\n{result.stderr}") return None logger.info(f"Training completed in {elapsed/60:.1f} minutes") # Find model checkpoint checkpoints = list(version_output_dir.glob("*.pt")) + list(version_output_dir.glob("*.pth")) if not checkpoints: # Check for best model checkpoints = list(version_output_dir.glob("**/best*.pt")) + \ list(version_output_dir.glob("**/checkpoint*.pt")) if checkpoints: model_path = checkpoints[0] logger.info(f"Model saved: {model_path}") else: logger.warning("No checkpoint file found after training") model_path = version_output_dir # Update state self.last_trained_count = current_count self.training_history.append({ "version": self.model_version, "timestamp": datetime.now().isoformat(), "data_points": current_count, "epochs": self.epochs, "training_time_seconds": elapsed, "model_path": str(model_path) }) self._save_state() return model_path except subprocess.TimeoutExpired: logger.error("Training timed out after 4 hours") return None except Exception as e: logger.error(f"Training error: {e}") return None def validate_model( self, model_path: Path, n_validation_trials: int = 5 ) -> Dict[str, Any]: """ Validate trained model against FEA results. Args: model_path: Path to trained model n_validation_trials: Number of trials to validate Returns: Validation metrics dictionary """ logger.info(f"Validating model: {model_path}") # This would integrate with the neural surrogate to compare predictions vs FEA # For now, return placeholder metrics validation_results = { "model_path": str(model_path), "n_validation_trials": n_validation_trials, "mean_error_percent": 0.0, # Would be computed "max_error_percent": 0.0, "validated_at": datetime.now().isoformat() } # TODO: Implement actual validation # - Load model # - Run predictions on held-out trials # - Compare with FEA results # - Compute error metrics return validation_results def get_latest_model(self) -> Optional[Path]: """ Get path to latest trained model. Returns: Path to latest model checkpoint, or None if no model exists """ if self.model_version == 0: return None latest_dir = self.output_dir / f"v{self.model_version}" if not latest_dir.exists(): return None # Find checkpoint checkpoints = list(latest_dir.glob("*.pt")) + list(latest_dir.glob("*.pth")) if checkpoints: return checkpoints[0] return latest_dir def watch(self, check_interval: int = 60) -> None: """ Continuously monitor for new data and trigger training. Args: check_interval: Seconds between checks (default: 60) """ logger.info(f"Starting auto-trainer watch mode for {self.study_name}") logger.info(f"Check interval: {check_interval}s") logger.info(f"Min points: {self.min_points}, Retrain threshold: {self.retrain_threshold}") try: while True: current_count = self.count_training_points() new_points = current_count - self.last_trained_count status = f"[{datetime.now().strftime('%H:%M:%S')}] " status += f"Points: {current_count} (new: {new_points})" if self.should_train(): status += " -> TRAINING" print(status) model_path = self.train() if model_path: print(f"Training complete: {model_path}") else: if self.last_trained_count == 0: needed = self.min_points - current_count status += f" (need {needed} more for first training)" else: needed = self.retrain_threshold - new_points status += f" (need {needed} more for retraining)" print(status) time.sleep(check_interval) except KeyboardInterrupt: logger.info("Watch mode stopped") def get_status(self) -> Dict[str, Any]: """ Get current trainer status. Returns: Status dictionary with counts and state """ current_count = self.count_training_points() new_points = current_count - self.last_trained_count return { "study_name": self.study_name, "total_points": current_count, "new_points_since_training": new_points, "last_trained_count": self.last_trained_count, "model_version": self.model_version, "min_points_threshold": self.min_points, "retrain_threshold": self.retrain_threshold, "should_train": self.should_train(), "latest_model": str(self.get_latest_model()) if self.get_latest_model() else None, "training_history_count": len(self.training_history) } def check_training_status(study_name: str) -> Dict[str, Any]: """ Quick check of training data status for a study. Args: study_name: Name of the study Returns: Status dictionary """ trainer = AutoTrainer(study_name=study_name) return trainer.get_status() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="AtomizerField Auto-Trainer") parser.add_argument("study_name", help="Name of the optimization study") parser.add_argument("--train", action="store_true", help="Trigger training now") parser.add_argument("--watch", action="store_true", help="Watch mode - continuous monitoring") parser.add_argument("--status", action="store_true", help="Show status only") parser.add_argument("--min-points", type=int, default=50, help="Minimum points for training") parser.add_argument("--epochs", type=int, default=100, help="Training epochs") parser.add_argument("--interval", type=int, default=60, help="Check interval for watch mode") args = parser.parse_args() # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) trainer = AutoTrainer( study_name=args.study_name, min_points=args.min_points, epochs=args.epochs ) if args.status: status = trainer.get_status() print(f"\nAuto-Trainer Status: {args.study_name}") print("=" * 50) for key, value in status.items(): print(f" {key}: {value}") elif args.train: if trainer.should_train(): print("Training triggered...") model_path = trainer.train() if model_path: print(f"Success! Model at: {model_path}") else: print("Training failed") else: print("Not enough data for training") print(f"Current: {trainer.count_training_points()}, Need: {args.min_points}") elif args.watch: trainer.watch(check_interval=args.interval) else: # Default: show status and recommendation status = trainer.get_status() print(f"\nAuto-Trainer Status: {args.study_name}") print("=" * 50) print(f" Data points: {status['total_points']}") print(f" New since last training: {status['new_points_since_training']}") print(f" Model version: v{status['model_version']}") print(f" Should train: {status['should_train']}") print() if status['should_train']: print("Ready to train! Run with --train to start training.") else: if status['last_trained_count'] == 0: needed = status['min_points_threshold'] - status['total_points'] print(f"Need {needed} more points for initial training.") else: needed = status['retrain_threshold'] - status['new_points_since_training'] print(f"Need {needed} more new points for retraining.")