Add iterative project mode for multi-video documentation

New features:
- project.py: Project management with VideoEntry, ComponentKnowledge
  - Tracks videos, components, changes over time
  - Accumulates knowledge across multiple videos
  - Change history tracking per component
  - Atomizer hints merging

- incremental.py: Incremental processing
  - Processes videos in chronological order
  - Detects explicit changes from transcript (supersedes, adds, updates)
  - Merges component info intelligently
  - UnifiedDocumentGenerator for final output

- cli_project.py: New CLI commands
  - cad-doc project init <path> - Create new project
  - cad-doc project add <project> <video> - Add video
  - cad-doc project process <project> - Process pending videos
  - cad-doc project generate <project> - Generate unified docs
  - cad-doc project status <project> - Show project status
  - cad-doc project list-components <project> - List all components

Usage:
  cad-doc project init ./my-bracket
  cp video1.mp4 video2.mp4 ./my-bracket/videos/
  cad-doc project add ./my-bracket videos/video1.mp4
  cad-doc project add ./my-bracket videos/video2.mp4
  cad-doc project process ./my-bracket
  cad-doc project generate ./my-bracket
This commit is contained in:
Mario Lavoie
2026-01-28 00:37:45 +00:00
parent d2e63a335a
commit 3ee0b14a2b
4 changed files with 1086 additions and 0 deletions

View File

