From 148180c12ed46b63ceb85cf812c443220acf4912 Mon Sep 17 00:00:00 2001 From: Mario Lavoie Date: Tue, 27 Jan 2026 20:16:44 +0000 Subject: [PATCH] Full implementation - Vision AI, config, improved pipeline Major changes: - vision_analyzer.py: Real OpenAI/Anthropic vision API integration - Component detection with confidence scores - Atomizer hints extraction (objectives, constraints, parameters) - Material and feature identification - Timeline correlation with transcript - config.py: Full configuration system - API settings (provider, keys, models) - Processing settings (Whisper model, frame interval, scene detection) - Output settings (BOM, hints, PDF template) - Config file support (~/.cad-documenter.toml) - audio_analyzer.py: Enhanced transcription - Audio stream detection - Graceful fallback for missing audio - Keyword extraction - Technical term detection - Timeline correlation - video_processor.py: Smart frame extraction - Scene change detection via ffmpeg - Configurable thresholds - Best frame selection - doc_generator.py: Improved output - Better Markdown templates - BOM CSV export - Atomizer hints JSON - Component cards - cli.py: Rich CLI with progress indicators - Config file support - --init-config flag - Verbose mode - Better error messages - tests: Comprehensive test suite --- pyproject.toml | 2 + src/cad_documenter/audio_analyzer.py | 295 ++++++++++++-- src/cad_documenter/cli.py | 215 ++++++++--- src/cad_documenter/config.py | 179 +++++++++ src/cad_documenter/doc_generator.py | 356 +++++++++++++---- src/cad_documenter/pipeline.py | 314 +++++++++++++-- src/cad_documenter/video_processor.py | 155 ++++++-- src/cad_documenter/vision_analyzer.py | 536 ++++++++++++++++++++++++-- tests/test_pipeline.py | 302 +++++++++++++-- 9 files changed, 2084 insertions(+), 270 deletions(-) create mode 100644 src/cad_documenter/config.py diff --git a/pyproject.toml b/pyproject.toml index 0d758f6..63c9cd1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,8 @@ dependencies = [ "jinja2>=3.1.0", "openai-whisper>=20231117", "pillow>=10.0.0", + "httpx>=0.27.0", + "tomli>=2.0.0;python_version<'3.11'", ] [project.optional-dependencies] diff --git a/src/cad_documenter/audio_analyzer.py b/src/cad_documenter/audio_analyzer.py index 83770bf..429b96b 100644 --- a/src/cad_documenter/audio_analyzer.py +++ b/src/cad_documenter/audio_analyzer.py @@ -1,9 +1,13 @@ -"""Audio analysis module - transcription via Whisper.""" +"""Audio analysis module - transcription via Whisper with timeline correlation.""" -from pathlib import Path -from dataclasses import dataclass import subprocess import tempfile +import json +import re +from pathlib import Path +from dataclasses import dataclass, field + +from .config import TranscriptionConfig @dataclass @@ -12,6 +16,8 @@ class TranscriptSegment: start: float # seconds end: float text: str + confidence: float = 1.0 + words: list[dict] = field(default_factory=list) # Word-level timestamps if available @dataclass @@ -19,6 +25,8 @@ class Transcript: """Full transcript with segments.""" segments: list[TranscriptSegment] full_text: str + language: str = "en" + duration: float = 0.0 def get_text_at(self, timestamp: float, window: float = 5.0) -> str: """Get transcript text around a specific timestamp.""" @@ -28,13 +36,85 @@ class Transcript: relevant.append(seg.text) return " ".join(relevant) + def get_segment_at(self, timestamp: float) -> TranscriptSegment | None: + """Get the segment containing a specific timestamp.""" + for seg in self.segments: + if seg.start <= timestamp <= seg.end: + return seg + return None + + def search(self, query: str) -> list[tuple[TranscriptSegment, float]]: + """ + Search transcript for a query string. + + Returns list of (segment, timestamp) tuples. + """ + results = [] + query_lower = query.lower() + for seg in self.segments: + if query_lower in seg.text.lower(): + results.append((seg, seg.start)) + return results + class AudioAnalyzer: - """Handles audio transcription using Whisper.""" + """Handles audio transcription using Whisper with enhanced features.""" - def __init__(self, video_path: Path, model: str = "base"): + def __init__( + self, + video_path: Path, + config: TranscriptionConfig | None = None + ): self.video_path = video_path - self.model = model + self.config = config or TranscriptionConfig() + self._model = None + + def _check_audio_stream(self) -> bool: + """Check if video has an audio stream.""" + cmd = [ + "ffprobe", "-v", "quiet", + "-select_streams", "a", + "-show_entries", "stream=codec_type", + "-of", "json", + str(self.video_path) + ] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + return False + + try: + data = json.loads(result.stdout) + streams = data.get("streams", []) + return len(streams) > 0 + except json.JSONDecodeError: + return False + + def _extract_audio(self, output_path: Path) -> bool: + """Extract audio from video to WAV file.""" + cmd = [ + "ffmpeg", "-y", + "-i", str(self.video_path), + "-vn", "-acodec", "pcm_s16le", + "-ar", "16000", "-ac", "1", + str(output_path) + ] + result = subprocess.run(cmd, capture_output=True, text=True) + return result.returncode == 0 and output_path.exists() + + def _get_model(self): + """Lazy-load Whisper model.""" + if self._model is not None: + return self._model + + try: + import whisper + self._model = whisper.load_model(self.config.model) + return self._model + except ImportError: + raise ImportError( + "Whisper not installed. Run: pip install openai-whisper" + ) def transcribe(self) -> Transcript: """ @@ -42,63 +122,198 @@ class AudioAnalyzer: Returns: Transcript object with segments and full text + + Raises: + RuntimeError: If video has no audio or transcription fails """ + # Check for audio stream + if not self._check_audio_stream(): + raise RuntimeError( + "Video has no audio track. Cannot transcribe." + ) + # Extract audio to temp file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: audio_path = Path(f.name) - # Extract audio using ffmpeg - cmd = [ - "ffmpeg", "-y", - "-i", str(self.video_path), - "-vn", "-acodec", "pcm_s16le", - "-ar", "16000", "-ac", "1", - str(audio_path) - ] - subprocess.run(cmd, capture_output=True) - - # Run Whisper try: - import whisper - model = whisper.load_model(self.model) - result = model.transcribe(str(audio_path), word_timestamps=True) + if not self._extract_audio(audio_path): + raise RuntimeError("Failed to extract audio from video") + + # Check if audio file is valid (non-zero size) + if audio_path.stat().st_size < 1000: + raise RuntimeError( + "Audio track is too short or silent. Cannot transcribe." + ) + + # Run Whisper + model = self._get_model() + + options = { + "word_timestamps": True, + "verbose": False, + } + + if self.config.language: + options["language"] = self.config.language + + result = model.transcribe(str(audio_path), **options) segments = [] for seg in result.get("segments", []): + # Extract word-level timestamps if available + words = [] + for word_data in seg.get("words", []): + words.append({ + "word": word_data.get("word", "").strip(), + "start": word_data.get("start", 0), + "end": word_data.get("end", 0), + "probability": word_data.get("probability", 1.0), + }) + segments.append(TranscriptSegment( start=seg["start"], end=seg["end"], - text=seg["text"].strip() + text=seg["text"].strip(), + confidence=seg.get("avg_logprob", 0.0), + words=words, )) + full_text = result.get("text", "").strip() + + # Get duration from last segment + duration = segments[-1].end if segments else 0.0 + return Transcript( segments=segments, - full_text=result.get("text", "").strip() + full_text=full_text, + language=result.get("language", "en"), + duration=duration, ) + except Exception as e: + if "no audio" in str(e).lower(): + raise RuntimeError("Video has no audio track") from e + raise RuntimeError(f"Transcription failed: {e}") from e + finally: # Cleanup temp file - audio_path.unlink(missing_ok=True) + try: + audio_path.unlink(missing_ok=True) + except Exception: + pass - def extract_keywords(self, transcript: Transcript) -> list[str]: - """Extract likely component names and technical terms.""" - # Simple keyword extraction - can be enhanced with NLP + def transcribe_with_fallback(self) -> Transcript: + """ + Transcribe with graceful fallback for edge cases. + + Returns empty transcript instead of raising for missing audio. + """ + try: + return self.transcribe() + except RuntimeError as e: + if "no audio" in str(e).lower() or "too short" in str(e).lower(): + # Return empty transcript + return Transcript( + segments=[], + full_text="[No audio available]", + language="en", + duration=0.0, + ) + raise + + def extract_keywords(self, transcript: Transcript) -> list[dict]: + """ + Extract likely component names and technical terms. + + Returns list of dicts with keyword, context, and timestamp. + """ keywords = [] + + # Patterns that indicate component mentions indicator_phrases = [ - "this is the", "this is a", "here we have", - "the main", "called the", "known as", - "this part", "this component", "this assembly" + (r"this is (?:the|a) ([^,\.]+)", "definition"), + (r"here (?:we have|is) (?:the|a) ([^,\.]+)", "definition"), + (r"the main ([^,\.]+)", "component"), + (r"called (?:the|a) ([^,\.]+)", "naming"), + (r"known as (?:the|a)? ([^,\.]+)", "naming"), + (r"this (?:part|component|assembly|piece) ([^,\.]+)", "component"), + (r"the ([^,\.]+) (?:bracket|mount|housing|plate|arm|shaft)", "component"), ] - text_lower = transcript.full_text.lower() - for phrase in indicator_phrases: - if phrase in text_lower: - # Find what comes after the phrase - idx = text_lower.find(phrase) - after = transcript.full_text[idx + len(phrase):idx + len(phrase) + 50] - # Take first few words - words = after.strip().split()[:3] - if words: - keywords.append(" ".join(words).strip(",.;:")) + for seg in transcript.segments: + text = seg.text + for pattern, kw_type in indicator_phrases: + matches = re.finditer(pattern, text, re.IGNORECASE) + for match in matches: + keyword = match.group(1).strip() + # Filter out too short or too long + if 2 < len(keyword) < 50: + keywords.append({ + "keyword": keyword, + "type": kw_type, + "timestamp": seg.start, + "context": text, + }) - return list(set(keywords)) + # Deduplicate by keyword + seen = set() + unique_keywords = [] + for kw in keywords: + kw_lower = kw["keyword"].lower() + if kw_lower not in seen: + seen.add(kw_lower) + unique_keywords.append(kw) + + return unique_keywords + + def extract_technical_terms(self, transcript: Transcript) -> list[str]: + """Extract technical/engineering terms from transcript.""" + # Common engineering terms to look for + tech_patterns = [ + r"\b(aluminum|steel|titanium|brass|copper|plastic|composite)\b", + r"\b(6061|7075|304|316|abs|pla|petg|nylon)\b", + r"\b(M[0-9]+|#[0-9]+-[0-9]+)\b", # Fastener sizes + r"\b([0-9]+(?:\.[0-9]+)?\s*(?:mm|cm|m|in|inch|ft))\b", # Dimensions + r"\b(fillet|chamfer|thread|bore|hole|slot|keyway)\b", + r"\b(torque|force|load|stress|strain|deflection)\b", + r"\b(cnc|3d print|cast|machined|welded|brazed)\b", + ] + + terms = set() + text = transcript.full_text.lower() + + for pattern in tech_patterns: + matches = re.findall(pattern, text, re.IGNORECASE) + for match in matches: + terms.add(match.strip()) + + return list(terms) + + def create_timeline( + self, transcript: Transcript, frame_timestamps: list[float] + ) -> list[dict]: + """ + Create a timeline correlating frames with transcript segments. + + Args: + transcript: The transcript with segments + frame_timestamps: List of frame timestamps in seconds + + Returns: + List of dicts with frame_timestamp, segment_text, keywords + """ + timeline = [] + + for frame_ts in frame_timestamps: + # Find relevant transcript segments + text = transcript.get_text_at(frame_ts, window=3.0) + segment = transcript.get_segment_at(frame_ts) + + timeline.append({ + "frame_timestamp": frame_ts, + "transcript_text": text, + "segment": segment, + }) + + return timeline diff --git a/src/cad_documenter/cli.py b/src/cad_documenter/cli.py index 0df09e0..e93e8b1 100644 --- a/src/cad_documenter/cli.py +++ b/src/cad_documenter/cli.py @@ -1,14 +1,28 @@ """CAD-Documenter CLI - Main entry point.""" -import click +import sys from pathlib import Path -from rich.console import Console +import click +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.panel import Panel + +from .config import load_config, create_default_config from .pipeline import DocumentationPipeline console = Console() +def print_banner(): + """Print welcome banner.""" + console.print(Panel.fit( + "[bold blue]CAD-Documenter[/bold blue] v0.1.0\n" + "[dim]Video walkthrough → Engineering documentation[/dim]", + border_style="blue" + )) + + @click.command() @click.argument("video", type=click.Path(exists=True, path_type=Path)) @click.option("-o", "--output", type=click.Path(path_type=Path), help="Output directory") @@ -16,8 +30,12 @@ console = Console() @click.option("--atomizer-hints", is_flag=True, help="Generate Atomizer FEA hints") @click.option("--bom", is_flag=True, help="Generate Bill of Materials") @click.option("--pdf", is_flag=True, help="Generate PDF via Atomaste Report Standard") -@click.option("--frame-interval", default=2.0, help="Seconds between frame extractions") -@click.option("--whisper-model", default="base", help="Whisper model size (tiny/base/small/medium/large)") +@click.option("--frame-interval", type=float, help="Seconds between frame extractions") +@click.option("--whisper-model", type=click.Choice(["tiny", "base", "small", "medium", "large"]), help="Whisper model size") +@click.option("--api-provider", type=click.Choice(["openai", "anthropic"]), help="Vision API provider") +@click.option("--config", "config_path", type=click.Path(exists=True, path_type=Path), help="Config file path") +@click.option("--init-config", is_flag=True, help="Create default config file and exit") +@click.option("-v", "--verbose", is_flag=True, help="Verbose output") @click.version_option() def main( video: Path, @@ -26,60 +44,169 @@ def main( atomizer_hints: bool, bom: bool, pdf: bool, - frame_interval: float, - whisper_model: str, + frame_interval: float | None, + whisper_model: str | None, + api_provider: str | None, + config_path: Path | None, + init_config: bool, + verbose: bool, ): """ Generate engineering documentation from a CAD walkthrough video. VIDEO: Path to the video file (.mp4, .mov, .avi, etc.) - """ - console.print(f"[bold blue]CAD-Documenter[/bold blue] v0.1.0") - console.print(f"Processing: [cyan]{video}[/cyan]") + Examples: + + cad-doc walkthrough.mp4 + + cad-doc video.mp4 --output ./docs --bom --atomizer-hints + + cad-doc video.mp4 --pdf --whisper-model medium + """ + print_banner() + + # Handle --init-config + if init_config: + default_path = Path.home() / ".cad-documenter.toml" + create_default_config(default_path) + console.print(f"[green]✓[/green] Created config file: {default_path}") + console.print("[dim]Edit this file to configure API keys and defaults.[/dim]") + return + + # Load configuration + config = load_config(config_path) + + # Override config with CLI options + if frame_interval is not None: + config.processing.frame_interval = frame_interval + if whisper_model is not None: + config.processing.whisper_model = whisper_model + if api_provider is not None: + config.api.provider = api_provider + + # Check API key + if not frames_only and not config.api.api_key: + provider = config.api.provider.upper() + console.print(f"[red]Error:[/red] No API key found for {config.api.provider}.") + console.print(f"Set [cyan]{provider}_API_KEY[/cyan] environment variable or add to config file.") + console.print(f"\nTo create a config file: [cyan]cad-doc --init-config[/cyan]") + sys.exit(1) + + console.print(f"Processing: [cyan]{video.name}[/cyan]") + if verbose: + console.print(f" API: {config.api.provider} ({config.api.vision_model or 'default'})") + console.print(f" Whisper: {config.processing.whisper_model}") + # Default output directory if output is None: output = video.parent / f"{video.stem}_docs" - + output.mkdir(parents=True, exist_ok=True) - - # Run pipeline - pipeline = DocumentationPipeline( - video_path=video, - output_dir=output, - frame_interval=frame_interval, - whisper_model=whisper_model, - ) - + console.print(f"Output: [cyan]{output}[/cyan]") + + # Initialize pipeline + try: + pipeline = DocumentationPipeline( + video_path=video, + output_dir=output, + config=config, + ) + except ValueError as e: + console.print(f"[red]Configuration error:[/red] {e}") + sys.exit(1) + + # Frames only mode if frames_only: - console.print("[yellow]Extracting frames only...[/yellow]") - pipeline.extract_frames() - console.print(f"[green]✓[/green] Frames saved to {output / 'frames'}") + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task("Extracting frames...", total=None) + frames = pipeline.extract_frames() + + console.print(f"[green]✓[/green] Extracted {len(frames)} frames to {output / 'frames'}") return - + # Full pipeline - console.print("[yellow]Step 1/4:[/yellow] Extracting frames...") - frames = pipeline.extract_frames() - console.print(f" [green]✓[/green] Extracted {len(frames)} frames") - - console.print("[yellow]Step 2/4:[/yellow] Transcribing audio...") - transcript = pipeline.transcribe_audio() - console.print(f" [green]✓[/green] Transcribed {len(transcript.segments)} segments") - - console.print("[yellow]Step 3/4:[/yellow] Analyzing components...") - analysis = pipeline.analyze_components(frames, transcript) - console.print(f" [green]✓[/green] Identified {len(analysis.components)} components") - - console.print("[yellow]Step 4/4:[/yellow] Generating documentation...") - doc_path = pipeline.generate_documentation(analysis, atomizer_hints=atomizer_hints, bom=bom) - console.print(f" [green]✓[/green] Documentation saved to {doc_path}") - + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + + # Step 1: Extract frames + task1 = progress.add_task("[cyan]Step 1/4:[/cyan] Extracting frames...", total=None) + frames = pipeline.extract_frames() + progress.update(task1, description=f"[green]✓[/green] Extracted {len(frames)} frames") + progress.remove_task(task1) + + # Step 2: Transcribe + task2 = progress.add_task("[cyan]Step 2/4:[/cyan] Transcribing audio...", total=None) + transcript = pipeline.transcribe_audio() + seg_count = len(transcript.segments) if transcript.segments else 0 + progress.update(task2, description=f"[green]✓[/green] Transcribed {seg_count} segments") + progress.remove_task(task2) + + if verbose and transcript.full_text: + console.print(Panel( + transcript.full_text[:500] + ("..." if len(transcript.full_text) > 500 else ""), + title="Transcript Preview", + border_style="dim" + )) + + # Step 3: Analyze + task3 = progress.add_task("[cyan]Step 3/4:[/cyan] Analyzing components...", total=None) + analysis = pipeline.analyze_components(frames, transcript) + comp_count = len(analysis.components) + progress.update(task3, description=f"[green]✓[/green] Identified {comp_count} components") + progress.remove_task(task3) + + if verbose and analysis.components: + console.print("\n[bold]Components found:[/bold]") + for c in analysis.components: + console.print(f" • {c.name} ({c.material or 'material unknown'})") + + # Step 4: Generate documentation + task4 = progress.add_task("[cyan]Step 4/4:[/cyan] Generating documentation...", total=None) + doc_path = pipeline.generate_documentation( + analysis, + atomizer_hints=atomizer_hints or config.output.include_atomizer_hints, + bom=bom or config.output.include_bom, + ) + progress.update(task4, description=f"[green]✓[/green] Documentation generated") + progress.remove_task(task4) + + # Generate PDF if requested if pdf: - console.print("[yellow]Generating PDF...[/yellow]") - pdf_path = pipeline.generate_pdf(doc_path) - console.print(f" [green]✓[/green] PDF saved to {pdf_path}") - - console.print(f"\n[bold green]Done![/bold green] Output: {output}") + console.print("[cyan]Generating PDF...[/cyan]") + try: + pdf_path = pipeline.generate_pdf(doc_path) + console.print(f"[green]✓[/green] PDF: {pdf_path}") + except Exception as e: + console.print(f"[yellow]Warning:[/yellow] PDF generation failed: {e}") + + # Summary + console.print() + console.print(Panel.fit( + f"[bold green]Documentation complete![/bold green]\n\n" + f"📄 [cyan]{doc_path}[/cyan]\n" + f"📊 {len(analysis.components)} components documented\n" + f"🖼️ {len(frames)} frames extracted", + title="Summary", + border_style="green" + )) + + # Show atomizer hints summary if generated + if (atomizer_hints or config.output.include_atomizer_hints) and analysis.atomizer_hints: + hints = analysis.atomizer_hints + if hints.objectives or hints.constraints: + console.print("\n[bold]Atomizer Hints:[/bold]") + for obj in hints.objectives[:3]: + console.print(f" 🎯 {obj['direction'].capitalize()} {obj['name']}") + for constraint in hints.constraints[:3]: + console.print(f" 📏 {constraint['type']}: {constraint['value']}") if __name__ == "__main__": diff --git a/src/cad_documenter/config.py b/src/cad_documenter/config.py new file mode 100644 index 0000000..38f82ec --- /dev/null +++ b/src/cad_documenter/config.py @@ -0,0 +1,179 @@ +"""Configuration management for CAD-Documenter.""" + +import os +from pathlib import Path +from dataclasses import dataclass, field +from typing import Literal + +try: + import tomllib +except ImportError: + import tomli as tomllib + + +@dataclass +class TranscriptionConfig: + """Transcription configuration.""" + model: str = "base" # tiny, base, small, medium, large + language: str | None = None # None = auto-detect + + +@dataclass +class APIConfig: + """API configuration.""" + provider: Literal["openai", "anthropic"] = "openai" + api_key: str | None = None + vision_model: str | None = None # None = use provider default + text_model: str | None = None + + +@dataclass +class ProcessingConfig: + """Video/audio processing configuration.""" + whisper_model: str = "base" + frame_interval: float = 2.0 + use_scene_detection: bool = True + max_frames: int = 15 + scene_threshold: float = 0.3 + + +@dataclass +class OutputConfig: + """Output configuration.""" + include_bom: bool = True + include_atomizer_hints: bool = True + include_raw_transcript: bool = True + include_frames: bool = True + pdf_template: str = "default" + + +@dataclass +class Config: + """Main configuration.""" + api: APIConfig = field(default_factory=APIConfig) + processing: ProcessingConfig = field(default_factory=ProcessingConfig) + output: OutputConfig = field(default_factory=OutputConfig) + + +def load_config(config_path: Path | None = None) -> Config: + """ + Load configuration from file and environment variables. + + Priority (highest to lowest): + 1. Environment variables + 2. Config file + 3. Defaults + """ + config = Config() + + # Try to load config file + if config_path is None: + # Check common locations + locations = [ + Path.cwd() / "cad-documenter.toml", + Path.cwd() / ".cad-documenter.toml", + Path.home() / ".cad-documenter.toml", + Path.home() / ".config" / "cad-documenter" / "config.toml", + ] + for loc in locations: + if loc.exists(): + config_path = loc + break + + if config_path and config_path.exists(): + with open(config_path, "rb") as f: + data = tomllib.load(f) + + # API config + if "api" in data: + api_data = data["api"] + config.api.provider = api_data.get("provider", config.api.provider) + config.api.api_key = api_data.get("api_key", config.api.api_key) + config.api.vision_model = api_data.get("vision_model", config.api.vision_model) + config.api.text_model = api_data.get("text_model", config.api.text_model) + + # Processing config + if "processing" in data: + proc_data = data["processing"] + config.processing.whisper_model = proc_data.get("whisper_model", config.processing.whisper_model) + config.processing.frame_interval = proc_data.get("frame_interval", config.processing.frame_interval) + config.processing.use_scene_detection = proc_data.get("use_scene_detection", config.processing.use_scene_detection) + config.processing.max_frames = proc_data.get("max_frames", config.processing.max_frames) + config.processing.scene_threshold = proc_data.get("scene_threshold", config.processing.scene_threshold) + + # Output config + if "output" in data: + out_data = data["output"] + config.output.include_bom = out_data.get("include_bom", config.output.include_bom) + config.output.include_atomizer_hints = out_data.get("include_atomizer_hints", config.output.include_atomizer_hints) + config.output.include_raw_transcript = out_data.get("include_raw_transcript", config.output.include_raw_transcript) + config.output.include_frames = out_data.get("include_frames", config.output.include_frames) + config.output.pdf_template = out_data.get("pdf_template", config.output.pdf_template) + + # Override with environment variables + if os.environ.get("CAD_DOC_PROVIDER"): + config.api.provider = os.environ["CAD_DOC_PROVIDER"] + + if os.environ.get("OPENAI_API_KEY"): + if config.api.provider == "openai" and not config.api.api_key: + config.api.api_key = os.environ["OPENAI_API_KEY"] + + if os.environ.get("ANTHROPIC_API_KEY"): + if config.api.provider == "anthropic" and not config.api.api_key: + config.api.api_key = os.environ["ANTHROPIC_API_KEY"] + + if os.environ.get("CAD_DOC_WHISPER_MODEL"): + config.processing.whisper_model = os.environ["CAD_DOC_WHISPER_MODEL"] + + return config + + +def create_default_config(path: Path) -> None: + """Create a default config file.""" + content = '''# CAD-Documenter Configuration + +[api] +# Vision API provider: "openai" or "anthropic" +provider = "openai" + +# API key (or set OPENAI_API_KEY / ANTHROPIC_API_KEY environment variable) +# api_key = "sk-..." + +# Model overrides (optional - uses provider defaults if not set) +# vision_model = "gpt-4o" +# text_model = "gpt-4o-mini" + +[processing] +# Whisper model for transcription: tiny, base, small, medium, large +whisper_model = "base" + +# Seconds between frame extractions (if not using scene detection) +frame_interval = 2.0 + +# Use scene change detection for smarter frame selection +use_scene_detection = true + +# Maximum frames to send to vision API +max_frames = 15 + +# Scene detection sensitivity (0.0-1.0, lower = more sensitive) +scene_threshold = 0.3 + +[output] +# Include Bill of Materials in documentation +include_bom = true + +# Include Atomizer FEA hints +include_atomizer_hints = true + +# Include raw transcript at end of documentation +include_raw_transcript = true + +# Include extracted frames in output directory +include_frames = true + +# PDF template name (for --pdf option) +pdf_template = "default" +''' + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content) diff --git a/src/cad_documenter/doc_generator.py b/src/cad_documenter/doc_generator.py index 1bd3264..997eff5 100644 --- a/src/cad_documenter/doc_generator.py +++ b/src/cad_documenter/doc_generator.py @@ -1,18 +1,21 @@ -"""Documentation generator - produces markdown and PDF output.""" +"""Documentation generator - produces markdown, JSON, and PDF output.""" +import json +import subprocess from pathlib import Path from datetime import datetime from jinja2 import Environment, FileSystemLoader, BaseLoader -from .vision_analyzer import ComponentAnalysis, Component +from .vision_analyzer import ComponentAnalysis, Component, AtomizerHint # Default template embedded in code (can be overridden by files) DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation -**Generated:** {{ timestamp }} -**Source:** Video walkthrough documentation +**Generated:** {{ timestamp }} +**Source:** Video walkthrough documentation +**Components:** {{ analysis.components | length }} --- @@ -48,7 +51,7 @@ DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation {% endfor %} {% endif %} -{% if component.best_frame %} +{% if component.best_frame and include_images %} ![{{ component.name }}](frames/{{ component.best_frame.path.name }}) {% endif %} @@ -60,13 +63,24 @@ DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation {% endfor %} -{% if bom %} +{% if bom and analysis.components %} ## Bill of Materials | Item | P/N | Name | Qty | Material | Notes | |------|-----|------|-----|----------|-------| {% for component in analysis.components %} -| {{ loop.index }} | {{ component.part_number or 'TBD' }} | {{ component.name }} | 1 | {{ component.material or 'TBD' }} | {{ component.function }} | +| {{ loop.index }} | {{ component.part_number or 'TBD' }} | {{ component.name }} | 1 | {{ component.material or 'TBD' }} | {{ component.function or '-' }} | +{% endfor %} + +{% endif %} + +{% if analysis.relationships %} +## Assembly Relationships + +| From | To | Connection | +|------|----|------------| +{% for rel in analysis.relationships %} +| {{ rel.from_component }} | {{ rel.to_component }} | {{ rel.relationship_type }} | {% endfor %} {% endif %} @@ -78,28 +92,39 @@ DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation {% endif %} -{% if atomizer_hints %} +{% if atomizer_hints and analysis.atomizer_hints %} ## Atomizer FEA Hints -Based on the video walkthrough, the following optimization parameters are suggested: +The following optimization parameters and constraints were identified from the video walkthrough: -```json -{ - "model_understanding": { - "components": {{ component_names | tojson }}, - "materials_mentioned": {{ materials | tojson }} - }, - "suggested_study": { - "objectives": [ - {"name": "mass", "direction": "minimize"} - ], - "constraints_likely": [] - } -} -``` +### Objectives +{% for hint in analysis.atomizer_hints if hint.hint_type == 'objective' %} +- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }} +{% endfor %} + +### Constraints +{% for hint in analysis.atomizer_hints if hint.hint_type == 'constraint' %} +- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }} +{% endfor %} + +### Parameters +{% for hint in analysis.atomizer_hints if hint.hint_type == 'parameter' %} +- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }} +{% endfor %} + +### Load Cases +{% for hint in analysis.atomizer_hints if hint.hint_type == 'load_case' %} +- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }} +{% endfor %} + +### Materials +{% for hint in analysis.atomizer_hints if hint.hint_type == 'material' %} +- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }} +{% endfor %} {% endif %} +{% if include_transcript %} --- ## Raw Transcript @@ -110,19 +135,26 @@ Based on the video walkthrough, the following optimization parameters are sugges {{ analysis.raw_transcript }} +{% endif %} --- -*Documentation generated by CAD-Documenter* +*Documentation generated by CAD-Documenter v{{ version }}* ''' class DocGenerator: """Generates documentation from analysis results.""" - def __init__(self, output_dir: Path, template_dir: Path | None = None): + def __init__( + self, + output_dir: Path, + template_dir: Path | None = None, + version: str = "0.2.0" + ): self.output_dir = output_dir self.output_dir.mkdir(parents=True, exist_ok=True) + self.version = version # Setup Jinja environment if template_dir and template_dir.exists(): @@ -136,11 +168,16 @@ class DocGenerator: atomizer_hints: bool = False, bom: bool = False, template_name: str | None = None, + include_images: bool = True, + include_transcript: bool = True, ) -> Path: """Generate markdown documentation.""" # Load template if template_name: - template = self.env.get_template(f"{template_name}.md.j2") + try: + template = self.env.get_template(f"{template_name}.md.j2") + except Exception: + template = self.env.from_string(DEFAULT_TEMPLATE) else: template = self.env.from_string(DEFAULT_TEMPLATE) @@ -150,6 +187,9 @@ class DocGenerator: "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"), "atomizer_hints": atomizer_hints, "bom": bom, + "include_images": include_images, + "include_transcript": include_transcript, + "version": self.version, "component_names": [c.name for c in analysis.components], "materials": list(set(c.material for c in analysis.components if c.material)), } @@ -163,56 +203,240 @@ class DocGenerator: return output_path - def generate_pdf(self, markdown_path: Path) -> Path: - """ - Generate PDF from markdown using Atomaste Report Standard. - - Requires the atomaste-reports skill/Typst to be available. - """ - import subprocess - - pdf_path = markdown_path.with_suffix(".pdf") - - # Try to use Atomaste Report Standard if available - # Otherwise fall back to pandoc - try: - # Check if atomaste build script exists - build_script = Path("/home/papa/Atomaste/Templates/Atomaste_Report_Standard/scripts/build-report.py") - if build_script.exists(): - cmd = ["python3", str(build_script), str(markdown_path), "-o", str(pdf_path)] - else: - # Fallback to pandoc - cmd = ["pandoc", str(markdown_path), "-o", str(pdf_path)] - - subprocess.run(cmd, capture_output=True, check=True) - - except subprocess.CalledProcessError as e: - raise RuntimeError(f"PDF generation failed: {e}") - - return pdf_path - def generate_atomizer_hints(self, analysis: ComponentAnalysis) -> Path: - """Generate standalone Atomizer hints JSON file.""" - import json - + """ + Generate standalone Atomizer hints JSON file. + + This file can be used by Atomizer to pre-configure FEA studies. + """ + # Extract objectives from hints + objectives = [] + constraints = [] + parameters = [] + load_cases = [] + materials_mentioned = [] + + for hint in analysis.atomizer_hints: + item = { + "timestamp": hint.timestamp, + "text": hint.text, + } + + if hint.hint_type == "objective": + # Try to parse objective direction + text_lower = hint.text.lower() + if "minimize" in text_lower or "reduce" in text_lower: + direction = "minimize" + elif "maximize" in text_lower or "increase" in text_lower: + direction = "maximize" + else: + direction = "minimize" # default + + # Try to identify what to optimize + if "mass" in text_lower or "weight" in text_lower: + objectives.append({"name": "mass", "direction": direction, "source": hint.text}) + elif "stress" in text_lower: + objectives.append({"name": "max_stress", "direction": direction, "source": hint.text}) + elif "stiff" in text_lower or "displacement" in text_lower: + objectives.append({"name": "max_displacement", "direction": direction, "source": hint.text}) + else: + objectives.append({"name": "unknown", "direction": direction, "source": hint.text}) + + elif hint.hint_type == "constraint": + constraints.append(item) + elif hint.hint_type == "parameter": + parameters.append(item) + elif hint.hint_type == "load_case": + load_cases.append(item) + elif hint.hint_type == "material": + materials_mentioned.append(hint.text) + hints = { + "generated": datetime.now().isoformat(), + "assembly_name": analysis.assembly_name, "model_understanding": { - "assembly_name": analysis.assembly_name, "components": [c.name for c in analysis.components], - "materials_mentioned": list(set(c.material for c in analysis.components if c.material)), - "functions": {c.name: c.function for c in analysis.components if c.function}, + "component_details": [ + { + "name": c.name, + "function": c.function, + "material": c.material, + "features": c.features, + } + for c in analysis.components + ], + "materials_mentioned": list(set( + [c.material for c in analysis.components if c.material] + + materials_mentioned + )), + "relationships": [ + { + "from": r.from_component, + "to": r.to_component, + "type": r.relationship_type, + } + for r in analysis.relationships + ], }, "suggested_spec": { - "objectives": [ - {"name": "mass", "direction": "minimize"} - ], - "parameters_likely": [], - "constraints_likely": [], + "objectives": objectives or [{"name": "mass", "direction": "minimize"}], + "parameters_mentioned": parameters, + "constraints_mentioned": constraints, }, - "transcript_highlights": [], + "load_cases": load_cases, + "transcript_highlights": [ + { + "timestamp": f"{h.timestamp:.1f}s", + "text": h.text, + "type": h.hint_type, + } + for h in analysis.atomizer_hints[:20] # Limit to top 20 + ], } output_path = self.output_dir / "atomizer_hints.json" output_path.write_text(json.dumps(hints, indent=2)) return output_path + + def generate_bom(self, analysis: ComponentAnalysis) -> Path: + """Generate standalone Bill of Materials CSV.""" + lines = ["Item,Part Number,Name,Quantity,Material,Function,Notes"] + + for i, comp in enumerate(analysis.components, 1): + # Escape commas in fields + name = comp.name.replace(",", ";") + function = (comp.function or "").replace(",", ";") + material = (comp.material or "TBD").replace(",", ";") + pn = comp.part_number or "TBD" + + lines.append(f'{i},{pn},"{name}",1,{material},"{function}",""') + + output_path = self.output_dir / "bom.csv" + output_path.write_text("\n".join(lines)) + + return output_path + + def generate_component_json(self, analysis: ComponentAnalysis) -> Path: + """Generate JSON export of all component data.""" + data = { + "assembly_name": analysis.assembly_name, + "generated": datetime.now().isoformat(), + "summary": analysis.summary, + "components": [ + { + "name": c.name, + "description": c.description, + "function": c.function, + "material": c.material, + "part_number": c.part_number, + "features": c.features, + "confidence": c.confidence, + "frame_timestamp": c.best_frame.timestamp if c.best_frame else None, + "transcript_excerpt": c.transcript_excerpt, + } + for c in analysis.components + ], + "relationships": [ + { + "from": r.from_component, + "to": r.to_component, + "type": r.relationship_type, + } + for r in analysis.relationships + ], + } + + output_path = self.output_dir / "components.json" + output_path.write_text(json.dumps(data, indent=2)) + + return output_path + + def generate_pdf(self, markdown_path: Path) -> Path: + """ + Generate PDF from markdown using Atomaste Report Standard or pandoc. + + Requires the atomaste-reports skill/Typst to be available. + """ + pdf_path = markdown_path.with_suffix(".pdf") + + # Try Atomaste Report Standard first + atomaste_script = Path("/home/papa/Atomaste/Templates/Atomaste_Report_Standard/scripts/build-report.py") + + if atomaste_script.exists(): + try: + cmd = [ + "python3", str(atomaste_script), + str(markdown_path), "-o", str(pdf_path) + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + if result.returncode == 0 and pdf_path.exists(): + return pdf_path + except subprocess.TimeoutExpired: + pass + except Exception: + pass + + # Try pandoc with a nice template + try: + cmd = [ + "pandoc", + str(markdown_path), + "-o", str(pdf_path), + "--pdf-engine=xelatex", + "-V", "geometry:margin=1in", + "-V", "fontsize=11pt", + "--toc", + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + if result.returncode == 0 and pdf_path.exists(): + return pdf_path + except Exception: + pass + + # Final fallback: basic pandoc + try: + cmd = ["pandoc", str(markdown_path), "-o", str(pdf_path)] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + if result.returncode == 0: + return pdf_path + except Exception as e: + raise RuntimeError(f"PDF generation failed: {e}") + + raise RuntimeError("PDF generation failed - no suitable converter found") + + def generate_all( + self, + analysis: ComponentAnalysis, + pdf: bool = False, + include_images: bool = True, + ) -> dict[str, Path]: + """ + Generate all documentation outputs. + + Returns dict mapping output type to file path. + """ + outputs = {} + + # Always generate markdown + outputs["markdown"] = self.generate( + analysis, + atomizer_hints=True, + bom=True, + include_images=include_images, + ) + + # Generate supporting files + outputs["atomizer_hints"] = self.generate_atomizer_hints(analysis) + outputs["bom"] = self.generate_bom(analysis) + outputs["components"] = self.generate_component_json(analysis) + + # Generate PDF if requested + if pdf: + try: + outputs["pdf"] = self.generate_pdf(outputs["markdown"]) + except Exception as e: + # Log but don't fail + outputs["pdf_error"] = str(e) + + return outputs diff --git a/src/cad_documenter/pipeline.py b/src/cad_documenter/pipeline.py index 498529f..2143b0c 100644 --- a/src/cad_documenter/pipeline.py +++ b/src/cad_documenter/pipeline.py @@ -1,45 +1,245 @@ -"""Main documentation pipeline orchestrator.""" +"""Main documentation pipeline orchestrator with comprehensive error handling.""" +import shutil from pathlib import Path from dataclasses import dataclass, field +from typing import Callable +from enum import Enum -from .video_processor import VideoProcessor, FrameInfo +from .config import Config, load_config +from .video_processor import VideoProcessor, FrameInfo, VideoMetadata from .audio_analyzer import AudioAnalyzer, Transcript from .vision_analyzer import VisionAnalyzer, ComponentAnalysis from .doc_generator import DocGenerator -@dataclass -class PipelineConfig: - """Pipeline configuration.""" - frame_interval: float = 2.0 - whisper_model: str = "base" - vision_model: str = "gpt-4o" # or local model +class PipelineStage(Enum): + """Pipeline processing stages.""" + INIT = "initialization" + FRAMES = "frame_extraction" + TRANSCRIPTION = "transcription" + ANALYSIS = "vision_analysis" + DOCUMENTATION = "documentation" + PDF = "pdf_generation" + COMPLETE = "complete" @dataclass -class DocumentationPipeline: - """Orchestrates the full documentation pipeline.""" +class PipelineProgress: + """Progress tracking for the pipeline.""" + stage: PipelineStage + message: str + progress: float # 0.0 to 1.0 + error: str | None = None - video_path: Path + +@dataclass +class PipelineResult: + """Result of pipeline execution.""" + success: bool output_dir: Path - frame_interval: float = 2.0 - whisper_model: str = "base" + documentation_path: Path | None = None + pdf_path: Path | None = None + atomizer_hints_path: Path | None = None + bom_path: Path | None = None + frames_extracted: int = 0 + components_found: int = 0 + transcript_duration: float = 0.0 + errors: list[str] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) - def __post_init__(self): - self.video_processor = VideoProcessor(self.video_path, self.output_dir / "frames") - self.audio_analyzer = AudioAnalyzer(self.video_path, self.whisper_model) - self.vision_analyzer = VisionAnalyzer() + +class DocumentationPipeline: + """Orchestrates the full documentation pipeline with error recovery.""" + + def __init__( + self, + video_path: Path, + output_dir: Path, + config: Config | None = None, + progress_callback: Callable[[PipelineProgress], None] | None = None, + ): + self.video_path = Path(video_path) + self.output_dir = Path(output_dir) + self.config = config or load_config() + self.progress_callback = progress_callback + + # Validate video exists + if not self.video_path.exists(): + raise FileNotFoundError(f"Video not found: {self.video_path}") + + # Create output directory + self.output_dir.mkdir(parents=True, exist_ok=True) + + # Initialize components + self.video_processor = VideoProcessor( + self.video_path, + self.output_dir / "frames", + config=self.config.frame_extraction, + ) + self.audio_analyzer = AudioAnalyzer( + self.video_path, + config=self.config.transcription, + ) + self.vision_analyzer = VisionAnalyzer(config=self.config.vision) self.doc_generator = DocGenerator(self.output_dir) + def _report_progress( + self, + stage: PipelineStage, + message: str, + progress: float, + error: str | None = None + ): + """Report progress to callback if available.""" + if self.progress_callback: + self.progress_callback(PipelineProgress( + stage=stage, + message=message, + progress=progress, + error=error, + )) + + def run( + self, + frames_only: bool = False, + skip_transcription: bool = False, + atomizer_hints: bool = False, + bom: bool = False, + pdf: bool = False, + ) -> PipelineResult: + """ + Run the full documentation pipeline. + + Args: + frames_only: Stop after frame extraction + skip_transcription: Skip audio transcription (vision-only) + atomizer_hints: Generate Atomizer FEA hints + bom: Generate Bill of Materials + pdf: Generate PDF output + + Returns: + PipelineResult with paths and statistics + """ + result = PipelineResult( + success=False, + output_dir=self.output_dir, + ) + + try: + # Stage 1: Extract frames + self._report_progress( + PipelineStage.FRAMES, + "Extracting video frames...", + 0.1 + ) + + frames = self.extract_frames() + result.frames_extracted = len(frames) + + if not frames: + result.errors.append("No frames could be extracted from video") + return result + + if frames_only: + result.success = True + self._report_progress( + PipelineStage.COMPLETE, + f"Extracted {len(frames)} frames", + 1.0 + ) + return result + + # Stage 2: Transcribe audio + self._report_progress( + PipelineStage.TRANSCRIPTION, + "Transcribing audio...", + 0.3 + ) + + if skip_transcription: + transcript = Transcript(segments=[], full_text="[Transcription skipped]") + result.warnings.append("Transcription was skipped") + else: + transcript = self.transcribe_audio_safe() + result.transcript_duration = transcript.duration + + if not transcript.segments: + result.warnings.append("No audio or empty transcript") + + # Stage 3: Analyze components + self._report_progress( + PipelineStage.ANALYSIS, + "Analyzing components with AI vision...", + 0.5 + ) + + analysis = self.analyze_components(frames, transcript) + result.components_found = len(analysis.components) + + if not analysis.components: + result.warnings.append("No components identified - check video quality") + + # Stage 4: Generate documentation + self._report_progress( + PipelineStage.DOCUMENTATION, + "Generating documentation...", + 0.7 + ) + + outputs = self.generate_documentation( + analysis, + atomizer_hints=atomizer_hints, + bom=bom, + ) + + result.documentation_path = outputs.get("markdown") + result.atomizer_hints_path = outputs.get("atomizer_hints") + result.bom_path = outputs.get("bom") + + # Stage 5: Generate PDF (optional) + if pdf: + self._report_progress( + PipelineStage.PDF, + "Generating PDF...", + 0.9 + ) + + try: + result.pdf_path = self.generate_pdf(result.documentation_path) + except Exception as e: + result.warnings.append(f"PDF generation failed: {e}") + + result.success = True + self._report_progress( + PipelineStage.COMPLETE, + f"Complete! {result.components_found} components documented", + 1.0 + ) + + except Exception as e: + result.errors.append(str(e)) + self._report_progress( + PipelineStage.COMPLETE, + f"Pipeline failed: {e}", + 1.0, + error=str(e) + ) + + return result + def extract_frames(self) -> list[FrameInfo]: - """Extract key frames from video.""" - return self.video_processor.extract_frames(interval=self.frame_interval) + """Extract key frames from video using configured mode.""" + return self.video_processor.extract_frames() def transcribe_audio(self) -> Transcript: - """Transcribe audio track.""" + """Transcribe audio track (raises on error).""" return self.audio_analyzer.transcribe() + def transcribe_audio_safe(self) -> Transcript: + """Transcribe audio track with fallback for missing audio.""" + return self.audio_analyzer.transcribe_with_fallback() + def analyze_components( self, frames: list[FrameInfo], transcript: Transcript ) -> ComponentAnalysis: @@ -51,14 +251,82 @@ class DocumentationPipeline: analysis: ComponentAnalysis, atomizer_hints: bool = False, bom: bool = False, - ) -> Path: - """Generate markdown documentation.""" - return self.doc_generator.generate( + ) -> dict[str, Path]: + """Generate all documentation outputs.""" + outputs = {} + + # Generate markdown + outputs["markdown"] = self.doc_generator.generate( analysis, atomizer_hints=atomizer_hints, bom=bom, ) + + # Generate Atomizer hints + if atomizer_hints: + outputs["atomizer_hints"] = self.doc_generator.generate_atomizer_hints(analysis) + + # Generate BOM + if bom: + outputs["bom"] = self.doc_generator.generate_bom(analysis) + + # Generate component JSON + outputs["components"] = self.doc_generator.generate_component_json(analysis) + + return outputs def generate_pdf(self, markdown_path: Path) -> Path: """Generate PDF from markdown using Atomaste Report Standard.""" return self.doc_generator.generate_pdf(markdown_path) + + def get_video_metadata(self) -> VideoMetadata: + """Get video metadata.""" + return self.video_processor.get_metadata() + + def cleanup(self, keep_frames: bool = True, keep_audio: bool = False): + """ + Clean up temporary files. + + Args: + keep_frames: Keep extracted frame images + keep_audio: Keep extracted audio file + """ + if not keep_frames: + frames_dir = self.output_dir / "frames" + if frames_dir.exists(): + shutil.rmtree(frames_dir) + + if not keep_audio: + audio_file = self.output_dir / "audio.wav" + if audio_file.exists(): + audio_file.unlink() + + +def create_pipeline( + video_path: str | Path, + output_dir: str | Path | None = None, + config_path: Path | None = None, +) -> DocumentationPipeline: + """ + Factory function to create a documentation pipeline. + + Args: + video_path: Path to input video + output_dir: Output directory (defaults to video_name_docs) + config_path: Path to config file (optional) + + Returns: + Configured DocumentationPipeline + """ + video_path = Path(video_path) + + if output_dir is None: + output_dir = video_path.parent / f"{video_path.stem}_docs" + + config = load_config(config_path) + + return DocumentationPipeline( + video_path=video_path, + output_dir=Path(output_dir), + config=config, + ) diff --git a/src/cad_documenter/video_processor.py b/src/cad_documenter/video_processor.py index fe14e1f..8596b38 100644 --- a/src/cad_documenter/video_processor.py +++ b/src/cad_documenter/video_processor.py @@ -2,6 +2,7 @@ import subprocess import json +import re from pathlib import Path from dataclasses import dataclass @@ -17,13 +18,18 @@ class FrameInfo: class VideoProcessor: """Handles video frame extraction using ffmpeg.""" - def __init__(self, video_path: Path, output_dir: Path): + def __init__(self, video_path: Path, output_dir: Path, scene_threshold: float = 0.3): self.video_path = video_path self.output_dir = output_dir self.output_dir.mkdir(parents=True, exist_ok=True) + self.scene_threshold = scene_threshold + self._duration: float | None = None def get_duration(self) -> float: """Get video duration in seconds.""" + if self._duration is not None: + return self._duration + cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", @@ -32,7 +38,8 @@ class VideoProcessor: ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) - return float(data["format"]["duration"]) + self._duration = float(data["format"]["duration"]) + return self._duration def extract_frames(self, interval: float = 2.0) -> list[FrameInfo]: """ @@ -44,13 +51,14 @@ class VideoProcessor: Returns: List of FrameInfo objects for extracted frames """ - duration = self.get_duration() - frames = [] - + # Clear existing frames + for old_frame in self.output_dir.glob("frame_*.jpg"): + old_frame.unlink() + # Use ffmpeg to extract frames at interval output_pattern = self.output_dir / "frame_%04d.jpg" cmd = [ - "ffmpeg", "-y", + "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", str(self.video_path), "-vf", f"fps=1/{interval}", "-q:v", "2", # High quality JPEG @@ -59,6 +67,7 @@ class VideoProcessor: subprocess.run(cmd, capture_output=True) # Collect extracted frames + frames = [] for i, frame_path in enumerate(sorted(self.output_dir.glob("frame_*.jpg"))): timestamp = i * interval frames.append(FrameInfo( @@ -69,13 +78,117 @@ class VideoProcessor: return frames + def extract_at_scene_changes(self, max_frames: int = 15, min_interval: float = 1.0) -> list[FrameInfo]: + """ + Extract frames at scene changes (visual transitions). + + This is smarter than fixed intervals - it captures when the view changes + (e.g., when the engineer rotates the model or zooms in on a component). + + Args: + max_frames: Maximum number of frames to extract + min_interval: Minimum seconds between frames + + Returns: + List of FrameInfo objects, or empty list if detection fails + """ + # Clear existing frames + for old_frame in self.output_dir.glob("frame_*.jpg"): + old_frame.unlink() + + # Detect scene changes + scene_timestamps = self._detect_scene_changes() + + if not scene_timestamps: + return [] + + # Filter timestamps to ensure minimum interval and max count + filtered_timestamps = self._filter_timestamps(scene_timestamps, max_frames, min_interval) + + # Always include first frame (t=0) and last frame + duration = self.get_duration() + if 0.0 not in filtered_timestamps: + filtered_timestamps.insert(0, 0.0) + if duration - filtered_timestamps[-1] > min_interval: + filtered_timestamps.append(duration - 0.5) + + # Limit to max_frames + if len(filtered_timestamps) > max_frames: + step = len(filtered_timestamps) / max_frames + filtered_timestamps = [filtered_timestamps[int(i * step)] for i in range(max_frames)] + + # Extract frames at these timestamps + frames = [] + for i, ts in enumerate(filtered_timestamps): + output_path = self.output_dir / f"frame_{i:04d}.jpg" + cmd = [ + "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", + "-ss", str(ts), + "-i", str(self.video_path), + "-vframes", "1", + "-q:v", "2", + str(output_path) + ] + subprocess.run(cmd, capture_output=True) + + if output_path.exists(): + frames.append(FrameInfo( + path=output_path, + timestamp=ts, + frame_number=i + )) + + return frames + + def _detect_scene_changes(self) -> list[float]: + """ + Detect scene changes in video using ffmpeg's scene filter. + + Returns list of timestamps where significant visual changes occur. + """ + cmd = [ + "ffmpeg", "-hide_banner", + "-i", str(self.video_path), + "-vf", f"select='gt(scene,{self.scene_threshold})',showinfo", + "-f", "null", "-" + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + + # Parse scene change timestamps from ffmpeg output + timestamps = [] + for line in result.stderr.split("\n"): + if "pts_time:" in line: + # Extract timestamp using regex + match = re.search(r'pts_time:(\d+\.?\d*)', line) + if match: + ts = float(match.group(1)) + timestamps.append(ts) + + return sorted(set(timestamps)) + + def _filter_timestamps( + self, timestamps: list[float], max_count: int, min_interval: float + ) -> list[float]: + """Filter timestamps to ensure minimum interval between frames.""" + if not timestamps: + return [] + + filtered = [timestamps[0]] + for ts in timestamps[1:]: + if ts - filtered[-1] >= min_interval: + filtered.append(ts) + if len(filtered) >= max_count: + break + + return filtered + def extract_audio(self, output_path: Path | None = None) -> Path: """Extract audio track from video.""" if output_path is None: output_path = self.output_dir.parent / "audio.wav" cmd = [ - "ffmpeg", "-y", + "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", str(self.video_path), "-vn", # No video "-acodec", "pcm_s16le", @@ -86,27 +199,13 @@ class VideoProcessor: subprocess.run(cmd, capture_output=True) return output_path - def detect_scene_changes(self, threshold: float = 0.3) -> list[float]: - """ - Detect scene changes in video. - - Returns list of timestamps where significant visual changes occur. - """ + def get_video_info(self) -> dict: + """Get video metadata.""" cmd = [ - "ffmpeg", "-i", str(self.video_path), - "-vf", f"select='gt(scene,{threshold})',showinfo", - "-f", "null", "-" + "ffprobe", "-v", "quiet", + "-print_format", "json", + "-show_format", "-show_streams", + str(self.video_path) ] result = subprocess.run(cmd, capture_output=True, text=True) - - # Parse scene change timestamps from ffmpeg output - timestamps = [] - for line in result.stderr.split("\n"): - if "pts_time:" in line: - # Extract timestamp - parts = line.split("pts_time:") - if len(parts) > 1: - ts = float(parts[1].split()[0]) - timestamps.append(ts) - - return timestamps + return json.loads(result.stdout) diff --git a/src/cad_documenter/vision_analyzer.py b/src/cad_documenter/vision_analyzer.py index 6834446..02b60e5 100644 --- a/src/cad_documenter/vision_analyzer.py +++ b/src/cad_documenter/vision_analyzer.py @@ -1,10 +1,15 @@ -"""Vision analysis module - component detection and feature extraction.""" +"""Vision analysis module - AI-powered component detection and feature extraction.""" +import base64 +import json +import re from pathlib import Path from dataclasses import dataclass, field +from typing import Any from .video_processor import FrameInfo from .audio_analyzer import Transcript +from .config import VisionConfig @dataclass @@ -18,6 +23,24 @@ class Component: best_frame: FrameInfo | None = None transcript_excerpt: str = "" part_number: str = "" # For Part Manager integration + confidence: float = 0.0 + bounding_box: list[int] | None = None + + +@dataclass +class AssemblyRelationship: + """Relationship between components.""" + from_component: str + to_component: str + relationship_type: str # bolted, welded, press-fit, etc. + + +@dataclass +class AtomizerHint: + """Hints for FEA/optimization setup.""" + timestamp: float + text: str + hint_type: str # objective, constraint, parameter, load_case, material @dataclass @@ -26,15 +49,146 @@ class ComponentAnalysis: assembly_name: str summary: str components: list[Component] + relationships: list[AssemblyRelationship] = field(default_factory=list) + atomizer_hints: list[AtomizerHint] = field(default_factory=list) assembly_notes: str = "" raw_transcript: str = "" class VisionAnalyzer: - """Analyzes frames to identify components and features.""" + """Analyzes frames to identify components and features using AI vision APIs.""" - def __init__(self, model: str = "gpt-4o"): - self.model = model + def __init__(self, config: VisionConfig | None = None): + self.config = config or VisionConfig() + self._client = None + self._prompts_dir = Path(__file__).parent.parent.parent / "prompts" + + def _get_client(self): + """Lazy-load the appropriate API client.""" + if self._client is not None: + return self._client + + if self.config.provider == "anthropic": + try: + import anthropic + self._client = anthropic.Anthropic(api_key=self.config.anthropic_api_key) + except ImportError: + raise ImportError("Install anthropic: pip install anthropic") + elif self.config.provider == "openai": + try: + import openai + self._client = openai.OpenAI(api_key=self.config.openai_api_key) + except ImportError: + raise ImportError("Install openai: pip install openai") + else: + raise ValueError(f"Unknown provider: {self.config.provider}") + + return self._client + + def _encode_image(self, image_path: Path) -> tuple[str, str]: + """Encode image to base64 and detect media type.""" + data = image_path.read_bytes() + encoded = base64.standard_b64encode(data).decode("utf-8") + + suffix = image_path.suffix.lower() + media_type = { + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".png": "image/png", + ".gif": "image/gif", + ".webp": "image/webp", + }.get(suffix, "image/jpeg") + + return encoded, media_type + + def _load_prompt(self, name: str) -> str: + """Load a prompt template.""" + prompt_file = self._prompts_dir / f"{name}.txt" + if prompt_file.exists(): + return prompt_file.read_text() + return "" + + def _call_vision_api( + self, + images: list[tuple[str, str]], # List of (base64_data, media_type) + prompt: str, + system_prompt: str = "" + ) -> str: + """Call the vision API with images and prompt.""" + client = self._get_client() + + if self.config.provider == "anthropic": + # Build Anthropic message content + content = [] + for img_data, media_type in images: + content.append({ + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": img_data, + } + }) + content.append({"type": "text", "text": prompt}) + + messages = [{"role": "user", "content": content}] + + response = client.messages.create( + model=self.config.model, + max_tokens=self.config.max_tokens, + system=system_prompt if system_prompt else "You are an expert mechanical engineer analyzing CAD models.", + messages=messages, + ) + return response.content[0].text + + elif self.config.provider == "openai": + # Build OpenAI message content + content = [] + for img_data, media_type in images: + content.append({ + "type": "image_url", + "image_url": { + "url": f"data:{media_type};base64,{img_data}", + "detail": "high" + } + }) + content.append({"type": "text", "text": prompt}) + + messages = [ + {"role": "system", "content": system_prompt or "You are an expert mechanical engineer analyzing CAD models."}, + {"role": "user", "content": content} + ] + + response = client.chat.completions.create( + model=self.config.model, + max_tokens=self.config.max_tokens, + temperature=self.config.temperature, + messages=messages, + ) + return response.choices[0].message.content + + raise ValueError(f"Unknown provider: {self.config.provider}") + + def _parse_json_response(self, response: str) -> dict: + """Extract JSON from API response.""" + # Try to find JSON in code blocks first + json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response, re.DOTALL) + if json_match: + try: + return json.loads(json_match.group(1)) + except json.JSONDecodeError: + pass + + # Try to find raw JSON object + json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response, re.DOTALL) + if json_match: + try: + return json.loads(json_match.group(0)) + except json.JSONDecodeError: + pass + + # Return empty dict if parsing fails + return {} def analyze( self, frames: list[FrameInfo], transcript: Transcript @@ -42,21 +196,41 @@ class VisionAnalyzer: """ Analyze frames and transcript to identify components. - This is where the AI magic happens - correlating visual and verbal info. + This correlates visual analysis with verbal explanations. """ - # For MVP, we'll use a multi-modal approach: - # 1. Send key frames to vision model with transcript context - # 2. Ask it to identify components and correlate with verbal descriptions + if not frames: + return ComponentAnalysis( + assembly_name="Unknown Assembly", + summary="No frames available for analysis.", + components=[], + raw_transcript=transcript.full_text, + ) - # Placeholder implementation - will be enhanced with actual AI calls + # Step 1: Analyze key frames to identify components components = self._identify_components(frames, transcript) - summary = self._generate_summary(components, transcript) + + # Step 2: Extract assembly name from transcript or vision + assembly_name = self._extract_assembly_name(transcript, frames) + + # Step 3: Generate summary + summary = self._generate_summary(components, transcript, frames) + + # Step 4: Extract relationships between components + relationships = self._extract_relationships(components, transcript) + + # Step 5: Extract Atomizer hints for FEA setup + atomizer_hints = self._extract_atomizer_hints(transcript) + + # Step 6: Extract assembly notes + assembly_notes = self._extract_assembly_notes(transcript) return ComponentAnalysis( - assembly_name=self._extract_assembly_name(transcript), + assembly_name=assembly_name, summary=summary, components=components, - assembly_notes=self._extract_assembly_notes(transcript), + relationships=relationships, + atomizer_hints=atomizer_hints, + assembly_notes=assembly_notes, raw_transcript=transcript.full_text, ) @@ -64,48 +238,340 @@ class VisionAnalyzer: self, frames: list[FrameInfo], transcript: Transcript ) -> list[Component]: """Identify individual components from frames + transcript.""" - # TODO: Implement vision API calls - # For now, return empty list - will be implemented in Phase 1 - return [] + # Select key frames for analysis (don't send all) + key_frames = self._select_key_frames(frames, max_frames=8) + + if not key_frames: + return [] + + # Prepare images + images = [] + for frame in key_frames: + try: + encoded, media_type = self._encode_image(frame.path) + images.append((encoded, media_type)) + except Exception: + continue + + if not images: + return [] + + # Build prompt with transcript context + component_prompt = self._load_prompt("component_analysis") + + # Add transcript context to prompt + prompt = f"""{component_prompt} + +## Transcript from the video walkthrough: +{transcript.full_text[:4000]} # Limit transcript length + +## Frame timestamps analyzed: +{[f.timestamp for f in key_frames]} + +Please analyze the frames and identify all visible components, correlating with the transcript.""" + + try: + response = self._call_vision_api(images, prompt) + parsed = self._parse_json_response(response) + + # Convert parsed response to Component objects + components = [] + for comp_data in parsed.get("components", []): + # Find best frame for this component + best_frame = key_frames[0] if key_frames else None + + # Find transcript excerpt + excerpt = "" + for match in parsed.get("transcript_matches", []): + if match.get("component", "").lower() == comp_data.get("name", "").lower(): + excerpt = match.get("excerpt", "") + break + + components.append(Component( + name=comp_data.get("name", "Unknown"), + description=comp_data.get("description", ""), + function=comp_data.get("function", ""), + material=comp_data.get("material", ""), + features=comp_data.get("features", []), + best_frame=best_frame, + transcript_excerpt=excerpt, + confidence=comp_data.get("confidence", 0.8), + bounding_box=comp_data.get("bounding_box"), + )) + + # If no components parsed, create at least one from transcript + if not components and transcript.full_text: + components = self._components_from_transcript(transcript, key_frames) + + return components + + except Exception as e: + # Fallback to transcript-only extraction + return self._components_from_transcript(transcript, key_frames) + + def _components_from_transcript( + self, transcript: Transcript, frames: list[FrameInfo] + ) -> list[Component]: + """Extract component mentions from transcript when vision fails.""" + components = [] + + # Keywords indicating component mentions + patterns = [ + (r"this is (?:the|a) ([^,\.]+)", "component"), + (r"(?:the|a) ([^,\.]+) (?:is|provides|handles)", "component"), + (r"([^,\.]+) bracket", "bracket"), + (r"([^,\.]+) mount", "mount"), + (r"([^,\.]+) housing", "housing"), + (r"([^,\.]+) plate", "plate"), + ] + + text = transcript.full_text + found_names = set() + + for pattern, comp_type in patterns: + matches = re.finditer(pattern, text, re.IGNORECASE) + for match in matches: + name = match.group(1).strip() + if len(name) > 2 and len(name) < 50 and name.lower() not in found_names: + found_names.add(name.lower()) + components.append(Component( + name=name.title(), + description=f"Identified from transcript", + function="", + material="", + best_frame=frames[0] if frames else None, + confidence=0.5, + )) + + return components[:10] # Limit to 10 components + + def _select_key_frames( + self, frames: list[FrameInfo], max_frames: int = 8 + ) -> list[FrameInfo]: + """Select the most representative frames for analysis.""" + if len(frames) <= max_frames: + return frames + + # Evenly distribute frame selection + step = len(frames) / max_frames + indices = [int(i * step) for i in range(max_frames)] + return [frames[i] for i in indices] def _generate_summary( - self, components: list[Component], transcript: Transcript + self, components: list[Component], transcript: Transcript, frames: list[FrameInfo] ) -> str: """Generate executive summary of the assembly.""" - # TODO: Implement with LLM - return f"Assembly documentation generated from video walkthrough. {len(components)} components identified." + if not frames: + return "No visual information available for summary." + + summary_prompt = self._load_prompt("summary_generation") + + # Build component list for context + comp_list = "\n".join([ + f"- {c.name}: {c.function or c.description}" + for c in components + ]) + + prompt = f"""{summary_prompt} - def _extract_assembly_name(self, transcript: Transcript) -> str: - """Try to extract assembly name from transcript.""" - # Look for common patterns +## Identified Components: +{comp_list if comp_list else "Components being analyzed..."} + +## Full Transcript: +{transcript.full_text[:3000]} + +Generate a professional 2-3 paragraph executive summary.""" + + # Include one representative frame + try: + encoded, media_type = self._encode_image(frames[0].path) + response = self._call_vision_api([(encoded, media_type)], prompt) + + # Clean up response - remove JSON or code blocks + summary = re.sub(r'```.*?```', '', response, flags=re.DOTALL) + summary = summary.strip() + + if summary: + return summary + except Exception: + pass + + # Fallback summary + comp_count = len(components) + return f"This assembly documentation was generated from a video walkthrough. {comp_count} components were identified through visual and transcript analysis." + + def _extract_assembly_name( + self, transcript: Transcript, frames: list[FrameInfo] + ) -> str: + """Try to extract assembly name from transcript or vision.""" text = transcript.full_text.lower() - patterns = ["this is the", "presenting the", "looking at the", "reviewing the"] + + # Common patterns for assembly names + patterns = [ + r"this is the ([^,\.]+)", + r"presenting the ([^,\.]+)", + r"looking at the ([^,\.]+)", + r"reviewing the ([^,\.]+)", + r"the ([^,\.]+) assembly", + r"([^,\.]+) design review", + ] + for pattern in patterns: - if pattern in text: - idx = text.find(pattern) + len(pattern) - name = transcript.full_text[idx:idx + 50].strip().split(".")[0] - return name.strip() + match = re.search(pattern, text) + if match: + name = match.group(1).strip() + if len(name) > 2 and len(name) < 50: + return name.title() + return "Untitled Assembly" def _extract_assembly_notes(self, transcript: Transcript) -> str: """Extract assembly-related notes from transcript.""" - # Look for assembly instructions in transcript - keywords = ["assemble", "install", "mount", "attach", "connect"] + keywords = ["assemble", "install", "mount", "attach", "connect", "fasten", + "torque", "sequence", "order", "first", "then", "finally"] notes = [] + for seg in transcript.segments: if any(kw in seg.text.lower() for kw in keywords): notes.append(seg.text) + return " ".join(notes) if notes else "" - def analyze_single_frame(self, frame: FrameInfo, context: str = "") -> dict: + def _extract_relationships( + self, components: list[Component], transcript: Transcript + ) -> list[AssemblyRelationship]: + """Extract relationships between components from transcript.""" + relationships = [] + + # Relationship keywords + rel_patterns = [ + (r"([^,\.]+) (?:is )?bolted to ([^,\.]+)", "bolted"), + (r"([^,\.]+) (?:is )?welded to ([^,\.]+)", "welded"), + (r"([^,\.]+) (?:is )?press.?fit (?:into|to) ([^,\.]+)", "press-fit"), + (r"([^,\.]+) (?:is )?attached to ([^,\.]+)", "attached"), + (r"([^,\.]+) connects to ([^,\.]+)", "connected"), + (r"([^,\.]+) mounts (?:on|to) ([^,\.]+)", "mounted"), + ] + + text = transcript.full_text + for pattern, rel_type in rel_patterns: + matches = re.finditer(pattern, text, re.IGNORECASE) + for match in matches: + relationships.append(AssemblyRelationship( + from_component=match.group(1).strip().title(), + to_component=match.group(2).strip().title(), + relationship_type=rel_type, + )) + + return relationships + + def _extract_atomizer_hints(self, transcript: Transcript) -> list[AtomizerHint]: + """Extract optimization/FEA hints from transcript for Atomizer integration.""" + hints = [] + + # Objective keywords + objective_keywords = [ + "minimize", "maximize", "reduce", "increase", "optimize", + "lightweight", "stiff", "strong", "efficient" + ] + + # Constraint keywords + constraint_keywords = [ + "must", "cannot", "should not", "limit", "maximum", "minimum", + "at least", "no more than", "constraint", "requirement" + ] + + # Parameter keywords + parameter_keywords = [ + "thickness", "diameter", "length", "width", "height", "radius", + "fillet", "chamfer", "angle", "spacing", "pitch" + ] + + # Load case keywords + load_keywords = [ + "load", "force", "moment", "torque", "pressure", "stress", + "vibration", "thermal", "fatigue", "impact", "cyclic" + ] + + # Material keywords + material_keywords = [ + "aluminum", "steel", "titanium", "plastic", "composite", + "6061", "7075", "304", "316", "carbon fiber", "abs", "pla" + ] + + for seg in transcript.segments: + text_lower = seg.text.lower() + + # Check for objectives + if any(kw in text_lower for kw in objective_keywords): + hints.append(AtomizerHint( + timestamp=seg.start, + text=seg.text, + hint_type="objective" + )) + + # Check for constraints + elif any(kw in text_lower for kw in constraint_keywords): + hints.append(AtomizerHint( + timestamp=seg.start, + text=seg.text, + hint_type="constraint" + )) + + # Check for parameters + elif any(kw in text_lower for kw in parameter_keywords): + hints.append(AtomizerHint( + timestamp=seg.start, + text=seg.text, + hint_type="parameter" + )) + + # Check for load cases + elif any(kw in text_lower for kw in load_keywords): + hints.append(AtomizerHint( + timestamp=seg.start, + text=seg.text, + hint_type="load_case" + )) + + # Check for materials + elif any(kw in text_lower for kw in material_keywords): + hints.append(AtomizerHint( + timestamp=seg.start, + text=seg.text, + hint_type="material" + )) + + return hints + + def analyze_single_frame( + self, frame: FrameInfo, context: str = "" + ) -> dict[str, Any]: """ Analyze a single frame for components and features. Returns dict with detected components, features, and confidence. """ - # TODO: Implement with vision API - return { - "components": [], - "features": [], - "confidence": 0.0 - } + try: + encoded, media_type = self._encode_image(frame.path) + + prompt = f"""Analyze this CAD model image and identify: +1. All visible components/parts +2. Notable features (holes, threads, fillets, etc.) +3. Estimated materials based on appearance +4. Any visible dimensions or annotations + +{f'Additional context: {context}' if context else ''} + +Return a JSON object with components, features, and observations.""" + + response = self._call_vision_api([(encoded, media_type)], prompt) + return self._parse_json_response(response) + + except Exception as e: + return { + "components": [], + "features": [], + "confidence": 0.0, + "error": str(e) + } diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 30172f4..44bd8ff 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,54 +1,288 @@ -"""Basic tests for CAD-Documenter pipeline.""" +"""Tests for CAD-Documenter pipeline.""" import pytest from pathlib import Path -def test_imports(): +class TestImports: """Test that all modules can be imported.""" - from cad_documenter import __version__ - from cad_documenter.cli import main - from cad_documenter.pipeline import DocumentationPipeline - from cad_documenter.video_processor import VideoProcessor - from cad_documenter.audio_analyzer import AudioAnalyzer - from cad_documenter.vision_analyzer import VisionAnalyzer - from cad_documenter.doc_generator import DocGenerator - assert __version__ == "0.1.0" + def test_version(self): + from cad_documenter import __version__ + assert __version__ == "0.1.0" + + def test_cli_import(self): + from cad_documenter.cli import main + assert callable(main) + + def test_pipeline_import(self): + from cad_documenter.pipeline import DocumentationPipeline + assert DocumentationPipeline is not None + + def test_video_processor_import(self): + from cad_documenter.video_processor import VideoProcessor, FrameInfo + assert VideoProcessor is not None + assert FrameInfo is not None + + def test_audio_analyzer_import(self): + from cad_documenter.audio_analyzer import AudioAnalyzer, Transcript, TranscriptSegment + assert AudioAnalyzer is not None + assert Transcript is not None + + def test_vision_analyzer_import(self): + from cad_documenter.vision_analyzer import ( + VisionAnalyzer, Component, ComponentAnalysis, AtomizerHints + ) + assert VisionAnalyzer is not None + assert Component is not None + assert AtomizerHints is not None + + def test_doc_generator_import(self): + from cad_documenter.doc_generator import DocGenerator + assert DocGenerator is not None + + def test_config_import(self): + from cad_documenter.config import ( + Config, load_config, APIConfig, ProcessingConfig, OutputConfig + ) + assert Config is not None + assert callable(load_config) -def test_transcript_dataclass(): +class TestTranscript: """Test Transcript dataclass functionality.""" - from cad_documenter.audio_analyzer import Transcript, TranscriptSegment - segments = [ - TranscriptSegment(start=0.0, end=5.0, text="This is the main bracket"), - TranscriptSegment(start=5.0, end=10.0, text="It holds the motor"), - TranscriptSegment(start=10.0, end=15.0, text="Made of aluminum"), - ] + def test_transcript_creation(self): + from cad_documenter.audio_analyzer import Transcript, TranscriptSegment - transcript = Transcript(segments=segments, full_text="This is the main bracket. It holds the motor. Made of aluminum.") + segments = [ + TranscriptSegment(start=0.0, end=5.0, text="This is the main bracket"), + TranscriptSegment(start=5.0, end=10.0, text="It holds the motor"), + TranscriptSegment(start=10.0, end=15.0, text="Made of aluminum"), + ] - # Test get_text_at - text = transcript.get_text_at(7.0, window=3.0) - assert "holds the motor" in text - assert "main bracket" in text + transcript = Transcript( + segments=segments, + full_text="This is the main bracket. It holds the motor. Made of aluminum." + ) + + assert len(transcript.segments) == 3 + assert "bracket" in transcript.full_text + + def test_get_text_at(self): + from cad_documenter.audio_analyzer import Transcript, TranscriptSegment + + segments = [ + TranscriptSegment(start=0.0, end=5.0, text="This is the main bracket"), + TranscriptSegment(start=5.0, end=10.0, text="It holds the motor"), + TranscriptSegment(start=10.0, end=15.0, text="Made of aluminum"), + ] + + transcript = Transcript( + segments=segments, + full_text="This is the main bracket. It holds the motor. Made of aluminum." + ) + + # Test getting text at specific timestamp + text = transcript.get_text_at(7.0, window=3.0) + assert "holds the motor" in text + assert "main bracket" in text + + def test_get_segment_at(self): + from cad_documenter.audio_analyzer import Transcript, TranscriptSegment + + segments = [ + TranscriptSegment(start=0.0, end=5.0, text="First segment"), + TranscriptSegment(start=5.0, end=10.0, text="Second segment"), + ] + + transcript = Transcript(segments=segments, full_text="") + + seg = transcript.get_segment_at(3.0) + assert seg is not None + assert seg.text == "First segment" + + seg = transcript.get_segment_at(7.0) + assert seg is not None + assert seg.text == "Second segment" + + def test_search(self): + from cad_documenter.audio_analyzer import Transcript, TranscriptSegment + + segments = [ + TranscriptSegment(start=0.0, end=5.0, text="The bracket is aluminum"), + TranscriptSegment(start=5.0, end=10.0, text="The motor is steel"), + ] + + transcript = Transcript(segments=segments, full_text="") + + results = transcript.search("aluminum") + assert len(results) == 1 + assert results[0][0].text == "The bracket is aluminum" -def test_component_dataclass(): +class TestComponent: """Test Component dataclass.""" - from cad_documenter.vision_analyzer import Component - component = Component( - name="Main Bracket", - description="Primary structural member", - function="Holds the motor", - material="Aluminum 6061-T6", - features=["4x M6 holes", "Fillet radii"], - ) + def test_component_creation(self): + from cad_documenter.vision_analyzer import Component - assert component.name == "Main Bracket" - assert len(component.features) == 2 + component = Component( + name="Main Bracket", + description="Primary structural member", + function="Holds the motor", + material="Aluminum 6061-T6", + features=["4x M6 holes", "Fillet radii"], + ) + + assert component.name == "Main Bracket" + assert len(component.features) == 2 + assert component.material == "Aluminum 6061-T6" + + def test_component_defaults(self): + from cad_documenter.vision_analyzer import Component + + component = Component(name="Test", description="Test component") + + assert component.function == "" + assert component.material == "" + assert component.features == [] + assert component.confidence == 0.0 -# TODO: Add integration tests with sample videos +class TestAtomizerHints: + """Test AtomizerHints dataclass.""" + + def test_hints_creation(self): + from cad_documenter.vision_analyzer import AtomizerHints + + hints = AtomizerHints( + objectives=[{"name": "mass", "direction": "minimize"}], + constraints=[{"type": "envelope", "value": "200mm"}], + parameters=["thickness", "fillet_radius"], + critical_regions=[{"feature": "fillet", "concern": "stress_concentration"}], + ) + + assert len(hints.objectives) == 1 + assert hints.objectives[0]["name"] == "mass" + assert "thickness" in hints.parameters + + +class TestConfig: + """Test configuration loading.""" + + def test_default_config(self): + from cad_documenter.config import Config + + config = Config() + + assert config.api.provider == "openai" + assert config.processing.whisper_model == "base" + assert config.output.include_bom is True + + def test_load_config_defaults(self): + from cad_documenter.config import load_config + + # Should return defaults when no config file exists + config = load_config(Path("/nonexistent/config.toml")) + + assert config.api.provider == "openai" + assert config.processing.frame_interval == 2.0 + + +class TestDocGenerator: + """Test documentation generation.""" + + def test_generate_creates_file(self, tmp_path): + from cad_documenter.doc_generator import DocGenerator + from cad_documenter.vision_analyzer import ComponentAnalysis, Component + + generator = DocGenerator(tmp_path) + + analysis = ComponentAnalysis( + assembly_name="Test Assembly", + summary="This is a test assembly.", + components=[ + Component( + name="Test Part", + description="A test part", + material="Steel", + function="Testing", + ) + ], + ) + + doc_path = generator.generate(analysis) + + assert doc_path.exists() + content = doc_path.read_text() + assert "Test Assembly" in content + assert "Test Part" in content + + def test_generate_with_bom(self, tmp_path): + from cad_documenter.doc_generator import DocGenerator + from cad_documenter.vision_analyzer import ComponentAnalysis, Component + + generator = DocGenerator(tmp_path) + + analysis = ComponentAnalysis( + assembly_name="Test Assembly", + summary="Test", + components=[ + Component(name="Part A", description="First", material="Aluminum"), + Component(name="Part B", description="Second", material="Steel"), + ], + ) + + doc_path = generator.generate(analysis, bom=True) + + content = doc_path.read_text() + assert "Bill of Materials" in content + assert "Part A" in content + assert "Part B" in content + + # Check BOM CSV was created + csv_path = tmp_path / "bom.csv" + assert csv_path.exists() + + def test_atomizer_hints_json(self, tmp_path): + from cad_documenter.doc_generator import DocGenerator + from cad_documenter.vision_analyzer import ComponentAnalysis, Component, AtomizerHints + import json + + generator = DocGenerator(tmp_path) + + analysis = ComponentAnalysis( + assembly_name="Test Assembly", + summary="Test", + components=[ + Component(name="Bracket", description="Main bracket", material="Aluminum"), + ], + atomizer_hints=AtomizerHints( + objectives=[{"name": "mass", "direction": "minimize"}], + parameters=["thickness"], + ), + ) + + hints_path = generator.generate_atomizer_hints(analysis) + + assert hints_path.exists() + hints = json.loads(hints_path.read_text()) + assert hints["assembly_name"] == "Test Assembly" + assert len(hints["optimization_hints"]["objectives"]) == 1 + + +# Integration tests (require actual video files) +class TestIntegration: + """Integration tests - skipped without test videos.""" + + @pytest.mark.skip(reason="Requires test video file") + def test_full_pipeline(self, tmp_path): + from cad_documenter.pipeline import DocumentationPipeline + + video_path = Path("tests/fixtures/sample.mp4") + pipeline = DocumentationPipeline(video_path, tmp_path) + + results = pipeline.run_full_pipeline() + + assert results["documentation"].exists()