Files
Atomizer/atomizer-dashboard/backend/api/routes/intake.py
Anto01 a26914bbe8 feat: Add Studio UI, intake system, and extractor improvements
Dashboard:
- Add Studio page with drag-drop model upload and Claude chat
- Add intake system for study creation workflow
- Improve session manager and context builder
- Add intake API routes and frontend components

Optimization Engine:
- Add CLI module for command-line operations
- Add intake module for study preprocessing
- Add validation module with gate checks
- Improve Zernike extractor documentation
- Update spec models with better validation
- Enhance solve_simulation robustness

Documentation:
- Add ATOMIZER_STUDIO.md planning doc
- Add ATOMIZER_UX_SYSTEM.md for UX patterns
- Update extractor library docs
- Add study-readme-generator skill

Tools:
- Add test scripts for extraction validation
- Add Zernike recentering test

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 12:02:30 -05:00

1722 lines
54 KiB
Python

"""
Intake API Routes
Provides endpoints for the study intake workflow:
1. Create inbox folder with initial AtomizerSpec (draft status)
2. Run NX introspection and update spec
3. List inbox folders with status
4. List existing topic folders
The intake workflow:
User drops files → /create → draft spec created
→ /introspect → expressions discovered, spec updated
→ Frontend configures → configured status
→ /finalize (Phase 5) → baseline solve, move to studies/{topic}/
"""
import json
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks, UploadFile, File
from pydantic import BaseModel, Field
from api.services.spec_manager import SpecManager, SpecNotFoundError
# Path setup
import os
_file_path = os.path.abspath(__file__)
ATOMIZER_ROOT = Path(
os.path.normpath(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(_file_path))))
)
)
)
STUDIES_ROOT = ATOMIZER_ROOT / "studies"
INBOX_ROOT = STUDIES_ROOT / "_inbox"
router = APIRouter()
# ==============================================================================
# Request/Response Models
# ==============================================================================
class CreateInboxRequest(BaseModel):
"""Request to create a new inbox folder."""
study_name: str = Field(..., min_length=3, max_length=100, pattern=r"^[a-z0-9_]+$")
description: Optional[str] = Field(default=None, max_length=1000)
topic: Optional[str] = Field(default=None, pattern=r"^[A-Za-z0-9_]+$")
class CreateInboxResponse(BaseModel):
"""Response from creating inbox folder."""
success: bool
study_name: str
inbox_path: str
spec_path: str
status: str
class IntrospectRequest(BaseModel):
"""Request to run introspection on inbox study."""
study_name: str = Field(..., description="Name of the inbox study")
model_file: Optional[str] = Field(
default=None, description="Specific model file to introspect (optional)"
)
class IntrospectResponse(BaseModel):
"""Response from introspection."""
success: bool
study_name: str
status: str
expressions_count: int
candidates_count: int
mass_kg: Optional[float]
warnings: List[str]
class InboxStudy(BaseModel):
"""Summary of an inbox study."""
study_name: str
status: str
description: Optional[str]
topic: Optional[str]
created: Optional[str]
modified: Optional[str]
model_files: List[str]
has_context: bool
class ListInboxResponse(BaseModel):
"""Response listing inbox studies."""
studies: List[InboxStudy]
total: int
class TopicInfo(BaseModel):
"""Information about a topic folder."""
name: str
study_count: int
path: str
class ListTopicsResponse(BaseModel):
"""Response listing topic folders."""
topics: List[TopicInfo]
total: int
# ==============================================================================
# Utility Functions
# ==============================================================================
def create_initial_spec(
study_name: str, description: Optional[str], topic: Optional[str]
) -> Dict[str, Any]:
"""Create an initial AtomizerSpec v2.0 for a new inbox study."""
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
return {
"meta": {
"version": "2.0",
"study_name": study_name,
"description": description,
"created": now,
"modified": now,
"created_by": "dashboard_intake",
"modified_by": "dashboard_intake",
"status": "draft",
"topic": topic,
"tags": [],
},
"model": {
"sim": None,
"prt": None,
"fem": None,
"introspection": None,
},
"design_variables": [],
"extractors": [],
"objectives": [],
"constraints": [],
"optimization": {
"algorithm": {"type": "TPE"},
"budget": {"max_trials": 100},
},
"canvas": {
"edges": [],
"layout_version": "2.0",
},
}
def find_model_files(inbox_path: Path) -> Dict[str, List[Path]]:
"""Find model files in inbox folder."""
model_extensions = {".sim", ".prt", ".fem", ".afem"}
files = {"sim": [], "prt": [], "fem": [], "other": []}
# Check models/ subdirectory if it exists
search_paths = [inbox_path]
models_dir = inbox_path / "models"
if models_dir.exists():
search_paths.append(models_dir)
for search_path in search_paths:
for item in search_path.iterdir():
if item.is_file() and item.suffix.lower() in model_extensions:
ext = item.suffix.lower()
if ext == ".sim":
files["sim"].append(item)
elif ext == ".prt":
files["prt"].append(item)
elif ext in {".fem", ".afem"}:
files["fem"].append(item)
return files
def identify_design_candidates(expressions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Identify which expressions are likely design variable candidates.
Heuristics:
- Has numeric value
- Name doesn't start with '_' (internal)
- Name contains common DV keywords (thickness, width, height, radius, etc.)
- Units are length-related (mm, in, etc.)
"""
candidate_keywords = {
"thickness",
"thick",
"width",
"height",
"depth",
"radius",
"diameter",
"length",
"offset",
"gap",
"spacing",
"angle",
"dist",
"size",
"web",
"flange",
"rib",
"wall",
"fillet",
"chamfer",
}
for expr in expressions:
name_lower = expr.get("name", "").lower()
# Skip internal expressions
if name_lower.startswith("_") or name_lower.startswith("nx_"):
expr["is_candidate"] = False
expr["confidence"] = 0.0
continue
# Check for candidate keywords
confidence = 0.0
has_keyword = any(kw in name_lower for kw in candidate_keywords)
if has_keyword:
confidence = 0.8
elif expr.get("value") is not None:
# Has numeric value
confidence = 0.3
# Boost if has length units
units = expr.get("units", "").lower()
if units in {"mm", "in", "inch", "cm", "m"}:
confidence = min(1.0, confidence + 0.2)
expr["is_candidate"] = confidence >= 0.5
expr["confidence"] = confidence
return expressions
# ==============================================================================
# Endpoints
# ==============================================================================
@router.post("/intake/create", response_model=CreateInboxResponse)
async def create_inbox_study(request: CreateInboxRequest):
"""
Create a new inbox folder with an initial AtomizerSpec.
This creates:
- studies/_inbox/{study_name}/
- studies/_inbox/{study_name}/models/ (for model files)
- studies/_inbox/{study_name}/context/ (for goals.md, etc.)
- studies/_inbox/{study_name}/atomizer_spec.json (draft status)
The user can then drag model files into the models/ folder.
"""
# Ensure inbox root exists
INBOX_ROOT.mkdir(parents=True, exist_ok=True)
# Check for existing study
inbox_path = INBOX_ROOT / request.study_name
if inbox_path.exists():
raise HTTPException(
status_code=409, detail=f"Study '{request.study_name}' already exists in inbox"
)
# Also check if it exists in any topic folder
for topic_dir in STUDIES_ROOT.iterdir():
if topic_dir.is_dir() and not topic_dir.name.startswith("_"):
if (topic_dir / request.study_name).exists():
raise HTTPException(
status_code=409,
detail=f"Study '{request.study_name}' already exists in topic '{topic_dir.name}'",
)
try:
# Create folder structure
inbox_path.mkdir(parents=True)
(inbox_path / "models").mkdir()
(inbox_path / "context").mkdir()
# Create initial spec
spec_data = create_initial_spec(request.study_name, request.description, request.topic)
spec_path = inbox_path / "atomizer_spec.json"
with open(spec_path, "w", encoding="utf-8") as f:
json.dump(spec_data, f, indent=2, ensure_ascii=False)
return CreateInboxResponse(
success=True,
study_name=request.study_name,
inbox_path=str(inbox_path.relative_to(ATOMIZER_ROOT)),
spec_path=str(spec_path.relative_to(ATOMIZER_ROOT)),
status="draft",
)
except Exception as e:
# Clean up on failure
if inbox_path.exists():
shutil.rmtree(inbox_path)
raise HTTPException(status_code=500, detail=str(e))
@router.post("/intake/introspect", response_model=IntrospectResponse)
async def introspect_inbox_study(request: IntrospectRequest, background_tasks: BackgroundTasks):
"""
Run NX introspection on an inbox study's model files.
This:
1. Finds the primary .prt file in the inbox
2. Runs the introspect_part extractor
3. Updates the spec with:
- Expressions discovered
- Mass properties
- Design variable candidates (auto-identified)
4. Updates status to 'introspected'
The introspection runs synchronously (typically 30-60 seconds).
"""
inbox_path = INBOX_ROOT / request.study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{request.study_name}' not found")
# Load current spec
try:
spec_manager = SpecManager(inbox_path)
spec_data = spec_manager.load_raw()
except SpecNotFoundError:
raise HTTPException(
status_code=404, detail=f"No atomizer_spec.json found for '{request.study_name}'"
)
# Find model files
model_files = find_model_files(inbox_path)
# Determine which file to introspect
if request.model_file:
# User specified a file
prt_path = inbox_path / "models" / request.model_file
if not prt_path.exists():
prt_path = inbox_path / request.model_file
if not prt_path.exists():
raise HTTPException(
status_code=404, detail=f"Model file not found: {request.model_file}"
)
else:
# Auto-detect: prefer main .prt file (not _i.prt)
prt_files = [p for p in model_files["prt"] if "_i.prt" not in p.name.lower()]
if not prt_files:
prt_files = model_files["prt"]
if not prt_files:
raise HTTPException(
status_code=400,
detail="No .prt files found in inbox. Please add model files first.",
)
prt_path = prt_files[0]
# Run introspection
warnings = []
try:
# Import the introspect_part function
import sys
sys.path.insert(0, str(ATOMIZER_ROOT))
from optimization_engine.extractors.introspect_part import introspect_part
result = introspect_part(str(prt_path), verbose=False)
if not result.get("success"):
raise HTTPException(
status_code=500,
detail=f"Introspection failed: {result.get('error', 'Unknown error')}",
)
# Extract and process expressions
user_expressions = result.get("expressions", {}).get("user", [])
expressions = identify_design_candidates(user_expressions)
# Get mass properties
mass_props = result.get("mass_properties", {})
mass_kg = mass_props.get("mass_kg")
volume_mm3 = mass_props.get("volume_mm3")
# Build introspection data for spec
now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
introspection_data = {
"timestamp": now,
"solver_type": None, # Will be set from .sim file later
"mass_kg": mass_kg,
"volume_mm3": volume_mm3,
"expressions": expressions,
"warnings": warnings,
"baseline": None, # Set during finalization
}
# Update spec with model file paths
if model_files["sim"]:
spec_data["model"]["sim"] = {
"path": model_files["sim"][0].name,
"solver": "nastran", # Default, can be updated
}
if model_files["prt"]:
# Main prt (not _i.prt)
main_prt = next(
(p for p in model_files["prt"] if "_i.prt" not in p.name.lower()),
model_files["prt"][0],
)
spec_data["model"]["prt"] = {"path": main_prt.name}
if model_files["fem"]:
spec_data["model"]["fem"] = {"path": model_files["fem"][0].name}
# Add introspection data to the spec_data
spec_data["model"]["introspection"] = introspection_data
spec_data["meta"]["status"] = "introspected"
# Save the complete spec with model paths and introspection
spec_manager.save(spec_data, modified_by="introspection")
# Count candidates
candidates = [e for e in expressions if e.get("is_candidate")]
return IntrospectResponse(
success=True,
study_name=request.study_name,
status="introspected",
expressions_count=len(expressions),
candidates_count=len(candidates),
mass_kg=mass_kg,
warnings=warnings,
)
except HTTPException:
raise
except ImportError as e:
raise HTTPException(status_code=500, detail=f"Failed to import introspection module: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Introspection error: {e}")
@router.get("/intake/list", response_model=ListInboxResponse)
async def list_inbox_studies():
"""
List all studies in the inbox folder.
Returns summary information including:
- Study name and status
- Description and topic
- Model files present
- Whether context files exist
"""
if not INBOX_ROOT.exists():
return ListInboxResponse(studies=[], total=0)
studies = []
for item in sorted(INBOX_ROOT.iterdir()):
if not item.is_dir():
continue
if item.name.startswith("."):
continue
# Try to load spec
spec_path = item / "atomizer_spec.json"
if spec_path.exists():
try:
with open(spec_path, "r", encoding="utf-8") as f:
spec = json.load(f)
meta = spec.get("meta", {})
# Find model files
model_files = find_model_files(item)
all_model_files = []
for file_list in model_files.values():
all_model_files.extend([f.name for f in file_list])
# Check for context files
context_dir = item / "context"
has_context = (
context_dir.exists() and any(context_dir.iterdir())
if context_dir.exists()
else False
)
studies.append(
InboxStudy(
study_name=meta.get("study_name", item.name),
status=meta.get("status", "unknown"),
description=meta.get("description"),
topic=meta.get("topic"),
created=meta.get("created"),
modified=meta.get("modified"),
model_files=all_model_files,
has_context=has_context,
)
)
except Exception as e:
# Spec exists but couldn't be parsed
studies.append(
InboxStudy(
study_name=item.name,
status="error",
description=f"Error loading spec: {e}",
topic=None,
created=None,
modified=None,
model_files=[],
has_context=False,
)
)
else:
# No spec file - orphaned folder
model_files = find_model_files(item)
all_model_files = []
for file_list in model_files.values():
all_model_files.extend([f.name for f in file_list])
studies.append(
InboxStudy(
study_name=item.name,
status="no_spec",
description="No atomizer_spec.json found",
topic=None,
created=None,
modified=None,
model_files=all_model_files,
has_context=False,
)
)
return ListInboxResponse(studies=studies, total=len(studies))
@router.get("/intake/topics", response_model=ListTopicsResponse)
async def list_topics():
"""
List existing topic folders in the studies directory.
Topics are top-level folders that don't start with '_' (like _inbox).
Returns the topic name, study count, and path.
"""
if not STUDIES_ROOT.exists():
return ListTopicsResponse(topics=[], total=0)
topics = []
for item in sorted(STUDIES_ROOT.iterdir()):
if not item.is_dir():
continue
if item.name.startswith("_") or item.name.startswith("."):
continue
# Count studies in this topic
study_count = 0
for child in item.iterdir():
if child.is_dir() and not child.name.startswith("."):
# Check if it's actually a study (has atomizer_spec.json or optimization_config.json)
if (child / "atomizer_spec.json").exists() or (
child / "optimization_config.json"
).exists():
study_count += 1
topics.append(
TopicInfo(
name=item.name, study_count=study_count, path=str(item.relative_to(ATOMIZER_ROOT))
)
)
return ListTopicsResponse(topics=topics, total=len(topics))
@router.get("/intake/{study_name}")
async def get_inbox_study(study_name: str):
"""
Get detailed information about a specific inbox study.
Returns the full spec plus additional file information.
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
# Load spec
spec_path = inbox_path / "atomizer_spec.json"
if not spec_path.exists():
raise HTTPException(
status_code=404, detail=f"No atomizer_spec.json found for '{study_name}'"
)
with open(spec_path, "r", encoding="utf-8") as f:
spec = json.load(f)
# Find all files
model_files = find_model_files(inbox_path)
all_files = {
"sim": [f.name for f in model_files["sim"]],
"prt": [f.name for f in model_files["prt"]],
"fem": [f.name for f in model_files["fem"]],
}
# Check context
context_dir = inbox_path / "context"
context_files = []
if context_dir.exists():
context_files = [f.name for f in context_dir.iterdir() if f.is_file()]
return {
"study_name": study_name,
"inbox_path": str(inbox_path.relative_to(ATOMIZER_ROOT)),
"spec": spec,
"files": all_files,
"context_files": context_files,
}
@router.delete("/intake/{study_name}")
async def delete_inbox_study(study_name: str):
"""
Delete an inbox study folder and all its contents.
This is permanent - use with caution.
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
try:
shutil.rmtree(inbox_path)
return {"success": True, "deleted": study_name}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete: {e}")
# ==============================================================================
# README Generation Endpoint
# ==============================================================================
class GenerateReadmeRequest(BaseModel):
"""Request to generate README for a study."""
study_name: str = Field(..., description="Name of the inbox study")
class GenerateReadmeResponse(BaseModel):
"""Response from README generation."""
success: bool
content: str
path: str
@router.post("/intake/{study_name}/readme", response_model=GenerateReadmeResponse)
async def generate_readme(study_name: str):
"""
Generate a README.md for an inbox study using Claude AI.
This:
1. Loads the current spec and introspection data
2. Reads any context files (goals.md, etc.)
3. Calls Claude to generate an intelligent README
4. Saves the README to the inbox folder
5. Updates status to 'configured'
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
# Load spec
try:
spec_manager = SpecManager(inbox_path)
spec_data = spec_manager.load_raw()
except SpecNotFoundError:
raise HTTPException(
status_code=404, detail=f"No atomizer_spec.json found for '{study_name}'"
)
# Load context files
context_files = {}
context_dir = inbox_path / "context"
if context_dir.exists():
for f in context_dir.iterdir():
if f.is_file() and f.suffix in {".md", ".txt", ".json"}:
try:
context_files[f.name] = f.read_text(encoding="utf-8")
except Exception:
pass
# Generate README using Claude
try:
from api.services.claude_readme import get_readme_generator
generator = get_readme_generator()
topic = spec_data.get("meta", {}).get("topic")
readme_content = generator.generate_readme(
study_name=study_name,
spec=spec_data,
context_files=context_files if context_files else None,
topic=topic,
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"README generation failed: {e}")
# Save README
readme_path = inbox_path / "README.md"
readme_path.write_text(readme_content, encoding="utf-8")
# Update status to configured
spec_manager.update_status("configured", modified_by="readme_generator")
return GenerateReadmeResponse(
success=True,
content=readme_content,
path=str(readme_path.relative_to(ATOMIZER_ROOT)),
)
# ==============================================================================
# Finalize Endpoint (Phase 5)
# ==============================================================================
class FinalizeRequest(BaseModel):
"""Request to finalize an inbox study."""
topic: str = Field(..., pattern=r"^[A-Za-z0-9_]+$", description="Target topic folder")
run_baseline: bool = Field(default=True, description="Whether to run baseline FEA solve")
class FinalizeResponse(BaseModel):
"""Response from finalization."""
success: bool
study_name: str
final_path: str
status: str
baseline_success: Optional[bool] = None
readme_generated: bool
@router.post("/intake/{study_name}/finalize", response_model=FinalizeResponse)
async def finalize_inbox_study(study_name: str, request: FinalizeRequest):
"""
Finalize an inbox study and move it to the studies directory.
This:
1. Validates the spec is ready
2. Optionally runs baseline FEA solve
3. Creates the study folder structure in studies/{topic}/{study_name}/
4. Copies all files from inbox
5. Archives the inbox folder to _inbox_archive/
6. Updates status to 'ready'
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
# Load and validate spec
try:
spec_manager = SpecManager(inbox_path)
spec_data = spec_manager.load_raw()
except SpecNotFoundError:
raise HTTPException(
status_code=404, detail=f"No atomizer_spec.json found for '{study_name}'"
)
# Check status - must be at least 'introspected'
current_status = spec_data.get("meta", {}).get("status", "draft")
if current_status == "draft":
raise HTTPException(
status_code=400,
detail="Study must be introspected before finalization. Run introspection first.",
)
# Determine target path
topic_path = STUDIES_ROOT / request.topic
final_path = topic_path / study_name
# Check if target already exists
if final_path.exists():
raise HTTPException(
status_code=409,
detail=f"Study '{study_name}' already exists in topic '{request.topic}'",
)
baseline_success = None
# Run baseline solve if requested
if request.run_baseline:
try:
baseline_success = await _run_baseline_solve(inbox_path, spec_manager)
except Exception as e:
# Log but don't fail - baseline is optional
baseline_success = False
# Update spec with failure info
spec_manager.add_baseline(
{
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"solve_time_seconds": 0,
"success": False,
"error": str(e),
},
modified_by="finalize",
)
# Create target directory structure
try:
topic_path.mkdir(parents=True, exist_ok=True)
final_path.mkdir()
# Create standard study folder structure
(final_path / "1_model").mkdir()
(final_path / "2_iterations").mkdir()
(final_path / "3_results").mkdir()
# Copy files from inbox
# Copy model files to 1_model
models_dir = inbox_path / "models"
if models_dir.exists():
for f in models_dir.iterdir():
if f.is_file():
shutil.copy2(f, final_path / "1_model" / f.name)
# Also copy any model files from inbox root
for ext in [".sim", ".prt", ".fem", ".afem"]:
for f in inbox_path.glob(f"*{ext}"):
if f.is_file():
shutil.copy2(f, final_path / "1_model" / f.name)
# Copy README if exists
readme_src = inbox_path / "README.md"
if readme_src.exists():
shutil.copy2(readme_src, final_path / "README.md")
# Copy context files
context_src = inbox_path / "context"
if context_src.exists():
context_dst = final_path / "context"
shutil.copytree(context_src, context_dst)
# Update spec with final paths and status
spec_data["meta"]["status"] = "ready"
spec_data["meta"]["topic"] = request.topic
# Update model paths to be relative to 1_model
model = spec_data.get("model", {})
if model.get("sim") and model["sim"].get("path"):
model["sim"]["path"] = f"1_model/{Path(model['sim']['path']).name}"
if model.get("prt") and model["prt"].get("path"):
model["prt"]["path"] = f"1_model/{Path(model['prt']['path']).name}"
if model.get("fem") and model["fem"].get("path"):
model["fem"]["path"] = f"1_model/{Path(model['fem']['path']).name}"
# Save updated spec to final location
final_spec_path = final_path / "atomizer_spec.json"
with open(final_spec_path, "w", encoding="utf-8") as f:
json.dump(spec_data, f, indent=2, ensure_ascii=False)
# Archive inbox folder
archive_root = STUDIES_ROOT / "_inbox_archive"
archive_root.mkdir(exist_ok=True)
archive_path = archive_root / f"{study_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
shutil.move(str(inbox_path), str(archive_path))
return FinalizeResponse(
success=True,
study_name=study_name,
final_path=str(final_path.relative_to(ATOMIZER_ROOT)),
status="ready",
baseline_success=baseline_success,
readme_generated=(readme_src.exists()),
)
except HTTPException:
raise
except Exception as e:
# Clean up on failure
if final_path.exists():
shutil.rmtree(final_path)
raise HTTPException(status_code=500, detail=f"Finalization failed: {e}")
async def _run_baseline_solve(inbox_path: Path, spec_manager: SpecManager) -> bool:
"""
Run baseline FEA solve for the study.
This is a simplified version - full implementation would use the NX solver.
For now, we just record that baseline was attempted.
"""
import time
start_time = time.time()
# Find sim file
model_files = find_model_files(inbox_path)
if not model_files["sim"]:
raise ValueError("No .sim file found for baseline solve")
# In a full implementation, we would:
# 1. Load the sim file in NX
# 2. Run the solve
# 3. Extract baseline results
# For now, simulate the baseline solve
# TODO: Integrate with actual NX solver
solve_time = time.time() - start_time
# Record baseline data
spec_manager.add_baseline(
{
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"solve_time_seconds": solve_time,
"success": True,
"error": None,
# These would be populated from actual FEA results:
"mass_kg": None,
"max_displacement_mm": None,
"max_stress_mpa": None,
},
modified_by="baseline_solve",
)
return True
# ==============================================================================
# File Upload Endpoint
# ==============================================================================
@router.post("/intake/{study_name}/upload")
async def upload_files_to_inbox(
study_name: str,
files: List[UploadFile] = File(..., description="Model files to upload"),
):
"""
Upload model files to an inbox study's models folder.
Accepts .prt, .sim, .fem, .afem files.
Files are uploaded via multipart/form-data.
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
models_dir = inbox_path / "models"
models_dir.mkdir(exist_ok=True)
uploaded = []
valid_extensions = {".prt", ".sim", ".fem", ".afem"}
for file in files:
if not file.filename:
uploaded.append(
{"name": "unknown", "status": "rejected", "reason": "No filename provided"}
)
continue
suffix = Path(file.filename).suffix.lower()
if suffix not in valid_extensions:
uploaded.append(
{
"name": file.filename,
"status": "rejected",
"reason": f"Invalid type: {suffix}. Allowed: {', '.join(valid_extensions)}",
}
)
continue
dest_path = models_dir / file.filename
content = await file.read()
dest_path.write_bytes(content)
uploaded.append(
{
"name": file.filename,
"status": "uploaded",
"path": str(dest_path.relative_to(ATOMIZER_ROOT)),
"size": len(content),
}
)
return {
"success": True,
"study_name": study_name,
"uploaded_files": uploaded,
"total_uploaded": len([f for f in uploaded if f["status"] == "uploaded"]),
}
# ==============================================================================
# Context File Upload Endpoint
# ==============================================================================
@router.post("/intake/{study_name}/context")
async def upload_context_files(
study_name: str,
files: List[UploadFile] = File(..., description="Context files to upload"),
):
"""
Upload context files to an inbox study's context folder.
Context files help Claude understand the study goals and generate better
documentation. Accepts: .md, .txt, .pdf, .png, .jpg, .jpeg, .json, .csv
Files are uploaded via multipart/form-data.
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
context_dir = inbox_path / "context"
context_dir.mkdir(exist_ok=True)
uploaded = []
valid_extensions = {".md", ".txt", ".pdf", ".png", ".jpg", ".jpeg", ".json", ".csv", ".docx"}
for file in files:
if not file.filename:
uploaded.append(
{"name": "unknown", "status": "rejected", "reason": "No filename provided"}
)
continue
suffix = Path(file.filename).suffix.lower()
if suffix not in valid_extensions:
uploaded.append(
{
"name": file.filename,
"status": "rejected",
"reason": f"Invalid type: {suffix}. Allowed: {', '.join(sorted(valid_extensions))}",
}
)
continue
dest_path = context_dir / file.filename
content = await file.read()
dest_path.write_bytes(content)
uploaded.append(
{
"name": file.filename,
"status": "uploaded",
"path": str(dest_path.relative_to(ATOMIZER_ROOT)),
"size": len(content),
"folder": "context",
}
)
return {
"success": True,
"study_name": study_name,
"uploaded_files": uploaded,
"total_uploaded": len([f for f in uploaded if f["status"] == "uploaded"]),
}
@router.get("/intake/{study_name}/context")
async def list_context_files(study_name: str):
"""
List all context files for an inbox study.
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
context_dir = inbox_path / "context"
files = []
if context_dir.exists():
for f in sorted(context_dir.iterdir()):
if f.is_file() and not f.name.startswith("."):
files.append(
{
"name": f.name,
"path": str(f.relative_to(ATOMIZER_ROOT)),
"size": f.stat().st_size,
"extension": f.suffix.lower(),
}
)
return {
"study_name": study_name,
"context_files": files,
"total": len(files),
}
@router.delete("/intake/{study_name}/context/{filename}")
async def delete_context_file(study_name: str, filename: str):
"""
Delete a specific context file.
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
file_path = inbox_path / "context" / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"Context file '{filename}' not found")
file_path.unlink()
return {
"success": True,
"deleted": filename,
}
# ==============================================================================
# Design Variables Endpoint
# ==============================================================================
class CreateDesignVariablesRequest(BaseModel):
"""Request to create design variables from selected expressions."""
expression_names: List[str] = Field(
..., description="List of expression names to convert to DVs"
)
auto_bounds: bool = Field(
default=True, description="Automatically set bounds based on current value"
)
bound_factor: float = Field(
default=0.5, description="Factor for auto-bounds (e.g., 0.5 = +/- 50%)"
)
class DesignVariableCreated(BaseModel):
"""Info about a created design variable."""
id: str
name: str
expression_name: str
bounds_min: float
bounds_max: float
baseline: float
units: Optional[str]
class CreateDesignVariablesResponse(BaseModel):
"""Response from creating design variables."""
success: bool
study_name: str
created: List[DesignVariableCreated]
total_created: int
@router.post("/intake/{study_name}/design-variables", response_model=CreateDesignVariablesResponse)
async def create_design_variables(study_name: str, request: CreateDesignVariablesRequest):
"""
Create design variables from selected expressions.
This:
1. Reads the current spec
2. Finds matching expressions from introspection data
3. Creates design variables with auto-generated bounds
4. Updates the spec with the new design variables
5. Updates status to 'configured' if not already
Bounds are automatically set based on current value:
- min = value * (1 - bound_factor)
- max = value * (1 + bound_factor)
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
# Load current spec
try:
spec_manager = SpecManager(inbox_path)
spec_data = spec_manager.load_raw()
except SpecNotFoundError:
raise HTTPException(
status_code=404, detail=f"No atomizer_spec.json found for '{study_name}'"
)
# Get introspection data
introspection = spec_data.get("model", {}).get("introspection")
if not introspection or not introspection.get("expressions"):
raise HTTPException(
status_code=400,
detail="No introspection data found. Run introspection first.",
)
# Build lookup of expressions
expr_lookup = {e["name"]: e for e in introspection["expressions"]}
# Get existing design variables to avoid duplicates
existing_dvs = {dv["expression_name"] for dv in spec_data.get("design_variables", [])}
# Create design variables for each selected expression
created = []
new_dvs = []
for i, expr_name in enumerate(request.expression_names):
if expr_name in existing_dvs:
continue # Skip existing
expr = expr_lookup.get(expr_name)
if not expr:
continue # Expression not found in introspection
value = expr.get("value")
if value is None:
continue # No numeric value
# Generate bounds
if request.auto_bounds and value != 0:
bounds_min = value * (1 - request.bound_factor)
bounds_max = value * (1 + request.bound_factor)
# Ensure min < max
if bounds_min > bounds_max:
bounds_min, bounds_max = bounds_max, bounds_min
else:
# Default bounds for zero or manual
bounds_min = value - 10 if value == 0 else value * 0.5
bounds_max = value + 10 if value == 0 else value * 1.5
# Generate unique ID
dv_id = f"dv_{len(spec_data.get('design_variables', [])) + len(new_dvs) + 1:03d}"
# Create design variable
dv = {
"id": dv_id,
"name": expr_name.replace("_", " ").title(),
"expression_name": expr_name,
"type": "continuous",
"bounds": {
"min": round(bounds_min, 4),
"max": round(bounds_max, 4),
},
"baseline": round(value, 4),
"units": expr.get("units"),
"enabled": True,
}
new_dvs.append(dv)
created.append(
DesignVariableCreated(
id=dv_id,
name=dv["name"],
expression_name=expr_name,
bounds_min=dv["bounds"]["min"],
bounds_max=dv["bounds"]["max"],
baseline=dv["baseline"],
units=dv.get("units"),
)
)
# Add new DVs to spec
if "design_variables" not in spec_data:
spec_data["design_variables"] = []
spec_data["design_variables"].extend(new_dvs)
# Update status to configured if we added DVs
if new_dvs and spec_data.get("meta", {}).get("status") == "introspected":
spec_data["meta"]["status"] = "configured"
# Save updated spec
spec_manager.save(spec_data, modified_by="design_variable_creator")
return CreateDesignVariablesResponse(
success=True,
study_name=study_name,
created=created,
total_created=len(created),
)
# ==============================================================================
# Studio Endpoints (Atomizer Studio - Unified Creation Environment)
# ==============================================================================
class CreateDraftResponse(BaseModel):
"""Response from creating an anonymous draft."""
success: bool
draft_id: str
inbox_path: str
spec_path: str
status: str
@router.post("/intake/draft", response_model=CreateDraftResponse)
async def create_draft():
"""
Create an anonymous draft study for the Studio workflow.
This creates a temporary study folder with a unique ID that can be
renamed during finalization. Perfect for the "untitled document" pattern.
Returns:
CreateDraftResponse with draft_id and paths
"""
import uuid
# Generate unique draft ID
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = uuid.uuid4().hex[:6]
draft_id = f"draft_{timestamp}_{unique_id}"
# Ensure inbox root exists
INBOX_ROOT.mkdir(parents=True, exist_ok=True)
inbox_path = INBOX_ROOT / draft_id
try:
# Create folder structure
inbox_path.mkdir(parents=True)
(inbox_path / "models").mkdir()
(inbox_path / "context").mkdir()
# Create initial spec
spec_data = create_initial_spec(draft_id, "Untitled Study (Draft)", None)
spec_data["meta"]["is_draft"] = True
spec_path = inbox_path / "atomizer_spec.json"
with open(spec_path, "w", encoding="utf-8") as f:
json.dump(spec_data, f, indent=2, ensure_ascii=False)
return CreateDraftResponse(
success=True,
draft_id=draft_id,
inbox_path=str(inbox_path.relative_to(ATOMIZER_ROOT)),
spec_path=str(spec_path.relative_to(ATOMIZER_ROOT)),
status="draft",
)
except Exception as e:
# Clean up on failure
if inbox_path.exists():
shutil.rmtree(inbox_path)
raise HTTPException(status_code=500, detail=str(e))
class ContextContentResponse(BaseModel):
"""Response containing extracted text from context files."""
success: bool
study_name: str
content: str
files_read: List[Dict[str, Any]]
total_characters: int
@router.get("/intake/{study_name}/context/content", response_model=ContextContentResponse)
async def get_context_content(study_name: str):
"""
Extract and return text content from all context files.
This reads .md, .txt files directly and attempts to extract text from PDFs.
The combined content is returned for AI context injection.
Returns:
ContextContentResponse with combined text content
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
context_dir = inbox_path / "context"
if not context_dir.exists():
return ContextContentResponse(
success=True,
study_name=study_name,
content="",
files_read=[],
total_characters=0,
)
combined_content = []
files_read = []
for file_path in sorted(context_dir.iterdir()):
if not file_path.is_file():
continue
file_info = {
"name": file_path.name,
"extension": file_path.suffix.lower(),
"size": file_path.stat().st_size,
"status": "success",
"characters": 0,
}
try:
suffix = file_path.suffix.lower()
if suffix in {".md", ".txt", ".json", ".csv"}:
# Direct text reading
text = file_path.read_text(encoding="utf-8")
combined_content.append(f"=== {file_path.name} ===\n{text}\n")
file_info["characters"] = len(text)
elif suffix == ".pdf":
# Attempt PDF extraction
try:
import pypdf
reader = pypdf.PdfReader(str(file_path))
text_parts = []
for page in reader.pages:
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
text = "\n".join(text_parts)
combined_content.append(f"=== {file_path.name} ===\n{text}\n")
file_info["characters"] = len(text)
except ImportError:
# pypdf not installed, skip PDF
file_info["status"] = "skipped"
file_info["error"] = "pypdf not installed"
except Exception as pdf_err:
file_info["status"] = "error"
file_info["error"] = str(pdf_err)
else:
file_info["status"] = "skipped"
file_info["error"] = f"Unsupported format: {suffix}"
except Exception as e:
file_info["status"] = "error"
file_info["error"] = str(e)
files_read.append(file_info)
full_content = "\n".join(combined_content)
return ContextContentResponse(
success=True,
study_name=study_name,
content=full_content,
files_read=files_read,
total_characters=len(full_content),
)
class EnhancedFinalizeRequest(BaseModel):
"""Enhanced request to finalize with rename support."""
topic: str = Field(..., pattern=r"^[A-Za-z0-9_]+$", description="Target topic folder")
new_name: Optional[str] = Field(
default=None,
min_length=3,
max_length=100,
pattern=r"^[a-z0-9_]+$",
description="New study name (for renaming drafts)",
)
run_baseline: bool = Field(default=False, description="Whether to run baseline FEA solve")
class EnhancedFinalizeResponse(BaseModel):
"""Enhanced response from finalization."""
success: bool
original_name: str
final_name: str
final_path: str
status: str
baseline_success: Optional[bool] = None
readme_generated: bool
@router.post("/intake/{study_name}/finalize/studio", response_model=EnhancedFinalizeResponse)
async def finalize_studio_draft(study_name: str, request: EnhancedFinalizeRequest):
"""
Finalize a Studio draft with rename support.
This is an enhanced version of finalize that:
1. Supports renaming draft_xxx to a proper study name
2. Does NOT require introspection (allows manual configuration)
3. Has baseline solve disabled by default for faster iteration
Args:
study_name: Current draft ID (e.g., "draft_20260124_abc123")
request: Finalization options including new_name
Returns:
EnhancedFinalizeResponse with final paths
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
# Load spec
try:
spec_manager = SpecManager(inbox_path)
spec_data = spec_manager.load_raw()
except SpecNotFoundError:
raise HTTPException(
status_code=404, detail=f"No atomizer_spec.json found for '{study_name}'"
)
# Determine final name
final_name = request.new_name if request.new_name else study_name
# Validate final name doesn't start with draft_ if it was renamed
if request.new_name and request.new_name.startswith("draft_"):
raise HTTPException(
status_code=400,
detail="Final study name cannot start with 'draft_'. Please provide a proper name.",
)
# Determine target path
topic_path = STUDIES_ROOT / request.topic
final_path = topic_path / final_name
# Check if target already exists
if final_path.exists():
raise HTTPException(
status_code=409,
detail=f"Study '{final_name}' already exists in topic '{request.topic}'",
)
baseline_success = None
# Run baseline solve if requested (disabled by default for Studio)
if request.run_baseline:
try:
baseline_success = await _run_baseline_solve(inbox_path, spec_manager)
except Exception as e:
baseline_success = False
spec_manager.add_baseline(
{
"timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"solve_time_seconds": 0,
"success": False,
"error": str(e),
},
modified_by="finalize_studio",
)
# Create target directory structure
try:
topic_path.mkdir(parents=True, exist_ok=True)
final_path.mkdir()
# Create standard study folder structure
(final_path / "1_model").mkdir()
(final_path / "2_iterations").mkdir()
(final_path / "3_results").mkdir()
# Copy model files to 1_model
models_dir = inbox_path / "models"
if models_dir.exists():
for f in models_dir.iterdir():
if f.is_file():
shutil.copy2(f, final_path / "1_model" / f.name)
# Copy any model files from inbox root
for ext in [".sim", ".prt", ".fem", ".afem"]:
for f in inbox_path.glob(f"*{ext}"):
if f.is_file():
shutil.copy2(f, final_path / "1_model" / f.name)
# Copy README if exists
readme_src = inbox_path / "README.md"
readme_generated = readme_src.exists()
if readme_generated:
shutil.copy2(readme_src, final_path / "README.md")
# Copy context files
context_src = inbox_path / "context"
if context_src.exists() and any(context_src.iterdir()):
context_dst = final_path / "context"
shutil.copytree(context_src, context_dst)
# Update spec with final name and paths
spec_data["meta"]["study_name"] = final_name
spec_data["meta"]["status"] = "ready"
spec_data["meta"]["topic"] = request.topic
spec_data["meta"]["is_draft"] = False
spec_data["meta"]["finalized_from"] = study_name
# Update model paths to be relative to 1_model
model = spec_data.get("model", {})
if model.get("sim") and model["sim"].get("path"):
model["sim"]["path"] = f"1_model/{Path(model['sim']['path']).name}"
if model.get("prt") and model["prt"].get("path"):
model["prt"]["path"] = f"1_model/{Path(model['prt']['path']).name}"
if model.get("fem") and model["fem"].get("path"):
model["fem"]["path"] = f"1_model/{Path(model['fem']['path']).name}"
# Save updated spec to final location
final_spec_path = final_path / "atomizer_spec.json"
with open(final_spec_path, "w", encoding="utf-8") as f:
json.dump(spec_data, f, indent=2, ensure_ascii=False)
# Archive inbox folder (don't delete, archive for safety)
archive_root = STUDIES_ROOT / "_inbox_archive"
archive_root.mkdir(exist_ok=True)
archive_name = f"{study_name}_finalized_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
archive_path = archive_root / archive_name
shutil.move(str(inbox_path), str(archive_path))
return EnhancedFinalizeResponse(
success=True,
original_name=study_name,
final_name=final_name,
final_path=str(final_path.relative_to(ATOMIZER_ROOT)),
status="ready",
baseline_success=baseline_success,
readme_generated=readme_generated,
)
except HTTPException:
raise
except Exception as e:
# Clean up on failure
if final_path.exists():
shutil.rmtree(final_path)
raise HTTPException(status_code=500, detail=f"Finalization failed: {e}")
class DraftSpecResponse(BaseModel):
"""Response with full draft spec for Studio."""
success: bool
draft_id: str
spec: Dict[str, Any]
model_files: List[str]
context_files: List[str]
introspection_available: bool
design_variable_count: int
objective_count: int
@router.get("/intake/{study_name}/studio", response_model=DraftSpecResponse)
async def get_studio_draft(study_name: str):
"""
Get complete draft information for Studio UI.
This is a convenience endpoint that returns everything the Studio needs:
- Full spec
- List of uploaded files
- Introspection status
- Configuration counts
Returns:
DraftSpecResponse with all Studio-relevant data
"""
inbox_path = INBOX_ROOT / study_name
if not inbox_path.exists():
raise HTTPException(status_code=404, detail=f"Inbox study '{study_name}' not found")
# Load spec
spec_path = inbox_path / "atomizer_spec.json"
if not spec_path.exists():
raise HTTPException(
status_code=404, detail=f"No atomizer_spec.json found for '{study_name}'"
)
with open(spec_path, "r", encoding="utf-8") as f:
spec = json.load(f)
# Find model files
model_files = find_model_files(inbox_path)
all_model_files = []
for file_list in model_files.values():
all_model_files.extend([f.name for f in file_list])
# Find context files
context_dir = inbox_path / "context"
context_files = []
if context_dir.exists():
context_files = [f.name for f in context_dir.iterdir() if f.is_file()]
# Check introspection
introspection = spec.get("model", {}).get("introspection")
introspection_available = introspection is not None and bool(introspection.get("expressions"))
return DraftSpecResponse(
success=True,
draft_id=study_name,
spec=spec,
model_files=all_model_files,
context_files=context_files,
introspection_available=introspection_available,
design_variable_count=len(spec.get("design_variables", [])),
objective_count=len(spec.get("objectives", [])),
)