Full implementation - Vision AI, config, improved pipeline

Major changes:
- vision_analyzer.py: Real OpenAI/Anthropic vision API integration
  - Component detection with confidence scores
  - Atomizer hints extraction (objectives, constraints, parameters)
  - Material and feature identification
  - Timeline correlation with transcript

- config.py: Full configuration system
  - API settings (provider, keys, models)
  - Processing settings (Whisper model, frame interval, scene detection)
  - Output settings (BOM, hints, PDF template)
  - Config file support (~/.cad-documenter.toml)

- audio_analyzer.py: Enhanced transcription
  - Audio stream detection
  - Graceful fallback for missing audio
  - Keyword extraction
  - Technical term detection
  - Timeline correlation

- video_processor.py: Smart frame extraction
  - Scene change detection via ffmpeg
  - Configurable thresholds
  - Best frame selection

- doc_generator.py: Improved output
  - Better Markdown templates
  - BOM CSV export
  - Atomizer hints JSON
  - Component cards

- cli.py: Rich CLI with progress indicators
  - Config file support
  - --init-config flag
  - Verbose mode
  - Better error messages

- tests: Comprehensive test suite
This commit is contained in:
Mario Lavoie
2026-01-27 20:16:44 +00:00
parent 1e94a98e5b
commit 148180c12e
9 changed files with 2084 additions and 270 deletions

View File

@@ -15,6 +15,8 @@ dependencies = [
"jinja2>=3.1.0",
"openai-whisper>=20231117",
"pillow>=10.0.0",
"httpx>=0.27.0",
"tomli>=2.0.0;python_version<'3.11'",
]
[project.optional-dependencies]

View File