@@ -11,6 +11,7 @@ from rich.table import Table
from .pipeline import DocumentationPipeline, PipelineProgress, PipelineStage, create_pipeline
from .config import Config, load_config
from .cli_project import project as project_commands
console = Console()
@@ -380,5 +381,9 @@ def main():
cli()
# Register project subcommands
cli.add_command(project_commands)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,276 @@
"""CLI commands for project management."""
import click
from pathlib import Path
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from .project import Project
from .incremental import IncrementalProcessor, UnifiedDocumentGenerator
from .config import load_config
console = Console()
@click.group()
def project():
"""Manage iterative documentation projects."""
pass
@project.command()
@click.argument("path", type=click.Path(path_type=Path))
@click.option("--name", "-n", help="Project name (defaults to folder name)")
@click.option("--description", "-d", default="", help="Project description")
def init(path: Path, name: str | None, description: str):
"""Create a new documentation project.
Example:
cad-doc project init ./my-bracket-project
"""
if name is None:
name = path.name
try:
proj = Project.create(path, name, description)
console.print(f"[green]✓[/green] Created project: [cyan]{name}[/cyan]")
console.print(f" Location: {path.absolute()}")
console.print()
console.print("Next steps:")
console.print(f" 1. Copy videos to [cyan]{path}/videos/[/cyan]")
console.print(f" 2. Run [cyan]cad-doc project add {path} <video>[/cyan]")
console.print(f" 3. Run [cyan]cad-doc project process {path}[/cyan]")
except ValueError as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
@project.command()
@click.argument("project_path", type=click.Path(exists=True, path_type=Path))
@click.argument("video", type=click.Path(exists=True, path_type=Path))
@click.option("--no-copy", is_flag=True, help="Don't copy video, just reference it")
def add(project_path: Path, video: Path, no_copy: bool):
"""Add a video to the project.
Example:
cad-doc project add ./my-project video.mp4
"""
try:
proj = Project.load(project_path)
entry = proj.add_video(video, copy=not no_copy)
console.print(f"[green]✓[/green] Added: [cyan]{video.name}[/cyan]")
console.print(f" Status: {entry.status}")
pending = len(proj.get_pending_videos())
console.print(f"\nPending videos: {pending}")
console.print(f"Run [cyan]cad-doc project process {project_path}[/cyan] to process")
except FileNotFoundError as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
@project.command()
@click.argument("project_path", type=click.Path(exists=True, path_type=Path))
@click.option("--all", "process_all", is_flag=True, help="Reprocess all videos, not just pending")
def process(project_path: Path, process_all: bool):
"""Process pending videos in the project.
Example:
cad-doc project process ./my-project
"""
try:
proj = Project.load(project_path)
config = load_config()
pending = proj.get_pending_videos()
if not pending:
console.print("[yellow]No pending videos to process[/yellow]")
return
console.print(f"Processing {len(pending)} video(s)...")
console.print()
processor = IncrementalProcessor(proj, config)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
def on_progress(msg):
progress.update(task, description=msg)
task = progress.add_task("Starting...", total=None)
results = processor.process_pending(progress_callback=on_progress)
# Show results
console.print()
console.print(Panel.fit(
f"[bold green]Processing complete![/bold green]\n\n"
f"Videos processed: {results['processed']}\n"
f"New components: {len(results['new_components'])}\n"
f"Updated components: {len(results['updated_components'])}\n"
f"Changes detected: {len(results['changes_detected'])}",
title="Results"
))
if results['new_components']:
console.print("\n[bold]New components:[/bold]")
for name in results['new_components']:
console.print(f" [green]+[/green] {name}")
if results['updated_components']:
console.print("\n[bold]Updated components:[/bold]")
for name in results['updated_components']:
console.print(f" [yellow]~[/yellow] {name}")
if results['errors']:
console.print("\n[bold red]Errors:[/bold red]")
for err in results['errors']:
console.print(f" [red]✗[/red] {err['video']}: {err['error']}")
console.print(f"\nRun [cyan]cad-doc project generate {project_path}[/cyan] to create documentation")
except FileNotFoundError as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
@project.command()
@click.argument("project_path", type=click.Path(exists=True, path_type=Path))
@click.option("--no-history", is_flag=True, help="Don't include change history")
@click.option("--no-bom", is_flag=True, help="Don't include Bill of Materials")
@click.option("--no-atomizer", is_flag=True, help="Don't include Atomizer hints")
@click.option("--pdf", is_flag=True, help="Also generate PDF")
def generate(project_path: Path, no_history: bool, no_bom: bool, no_atomizer: bool, pdf: bool):
"""Generate unified documentation from all processed videos.
Example:
cad-doc project generate ./my-project
"""
try:
proj = Project.load(project_path)
generator = UnifiedDocumentGenerator(proj)
console.print("Generating unified documentation...")
doc_path = generator.generate(
include_history=not no_history,
include_bom=not no_bom,
include_atomizer=not no_atomizer,
)
console.print(f"[green]✓[/green] Documentation: [cyan]{doc_path}[/cyan]")
if pdf:
console.print("[yellow]PDF generation not yet implemented for projects[/yellow]")
# Show summary
status = proj.get_status()
console.print()
console.print(f"Components documented: {status['total_components']}")
console.print(f"From {status['processed']} videos ({status['total_duration']:.1f}s total)")
except FileNotFoundError as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
@project.command()
@click.argument("project_path", type=click.Path(exists=True, path_type=Path))
def status(project_path: Path):
"""Show project status.
Example:
cad-doc project status ./my-project
"""
try:
proj = Project.load(project_path)
status = proj.get_status()
console.print(Panel.fit(
f"[bold]{status['name']}[/bold]",
subtitle=f"Last updated: {status['last_updated'][:16]}"
))
# Videos table
table = Table(title="Videos")
table.add_column("#", style="dim")
table.add_column("Filename")
table.add_column("Status")
table.add_column("Components")
for i, video in enumerate(proj.manifest.videos, 1):
status_style = {
"pending": "yellow",
"processed": "green",
"error": "red",
}.get(video.status, "white")
table.add_row(
str(i),
video.filename,
f"[{status_style}]{video.status}[/{status_style}]",
str(len(video.components_found)) if video.components_found else "-"
)
console.print(table)
# Summary
console.print()
console.print(f"Total videos: {status['total_videos']} ({status['pending']} pending)")
console.print(f"Components: {status['total_components']}")
console.print(f"Total duration: {status['total_duration']:.1f}s")
console.print(f"Total frames: {status['total_frames']}")
if status['pending'] > 0:
console.print()
console.print(f"[yellow]Run [cyan]cad-doc project process {project_path}[/cyan] to process pending videos[/yellow]")
except FileNotFoundError as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)
@project.command("list-components")
@click.argument("project_path", type=click.Path(exists=True, path_type=Path))
def list_components(project_path: Path):
"""List all components in the project.
Example:
cad-doc project list-components ./my-project
"""
try:
proj = Project.load(project_path)
components = proj.get_all_components()
if not components:
console.print("[yellow]No components found. Process some videos first.[/yellow]")
return
table = Table(title=f"Components ({len(components)})")
table.add_column("Name")
table.add_column("Material")
table.add_column("Function")
table.add_column("Changes")
for comp in components:
table.add_row(
comp.name,
comp.material or "-",
comp.function[:40] + "..." if len(comp.function) > 40 else comp.function or "-",
str(len(comp.history)) if comp.history else "-"
)
console.print(table)
except FileNotFoundError as e:
console.print(f"[red]Error:[/red] {e}")
raise SystemExit(1)

