""" Files API Routes Provides file browsing and import capabilities for the Canvas Builder. Supports importing NX model files from anywhere on the file system. """ from fastapi import APIRouter, Query, UploadFile, File, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from pathlib import Path from typing import List, Optional import os import shutil import re router = APIRouter() class ImportRequest(BaseModel): """Request to import a file from a Windows path""" source_path: str study_name: str copy_related: bool = True # Path to studies root (go up 5 levels from this file) _file_path = os.path.abspath(__file__) ATOMIZER_ROOT = Path( os.path.normpath( os.path.dirname( os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(_file_path)))) ) ) ) STUDIES_ROOT = ATOMIZER_ROOT / "studies" @router.get("/list") async def list_files(path: str = "", types: str = ".sim,.prt,.fem,.afem"): """ List files in a directory, filtered by type. Args: path: Relative path from studies root (empty for root) types: Comma-separated list of file extensions to include Returns: List of files and directories with their paths """ allowed_types = [t.strip().lower() for t in types.split(",") if t.strip()] base_path = STUDIES_ROOT / path if path else STUDIES_ROOT if not base_path.exists(): return {"files": [], "path": path, "error": "Directory not found"} files = [] try: for entry in sorted(base_path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())): # Skip hidden files and directories if entry.name.startswith("."): continue if entry.is_dir(): # Include directories files.append( { "name": entry.name, "path": str(entry.relative_to(STUDIES_ROOT)).replace("\\", "/"), "isDirectory": True, } ) else: # Include files matching type filter suffix = entry.suffix.lower() if suffix in allowed_types: files.append( { "name": entry.name, "path": str(entry.relative_to(STUDIES_ROOT)).replace("\\", "/"), "isDirectory": False, "size": entry.stat().st_size, } ) except PermissionError: return {"files": [], "path": path, "error": "Permission denied"} except Exception as e: return {"files": [], "path": path, "error": str(e)} return {"files": files, "path": path} @router.get("/search") async def search_files(query: str, types: str = ".sim,.prt,.fem,.afem", max_results: int = 50): """ Search for files by name pattern. Args: query: Search pattern (partial name match) types: Comma-separated list of file extensions to include max_results: Maximum number of results to return Returns: List of matching files with their paths """ allowed_types = [t.strip().lower() for t in types.split(",") if t.strip()] query_lower = query.lower() files = [] def search_recursive(directory: Path, depth: int = 0): """Recursively search for matching files""" if depth > 10 or len(files) >= max_results: # Limit depth and results return try: for entry in directory.iterdir(): if len(files) >= max_results: return if entry.name.startswith("."): continue if entry.is_dir(): search_recursive(entry, depth + 1) elif entry.suffix.lower() in allowed_types: if query_lower in entry.name.lower(): files.append( { "name": entry.name, "path": str(entry.relative_to(STUDIES_ROOT)).replace("\\", "/"), "isDirectory": False, "size": entry.stat().st_size, } ) except (PermissionError, OSError): pass search_recursive(STUDIES_ROOT) return {"files": files, "query": query, "total": len(files)} @router.get("/exists") async def check_file_exists(path: str): """ Check if a file exists. Args: path: Relative path from studies root Returns: Boolean indicating if file exists and file info """ file_path = STUDIES_ROOT / path exists = file_path.exists() result = { "exists": exists, "path": path, } if exists: result["isDirectory"] = file_path.is_dir() if file_path.is_file(): result["size"] = file_path.stat().st_size result["name"] = file_path.name return result def find_related_nx_files(source_path: Path) -> List[Path]: """ Find all related NX files based on naming conventions. Given a .sim file like 'model_sim1.sim', finds: - model.prt (geometry part) - model_fem1.fem (FEM file) - model_fem1_i.prt (idealized part) - model_sim1.sim (simulation) Args: source_path: Path to any NX file Returns: List of all related file paths that exist """ related = [] parent = source_path.parent stem = source_path.stem suffix = source_path.suffix.lower() # Extract base name by removing _sim1, _fem1, _i suffixes base_name = stem base_name = re.sub(r"_sim\d*$", "", base_name) base_name = re.sub(r"_fem\d*$", "", base_name) base_name = re.sub(r"_i$", "", base_name) # Define patterns to search for patterns = [ f"{base_name}.prt", # Main geometry f"{base_name}_i.prt", # Idealized part f"{base_name}_fem*.fem", # FEM files f"{base_name}_fem*_i.prt", # Idealized FEM parts f"{base_name}_sim*.sim", # Simulation files f"{base_name}.afem", # Assembled FEM ] # Search for matching files for pattern in patterns: for match in parent.glob(pattern): if match.exists() and match not in related: related.append(match) # Also include the source file itself if source_path.exists() and source_path not in related: related.append(source_path) return related @router.get("/validate-path") async def validate_external_path(path: str): """ Validate an external Windows path and return info about related files. Args: path: Absolute Windows path (e.g., C:\\Models\\bracket.sim) Returns: Information about the file and related files """ try: source_path = Path(path) if not source_path.exists(): return { "valid": False, "error": f"Path does not exist: {path}", } if not source_path.is_file(): return { "valid": False, "error": "Path is not a file", } # Check if it's a valid NX file type valid_extensions = [".prt", ".sim", ".fem", ".afem"] if source_path.suffix.lower() not in valid_extensions: return { "valid": False, "error": f"Invalid file type. Expected: {', '.join(valid_extensions)}", } # Find related files related = find_related_nx_files(source_path) return { "valid": True, "path": str(source_path), "name": source_path.name, "size": source_path.stat().st_size, "related_files": [ { "name": f.name, "path": str(f), "size": f.stat().st_size, "type": f.suffix.lower(), } for f in related ], } except Exception as e: return { "valid": False, "error": str(e), } @router.post("/import-from-path") async def import_from_path(request: ImportRequest): """ Import NX model files from an external path into a study folder. This will: 1. Create the study folder if it doesn't exist 2. Copy the specified file 3. Optionally copy all related files (.prt, .sim, .fem, _i.prt) Args: request: ImportRequest with source_path, study_name, and copy_related flag Returns: List of imported files """ try: source_path = Path(request.source_path) if not source_path.exists(): raise HTTPException( status_code=404, detail=f"Source file not found: {request.source_path}" ) # Create study folder structure study_dir = STUDIES_ROOT / request.study_name model_dir = study_dir / "1_model" model_dir.mkdir(parents=True, exist_ok=True) # Find files to copy if request.copy_related: files_to_copy = find_related_nx_files(source_path) else: files_to_copy = [source_path] imported = [] for src_file in files_to_copy: dest_file = model_dir / src_file.name # Skip if already exists (avoid overwrite) if dest_file.exists(): imported.append( { "name": src_file.name, "status": "skipped", "reason": "Already exists", "path": str(dest_file.relative_to(STUDIES_ROOT)).replace("\\", "/"), } ) continue # Copy file shutil.copy2(src_file, dest_file) imported.append( { "name": src_file.name, "status": "imported", "path": str(dest_file.relative_to(STUDIES_ROOT)).replace("\\", "/"), "size": dest_file.stat().st_size, } ) return { "success": True, "study_name": request.study_name, "imported_files": imported, "total_imported": len([f for f in imported if f["status"] == "imported"]), } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/upload") async def upload_files( files: List[UploadFile] = File(...), study_name: str = Query(...), ): """ Upload NX model files to a study folder. Args: files: List of files to upload study_name: Target study name Returns: List of uploaded files """ try: # Create study folder structure study_dir = STUDIES_ROOT / study_name model_dir = study_dir / "1_model" model_dir.mkdir(parents=True, exist_ok=True) uploaded = [] for file in files: # Validate file type suffix = Path(file.filename).suffix.lower() if suffix not in [".prt", ".sim", ".fem", ".afem"]: uploaded.append( { "name": file.filename, "status": "rejected", "reason": f"Invalid file type: {suffix}", } ) continue dest_file = model_dir / file.filename # Save file content = await file.read() with open(dest_file, "wb") as f: f.write(content) uploaded.append( { "name": file.filename, "status": "uploaded", "path": str(dest_file.relative_to(STUDIES_ROOT)).replace("\\", "/"), "size": len(content), } ) return { "success": True, "study_name": study_name, "uploaded_files": uploaded, "total_uploaded": len([f for f in uploaded if f["status"] == "uploaded"]), } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/structure/{study_id:path}") async def get_study_structure(study_id: str): """ Get the file structure tree for a study. Args: study_id: Study ID (can include path separators like M1_Mirror/m1_mirror_flatback) Returns: Hierarchical file tree with type information """ # Resolve study path study_path = STUDIES_ROOT / study_id if not study_path.exists(): raise HTTPException(status_code=404, detail=f"Study not found: {study_id}") if not study_path.is_dir(): raise HTTPException(status_code=400, detail=f"Not a directory: {study_id}") # File extensions to highlight as model files model_extensions = {".prt", ".sim", ".fem", ".afem"} result_extensions = {".op2", ".f06", ".dat", ".bdf", ".csv", ".json"} def build_tree(directory: Path, depth: int = 0) -> List[dict]: """Recursively build file tree.""" if depth > 5: # Limit depth to prevent infinite recursion return [] entries = [] try: items = sorted(directory.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) for item in items: # Skip hidden files/dirs and __pycache__ if item.name.startswith(".") or item.name == "__pycache__": continue # Skip very large directories (e.g., trial folders with many iterations) if item.is_dir() and item.name.startswith("trial_"): # Just count trials, don't recurse into each entries.append( { "name": item.name, "path": str(item.relative_to(STUDIES_ROOT)).replace("\\", "/"), "type": "directory", "children": [], # Empty children for trial folders } ) continue if item.is_dir(): children = build_tree(item, depth + 1) entries.append( { "name": item.name, "path": str(item.relative_to(STUDIES_ROOT)).replace("\\", "/"), "type": "directory", "children": children, } ) else: ext = item.suffix.lower() entries.append( { "name": item.name, "path": str(item.relative_to(STUDIES_ROOT)).replace("\\", "/"), "type": "file", "extension": ext, "size": item.stat().st_size, "isModelFile": ext in model_extensions, "isResultFile": ext in result_extensions, } ) except PermissionError: pass except Exception as e: print(f"Error reading directory {directory}: {e}") return entries # Build the tree starting from study root files = build_tree(study_path) return { "study_id": study_id, "path": str(study_path), "files": files, }