@@ -1,9 +1,13 @@
"""Audio analysis module - transcription via Whisper."""
"""Audio analysis module - transcription via Whisper with timeline correlation."""
from pathlib import Path
from dataclasses import dataclass
import subprocess
import tempfile
import json
import re
from pathlib import Path
from dataclasses import dataclass, field
from .config import TranscriptionConfig
@dataclass
@@ -12,6 +16,8 @@ class TranscriptSegment:
start: float # seconds
end: float
text: str
confidence: float = 1.0
words: list[dict] = field(default_factory=list) # Word-level timestamps if available
@dataclass
@@ -19,6 +25,8 @@ class Transcript:
"""Full transcript with segments."""
segments: list[TranscriptSegment]
full_text: str
language: str = "en"
duration: float = 0.0
def get_text_at(self, timestamp: float, window: float = 5.0) -> str:
"""Get transcript text around a specific timestamp."""
@@ -28,13 +36,85 @@ class Transcript:
relevant.append(seg.text)
return " ".join(relevant)
def get_segment_at(self, timestamp: float) -> TranscriptSegment | None:
"""Get the segment containing a specific timestamp."""
for seg in self.segments:
if seg.start <= timestamp <= seg.end:
return seg
return None
def search(self, query: str) -> list[tuple[TranscriptSegment, float]]:
"""
Search transcript for a query string.
Returns list of (segment, timestamp) tuples.
"""
results = []
query_lower = query.lower()
for seg in self.segments:
if query_lower in seg.text.lower():
results.append((seg, seg.start))
return results
class AudioAnalyzer:
"""Handles audio transcription using Whisper."""
"""Handles audio transcription using Whisper with enhanced features."""
def __init__(self, video_path: Path, model: str = "base"):
def __init__(
self,
video_path: Path,
config: TranscriptionConfig | None = None
):
self.video_path = video_path
self.model = model
self.config = config or TranscriptionConfig()
self._model = None
def _check_audio_stream(self) -> bool:
"""Check if video has an audio stream."""
cmd = [
"ffprobe", "-v", "quiet",
"-select_streams", "a",
"-show_entries", "stream=codec_type",
"-of", "json",
str(self.video_path)
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return False
try:
data = json.loads(result.stdout)
streams = data.get("streams", [])
return len(streams) > 0
except json.JSONDecodeError:
return False
def _extract_audio(self, output_path: Path) -> bool:
"""Extract audio from video to WAV file."""
cmd = [
"ffmpeg", "-y",
"-i", str(self.video_path),
"-vn", "-acodec", "pcm_s16le",
"-ar", "16000", "-ac", "1",
str(output_path)
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0 and output_path.exists()
def _get_model(self):
"""Lazy-load Whisper model."""
if self._model is not None:
return self._model
try:
import whisper
self._model = whisper.load_model(self.config.model)
return self._model
except ImportError:
raise ImportError(
"Whisper not installed. Run: pip install openai-whisper"
)
def transcribe(self) -> Transcript:
"""
@@ -42,63 +122,198 @@ class AudioAnalyzer:
Returns:
Transcript object with segments and full text
Raises:
RuntimeError: If video has no audio or transcription fails
"""
# Check for audio stream
if not self._check_audio_stream():
raise RuntimeError(
"Video has no audio track. Cannot transcribe."
)
# Extract audio to temp file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
audio_path = Path(f.name)
# Extract audio using ffmpeg
cmd = [
"ffmpeg", "-y",
"-i", str(self.video_path),
"-vn", "-acodec", "pcm_s16le",
"-ar", "16000", "-ac", "1",
str(audio_path)
]
subprocess.run(cmd, capture_output=True)
try:
if not self._extract_audio(audio_path):
raise RuntimeError("Failed to extract audio from video")
# Check if audio file is valid (non-zero size)
if audio_path.stat().st_size < 1000:
raise RuntimeError(
"Audio track is too short or silent. Cannot transcribe."
)
# Run Whisper
try:
import whisper
model = whisper.load_model(self.model)
result = model.transcribe(str(audio_path), word_timestamps=True)
model = self._get_model()
options = {
"word_timestamps": True,
"verbose": False,
}
if self.config.language:
options["language"] = self.config.language
result = model.transcribe(str(audio_path), **options)
segments = []
for seg in result.get("segments", []):
# Extract word-level timestamps if available
words = []
for word_data in seg.get("words", []):
words.append({
"word": word_data.get("word", "").strip(),
"start": word_data.get("start", 0),
"end": word_data.get("end", 0),
"probability": word_data.get("probability", 1.0),
})
segments.append(TranscriptSegment(
start=seg["start"],
end=seg["end"],
text=seg["text"].strip()
text=seg["text"].strip(),
confidence=seg.get("avg_logprob", 0.0),
words=words,
))
full_text = result.get("text", "").strip()
# Get duration from last segment
duration = segments[-1].end if segments else 0.0
return Transcript(
segments=segments,
full_text=result.get("text", "").strip()
full_text=full_text,
language=result.get("language", "en"),
duration=duration,
)
except Exception as e:
if "no audio" in str(e).lower():
raise RuntimeError("Video has no audio track") from e
raise RuntimeError(f"Transcription failed: {e}") from e
finally:
# Cleanup temp file
try:
audio_path.unlink(missing_ok=True)
except Exception:
pass
def extract_keywords(self, transcript: Transcript) -> list[str]:
"""Extract likely component names and technical terms."""
# Simple keyword extraction - can be enhanced with NLP
def transcribe_with_fallback(self) -> Transcript:
"""
Transcribe with graceful fallback for edge cases.
Returns empty transcript instead of raising for missing audio.
"""
try:
return self.transcribe()
except RuntimeError as e:
if "no audio" in str(e).lower() or "too short" in str(e).lower():
# Return empty transcript
return Transcript(
segments=[],
full_text="[No audio available]",
language="en",
duration=0.0,
)
raise
def extract_keywords(self, transcript: Transcript) -> list[dict]:
"""
Extract likely component names and technical terms.
Returns list of dicts with keyword, context, and timestamp.
"""
keywords = []
# Patterns that indicate component mentions
indicator_phrases = [
"this is the", "this is a", "here we have",
"the main", "called the", "known as",
"this part", "this component", "this assembly"
(r"this is (?:the|a) ([^,\.]+)", "definition"),
(r"here (?:we have|is) (?:the|a) ([^,\.]+)", "definition"),
(r"the main ([^,\.]+)", "component"),
(r"called (?:the|a) ([^,\.]+)", "naming"),
(r"known as (?:the|a)? ([^,\.]+)", "naming"),
(r"this (?:part|component|assembly|piece) ([^,\.]+)", "component"),
(r"the ([^,\.]+) (?:bracket|mount|housing|plate|arm|shaft)", "component"),
]
text_lower = transcript.full_text.lower()
for phrase in indicator_phrases:
if phrase in text_lower:
# Find what comes after the phrase
idx = text_lower.find(phrase)
after = transcript.full_text[idx + len(phrase):idx + len(phrase) + 50]
# Take first few words
words = after.strip().split()[:3]
if words:
keywords.append(" ".join(words).strip(",.;:"))
for seg in transcript.segments:
text = seg.text
for pattern, kw_type in indicator_phrases:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
keyword = match.group(1).strip()
# Filter out too short or too long
if 2 < len(keyword) < 50:
keywords.append({
"keyword": keyword,
"type": kw_type,
"timestamp": seg.start,
"context": text,
})
return list(set(keywords))
# Deduplicate by keyword
seen = set()
unique_keywords = []
for kw in keywords:
kw_lower = kw["keyword"].lower()
if kw_lower not in seen:
seen.add(kw_lower)
unique_keywords.append(kw)
return unique_keywords
def extract_technical_terms(self, transcript: Transcript) -> list[str]:
"""Extract technical/engineering terms from transcript."""
# Common engineering terms to look for
tech_patterns = [
r"\b(aluminum|steel|titanium|brass|copper|plastic|composite)\b",
r"\b(6061|7075|304|316|abs|pla|petg|nylon)\b",
r"\b(M[0-9]+|#[0-9]+-[0-9]+)\b", # Fastener sizes
r"\b([0-9]+(?:\.[0-9]+)?\s*(?:mm|cm|m|in|inch|ft))\b", # Dimensions
r"\b(fillet|chamfer|thread|bore|hole|slot|keyway)\b",
r"\b(torque|force|load|stress|strain|deflection)\b",
r"\b(cnc|3d print|cast|machined|welded|brazed)\b",
]
terms = set()
text = transcript.full_text.lower()
for pattern in tech_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
terms.add(match.strip())
return list(terms)
def create_timeline(
self, transcript: Transcript, frame_timestamps: list[float]
) -> list[dict]:
"""
Create a timeline correlating frames with transcript segments.
Args:
transcript: The transcript with segments
frame_timestamps: List of frame timestamps in seconds
Returns:
List of dicts with frame_timestamp, segment_text, keywords
"""
timeline = []
for frame_ts in frame_timestamps:
# Find relevant transcript segments
text = transcript.get_text_at(frame_ts, window=3.0)
segment = transcript.get_segment_at(frame_ts)
timeline.append({
"frame_timestamp": frame_ts,
"transcript_text": text,
"segment": segment,
})
return timeline

View File

@@ -1,14 +1,28 @@
"""CAD-Documenter CLI - Main entry point."""
import click
import sys
from pathlib import Path
from rich.console import Console
import click
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.panel import Panel
from .config import load_config, create_default_config
from .pipeline import DocumentationPipeline
console = Console()
def print_banner():
"""Print welcome banner."""
console.print(Panel.fit(
"[bold blue]CAD-Documenter[/bold blue] v0.1.0\n"
"[dim]Video walkthrough → Engineering documentation[/dim]",
border_style="blue"
))
@click.command()
@click.argument("video", type=click.Path(exists=True, path_type=Path))
@click.option("-o", "--output", type=click.Path(path_type=Path), help="Output directory")
@@ -16,8 +30,12 @@ console = Console()
@click.option("--atomizer-hints", is_flag=True, help="Generate Atomizer FEA hints")
@click.option("--bom", is_flag=True, help="Generate Bill of Materials")
@click.option("--pdf", is_flag=True, help="Generate PDF via Atomaste Report Standard")
@click.option("--frame-interval", default=2.0, help="Seconds between frame extractions")
@click.option("--whisper-model", default="base", help="Whisper model size (tiny/base/small/medium/large)")
@click.option("--frame-interval", type=float, help="Seconds between frame extractions")
@click.option("--whisper-model", type=click.Choice(["tiny", "base", "small", "medium", "large"]), help="Whisper model size")
@click.option("--api-provider", type=click.Choice(["openai", "anthropic"]), help="Vision API provider")
@click.option("--config", "config_path", type=click.Path(exists=True, path_type=Path), help="Config file path")
@click.option("--init-config", is_flag=True, help="Create default config file and exit")
@click.option("-v", "--verbose", is_flag=True, help="Verbose output")
@click.version_option()
def main(
video: Path,
@@ -26,60 +44,169 @@ def main(
atomizer_hints: bool,
bom: bool,
pdf: bool,
frame_interval: float,
whisper_model: str,
frame_interval: float | None,
whisper_model: str | None,
api_provider: str | None,
config_path: Path | None,
init_config: bool,
verbose: bool,
):
"""
Generate engineering documentation from a CAD walkthrough video.
VIDEO: Path to the video file (.mp4, .mov, .avi, etc.)
Examples:
cad-doc walkthrough.mp4
cad-doc video.mp4 --output ./docs --bom --atomizer-hints
cad-doc video.mp4 --pdf --whisper-model medium
"""
console.print(f"[bold blue]CAD-Documenter[/bold blue] v0.1.0")
console.print(f"Processing: [cyan]{video}[/cyan]")
print_banner()
# Handle --init-config
if init_config:
default_path = Path.home() / ".cad-documenter.toml"
create_default_config(default_path)
console.print(f"[green]✓[/green] Created config file: {default_path}")
console.print("[dim]Edit this file to configure API keys and defaults.[/dim]")
return
# Load configuration
config = load_config(config_path)
# Override config with CLI options
if frame_interval is not None:
config.processing.frame_interval = frame_interval
if whisper_model is not None:
config.processing.whisper_model = whisper_model
if api_provider is not None:
config.api.provider = api_provider
# Check API key
if not frames_only and not config.api.api_key:
provider = config.api.provider.upper()
console.print(f"[red]Error:[/red] No API key found for {config.api.provider}.")
console.print(f"Set [cyan]{provider}_API_KEY[/cyan] environment variable or add to config file.")
console.print(f"\nTo create a config file: [cyan]cad-doc --init-config[/cyan]")
sys.exit(1)
console.print(f"Processing: [cyan]{video.name}[/cyan]")
if verbose:
console.print(f" API: {config.api.provider} ({config.api.vision_model or 'default'})")
console.print(f" Whisper: {config.processing.whisper_model}")
# Default output directory
if output is None:
output = video.parent / f"{video.stem}_docs"
output.mkdir(parents=True, exist_ok=True)
console.print(f"Output: [cyan]{output}[/cyan]")
# Run pipeline
# Initialize pipeline
try:
pipeline = DocumentationPipeline(
video_path=video,
output_dir=output,
frame_interval=frame_interval,
whisper_model=whisper_model,
config=config,
)
except ValueError as e:
console.print(f"[red]Configuration error:[/red] {e}")
sys.exit(1)
# Frames only mode
if frames_only:
console.print("[yellow]Extracting frames only...[/yellow]")
pipeline.extract_frames()
console.print(f"[green]✓[/green] Frames saved to {output / 'frames'}")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
progress.add_task("Extracting frames...", total=None)
frames = pipeline.extract_frames()
console.print(f"[green]✓[/green] Extracted {len(frames)} frames to {output / 'frames'}")
return
# Full pipeline
console.print("[yellow]Step 1/4:[/yellow] Extracting frames...")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
# Step 1: Extract frames
task1 = progress.add_task("[cyan]Step 1/4:[/cyan] Extracting frames...", total=None)
frames = pipeline.extract_frames()
console.print(f" [green]✓[/green] Extracted {len(frames)} frames")
progress.update(task1, description=f"[green]✓[/green] Extracted {len(frames)} frames")
progress.remove_task(task1)
console.print("[yellow]Step 2/4:[/yellow] Transcribing audio...")
# Step 2: Transcribe
task2 = progress.add_task("[cyan]Step 2/4:[/cyan] Transcribing audio...", total=None)
transcript = pipeline.transcribe_audio()
console.print(f" [green]✓[/green] Transcribed {len(transcript.segments)} segments")
seg_count = len(transcript.segments) if transcript.segments else 0
progress.update(task2, description=f"[green]✓[/green] Transcribed {seg_count} segments")
progress.remove_task(task2)
console.print("[yellow]Step 3/4:[/yellow] Analyzing components...")
if verbose and transcript.full_text:
console.print(Panel(
transcript.full_text[:500] + ("..." if len(transcript.full_text) > 500 else ""),
title="Transcript Preview",
border_style="dim"
))
# Step 3: Analyze
task3 = progress.add_task("[cyan]Step 3/4:[/cyan] Analyzing components...", total=None)
analysis = pipeline.analyze_components(frames, transcript)
console.print(f" [green]✓[/green] Identified {len(analysis.components)} components")
comp_count = len(analysis.components)
progress.update(task3, description=f"[green]✓[/green] Identified {comp_count} components")
progress.remove_task(task3)
console.print("[yellow]Step 4/4:[/yellow] Generating documentation...")
doc_path = pipeline.generate_documentation(analysis, atomizer_hints=atomizer_hints, bom=bom)
console.print(f" [green]✓[/green] Documentation saved to {doc_path}")
if verbose and analysis.components:
console.print("\n[bold]Components found:[/bold]")
for c in analysis.components:
console.print(f"{c.name} ({c.material or 'material unknown'})")
# Step 4: Generate documentation
task4 = progress.add_task("[cyan]Step 4/4:[/cyan] Generating documentation...", total=None)
doc_path = pipeline.generate_documentation(
analysis,
atomizer_hints=atomizer_hints or config.output.include_atomizer_hints,
bom=bom or config.output.include_bom,
)
progress.update(task4, description=f"[green]✓[/green] Documentation generated")
progress.remove_task(task4)
# Generate PDF if requested
if pdf:
console.print("[yellow]Generating PDF...[/yellow]")
console.print("[cyan]Generating PDF...[/cyan]")
try:
pdf_path = pipeline.generate_pdf(doc_path)
console.print(f" [green]✓[/green] PDF saved to {pdf_path}")
console.print(f"[green]✓[/green] PDF: {pdf_path}")
except Exception as e:
console.print(f"[yellow]Warning:[/yellow] PDF generation failed: {e}")
console.print(f"\n[bold green]Done![/bold green] Output: {output}")
# Summary
console.print()
console.print(Panel.fit(
f"[bold green]Documentation complete![/bold green]\n\n"
f"📄 [cyan]{doc_path}[/cyan]\n"
f"📊 {len(analysis.components)} components documented\n"
f"🖼️ {len(frames)} frames extracted",
title="Summary",
border_style="green"
))
# Show atomizer hints summary if generated
if (atomizer_hints or config.output.include_atomizer_hints) and analysis.atomizer_hints:
hints = analysis.atomizer_hints
if hints.objectives or hints.constraints:
console.print("\n[bold]Atomizer Hints:[/bold]")
for obj in hints.objectives[:3]:
console.print(f" 🎯 {obj['direction'].capitalize()} {obj['name']}")
for constraint in hints.constraints[:3]:
console.print(f" 📏 {constraint['type']}: {constraint['value']}")
if __name__ == "__main__":

View File

@@ -0,0 +1,179 @@
"""Configuration management for CAD-Documenter."""
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Literal
try:
import tomllib
except ImportError:
import tomli as tomllib
@dataclass
class TranscriptionConfig:
"""Transcription configuration."""
model: str = "base" # tiny, base, small, medium, large
language: str | None = None # None = auto-detect
@dataclass
class APIConfig:
"""API configuration."""
provider: Literal["openai", "anthropic"] = "openai"
api_key: str | None = None
vision_model: str | None = None # None = use provider default
text_model: str | None = None
@dataclass
class ProcessingConfig:
"""Video/audio processing configuration."""
whisper_model: str = "base"
frame_interval: float = 2.0
use_scene_detection: bool = True
max_frames: int = 15
scene_threshold: float = 0.3
@dataclass
class OutputConfig:
"""Output configuration."""
include_bom: bool = True
include_atomizer_hints: bool = True
include_raw_transcript: bool = True
include_frames: bool = True
pdf_template: str = "default"
@dataclass
class Config:
"""Main configuration."""
api: APIConfig = field(default_factory=APIConfig)
processing: ProcessingConfig = field(default_factory=ProcessingConfig)
output: OutputConfig = field(default_factory=OutputConfig)
def load_config(config_path: Path | None = None) -> Config:
"""
Load configuration from file and environment variables.
Priority (highest to lowest):
1. Environment variables
2. Config file
3. Defaults
"""
config = Config()
# Try to load config file
if config_path is None:
# Check common locations
locations = [
Path.cwd() / "cad-documenter.toml",
Path.cwd() / ".cad-documenter.toml",
Path.home() / ".cad-documenter.toml",
Path.home() / ".config" / "cad-documenter" / "config.toml",
]
for loc in locations:
if loc.exists():
config_path = loc
break
if config_path and config_path.exists():
with open(config_path, "rb") as f:
data = tomllib.load(f)
# API config
if "api" in data:
api_data = data["api"]
config.api.provider = api_data.get("provider", config.api.provider)
config.api.api_key = api_data.get("api_key", config.api.api_key)
config.api.vision_model = api_data.get("vision_model", config.api.vision_model)
config.api.text_model = api_data.get("text_model", config.api.text_model)
# Processing config
if "processing" in data:
proc_data = data["processing"]
config.processing.whisper_model = proc_data.get("whisper_model", config.processing.whisper_model)
config.processing.frame_interval = proc_data.get("frame_interval", config.processing.frame_interval)
config.processing.use_scene_detection = proc_data.get("use_scene_detection", config.processing.use_scene_detection)
config.processing.max_frames = proc_data.get("max_frames", config.processing.max_frames)
config.processing.scene_threshold = proc_data.get("scene_threshold", config.processing.scene_threshold)
# Output config
if "output" in data:
out_data = data["output"]
config.output.include_bom = out_data.get("include_bom", config.output.include_bom)
config.output.include_atomizer_hints = out_data.get("include_atomizer_hints", config.output.include_atomizer_hints)
config.output.include_raw_transcript = out_data.get("include_raw_transcript", config.output.include_raw_transcript)
config.output.include_frames = out_data.get("include_frames", config.output.include_frames)
config.output.pdf_template = out_data.get("pdf_template", config.output.pdf_template)
# Override with environment variables
if os.environ.get("CAD_DOC_PROVIDER"):
config.api.provider = os.environ["CAD_DOC_PROVIDER"]
if os.environ.get("OPENAI_API_KEY"):
if config.api.provider == "openai" and not config.api.api_key:
config.api.api_key = os.environ["OPENAI_API_KEY"]
if os.environ.get("ANTHROPIC_API_KEY"):
if config.api.provider == "anthropic" and not config.api.api_key:
config.api.api_key = os.environ["ANTHROPIC_API_KEY"]
if os.environ.get("CAD_DOC_WHISPER_MODEL"):
config.processing.whisper_model = os.environ["CAD_DOC_WHISPER_MODEL"]
return config
def create_default_config(path: Path) -> None:
"""Create a default config file."""
content = '''# CAD-Documenter Configuration
[api]
# Vision API provider: "openai" or "anthropic"
provider = "openai"
# API key (or set OPENAI_API_KEY / ANTHROPIC_API_KEY environment variable)
# api_key = "sk-..."
# Model overrides (optional - uses provider defaults if not set)
# vision_model = "gpt-4o"
# text_model = "gpt-4o-mini"
[processing]
# Whisper model for transcription: tiny, base, small, medium, large
whisper_model = "base"
# Seconds between frame extractions (if not using scene detection)
frame_interval = 2.0
# Use scene change detection for smarter frame selection
use_scene_detection = true
# Maximum frames to send to vision API
max_frames = 15
# Scene detection sensitivity (0.0-1.0, lower = more sensitive)
scene_threshold = 0.3
[output]
# Include Bill of Materials in documentation
include_bom = true
# Include Atomizer FEA hints
include_atomizer_hints = true
# Include raw transcript at end of documentation
include_raw_transcript = true
# Include extracted frames in output directory
include_frames = true
# PDF template name (for --pdf option)
pdf_template = "default"
'''
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)

View File

@@ -1,11 +1,13 @@
"""Documentation generator - produces markdown and PDF output."""
"""Documentation generator - produces markdown, JSON, and PDF output."""
import json
import subprocess
from pathlib import Path
from datetime import datetime
from jinja2 import Environment, FileSystemLoader, BaseLoader
from .vision_analyzer import ComponentAnalysis, Component
from .vision_analyzer import ComponentAnalysis, Component, AtomizerHint
# Default template embedded in code (can be overridden by files)
@@ -13,6 +15,7 @@ DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation
**Generated:** {{ timestamp }}
**Source:** Video walkthrough documentation
**Components:** {{ analysis.components | length }}
---
@@ -48,7 +51,7 @@ DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation
{% endfor %}
{% endif %}
{% if component.best_frame %}
{% if component.best_frame and include_images %}
![{{ component.name }}](frames/{{ component.best_frame.path.name }})
{% endif %}
@@ -60,13 +63,24 @@ DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation
{% endfor %}
{% if bom %}
{% if bom and analysis.components %}
## Bill of Materials
| Item | P/N | Name | Qty | Material | Notes |
|------|-----|------|-----|----------|-------|
{% for component in analysis.components %}
| {{ loop.index }} | {{ component.part_number or 'TBD' }} | {{ component.name }} | 1 | {{ component.material or 'TBD' }} | {{ component.function }} |
| {{ loop.index }} | {{ component.part_number or 'TBD' }} | {{ component.name }} | 1 | {{ component.material or 'TBD' }} | {{ component.function or '-' }} |
{% endfor %}
{% endif %}
{% if analysis.relationships %}
## Assembly Relationships
| From | To | Connection |
|------|----|------------|
{% for rel in analysis.relationships %}
| {{ rel.from_component }} | {{ rel.to_component }} | {{ rel.relationship_type }} |
{% endfor %}
{% endif %}
@@ -78,28 +92,39 @@ DEFAULT_TEMPLATE = '''# {{ analysis.assembly_name }} - Technical Documentation
{% endif %}
{% if atomizer_hints %}
{% if atomizer_hints and analysis.atomizer_hints %}
## Atomizer FEA Hints
Based on the video walkthrough, the following optimization parameters are suggested:
The following optimization parameters and constraints were identified from the video walkthrough:
```json
{
"model_understanding": {
"components": {{ component_names | tojson }},
"materials_mentioned": {{ materials | tojson }}
},
"suggested_study": {
"objectives": [
{"name": "mass", "direction": "minimize"}
],
"constraints_likely": []
}
}
```
### Objectives
{% for hint in analysis.atomizer_hints if hint.hint_type == 'objective' %}
- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }}
{% endfor %}
### Constraints
{% for hint in analysis.atomizer_hints if hint.hint_type == 'constraint' %}
- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }}
{% endfor %}
### Parameters
{% for hint in analysis.atomizer_hints if hint.hint_type == 'parameter' %}
- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }}
{% endfor %}
### Load Cases
{% for hint in analysis.atomizer_hints if hint.hint_type == 'load_case' %}
- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }}
{% endfor %}
### Materials
{% for hint in analysis.atomizer_hints if hint.hint_type == 'material' %}
- **[{{ "%.1f"|format(hint.timestamp) }}s]** {{ hint.text }}
{% endfor %}
{% endif %}
{% if include_transcript %}
---
## Raw Transcript
@@ -110,19 +135,26 @@ Based on the video walkthrough, the following optimization parameters are sugges
{{ analysis.raw_transcript }}
</details>
{% endif %}
---
*Documentation generated by CAD-Documenter*
*Documentation generated by CAD-Documenter v{{ version }}*
'''
class DocGenerator:
"""Generates documentation from analysis results."""
def __init__(self, output_dir: Path, template_dir: Path | None = None):
def __init__(
self,
output_dir: Path,
template_dir: Path | None = None,
version: str = "0.2.0"
):
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self.version = version
# Setup Jinja environment
if template_dir and template_dir.exists():
@@ -136,11 +168,16 @@ class DocGenerator:
atomizer_hints: bool = False,
bom: bool = False,
template_name: str | None = None,
include_images: bool = True,
include_transcript: bool = True,
) -> Path:
"""Generate markdown documentation."""
# Load template
if template_name:
try:
template = self.env.get_template(f"{template_name}.md.j2")
except Exception:
template = self.env.from_string(DEFAULT_TEMPLATE)
else:
template = self.env.from_string(DEFAULT_TEMPLATE)
@@ -150,6 +187,9 @@ class DocGenerator:
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
"atomizer_hints": atomizer_hints,
"bom": bom,
"include_images": include_images,
"include_transcript": include_transcript,
"version": self.version,
"component_names": [c.name for c in analysis.components],
"materials": list(set(c.material for c in analysis.components if c.material)),
}
@@ -163,56 +203,240 @@ class DocGenerator:
return output_path
def generate_pdf(self, markdown_path: Path) -> Path:
"""
Generate PDF from markdown using Atomaste Report Standard.
Requires the atomaste-reports skill/Typst to be available.
"""
import subprocess
pdf_path = markdown_path.with_suffix(".pdf")
# Try to use Atomaste Report Standard if available
# Otherwise fall back to pandoc
try:
# Check if atomaste build script exists
build_script = Path("/home/papa/Atomaste/Templates/Atomaste_Report_Standard/scripts/build-report.py")
if build_script.exists():
cmd = ["python3", str(build_script), str(markdown_path), "-o", str(pdf_path)]
else:
# Fallback to pandoc
cmd = ["pandoc", str(markdown_path), "-o", str(pdf_path)]
subprocess.run(cmd, capture_output=True, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"PDF generation failed: {e}")
return pdf_path
def generate_atomizer_hints(self, analysis: ComponentAnalysis) -> Path:
"""Generate standalone Atomizer hints JSON file."""
import json
"""
Generate standalone Atomizer hints JSON file.
This file can be used by Atomizer to pre-configure FEA studies.
"""
# Extract objectives from hints
objectives = []
constraints = []
parameters = []
load_cases = []
materials_mentioned = []
for hint in analysis.atomizer_hints:
item = {
"timestamp": hint.timestamp,
"text": hint.text,
}
if hint.hint_type == "objective":
# Try to parse objective direction
text_lower = hint.text.lower()
if "minimize" in text_lower or "reduce" in text_lower:
direction = "minimize"
elif "maximize" in text_lower or "increase" in text_lower:
direction = "maximize"
else:
direction = "minimize" # default
# Try to identify what to optimize
if "mass" in text_lower or "weight" in text_lower:
objectives.append({"name": "mass", "direction": direction, "source": hint.text})
elif "stress" in text_lower:
objectives.append({"name": "max_stress", "direction": direction, "source": hint.text})
elif "stiff" in text_lower or "displacement" in text_lower:
objectives.append({"name": "max_displacement", "direction": direction, "source": hint.text})
else:
objectives.append({"name": "unknown", "direction": direction, "source": hint.text})
elif hint.hint_type == "constraint":
constraints.append(item)
elif hint.hint_type == "parameter":
parameters.append(item)
elif hint.hint_type == "load_case":
load_cases.append(item)
elif hint.hint_type == "material":
materials_mentioned.append(hint.text)
hints = {
"model_understanding": {
"generated": datetime.now().isoformat(),
"assembly_name": analysis.assembly_name,
"model_understanding": {
"components": [c.name for c in analysis.components],
"materials_mentioned": list(set(c.material for c in analysis.components if c.material)),
"functions": {c.name: c.function for c in analysis.components if c.function},
"component_details": [
{
"name": c.name,
"function": c.function,
"material": c.material,
"features": c.features,
}
for c in analysis.components
],
"materials_mentioned": list(set(
[c.material for c in analysis.components if c.material] +
materials_mentioned
)),
"relationships": [
{
"from": r.from_component,
"to": r.to_component,
"type": r.relationship_type,
}
for r in analysis.relationships
],
},
"suggested_spec": {
"objectives": [
{"name": "mass", "direction": "minimize"}
],
"parameters_likely": [],
"constraints_likely": [],
"objectives": objectives or [{"name": "mass", "direction": "minimize"}],
"parameters_mentioned": parameters,
"constraints_mentioned": constraints,
},
"transcript_highlights": [],
"load_cases": load_cases,
"transcript_highlights": [
{
"timestamp": f"{h.timestamp:.1f}s",
"text": h.text,
"type": h.hint_type,
}
for h in analysis.atomizer_hints[:20] # Limit to top 20
],
}
output_path = self.output_dir / "atomizer_hints.json"
output_path.write_text(json.dumps(hints, indent=2))
return output_path
def generate_bom(self, analysis: ComponentAnalysis) -> Path:
"""Generate standalone Bill of Materials CSV."""
lines = ["Item,Part Number,Name,Quantity,Material,Function,Notes"]
for i, comp in enumerate(analysis.components, 1):
# Escape commas in fields
name = comp.name.replace(",", ";")
function = (comp.function or "").replace(",", ";")
material = (comp.material or "TBD").replace(",", ";")
pn = comp.part_number or "TBD"
lines.append(f'{i},{pn},"{name}",1,{material},"{function}",""')
output_path = self.output_dir / "bom.csv"
output_path.write_text("\n".join(lines))
return output_path
def generate_component_json(self, analysis: ComponentAnalysis) -> Path:
"""Generate JSON export of all component data."""
data = {
"assembly_name": analysis.assembly_name,
"generated": datetime.now().isoformat(),
"summary": analysis.summary,
"components": [
{
"name": c.name,
"description": c.description,
"function": c.function,
"material": c.material,
"part_number": c.part_number,
"features": c.features,
"confidence": c.confidence,
"frame_timestamp": c.best_frame.timestamp if c.best_frame else None,
"transcript_excerpt": c.transcript_excerpt,
}
for c in analysis.components
],
"relationships": [
{
"from": r.from_component,
"to": r.to_component,
"type": r.relationship_type,
}
for r in analysis.relationships
],
}
output_path = self.output_dir / "components.json"
output_path.write_text(json.dumps(data, indent=2))
return output_path
def generate_pdf(self, markdown_path: Path) -> Path:
"""
Generate PDF from markdown using Atomaste Report Standard or pandoc.
Requires the atomaste-reports skill/Typst to be available.
"""
pdf_path = markdown_path.with_suffix(".pdf")
# Try Atomaste Report Standard first
atomaste_script = Path("/home/papa/Atomaste/Templates/Atomaste_Report_Standard/scripts/build-report.py")
if atomaste_script.exists():
try:
cmd = [
"python3", str(atomaste_script),
str(markdown_path), "-o", str(pdf_path)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0 and pdf_path.exists():
return pdf_path
except subprocess.TimeoutExpired:
pass
except Exception:
pass
# Try pandoc with a nice template
try:
cmd = [
"pandoc",
str(markdown_path),
"-o", str(pdf_path),
"--pdf-engine=xelatex",
"-V", "geometry:margin=1in",
"-V", "fontsize=11pt",
"--toc",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0 and pdf_path.exists():
return pdf_path
except Exception:
pass
# Final fallback: basic pandoc
try:
cmd = ["pandoc", str(markdown_path), "-o", str(pdf_path)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
return pdf_path
except Exception as e:
raise RuntimeError(f"PDF generation failed: {e}")
raise RuntimeError("PDF generation failed - no suitable converter found")
def generate_all(
self,
analysis: ComponentAnalysis,
pdf: bool = False,
include_images: bool = True,
) -> dict[str, Path]:
"""
Generate all documentation outputs.
Returns dict mapping output type to file path.
"""
outputs = {}
# Always generate markdown
outputs["markdown"] = self.generate(
analysis,
atomizer_hints=True,
bom=True,
include_images=include_images,
)
# Generate supporting files
outputs["atomizer_hints"] = self.generate_atomizer_hints(analysis)
outputs["bom"] = self.generate_bom(analysis)
outputs["components"] = self.generate_component_json(analysis)
# Generate PDF if requested
if pdf:
try:
outputs["pdf"] = self.generate_pdf(outputs["markdown"])
except Exception as e:
# Log but don't fail
outputs["pdf_error"] = str(e)
return outputs

View File

@@ -1,45 +1,245 @@
"""Main documentation pipeline orchestrator."""
"""Main documentation pipeline orchestrator with comprehensive error handling."""
import shutil
from pathlib import Path
from dataclasses import dataclass, field
from typing import Callable
from enum import Enum
from .video_processor import VideoProcessor, FrameInfo
from .config import Config, load_config
from .video_processor import VideoProcessor, FrameInfo, VideoMetadata
from .audio_analyzer import AudioAnalyzer, Transcript
from .vision_analyzer import VisionAnalyzer, ComponentAnalysis
from .doc_generator import DocGenerator
@dataclass
class PipelineConfig:
"""Pipeline configuration."""
frame_interval: float = 2.0
whisper_model: str = "base"
vision_model: str = "gpt-4o" # or local model
class PipelineStage(Enum):
"""Pipeline processing stages."""
INIT = "initialization"
FRAMES = "frame_extraction"
TRANSCRIPTION = "transcription"
ANALYSIS = "vision_analysis"
DOCUMENTATION = "documentation"
PDF = "pdf_generation"
COMPLETE = "complete"
@dataclass
class DocumentationPipeline:
"""Orchestrates the full documentation pipeline."""
class PipelineProgress:
"""Progress tracking for the pipeline."""
stage: PipelineStage
message: str
progress: float # 0.0 to 1.0
error: str | None = None
video_path: Path
@dataclass
class PipelineResult:
"""Result of pipeline execution."""
success: bool
output_dir: Path
frame_interval: float = 2.0
whisper_model: str = "base"
documentation_path: Path | None = None
pdf_path: Path | None = None
atomizer_hints_path: Path | None = None
bom_path: Path | None = None
frames_extracted: int = 0
components_found: int = 0
transcript_duration: float = 0.0
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def __post_init__(self):
self.video_processor = VideoProcessor(self.video_path, self.output_dir / "frames")
self.audio_analyzer = AudioAnalyzer(self.video_path, self.whisper_model)
self.vision_analyzer = VisionAnalyzer()
class DocumentationPipeline:
"""Orchestrates the full documentation pipeline with error recovery."""
def __init__(
self,
video_path: Path,
output_dir: Path,
config: Config | None = None,
progress_callback: Callable[[PipelineProgress], None] | None = None,
):
self.video_path = Path(video_path)
self.output_dir = Path(output_dir)
self.config = config or load_config()
self.progress_callback = progress_callback
# Validate video exists
if not self.video_path.exists():
raise FileNotFoundError(f"Video not found: {self.video_path}")
# Create output directory
self.output_dir.mkdir(parents=True, exist_ok=True)
# Initialize components
self.video_processor = VideoProcessor(
self.video_path,
self.output_dir / "frames",
config=self.config.frame_extraction,
)
self.audio_analyzer = AudioAnalyzer(
self.video_path,
config=self.config.transcription,
)
self.vision_analyzer = VisionAnalyzer(config=self.config.vision)
self.doc_generator = DocGenerator(self.output_dir)
def _report_progress(
self,
stage: PipelineStage,
message: str,
progress: float,
error: str | None = None
):
"""Report progress to callback if available."""
if self.progress_callback:
self.progress_callback(PipelineProgress(
stage=stage,
message=message,
progress=progress,
error=error,
))
def run(
self,
frames_only: bool = False,
skip_transcription: bool = False,
atomizer_hints: bool = False,
bom: bool = False,
pdf: bool = False,
) -> PipelineResult:
"""
Run the full documentation pipeline.
Args:
frames_only: Stop after frame extraction
skip_transcription: Skip audio transcription (vision-only)
atomizer_hints: Generate Atomizer FEA hints
bom: Generate Bill of Materials
pdf: Generate PDF output
Returns:
PipelineResult with paths and statistics
"""
result = PipelineResult(
success=False,
output_dir=self.output_dir,
)
try:
# Stage 1: Extract frames
self._report_progress(
PipelineStage.FRAMES,
"Extracting video frames...",
0.1
)
frames = self.extract_frames()
result.frames_extracted = len(frames)
if not frames:
result.errors.append("No frames could be extracted from video")
return result
if frames_only:
result.success = True
self._report_progress(
PipelineStage.COMPLETE,
f"Extracted {len(frames)} frames",
1.0
)
return result
# Stage 2: Transcribe audio
self._report_progress(
PipelineStage.TRANSCRIPTION,
"Transcribing audio...",
0.3
)
if skip_transcription:
transcript = Transcript(segments=[], full_text="[Transcription skipped]")
result.warnings.append("Transcription was skipped")
else:
transcript = self.transcribe_audio_safe()
result.transcript_duration = transcript.duration
if not transcript.segments:
result.warnings.append("No audio or empty transcript")
# Stage 3: Analyze components
self._report_progress(
PipelineStage.ANALYSIS,
"Analyzing components with AI vision...",
0.5
)
analysis = self.analyze_components(frames, transcript)
result.components_found = len(analysis.components)
if not analysis.components:
result.warnings.append("No components identified - check video quality")
# Stage 4: Generate documentation
self._report_progress(
PipelineStage.DOCUMENTATION,
"Generating documentation...",
0.7
)
outputs = self.generate_documentation(
analysis,
atomizer_hints=atomizer_hints,
bom=bom,
)
result.documentation_path = outputs.get("markdown")
result.atomizer_hints_path = outputs.get("atomizer_hints")
result.bom_path = outputs.get("bom")
# Stage 5: Generate PDF (optional)
if pdf:
self._report_progress(
PipelineStage.PDF,
"Generating PDF...",
0.9
)
try:
result.pdf_path = self.generate_pdf(result.documentation_path)
except Exception as e:
result.warnings.append(f"PDF generation failed: {e}")
result.success = True
self._report_progress(
PipelineStage.COMPLETE,
f"Complete! {result.components_found} components documented",
1.0
)
except Exception as e:
result.errors.append(str(e))
self._report_progress(
PipelineStage.COMPLETE,
f"Pipeline failed: {e}",
1.0,
error=str(e)
)
return result
def extract_frames(self) -> list[FrameInfo]:
"""Extract key frames from video."""
return self.video_processor.extract_frames(interval=self.frame_interval)
"""Extract key frames from video using configured mode."""
return self.video_processor.extract_frames()
def transcribe_audio(self) -> Transcript:
"""Transcribe audio track."""
"""Transcribe audio track (raises on error)."""
return self.audio_analyzer.transcribe()
def transcribe_audio_safe(self) -> Transcript:
"""Transcribe audio track with fallback for missing audio."""
return self.audio_analyzer.transcribe_with_fallback()
def analyze_components(
self, frames: list[FrameInfo], transcript: Transcript
) -> ComponentAnalysis:
@@ -51,14 +251,82 @@ class DocumentationPipeline:
analysis: ComponentAnalysis,
atomizer_hints: bool = False,
bom: bool = False,
) -> Path:
"""Generate markdown documentation."""
return self.doc_generator.generate(
) -> dict[str, Path]:
"""Generate all documentation outputs."""
outputs = {}
# Generate markdown
outputs["markdown"] = self.doc_generator.generate(
analysis,
atomizer_hints=atomizer_hints,
bom=bom,
)
# Generate Atomizer hints
if atomizer_hints:
outputs["atomizer_hints"] = self.doc_generator.generate_atomizer_hints(analysis)
# Generate BOM
if bom:
outputs["bom"] = self.doc_generator.generate_bom(analysis)
# Generate component JSON
outputs["components"] = self.doc_generator.generate_component_json(analysis)
return outputs
def generate_pdf(self, markdown_path: Path) -> Path:
"""Generate PDF from markdown using Atomaste Report Standard."""
return self.doc_generator.generate_pdf(markdown_path)
def get_video_metadata(self) -> VideoMetadata:
"""Get video metadata."""
return self.video_processor.get_metadata()
def cleanup(self, keep_frames: bool = True, keep_audio: bool = False):
"""
Clean up temporary files.
Args:
keep_frames: Keep extracted frame images
keep_audio: Keep extracted audio file
"""
if not keep_frames:
frames_dir = self.output_dir / "frames"
if frames_dir.exists():
shutil.rmtree(frames_dir)
if not keep_audio:
audio_file = self.output_dir / "audio.wav"
if audio_file.exists():
audio_file.unlink()
def create_pipeline(
video_path: str | Path,
output_dir: str | Path | None = None,
config_path: Path | None = None,
) -> DocumentationPipeline:
"""
Factory function to create a documentation pipeline.
Args:
video_path: Path to input video
output_dir: Output directory (defaults to video_name_docs)
config_path: Path to config file (optional)
Returns:
Configured DocumentationPipeline
"""
video_path = Path(video_path)
if output_dir is None:
output_dir = video_path.parent / f"{video_path.stem}_docs"
config = load_config(config_path)
return DocumentationPipeline(
video_path=video_path,
output_dir=Path(output_dir),
config=config,
)

View File

@@ -2,6 +2,7 @@
import subprocess
import json
import re
from pathlib import Path
from dataclasses import dataclass
@@ -17,13 +18,18 @@ class FrameInfo:
class VideoProcessor:
"""Handles video frame extraction using ffmpeg."""
def __init__(self, video_path: Path, output_dir: Path):
def __init__(self, video_path: Path, output_dir: Path, scene_threshold: float = 0.3):
self.video_path = video_path
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True)
self.scene_threshold = scene_threshold
self._duration: float | None = None
def get_duration(self) -> float:
"""Get video duration in seconds."""
if self._duration is not None:
return self._duration
cmd = [
"ffprobe", "-v", "quiet",
"-print_format", "json",
@@ -32,7 +38,8 @@ class VideoProcessor:
]
result = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(result.stdout)
return float(data["format"]["duration"])
self._duration = float(data["format"]["duration"])
return self._duration
def extract_frames(self, interval: float = 2.0) -> list[FrameInfo]:
"""
@@ -44,13 +51,14 @@ class VideoProcessor:
Returns:
List of FrameInfo objects for extracted frames
"""
duration = self.get_duration()
frames = []
# Clear existing frames
for old_frame in self.output_dir.glob("frame_*.jpg"):
old_frame.unlink()
# Use ffmpeg to extract frames at interval
output_pattern = self.output_dir / "frame_%04d.jpg"
cmd = [
"ffmpeg", "-y",
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-i", str(self.video_path),
"-vf", f"fps=1/{interval}",
"-q:v", "2", # High quality JPEG
@@ -59,6 +67,7 @@ class VideoProcessor:
subprocess.run(cmd, capture_output=True)
# Collect extracted frames
frames = []
for i, frame_path in enumerate(sorted(self.output_dir.glob("frame_*.jpg"))):
timestamp = i * interval
frames.append(FrameInfo(
@@ -69,13 +78,117 @@ class VideoProcessor:
return frames
def extract_at_scene_changes(self, max_frames: int = 15, min_interval: float = 1.0) -> list[FrameInfo]:
"""
Extract frames at scene changes (visual transitions).
This is smarter than fixed intervals - it captures when the view changes
(e.g., when the engineer rotates the model or zooms in on a component).
Args:
max_frames: Maximum number of frames to extract
min_interval: Minimum seconds between frames
Returns:
List of FrameInfo objects, or empty list if detection fails
"""
# Clear existing frames
for old_frame in self.output_dir.glob("frame_*.jpg"):
old_frame.unlink()
# Detect scene changes
scene_timestamps = self._detect_scene_changes()
if not scene_timestamps:
return []
# Filter timestamps to ensure minimum interval and max count
filtered_timestamps = self._filter_timestamps(scene_timestamps, max_frames, min_interval)
# Always include first frame (t=0) and last frame
duration = self.get_duration()
if 0.0 not in filtered_timestamps:
filtered_timestamps.insert(0, 0.0)
if duration - filtered_timestamps[-1] > min_interval:
filtered_timestamps.append(duration - 0.5)
# Limit to max_frames
if len(filtered_timestamps) > max_frames:
step = len(filtered_timestamps) / max_frames
filtered_timestamps = [filtered_timestamps[int(i * step)] for i in range(max_frames)]
# Extract frames at these timestamps
frames = []
for i, ts in enumerate(filtered_timestamps):
output_path = self.output_dir / f"frame_{i:04d}.jpg"
cmd = [
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-ss", str(ts),
"-i", str(self.video_path),
"-vframes", "1",
"-q:v", "2",
str(output_path)
]
subprocess.run(cmd, capture_output=True)
if output_path.exists():
frames.append(FrameInfo(
path=output_path,
timestamp=ts,
frame_number=i
))
return frames
def _detect_scene_changes(self) -> list[float]:
"""
Detect scene changes in video using ffmpeg's scene filter.
Returns list of timestamps where significant visual changes occur.
"""
cmd = [
"ffmpeg", "-hide_banner",
"-i", str(self.video_path),
"-vf", f"select='gt(scene,{self.scene_threshold})',showinfo",
"-f", "null", "-"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
# Parse scene change timestamps from ffmpeg output
timestamps = []
for line in result.stderr.split("\n"):
if "pts_time:" in line:
# Extract timestamp using regex
match = re.search(r'pts_time:(\d+\.?\d*)', line)
if match:
ts = float(match.group(1))
timestamps.append(ts)
return sorted(set(timestamps))
def _filter_timestamps(
self, timestamps: list[float], max_count: int, min_interval: float
) -> list[float]:
"""Filter timestamps to ensure minimum interval between frames."""
if not timestamps:
return []
filtered = [timestamps[0]]
for ts in timestamps[1:]:
if ts - filtered[-1] >= min_interval:
filtered.append(ts)
if len(filtered) >= max_count:
break
return filtered
def extract_audio(self, output_path: Path | None = None) -> Path:
"""Extract audio track from video."""
if output_path is None:
output_path = self.output_dir.parent / "audio.wav"
cmd = [
"ffmpeg", "-y",
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
"-i", str(self.video_path),
"-vn", # No video
"-acodec", "pcm_s16le",
@@ -86,27 +199,13 @@ class VideoProcessor:
subprocess.run(cmd, capture_output=True)
return output_path
def detect_scene_changes(self, threshold: float = 0.3) -> list[float]:
"""
Detect scene changes in video.
Returns list of timestamps where significant visual changes occur.
"""
def get_video_info(self) -> dict:
"""Get video metadata."""
cmd = [
"ffmpeg", "-i", str(self.video_path),
"-vf", f"select='gt(scene,{threshold})',showinfo",
"-f", "null", "-"
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_format", "-show_streams",
str(self.video_path)
]
result = subprocess.run(cmd, capture_output=True, text=True)
# Parse scene change timestamps from ffmpeg output
timestamps = []
for line in result.stderr.split("\n"):
if "pts_time:" in line:
# Extract timestamp
parts = line.split("pts_time:")
if len(parts) > 1:
ts = float(parts[1].split()[0])
timestamps.append(ts)
return timestamps
return json.loads(result.stdout)

View File

@@ -1,10 +1,15 @@
"""Vision analysis module - component detection and feature extraction."""
"""Vision analysis module - AI-powered component detection and feature extraction."""
import base64
import json
import re
from pathlib import Path
from dataclasses import dataclass, field
from typing import Any
from .video_processor import FrameInfo
from .audio_analyzer import Transcript
from .config import VisionConfig
@dataclass
@@ -18,6 +23,24 @@ class Component:
best_frame: FrameInfo | None = None
transcript_excerpt: str = ""
part_number: str = "" # For Part Manager integration
confidence: float = 0.0
bounding_box: list[int] | None = None
@dataclass
class AssemblyRelationship:
"""Relationship between components."""
from_component: str
to_component: str
relationship_type: str # bolted, welded, press-fit, etc.
@dataclass
class AtomizerHint:
"""Hints for FEA/optimization setup."""
timestamp: float
text: str
hint_type: str # objective, constraint, parameter, load_case, material
@dataclass
@@ -26,15 +49,146 @@ class ComponentAnalysis:
assembly_name: str
summary: str
components: list[Component]
relationships: list[AssemblyRelationship] = field(default_factory=list)
atomizer_hints: list[AtomizerHint] = field(default_factory=list)
assembly_notes: str = ""
raw_transcript: str = ""
class VisionAnalyzer:
"""Analyzes frames to identify components and features."""
"""Analyzes frames to identify components and features using AI vision APIs."""
def __init__(self, model: str = "gpt-4o"):
self.model = model
def __init__(self, config: VisionConfig | None = None):
self.config = config or VisionConfig()
self._client = None
self._prompts_dir = Path(__file__).parent.parent.parent / "prompts"
def _get_client(self):
"""Lazy-load the appropriate API client."""
if self._client is not None:
return self._client
if self.config.provider == "anthropic":
try:
import anthropic
self._client = anthropic.Anthropic(api_key=self.config.anthropic_api_key)
except ImportError:
raise ImportError("Install anthropic: pip install anthropic")
elif self.config.provider == "openai":
try:
import openai
self._client = openai.OpenAI(api_key=self.config.openai_api_key)
except ImportError:
raise ImportError("Install openai: pip install openai")
else:
raise ValueError(f"Unknown provider: {self.config.provider}")
return self._client
def _encode_image(self, image_path: Path) -> tuple[str, str]:
"""Encode image to base64 and detect media type."""
data = image_path.read_bytes()
encoded = base64.standard_b64encode(data).decode("utf-8")
suffix = image_path.suffix.lower()
media_type = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
}.get(suffix, "image/jpeg")
return encoded, media_type
def _load_prompt(self, name: str) -> str:
"""Load a prompt template."""
prompt_file = self._prompts_dir / f"{name}.txt"
if prompt_file.exists():
return prompt_file.read_text()
return ""
def _call_vision_api(
self,
images: list[tuple[str, str]], # List of (base64_data, media_type)
prompt: str,
system_prompt: str = ""
) -> str:
"""Call the vision API with images and prompt."""
client = self._get_client()
if self.config.provider == "anthropic":
# Build Anthropic message content
content = []
for img_data, media_type in images:
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": img_data,
}
})
content.append({"type": "text", "text": prompt})
messages = [{"role": "user", "content": content}]
response = client.messages.create(
model=self.config.model,
max_tokens=self.config.max_tokens,
system=system_prompt if system_prompt else "You are an expert mechanical engineer analyzing CAD models.",
messages=messages,
)
return response.content[0].text
elif self.config.provider == "openai":
# Build OpenAI message content
content = []
for img_data, media_type in images:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{img_data}",
"detail": "high"
}
})
content.append({"type": "text", "text": prompt})
messages = [
{"role": "system", "content": system_prompt or "You are an expert mechanical engineer analyzing CAD models."},
{"role": "user", "content": content}
]
response = client.chat.completions.create(
model=self.config.model,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature,
messages=messages,
)
return response.choices[0].message.content
raise ValueError(f"Unknown provider: {self.config.provider}")
def _parse_json_response(self, response: str) -> dict:
"""Extract JSON from API response."""
# Try to find JSON in code blocks first
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Try to find raw JSON object
json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
# Return empty dict if parsing fails
return {}
def analyze(
self, frames: list[FrameInfo], transcript: Transcript
@@ -42,21 +196,41 @@ class VisionAnalyzer:
"""
Analyze frames and transcript to identify components.
This is where the AI magic happens - correlating visual and verbal info.
This correlates visual analysis with verbal explanations.
"""
# For MVP, we'll use a multi-modal approach:
# 1. Send key frames to vision model with transcript context
# 2. Ask it to identify components and correlate with verbal descriptions
if not frames:
return ComponentAnalysis(
assembly_name="Unknown Assembly",
summary="No frames available for analysis.",
components=[],
raw_transcript=transcript.full_text,
)
# Placeholder implementation - will be enhanced with actual AI calls
# Step 1: Analyze key frames to identify components
components = self._identify_components(frames, transcript)
summary = self._generate_summary(components, transcript)
# Step 2: Extract assembly name from transcript or vision
assembly_name = self._extract_assembly_name(transcript, frames)
# Step 3: Generate summary
summary = self._generate_summary(components, transcript, frames)
# Step 4: Extract relationships between components
relationships = self._extract_relationships(components, transcript)
# Step 5: Extract Atomizer hints for FEA setup
atomizer_hints = self._extract_atomizer_hints(transcript)
# Step 6: Extract assembly notes
assembly_notes = self._extract_assembly_notes(transcript)
return ComponentAnalysis(
assembly_name=self._extract_assembly_name(transcript),
assembly_name=assembly_name,
summary=summary,
components=components,
assembly_notes=self._extract_assembly_notes(transcript),
relationships=relationships,
atomizer_hints=atomizer_hints,
assembly_notes=assembly_notes,
raw_transcript=transcript.full_text,
)
@@ -64,48 +238,340 @@ class VisionAnalyzer:
self, frames: list[FrameInfo], transcript: Transcript
) -> list[Component]:
"""Identify individual components from frames + transcript."""
# TODO: Implement vision API calls
# For now, return empty list - will be implemented in Phase 1
# Select key frames for analysis (don't send all)
key_frames = self._select_key_frames(frames, max_frames=8)
if not key_frames:
return []
# Prepare images
images = []
for frame in key_frames:
try:
encoded, media_type = self._encode_image(frame.path)
images.append((encoded, media_type))
except Exception:
continue
if not images:
return []
# Build prompt with transcript context
component_prompt = self._load_prompt("component_analysis")
# Add transcript context to prompt
prompt = f"""{component_prompt}
## Transcript from the video walkthrough:
{transcript.full_text[:4000]} # Limit transcript length
## Frame timestamps analyzed:
{[f.timestamp for f in key_frames]}
Please analyze the frames and identify all visible components, correlating with the transcript."""
try:
response = self._call_vision_api(images, prompt)
parsed = self._parse_json_response(response)
# Convert parsed response to Component objects
components = []
for comp_data in parsed.get("components", []):
# Find best frame for this component
best_frame = key_frames[0] if key_frames else None
# Find transcript excerpt
excerpt = ""
for match in parsed.get("transcript_matches", []):
if match.get("component", "").lower() == comp_data.get("name", "").lower():
excerpt = match.get("excerpt", "")
break
components.append(Component(
name=comp_data.get("name", "Unknown"),
description=comp_data.get("description", ""),
function=comp_data.get("function", ""),
material=comp_data.get("material", ""),
features=comp_data.get("features", []),
best_frame=best_frame,
transcript_excerpt=excerpt,
confidence=comp_data.get("confidence", 0.8),
bounding_box=comp_data.get("bounding_box"),
))
# If no components parsed, create at least one from transcript
if not components and transcript.full_text:
components = self._components_from_transcript(transcript, key_frames)
return components
except Exception as e:
# Fallback to transcript-only extraction
return self._components_from_transcript(transcript, key_frames)
def _components_from_transcript(
self, transcript: Transcript, frames: list[FrameInfo]
) -> list[Component]:
"""Extract component mentions from transcript when vision fails."""
components = []
# Keywords indicating component mentions
patterns = [
(r"this is (?:the|a) ([^,\.]+)", "component"),
(r"(?:the|a) ([^,\.]+) (?:is|provides|handles)", "component"),
(r"([^,\.]+) bracket", "bracket"),
(r"([^,\.]+) mount", "mount"),
(r"([^,\.]+) housing", "housing"),
(r"([^,\.]+) plate", "plate"),
]
text = transcript.full_text
found_names = set()
for pattern, comp_type in patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
name = match.group(1).strip()
if len(name) > 2 and len(name) < 50 and name.lower() not in found_names:
found_names.add(name.lower())
components.append(Component(
name=name.title(),
description=f"Identified from transcript",
function="",
material="",
best_frame=frames[0] if frames else None,
confidence=0.5,
))
return components[:10] # Limit to 10 components
def _select_key_frames(
self, frames: list[FrameInfo], max_frames: int = 8
) -> list[FrameInfo]:
"""Select the most representative frames for analysis."""
if len(frames) <= max_frames:
return frames
# Evenly distribute frame selection
step = len(frames) / max_frames
indices = [int(i * step) for i in range(max_frames)]
return [frames[i] for i in indices]
def _generate_summary(
self, components: list[Component], transcript: Transcript
self, components: list[Component], transcript: Transcript, frames: list[FrameInfo]
) -> str:
"""Generate executive summary of the assembly."""
# TODO: Implement with LLM
return f"Assembly documentation generated from video walkthrough. {len(components)} components identified."
if not frames:
return "No visual information available for summary."
def _extract_assembly_name(self, transcript: Transcript) -> str:
"""Try to extract assembly name from transcript."""
# Look for common patterns
summary_prompt = self._load_prompt("summary_generation")
# Build component list for context
comp_list = "\n".join([
f"- {c.name}: {c.function or c.description}"
for c in components
])
prompt = f"""{summary_prompt}
## Identified Components:
{comp_list if comp_list else "Components being analyzed..."}
## Full Transcript:
{transcript.full_text[:3000]}
Generate a professional 2-3 paragraph executive summary."""
# Include one representative frame
try:
encoded, media_type = self._encode_image(frames[0].path)
response = self._call_vision_api([(encoded, media_type)], prompt)
# Clean up response - remove JSON or code blocks
summary = re.sub(r'```.*?```', '', response, flags=re.DOTALL)
summary = summary.strip()
if summary:
return summary
except Exception:
pass
# Fallback summary
comp_count = len(components)
return f"This assembly documentation was generated from a video walkthrough. {comp_count} components were identified through visual and transcript analysis."
def _extract_assembly_name(
self, transcript: Transcript, frames: list[FrameInfo]
) -> str:
"""Try to extract assembly name from transcript or vision."""
text = transcript.full_text.lower()
patterns = ["this is the", "presenting the", "looking at the", "reviewing the"]
# Common patterns for assembly names
patterns = [
r"this is the ([^,\.]+)",
r"presenting the ([^,\.]+)",
r"looking at the ([^,\.]+)",
r"reviewing the ([^,\.]+)",
r"the ([^,\.]+) assembly",
r"([^,\.]+) design review",
]
for pattern in patterns:
if pattern in text:
idx = text.find(pattern) + len(pattern)
name = transcript.full_text[idx:idx + 50].strip().split(".")[0]
return name.strip()
match = re.search(pattern, text)
if match:
name = match.group(1).strip()
if len(name) > 2 and len(name) < 50:
return name.title()
return "Untitled Assembly"
def _extract_assembly_notes(self, transcript: Transcript) -> str:
"""Extract assembly-related notes from transcript."""
# Look for assembly instructions in transcript
keywords = ["assemble", "install", "mount", "attach", "connect"]
keywords = ["assemble", "install", "mount", "attach", "connect", "fasten",
"torque", "sequence", "order", "first", "then", "finally"]
notes = []
for seg in transcript.segments:
if any(kw in seg.text.lower() for kw in keywords):
notes.append(seg.text)
return " ".join(notes) if notes else ""
def analyze_single_frame(self, frame: FrameInfo, context: str = "") -> dict:
def _extract_relationships(
self, components: list[Component], transcript: Transcript
) -> list[AssemblyRelationship]:
"""Extract relationships between components from transcript."""
relationships = []
# Relationship keywords
rel_patterns = [
(r"([^,\.]+) (?:is )?bolted to ([^,\.]+)", "bolted"),
(r"([^,\.]+) (?:is )?welded to ([^,\.]+)", "welded"),
(r"([^,\.]+) (?:is )?press.?fit (?:into|to) ([^,\.]+)", "press-fit"),
(r"([^,\.]+) (?:is )?attached to ([^,\.]+)", "attached"),
(r"([^,\.]+) connects to ([^,\.]+)", "connected"),
(r"([^,\.]+) mounts (?:on|to) ([^,\.]+)", "mounted"),
]
text = transcript.full_text
for pattern, rel_type in rel_patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
relationships.append(AssemblyRelationship(
from_component=match.group(1).strip().title(),
to_component=match.group(2).strip().title(),
relationship_type=rel_type,
))
return relationships
def _extract_atomizer_hints(self, transcript: Transcript) -> list[AtomizerHint]:
"""Extract optimization/FEA hints from transcript for Atomizer integration."""
hints = []
# Objective keywords
objective_keywords = [
"minimize", "maximize", "reduce", "increase", "optimize",
"lightweight", "stiff", "strong", "efficient"
]
# Constraint keywords
constraint_keywords = [
"must", "cannot", "should not", "limit", "maximum", "minimum",
"at least", "no more than", "constraint", "requirement"
]
# Parameter keywords
parameter_keywords = [
"thickness", "diameter", "length", "width", "height", "radius",
"fillet", "chamfer", "angle", "spacing", "pitch"
]
# Load case keywords
load_keywords = [
"load", "force", "moment", "torque", "pressure", "stress",
"vibration", "thermal", "fatigue", "impact", "cyclic"
]
# Material keywords
material_keywords = [
"aluminum", "steel", "titanium", "plastic", "composite",
"6061", "7075", "304", "316", "carbon fiber", "abs", "pla"
]
for seg in transcript.segments:
text_lower = seg.text.lower()
# Check for objectives
if any(kw in text_lower for kw in objective_keywords):
hints.append(AtomizerHint(
timestamp=seg.start,
text=seg.text,
hint_type="objective"
))
# Check for constraints
elif any(kw in text_lower for kw in constraint_keywords):
hints.append(AtomizerHint(
timestamp=seg.start,
text=seg.text,
hint_type="constraint"
))
# Check for parameters
elif any(kw in text_lower for kw in parameter_keywords):
hints.append(AtomizerHint(
timestamp=seg.start,
text=seg.text,
hint_type="parameter"
))
# Check for load cases
elif any(kw in text_lower for kw in load_keywords):
hints.append(AtomizerHint(
timestamp=seg.start,
text=seg.text,
hint_type="load_case"
))
# Check for materials
elif any(kw in text_lower for kw in material_keywords):
hints.append(AtomizerHint(
timestamp=seg.start,
text=seg.text,
hint_type="material"
))
return hints
def analyze_single_frame(
self, frame: FrameInfo, context: str = ""
) -> dict[str, Any]:
"""
Analyze a single frame for components and features.
Returns dict with detected components, features, and confidence.
"""
# TODO: Implement with vision API
try:
encoded, media_type = self._encode_image(frame.path)
prompt = f"""Analyze this CAD model image and identify:
1. All visible components/parts
2. Notable features (holes, threads, fillets, etc.)
3. Estimated materials based on appearance
4. Any visible dimensions or annotations
{f'Additional context: {context}' if context else ''}
Return a JSON object with components, features, and observations."""
response = self._call_vision_api([(encoded, media_type)], prompt)
return self._parse_json_response(response)
except Exception as e:
return {
"components": [],
"features": [],
"confidence": 0.0
"confidence": 0.0,
"error": str(e)
}