View File

@@ -0,0 +1,414 @@
"""Incremental processing for iterative documentation."""
import re
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from .project import Project, VideoEntry, ComponentKnowledge
from .pipeline import DocumentationPipeline
from .vision_analyzer import ComponentAnalysis, Component
from .audio_analyzer import Transcript
from .config import Config, load_config
@dataclass
class ChangeDetection:
"""Detected change from transcript."""
change_type: str # "supersedes", "adds", "updates", "removes"
component: str | None
field: str | None
old_value: str | None
new_value: str | None
quote: str
timestamp: float
class IncrementalProcessor:
"""Processes videos incrementally, building up knowledge."""
# Patterns that indicate changes/updates
CHANGE_PATTERNS = [
# Superseding patterns
(r"(?:now|instead)\s+(?:we\s+)?(?:use|using)\s+(.+?)(?:\s+instead\s+of\s+(.+))?", "supersedes"),
(r"(?:i|we)\s+changed\s+(?:the\s+)?(.+?)\s+(?:from\s+(.+?)\s+)?to\s+(.+)", "supersedes"),
(r"(?:the\s+)?new\s+(.+?)\s+(?:is|replaces)", "supersedes"),
(r"replaced\s+(?:the\s+)?(.+?)\s+with\s+(.+)", "supersedes"),
# Additive patterns
(r"(?:i|we)\s+added\s+(?:a\s+)?(.+)", "adds"),
(r"(?:in\s+addition|also|plus)\s+(?:we\s+have\s+)?(?:a\s+)?(.+)", "adds"),
(r"(?:there(?:'s| is)\s+)?(?:a\s+)?new\s+(.+)", "adds"),
# Update patterns
(r"(?:i|we)\s+(?:updated|modified|adjusted|refined)\s+(?:the\s+)?(.+)", "updates"),
(r"(?:the\s+)?(.+?)\s+(?:is\s+)?now\s+(.+)", "updates"),
# Removal patterns
(r"(?:i|we)\s+removed\s+(?:the\s+)?(.+)", "removes"),
(r"(?:no\s+longer|don't)\s+(?:have|need|use)\s+(?:the\s+)?(.+)", "removes"),
]
def __init__(self, project: Project, config: Config | None = None):
self.project = project
self.config = config or load_config()
def process_pending(self, progress_callback=None) -> dict:
"""Process all pending videos."""
pending = self.project.get_pending_videos()
if not pending:
return {"processed": 0, "message": "No pending videos"}
results = {
"processed": 0,
"errors": [],
"new_components": [],
"updated_components": [],
"changes_detected": [],
}
for i, video_entry in enumerate(pending):
if progress_callback:
progress_callback(f"Processing {video_entry.filename} ({i+1}/{len(pending)})")
try:
video_results = self.process_video(video_entry)
results["processed"] += 1
results["new_components"].extend(video_results.get("new_components", []))
results["updated_components"].extend(video_results.get("updated_components", []))
results["changes_detected"].extend(video_results.get("changes", []))
except Exception as e:
video_entry.status = "error"
video_entry.error_message = str(e)
results["errors"].append({
"video": video_entry.filename,
"error": str(e)
})
self.project.save()
return results
def process_video(self, video_entry: VideoEntry) -> dict:
"""Process a single video and merge into project knowledge."""
video_path = self.project.get_video_path(video_entry)
# Create output dir for this video's frames
video_frames_dir = self.project.frames_dir / video_path.stem
video_frames_dir.mkdir(exist_ok=True)
# Run pipeline
pipeline = DocumentationPipeline(
video_path=video_path,
output_dir=video_frames_dir,
config=self.config,
)
# Extract and transcribe
frames = pipeline.extract_frames()
transcript = pipeline.transcribe_audio()
# Save transcript
transcript_file = self.project.transcripts_dir / f"{video_path.stem}.json"
self._save_transcript(transcript, transcript_file)
# Analyze components
analysis = pipeline.analyze_components(frames, transcript)
# Detect explicit changes from transcript
changes = self.detect_changes(transcript)
# Merge into project knowledge
merge_results = self.merge_analysis(
analysis,
video_entry.filename,
changes
)
# Update video entry
video_entry.status = "processed"
video_entry.processed_at = datetime.now().isoformat()
video_entry.duration = transcript.duration if hasattr(transcript, 'duration') else 0
video_entry.transcript_file = transcript_file.name
video_entry.frames_extracted = len(frames)
video_entry.components_found = [c.name for c in analysis.components]
# Update project totals
self.project.manifest.total_frames += len(frames)
self.project.manifest.total_duration += video_entry.duration or 0
return {
"frames": len(frames),
"transcript_duration": video_entry.duration,
"components_analyzed": len(analysis.components),
"changes": changes,
**merge_results
}
def detect_changes(self, transcript: Transcript) -> list[ChangeDetection]:
"""Detect explicit changes mentioned in transcript."""
changes = []
for segment in transcript.segments:
text = segment.text.lower()
for pattern, change_type in self.CHANGE_PATTERNS:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
groups = match.groups()
change = ChangeDetection(
change_type=change_type,
component=groups[0] if groups else None,
field=None,
old_value=groups[1] if len(groups) > 1 else None,
new_value=groups[2] if len(groups) > 2 else groups[0],
quote=segment.text,
timestamp=segment.start,
)
changes.append(change)
return changes
def merge_analysis(
self,
analysis: ComponentAnalysis,
source_video: str,
changes: list[ChangeDetection]
) -> dict:
"""Merge analysis results into project knowledge."""
new_components = []
updated_components = []
# Process each detected component
for component in analysis.components:
existing = self.project.find_component(component.name)
if existing:
# Update existing component
self.project.update_component(
name=component.name,
source_video=source_video,
description=component.description,
function=component.function,
material=component.material,
features=component.features,
confidence=component.confidence,
)
updated_components.append(component.name)
else:
# New component
self.project.update_component(
name=component.name,
source_video=source_video,
description=component.description,
function=component.function,
material=component.material,
features=component.features,
confidence=component.confidence,
)
new_components.append(component.name)
# Apply explicit changes from transcript
for change in changes:
if change.change_type == "supersedes" and change.component:
comp = self.project.find_component(change.component)
if comp and change.new_value:
# Try to determine what field changed
if self._looks_like_material(change.new_value):
self.project.update_component(
name=comp.name,
source_video=source_video,
material=change.new_value,
)
else:
# Add to description/notes
pass
# Merge assembly relationships
if analysis.assembly_notes:
self.project.manifest.assembly_relationships.append({
"source_video": source_video,
"notes": analysis.assembly_notes,
})
# Merge Atomizer hints
if analysis.atomizer_hints:
hints_dict = {
"objectives": analysis.atomizer_hints.objectives,
"constraints": analysis.atomizer_hints.constraints,
"parameters": analysis.atomizer_hints.parameters,
"critical_regions": analysis.atomizer_hints.critical_regions,
}
self.project.merge_atomizer_hints(hints_dict, source_video)
return {
"new_components": new_components,
"updated_components": updated_components,
}
def _looks_like_material(self, text: str) -> bool:
"""Check if text looks like a material name."""
materials = [
"aluminum", "aluminium", "steel", "stainless", "titanium",
"brass", "copper", "plastic", "nylon", "abs", "pla", "petg",
"carbon fiber", "composite", "wood", "rubber", "silicone",
"6061", "7075", "304", "316", "4140",
]
text_lower = text.lower()
return any(mat in text_lower for mat in materials)
def _save_transcript(self, transcript: Transcript, path: Path):
"""Save transcript to JSON file."""
import json
data = {
"full_text": transcript.full_text,
"duration": getattr(transcript, 'duration', 0),
"segments": [
{
"start": seg.start,
"end": seg.end,
"text": seg.text,
}
for seg in transcript.segments
]
}
with open(path, "w") as f:
json.dump(data, f, indent=2)
class UnifiedDocumentGenerator:
"""Generates unified documentation from accumulated project knowledge."""
def __init__(self, project: Project):
self.project = project
def generate(
self,
include_history: bool = True,
include_atomizer: bool = True,
include_bom: bool = True,
) -> Path:
"""Generate unified documentation from all project knowledge."""
output_dir = self.project.output_dir
output_dir.mkdir(exist_ok=True)
# Build document
doc_parts = []
# Header
manifest = self.project.manifest
doc_parts.append(f"# {manifest.name}\n")
doc_parts.append(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
doc_parts.append(f"**Videos processed:** {len([v for v in manifest.videos if v.status == 'processed'])}\n")
doc_parts.append(f"**Total duration:** {manifest.total_duration:.1f}s\n")
if manifest.description:
doc_parts.append(f"\n{manifest.description}\n")
doc_parts.append("\n---\n")
# Components
doc_parts.append("\n## Components\n")
components = self.project.get_all_components()
components.sort(key=lambda c: c.first_seen)
for i, comp in enumerate(components, 1):
doc_parts.append(f"\n### {i}. {comp.name}\n")
if comp.description:
doc_parts.append(f"\n{comp.description}\n")
doc_parts.append("\n| Property | Value |")
doc_parts.append("\n|----------|-------|")
if comp.function:
doc_parts.append(f"\n| **Function** | {comp.function} |")
if comp.material:
doc_parts.append(f"\n| **Material** | {comp.material} |")
if comp.part_number:
doc_parts.append(f"\n| **Part Number** | {comp.part_number} |")
doc_parts.append("\n")
if comp.features:
doc_parts.append("\n**Key Features:**\n")
for feat in comp.features:
doc_parts.append(f"- {feat}\n")
# History
if include_history and comp.history:
doc_parts.append("\n<details>\n<summary>Change History</summary>\n\n")
for change in comp.history:
doc_parts.append(f"- **{change['timestamp'][:10]}**: {change['field']} changed from \"{change['old']}\" to \"{change['new']}\"\n")
doc_parts.append("\n</details>\n")
doc_parts.append("\n---\n")
# BOM
if include_bom and components:
doc_parts.append("\n## Bill of Materials\n")
doc_parts.append("\n| Item | Part Number | Name | Material | Function |")
doc_parts.append("\n|------|-------------|------|----------|----------|")
for i, comp in enumerate(components, 1):
pn = comp.part_number or "TBD"
mat = comp.material or "TBD"
func = comp.function or "-"
doc_parts.append(f"\n| {i} | {pn} | {comp.name} | {mat} | {func} |")
doc_parts.append("\n")
# Atomizer hints
if include_atomizer and manifest.atomizer_hints:
doc_parts.append("\n## FEA / Atomizer Hints\n")
hints = manifest.atomizer_hints
if hints.get("objectives"):
doc_parts.append("\n### Objectives\n")
for obj in hints["objectives"]:
doc_parts.append(f"- **{obj.get('direction', '').capitalize()} {obj.get('name', '')}**\n")
if hints.get("constraints"):
doc_parts.append("\n### Constraints\n")
for const in hints["constraints"]:
doc_parts.append(f"- {const.get('type', '')}: {const.get('value', '')}\n")
if hints.get("parameters"):
doc_parts.append("\n### Parameters to Optimize\n")
for param in hints["parameters"]:
doc_parts.append(f"- {param}\n")
if hints.get("critical_regions"):
doc_parts.append("\n### Critical Regions\n")
for region in hints["critical_regions"]:
doc_parts.append(f"- **{region.get('feature', '')}**: {region.get('concern', '')}\n")
# Video sources
doc_parts.append("\n## Source Videos\n")
doc_parts.append("\n| # | Filename | Processed | Components Found |")
doc_parts.append("\n|---|----------|-----------|------------------|")
for i, video in enumerate(manifest.videos, 1):
date = video.processed_at[:10] if video.processed_at else "pending"
comps = len(video.components_found)
doc_parts.append(f"\n| {i} | {video.filename} | {date} | {comps} |")
doc_parts.append("\n")
# Write document
doc_path = output_dir / "documentation.md"
doc_path.write_text("".join(doc_parts))
# Also write atomizer hints JSON
if manifest.atomizer_hints:
import json
hints_path = output_dir / "atomizer_hints.json"
hints_path.write_text(json.dumps(manifest.atomizer_hints, indent=2))
return doc_path

View File

@@ -0,0 +1,391 @@
"""Project management for iterative documentation."""
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Literal
import re
@dataclass
class VideoEntry:
"""A video in the project."""
filename: str
added_at: str # ISO timestamp
processed_at: str | None = None
duration: float | None = None
transcript_file: str | None = None
frames_extracted: int = 0
components_found: list[str] = field(default_factory=list)
status: Literal["pending", "processed", "error"] = "pending"
error_message: str | None = None
@dataclass
class ComponentKnowledge:
"""Accumulated knowledge about a component."""
name: str
first_seen: str # ISO timestamp
last_updated: str
description: str = ""
function: str = ""
material: str = ""
features: list[str] = field(default_factory=list)
part_number: str = ""
confidence: float = 0.0
# History tracking
history: list[dict] = field(default_factory=list)
# [{"timestamp": "...", "field": "material", "old": "aluminum", "new": "steel", "source_video": "..."}]
@dataclass
class ProjectManifest:
"""Project manifest tracking all videos and knowledge."""
name: str
created_at: str
updated_at: str
description: str = ""
# Videos
videos: list[VideoEntry] = field(default_factory=list)
# Accumulated knowledge
components: dict[str, ComponentKnowledge] = field(default_factory=dict)
assembly_relationships: list[dict] = field(default_factory=list)
atomizer_hints: dict = field(default_factory=dict)
# Metadata
total_duration: float = 0.0
total_frames: int = 0
version: str = "1.0"
class Project:
"""Manages an iterative documentation project."""
def __init__(self, project_dir: Path):
self.project_dir = Path(project_dir)
self.manifest_path = self.project_dir / "project.json"
self.videos_dir = self.project_dir / "videos"
self.knowledge_dir = self.project_dir / "knowledge"
self.frames_dir = self.project_dir / "frames"
self.output_dir = self.project_dir / "output"
self.transcripts_dir = self.knowledge_dir / "transcripts"
self.manifest: ProjectManifest | None = None
@classmethod
def create(cls, project_dir: Path, name: str, description: str = "") -> "Project":
"""Create a new project."""
project_dir = Path(project_dir)
if project_dir.exists() and any(project_dir.iterdir()):
raise ValueError(f"Directory {project_dir} is not empty")
# Create directory structure
project_dir.mkdir(parents=True, exist_ok=True)
(project_dir / "videos").mkdir()
(project_dir / "knowledge").mkdir()
(project_dir / "knowledge" / "transcripts").mkdir()
(project_dir / "frames").mkdir()
(project_dir / "output").mkdir()
# Create manifest
now = datetime.now().isoformat()
manifest = ProjectManifest(
name=name,
created_at=now,
updated_at=now,
description=description,
)
project = cls(project_dir)
project.manifest = manifest
project.save()
# Create README
readme = f"""# {name}
{description}
## Structure
```
{name}/
├── project.json # Project manifest
├── videos/ # Add your walkthrough videos here
├── knowledge/ # Accumulated knowledge base
│ └── transcripts/ # Video transcripts
├── frames/ # Extracted keyframes
└── output/ # Generated documentation
```
## Usage
```bash
# Add a video
cad-doc project add videos/my-video.mp4
# Process all pending videos
cad-doc project process
# Generate documentation
cad-doc project generate
# Check status
cad-doc project status
```
## Workflow
1. Record a video explaining your CAD model
2. Copy it to the `videos/` folder
3. Run `cad-doc project add` and `cad-doc project process`
4. Repeat as you make changes
5. Run `cad-doc project generate` for unified documentation
"""
(project_dir / "README.md").write_text(readme)
return project
@classmethod
def load(cls, project_dir: Path) -> "Project":
"""Load an existing project."""
project = cls(project_dir)
if not project.manifest_path.exists():
raise FileNotFoundError(f"No project found at {project_dir}")
with open(project.manifest_path) as f:
data = json.load(f)
# Reconstruct manifest
videos = [VideoEntry(**v) for v in data.get("videos", [])]
components = {
k: ComponentKnowledge(**v)
for k, v in data.get("components", {}).items()
}
project.manifest = ProjectManifest(
name=data["name"],
created_at=data["created_at"],
updated_at=data["updated_at"],
description=data.get("description", ""),
videos=videos,
components=components,
assembly_relationships=data.get("assembly_relationships", []),
atomizer_hints=data.get("atomizer_hints", {}),
total_duration=data.get("total_duration", 0.0),
total_frames=data.get("total_frames", 0),
version=data.get("version", "1.0"),
)
return project
def save(self):
"""Save project manifest."""
if self.manifest is None:
raise ValueError("No manifest to save")
self.manifest.updated_at = datetime.now().isoformat()
# Convert to dict for JSON serialization
data = {
"name": self.manifest.name,
"created_at": self.manifest.created_at,
"updated_at": self.manifest.updated_at,
"description": self.manifest.description,
"videos": [asdict(v) for v in self.manifest.videos],
"components": {k: asdict(v) for k, v in self.manifest.components.items()},
"assembly_relationships": self.manifest.assembly_relationships,
"atomizer_hints": self.manifest.atomizer_hints,
"total_duration": self.manifest.total_duration,
"total_frames": self.manifest.total_frames,
"version": self.manifest.version,
}
with open(self.manifest_path, "w") as f:
json.dump(data, f, indent=2)
def add_video(self, video_path: Path, copy: bool = True) -> VideoEntry:
"""Add a video to the project."""
video_path = Path(video_path)
if not video_path.exists():
raise FileNotFoundError(f"Video not found: {video_path}")
# Copy or link video to project
if copy:
dest = self.videos_dir / video_path.name
if not dest.exists():
import shutil
shutil.copy2(video_path, dest)
filename = video_path.name
else:
filename = str(video_path.absolute())
# Check if already added
for v in self.manifest.videos:
if v.filename == filename:
return v # Already exists
# Create entry
entry = VideoEntry(
filename=filename,
added_at=datetime.now().isoformat(),
)
self.manifest.videos.append(entry)
self.save()
return entry
def get_pending_videos(self) -> list[VideoEntry]:
"""Get videos that haven't been processed yet."""
return [v for v in self.manifest.videos if v.status == "pending"]
def get_video_path(self, entry: VideoEntry) -> Path:
"""Get the full path to a video."""
if Path(entry.filename).is_absolute():
return Path(entry.filename)
return self.videos_dir / entry.filename
def get_videos_chronological(self) -> list[VideoEntry]:
"""Get all videos in chronological order."""
return sorted(self.manifest.videos, key=lambda v: v.added_at)
def update_component(
self,
name: str,
source_video: str,
**updates
) -> ComponentKnowledge:
"""Update or create a component with change tracking."""
now = datetime.now().isoformat()
name_key = self._normalize_name(name)
if name_key in self.manifest.components:
# Existing component - track changes
comp = self.manifest.components[name_key]
for field_name, new_value in updates.items():
if not new_value:
continue
old_value = getattr(comp, field_name, None)
# Only track if actually changed
if old_value and old_value != new_value:
comp.history.append({
"timestamp": now,
"field": field_name,
"old": old_value,
"new": new_value,
"source_video": source_video,
})
setattr(comp, field_name, new_value)
comp.last_updated = now
else:
# New component
comp = ComponentKnowledge(
name=name,
first_seen=now,
last_updated=now,
**{k: v for k, v in updates.items() if v}
)
self.manifest.components[name_key] = comp
return comp
def _normalize_name(self, name: str) -> str:
"""Normalize component name for matching."""
# Lowercase, remove extra spaces, standardize
name = name.lower().strip()
name = re.sub(r'\s+', ' ', name)
return name
def find_component(self, name: str) -> ComponentKnowledge | None:
"""Find a component by name (fuzzy matching)."""
name_key = self._normalize_name(name)
# Exact match
if name_key in self.manifest.components:
return self.manifest.components[name_key]
# Fuzzy match - check if name is contained
for key, comp in self.manifest.components.items():
if name_key in key or key in name_key:
return comp
return None
def get_all_components(self) -> list[ComponentKnowledge]:
"""Get all components."""
return list(self.manifest.components.values())
def merge_atomizer_hints(self, new_hints: dict, source_video: str):
"""Merge new Atomizer hints with existing."""
existing = self.manifest.atomizer_hints
# Merge objectives (deduplicate)
existing_objectives = existing.get("objectives", [])
new_objectives = new_hints.get("objectives", [])
for obj in new_objectives:
# Check if similar objective exists
exists = any(
o.get("name") == obj.get("name") and o.get("direction") == obj.get("direction")
for o in existing_objectives
)
if not exists:
obj["source_video"] = source_video
existing_objectives.append(obj)
existing["objectives"] = existing_objectives
# Merge constraints
existing_constraints = existing.get("constraints", [])
new_constraints = new_hints.get("constraints", [])
for const in new_constraints:
const["source_video"] = source_video
existing_constraints.append(const)
existing["constraints"] = existing_constraints
# Merge parameters (deduplicate)
existing_params = set(existing.get("parameters", []))
new_params = set(new_hints.get("parameters", []))
existing["parameters"] = list(existing_params | new_params)
# Merge critical regions
existing_regions = existing.get("critical_regions", [])
new_regions = new_hints.get("critical_regions", [])
for region in new_regions:
region["source_video"] = source_video
existing_regions.append(region)
existing["critical_regions"] = existing_regions
self.manifest.atomizer_hints = existing
def get_status(self) -> dict:
"""Get project status summary."""
videos = self.manifest.videos
return {
"name": self.manifest.name,
"total_videos": len(videos),
"pending": len([v for v in videos if v.status == "pending"]),
"processed": len([v for v in videos if v.status == "processed"]),
"errors": len([v for v in videos if v.status == "error"]),
"total_components": len(self.manifest.components),
"total_duration": self.manifest.total_duration,
"total_frames": self.manifest.total_frames,
"last_updated": self.manifest.updated_at,
}