View File

@@ -1,24 +1,58 @@
"""Basic tests for CAD-Documenter pipeline."""
"""Tests for CAD-Documenter pipeline."""
import pytest
from pathlib import Path
def test_imports():
class TestImports:
"""Test that all modules can be imported."""
from cad_documenter import __version__
from cad_documenter.cli import main
from cad_documenter.pipeline import DocumentationPipeline
from cad_documenter.video_processor import VideoProcessor
from cad_documenter.audio_analyzer import AudioAnalyzer
from cad_documenter.vision_analyzer import VisionAnalyzer
from cad_documenter.doc_generator import DocGenerator
def test_version(self):
from cad_documenter import __version__
assert __version__ == "0.1.0"
def test_cli_import(self):
from cad_documenter.cli import main
assert callable(main)
def test_transcript_dataclass():
def test_pipeline_import(self):
from cad_documenter.pipeline import DocumentationPipeline
assert DocumentationPipeline is not None
def test_video_processor_import(self):
from cad_documenter.video_processor import VideoProcessor, FrameInfo
assert VideoProcessor is not None
assert FrameInfo is not None
def test_audio_analyzer_import(self):
from cad_documenter.audio_analyzer import AudioAnalyzer, Transcript, TranscriptSegment
assert AudioAnalyzer is not None
assert Transcript is not None
def test_vision_analyzer_import(self):
from cad_documenter.vision_analyzer import (
VisionAnalyzer, Component, ComponentAnalysis, AtomizerHints
)
assert VisionAnalyzer is not None
assert Component is not None
assert AtomizerHints is not None
def test_doc_generator_import(self):
from cad_documenter.doc_generator import DocGenerator
assert DocGenerator is not None
def test_config_import(self):
from cad_documenter.config import (
Config, load_config, APIConfig, ProcessingConfig, OutputConfig
)
assert Config is not None
assert callable(load_config)
class TestTranscript:
"""Test Transcript dataclass functionality."""
def test_transcript_creation(self):
from cad_documenter.audio_analyzer import Transcript, TranscriptSegment
segments = [
@@ -27,16 +61,70 @@ def test_transcript_dataclass():
TranscriptSegment(start=10.0, end=15.0, text="Made of aluminum"),
]
transcript = Transcript(segments=segments, full_text="This is the main bracket. It holds the motor. Made of aluminum.")
transcript = Transcript(
segments=segments,
full_text="This is the main bracket. It holds the motor. Made of aluminum."
)
# Test get_text_at
assert len(transcript.segments) == 3
assert "bracket" in transcript.full_text
def test_get_text_at(self):
from cad_documenter.audio_analyzer import Transcript, TranscriptSegment
segments = [
TranscriptSegment(start=0.0, end=5.0, text="This is the main bracket"),
TranscriptSegment(start=5.0, end=10.0, text="It holds the motor"),
TranscriptSegment(start=10.0, end=15.0, text="Made of aluminum"),
]
transcript = Transcript(
segments=segments,
full_text="This is the main bracket. It holds the motor. Made of aluminum."
)
# Test getting text at specific timestamp
text = transcript.get_text_at(7.0, window=3.0)
assert "holds the motor" in text
assert "main bracket" in text
def test_get_segment_at(self):
from cad_documenter.audio_analyzer import Transcript, TranscriptSegment
def test_component_dataclass():
segments = [
TranscriptSegment(start=0.0, end=5.0, text="First segment"),
TranscriptSegment(start=5.0, end=10.0, text="Second segment"),
]
transcript = Transcript(segments=segments, full_text="")
seg = transcript.get_segment_at(3.0)
assert seg is not None
assert seg.text == "First segment"
seg = transcript.get_segment_at(7.0)
assert seg is not None
assert seg.text == "Second segment"
def test_search(self):
from cad_documenter.audio_analyzer import Transcript, TranscriptSegment
segments = [
TranscriptSegment(start=0.0, end=5.0, text="The bracket is aluminum"),
TranscriptSegment(start=5.0, end=10.0, text="The motor is steel"),
]
transcript = Transcript(segments=segments, full_text="")
results = transcript.search("aluminum")
assert len(results) == 1
assert results[0][0].text == "The bracket is aluminum"
class TestComponent:
"""Test Component dataclass."""
def test_component_creation(self):
from cad_documenter.vision_analyzer import Component
component = Component(
@@ -49,6 +137,152 @@ def test_component_dataclass():
assert component.name == "Main Bracket"
assert len(component.features) == 2
assert component.material == "Aluminum 6061-T6"
def test_component_defaults(self):
from cad_documenter.vision_analyzer import Component
component = Component(name="Test", description="Test component")
assert component.function == ""
assert component.material == ""
assert component.features == []
assert component.confidence == 0.0
# TODO: Add integration tests with sample videos
class TestAtomizerHints:
"""Test AtomizerHints dataclass."""
def test_hints_creation(self):
from cad_documenter.vision_analyzer import AtomizerHints
hints = AtomizerHints(
objectives=[{"name": "mass", "direction": "minimize"}],
constraints=[{"type": "envelope", "value": "200mm"}],
parameters=["thickness", "fillet_radius"],
critical_regions=[{"feature": "fillet", "concern": "stress_concentration"}],
)
assert len(hints.objectives) == 1
assert hints.objectives[0]["name"] == "mass"
assert "thickness" in hints.parameters
class TestConfig:
"""Test configuration loading."""
def test_default_config(self):
from cad_documenter.config import Config
config = Config()
assert config.api.provider == "openai"
assert config.processing.whisper_model == "base"
assert config.output.include_bom is True
def test_load_config_defaults(self):
from cad_documenter.config import load_config
# Should return defaults when no config file exists
config = load_config(Path("/nonexistent/config.toml"))
assert config.api.provider == "openai"
assert config.processing.frame_interval == 2.0
class TestDocGenerator:
"""Test documentation generation."""
def test_generate_creates_file(self, tmp_path):
from cad_documenter.doc_generator import DocGenerator
from cad_documenter.vision_analyzer import ComponentAnalysis, Component
generator = DocGenerator(tmp_path)
analysis = ComponentAnalysis(
assembly_name="Test Assembly",
summary="This is a test assembly.",
components=[
Component(
name="Test Part",
description="A test part",
material="Steel",
function="Testing",
)
],
)
doc_path = generator.generate(analysis)
assert doc_path.exists()
content = doc_path.read_text()
assert "Test Assembly" in content
assert "Test Part" in content
def test_generate_with_bom(self, tmp_path):
from cad_documenter.doc_generator import DocGenerator
from cad_documenter.vision_analyzer import ComponentAnalysis, Component
generator = DocGenerator(tmp_path)
analysis = ComponentAnalysis(
assembly_name="Test Assembly",
summary="Test",
components=[
Component(name="Part A", description="First", material="Aluminum"),
Component(name="Part B", description="Second", material="Steel"),
],
)
doc_path = generator.generate(analysis, bom=True)
content = doc_path.read_text()
assert "Bill of Materials" in content
assert "Part A" in content
assert "Part B" in content
# Check BOM CSV was created
csv_path = tmp_path / "bom.csv"
assert csv_path.exists()
def test_atomizer_hints_json(self, tmp_path):
from cad_documenter.doc_generator import DocGenerator
from cad_documenter.vision_analyzer import ComponentAnalysis, Component, AtomizerHints
import json
generator = DocGenerator(tmp_path)
analysis = ComponentAnalysis(
assembly_name="Test Assembly",
summary="Test",
components=[
Component(name="Bracket", description="Main bracket", material="Aluminum"),
],
atomizer_hints=AtomizerHints(
objectives=[{"name": "mass", "direction": "minimize"}],
parameters=["thickness"],
),
)
hints_path = generator.generate_atomizer_hints(analysis)
assert hints_path.exists()
hints = json.loads(hints_path.read_text())
assert hints["assembly_name"] == "Test Assembly"
assert len(hints["optimization_hints"]["objectives"]) == 1
# Integration tests (require actual video files)
class TestIntegration:
"""Integration tests - skipped without test videos."""
@pytest.mark.skip(reason="Requires test video file")
def test_full_pipeline(self, tmp_path):
from cad_documenter.pipeline import DocumentationPipeline
video_path = Path("tests/fixtures/sample.mp4")
pipeline = DocumentationPipeline(video_path, tmp_path)
results = pipeline.run_full_pipeline()
assert results["documentation"].exists()