Add context documents and APM integration
Context Documents: - context.py: ContextLoader loads project-specific context for LLM - project-brief.md, terminology.md, materials-standard.md - naming-conventions.md, existing-components.md - Templates auto-created with new projects - Context injected into vision analysis prompts APM Integration: - apm_integration.py: Client for Atomaste Part Manager - APMClient: search, get_part, create_part - ComponentMatcher: fuzzy matches components to APM parts - Auto-suggests P/N during video processing - Falls back gracefully if APM not available Updates: - project.py: Creates context/ folder with templates - incremental.py: Loads context, uses APM for P/N lookup - vision_analyzer.py: Accepts context parameter for prompts Usage: - Edit context/*.md files to give LLM project knowledge - APM P/N lookup happens automatically if apm CLI available
This commit is contained in:
333
src/cad_documenter/apm_integration.py
Normal file
333
src/cad_documenter/apm_integration.py
Normal file
@@ -0,0 +1,333 @@
|
||||
"""Integration with Atomaste Part Manager (APM)."""
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable
|
||||
|
||||
|
||||
@dataclass
|
||||
class APMPart:
|
||||
"""Part information from APM."""
|
||||
pn: str
|
||||
description: str
|
||||
material: str = ""
|
||||
project: str = ""
|
||||
status: str = ""
|
||||
file_path: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "APMPart":
|
||||
return cls(
|
||||
pn=data.get("pn", data.get("part_number", "")),
|
||||
description=data.get("description", data.get("desc", "")),
|
||||
material=data.get("material", ""),
|
||||
project=data.get("project", ""),
|
||||
status=data.get("status", ""),
|
||||
file_path=data.get("file_path", data.get("path", "")),
|
||||
)
|
||||
|
||||
|
||||
class APMClient:
|
||||
"""Client for interacting with Atomaste Part Manager."""
|
||||
|
||||
def __init__(self, apm_path: str = "apm"):
|
||||
"""
|
||||
Initialize APM client.
|
||||
|
||||
Args:
|
||||
apm_path: Path to apm executable or just "apm" if in PATH
|
||||
"""
|
||||
self.apm_path = apm_path
|
||||
self._available: bool | None = None
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if APM is available."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[self.apm_path, "--version"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
self._available = result.returncode == 0
|
||||
except (subprocess.SubprocessError, FileNotFoundError):
|
||||
self._available = False
|
||||
|
||||
return self._available
|
||||
|
||||
def search(self, query: str, limit: int = 10) -> list[APMPart]:
|
||||
"""
|
||||
Search APM for parts matching query.
|
||||
|
||||
Args:
|
||||
query: Search string (matches description, P/N)
|
||||
limit: Maximum results to return
|
||||
|
||||
Returns:
|
||||
List of matching parts
|
||||
"""
|
||||
if not self.is_available():
|
||||
return []
|
||||
|
||||
try:
|
||||
# Try JSON output first
|
||||
result = subprocess.run(
|
||||
[self.apm_path, "search", query, "--json", "--limit", str(limit)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
data = json.loads(result.stdout)
|
||||
if isinstance(data, list):
|
||||
return [APMPart.from_dict(p) for p in data]
|
||||
elif isinstance(data, dict) and "results" in data:
|
||||
return [APMPart.from_dict(p) for p in data["results"]]
|
||||
|
||||
# Fallback: parse text output
|
||||
result = subprocess.run(
|
||||
[self.apm_path, "search", query],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
return self._parse_text_output(result.stdout)
|
||||
|
||||
except (subprocess.SubprocessError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
return []
|
||||
|
||||
def get_part(self, pn: str) -> APMPart | None:
|
||||
"""
|
||||
Get part details by P/N.
|
||||
|
||||
Args:
|
||||
pn: Part number (e.g., "P-10001")
|
||||
|
||||
Returns:
|
||||
Part details or None if not found
|
||||
"""
|
||||
if not self.is_available():
|
||||
return None
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[self.apm_path, "show", pn, "--json"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
data = json.loads(result.stdout)
|
||||
return APMPart.from_dict(data)
|
||||
|
||||
except (subprocess.SubprocessError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def create_part(
|
||||
self,
|
||||
part_type: str = "P",
|
||||
description: str = "",
|
||||
material: str = "",
|
||||
project: str = "",
|
||||
) -> str | None:
|
||||
"""
|
||||
Create a new part in APM.
|
||||
|
||||
Args:
|
||||
part_type: Part type prefix (P, A, S, W, C)
|
||||
description: Part description
|
||||
material: Material specification
|
||||
project: Project code
|
||||
|
||||
Returns:
|
||||
New part number or None if failed
|
||||
"""
|
||||
if not self.is_available():
|
||||
return None
|
||||
|
||||
cmd = [self.apm_path, "new", part_type]
|
||||
|
||||
if description:
|
||||
cmd.extend(["--desc", description])
|
||||
if material:
|
||||
cmd.extend(["--material", material])
|
||||
if project:
|
||||
cmd.extend(["--project", project])
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
# Extract P/N from output
|
||||
# Expected format: "Created P-10001" or similar
|
||||
match = re.search(r'([PASWC]-\d+)', result.stdout)
|
||||
if match:
|
||||
return match.group(1)
|
||||
|
||||
except subprocess.SubprocessError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def _parse_text_output(self, output: str) -> list[APMPart]:
|
||||
"""Parse text output from apm search."""
|
||||
parts = []
|
||||
|
||||
# Try to parse table format
|
||||
lines = output.strip().split("\n")
|
||||
|
||||
for line in lines:
|
||||
# Look for P/N pattern
|
||||
match = re.search(r'([PASWC]-\d+)\s+(.+)', line)
|
||||
if match:
|
||||
parts.append(APMPart(
|
||||
pn=match.group(1),
|
||||
description=match.group(2).strip(),
|
||||
))
|
||||
|
||||
return parts
|
||||
|
||||
|
||||
class ComponentMatcher:
|
||||
"""Matches detected components to APM parts."""
|
||||
|
||||
def __init__(self, apm_client: APMClient):
|
||||
self.apm = apm_client
|
||||
|
||||
def find_matches(
|
||||
self,
|
||||
component_name: str,
|
||||
material: str = "",
|
||||
threshold: float = 0.5
|
||||
) -> list[tuple[APMPart, float]]:
|
||||
"""
|
||||
Find APM parts matching a component.
|
||||
|
||||
Args:
|
||||
component_name: Detected component name
|
||||
material: Detected material (optional)
|
||||
threshold: Minimum match score (0-1)
|
||||
|
||||
Returns:
|
||||
List of (part, score) tuples, sorted by score descending
|
||||
"""
|
||||
if not self.apm.is_available():
|
||||
return []
|
||||
|
||||
# Search by name
|
||||
matches = self.apm.search(component_name)
|
||||
|
||||
# Score matches
|
||||
scored = []
|
||||
for part in matches:
|
||||
score = self._calculate_score(component_name, material, part)
|
||||
if score >= threshold:
|
||||
scored.append((part, score))
|
||||
|
||||
# Sort by score
|
||||
scored.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
return scored
|
||||
|
||||
def _calculate_score(
|
||||
self,
|
||||
component_name: str,
|
||||
material: str,
|
||||
part: APMPart
|
||||
) -> float:
|
||||
"""Calculate match score between component and APM part."""
|
||||
score = 0.0
|
||||
|
||||
# Name similarity
|
||||
name_lower = component_name.lower()
|
||||
desc_lower = part.description.lower()
|
||||
|
||||
# Exact match
|
||||
if name_lower == desc_lower:
|
||||
score += 1.0
|
||||
# Contains
|
||||
elif name_lower in desc_lower or desc_lower in name_lower:
|
||||
score += 0.7
|
||||
# Word overlap
|
||||
else:
|
||||
name_words = set(name_lower.split())
|
||||
desc_words = set(desc_lower.split())
|
||||
overlap = len(name_words & desc_words)
|
||||
total = len(name_words | desc_words)
|
||||
if total > 0:
|
||||
score += 0.5 * (overlap / total)
|
||||
|
||||
# Material match bonus
|
||||
if material and part.material:
|
||||
mat_lower = material.lower()
|
||||
part_mat_lower = part.material.lower()
|
||||
|
||||
if mat_lower == part_mat_lower:
|
||||
score += 0.3
|
||||
elif mat_lower in part_mat_lower or part_mat_lower in mat_lower:
|
||||
score += 0.15
|
||||
|
||||
return min(score, 1.0)
|
||||
|
||||
def suggest_pn(
|
||||
self,
|
||||
component_name: str,
|
||||
material: str = ""
|
||||
) -> tuple[str | None, str]:
|
||||
"""
|
||||
Suggest a P/N for a component.
|
||||
|
||||
Returns:
|
||||
(pn, source) where source is "apm" or "suggested" or None
|
||||
"""
|
||||
matches = self.find_matches(component_name, material)
|
||||
|
||||
if matches:
|
||||
best_match, score = matches[0]
|
||||
if score >= 0.7:
|
||||
return best_match.pn, "apm"
|
||||
|
||||
return None, "none"
|
||||
|
||||
|
||||
def get_apm_client() -> APMClient:
|
||||
"""Get APM client, checking common installation paths."""
|
||||
# Try standard path first
|
||||
client = APMClient("apm")
|
||||
if client.is_available():
|
||||
return client
|
||||
|
||||
# Try common Windows paths
|
||||
common_paths = [
|
||||
Path.home() / "apm" / "apm.exe",
|
||||
Path.home() / ".apm" / "apm.exe",
|
||||
Path("C:/Program Files/APM/apm.exe"),
|
||||
]
|
||||
|
||||
for path in common_paths:
|
||||
if path.exists():
|
||||
client = APMClient(str(path))
|
||||
if client.is_available():
|
||||
return client
|
||||
|
||||
# Return default (may not work)
|
||||
return APMClient("apm")
|
||||
185
src/cad_documenter/context.py
Normal file
185
src/cad_documenter/context.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""Context document management for LLM analysis."""
|
||||
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProjectContext:
|
||||
"""Aggregated context from all context documents."""
|
||||
project_brief: str = ""
|
||||
terminology: str = ""
|
||||
materials: str = ""
|
||||
naming_conventions: str = ""
|
||||
existing_components: str = ""
|
||||
custom_context: str = ""
|
||||
|
||||
def to_prompt(self) -> str:
|
||||
"""Convert to prompt text for LLM."""
|
||||
sections = []
|
||||
|
||||
if self.project_brief:
|
||||
sections.append(f"## Project Brief\n{self.project_brief}")
|
||||
|
||||
if self.terminology:
|
||||
sections.append(f"## Terminology\n{self.terminology}")
|
||||
|
||||
if self.materials:
|
||||
sections.append(f"## Approved Materials\n{self.materials}")
|
||||
|
||||
if self.naming_conventions:
|
||||
sections.append(f"## Naming Conventions\n{self.naming_conventions}")
|
||||
|
||||
if self.existing_components:
|
||||
sections.append(f"## Existing Components\n{self.existing_components}")
|
||||
|
||||
if self.custom_context:
|
||||
sections.append(f"## Additional Context\n{self.custom_context}")
|
||||
|
||||
if not sections:
|
||||
return ""
|
||||
|
||||
return "# PROJECT CONTEXT\n\n" + "\n\n".join(sections)
|
||||
|
||||
def __bool__(self) -> bool:
|
||||
"""Check if any context is present."""
|
||||
return bool(
|
||||
self.project_brief or self.terminology or self.materials or
|
||||
self.naming_conventions or self.existing_components or self.custom_context
|
||||
)
|
||||
|
||||
|
||||
class ContextLoader:
|
||||
"""Loads context documents from a project."""
|
||||
|
||||
# Standard context file names
|
||||
CONTEXT_FILES = {
|
||||
"project_brief": ["project-brief.md", "project.md", "brief.md", "README.md"],
|
||||
"terminology": ["terminology.md", "terms.md", "glossary.md"],
|
||||
"materials": ["materials.md", "materials-standard.md", "approved-materials.md"],
|
||||
"naming_conventions": ["naming.md", "naming-conventions.md", "conventions.md"],
|
||||
"existing_components": ["components.md", "existing-components.md", "parts.md"],
|
||||
"custom_context": ["context.md", "custom.md", "notes.md"],
|
||||
}
|
||||
|
||||
def __init__(self, context_dir: Path):
|
||||
self.context_dir = Path(context_dir)
|
||||
|
||||
def load(self) -> ProjectContext:
|
||||
"""Load all context documents."""
|
||||
context = ProjectContext()
|
||||
|
||||
if not self.context_dir.exists():
|
||||
return context
|
||||
|
||||
for field, filenames in self.CONTEXT_FILES.items():
|
||||
for filename in filenames:
|
||||
filepath = self.context_dir / filename
|
||||
if filepath.exists():
|
||||
content = filepath.read_text(encoding="utf-8")
|
||||
setattr(context, field, content)
|
||||
break # Use first found
|
||||
|
||||
# Also load any .md files not in standard names
|
||||
standard_files = set()
|
||||
for filenames in self.CONTEXT_FILES.values():
|
||||
standard_files.update(filenames)
|
||||
|
||||
extra_context = []
|
||||
for md_file in self.context_dir.glob("*.md"):
|
||||
if md_file.name not in standard_files:
|
||||
content = md_file.read_text(encoding="utf-8")
|
||||
extra_context.append(f"### {md_file.stem}\n{content}")
|
||||
|
||||
if extra_context:
|
||||
if context.custom_context:
|
||||
context.custom_context += "\n\n" + "\n\n".join(extra_context)
|
||||
else:
|
||||
context.custom_context = "\n\n".join(extra_context)
|
||||
|
||||
return context
|
||||
|
||||
@classmethod
|
||||
def create_template(cls, context_dir: Path):
|
||||
"""Create template context files."""
|
||||
context_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
templates = {
|
||||
"project-brief.md": """# Project Brief
|
||||
|
||||
## Overview
|
||||
Describe the project, its purpose, and key requirements.
|
||||
|
||||
## Client
|
||||
- **Client:**
|
||||
- **Project Code:**
|
||||
|
||||
## Key Requirements
|
||||
-
|
||||
-
|
||||
|
||||
## Constraints
|
||||
-
|
||||
-
|
||||
""",
|
||||
"terminology.md": """# Project Terminology
|
||||
|
||||
Define project-specific terms and abbreviations.
|
||||
|
||||
| Term | Definition |
|
||||
|------|------------|
|
||||
| | |
|
||||
|
||||
## Abbreviations
|
||||
-
|
||||
-
|
||||
""",
|
||||
"materials-standard.md": """# Approved Materials
|
||||
|
||||
## Metals
|
||||
- **AL6061-T6** — General purpose aluminum
|
||||
- **SS304** — Stainless steel, corrosion resistant
|
||||
- **SS316** — Stainless steel, marine grade
|
||||
|
||||
## Plastics
|
||||
- **ABS** — General purpose
|
||||
- **PETG** — Higher temp resistance
|
||||
- **Nylon** — Wear resistant
|
||||
|
||||
## Add project-specific materials below:
|
||||
-
|
||||
""",
|
||||
"naming-conventions.md": """# Naming Conventions
|
||||
|
||||
## Part Number Format
|
||||
- **P-XXXXX** — Parts
|
||||
- **A-XXXXX** — Assemblies
|
||||
- **S-XXXXX** — Sub-assemblies
|
||||
- **W-XXXXX** — Weldments
|
||||
- **C-XXXXX** — Commercial/purchased
|
||||
|
||||
## Component Naming
|
||||
- Use descriptive names: "Motor Bracket" not "Part1"
|
||||
- Include function: "Support-Arm-Left"
|
||||
-
|
||||
""",
|
||||
"existing-components.md": """# Existing Components
|
||||
|
||||
List components that already exist in APM or from previous work.
|
||||
|
||||
| P/N | Name | Material | Notes |
|
||||
|-----|------|----------|-------|
|
||||
| | | | |
|
||||
|
||||
## Standard Parts Used
|
||||
- M6 fasteners
|
||||
-
|
||||
""",
|
||||
}
|
||||
|
||||
for filename, content in templates.items():
|
||||
filepath = context_dir / filename
|
||||
if not filepath.exists():
|
||||
filepath.write_text(content, encoding="utf-8")
|
||||
|
||||
return context_dir
|
||||
@@ -10,6 +10,8 @@ from .pipeline import DocumentationPipeline
|
||||
from .vision_analyzer import ComponentAnalysis, Component
|
||||
from .audio_analyzer import Transcript
|
||||
from .config import Config, load_config
|
||||
from .context import ContextLoader, ProjectContext
|
||||
from .apm_integration import get_apm_client, ComponentMatcher
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -52,6 +54,14 @@ class IncrementalProcessor:
|
||||
def __init__(self, project: Project, config: Config | None = None):
|
||||
self.project = project
|
||||
self.config = config or load_config()
|
||||
|
||||
# Load context documents
|
||||
self.context_loader = ContextLoader(project.context_dir)
|
||||
self.context = self.context_loader.load()
|
||||
|
||||
# Initialize APM client for P/N lookup
|
||||
self.apm_client = get_apm_client()
|
||||
self.component_matcher = ComponentMatcher(self.apm_client) if self.apm_client.is_available() else None
|
||||
|
||||
def process_pending(self, progress_callback=None) -> dict:
|
||||
"""Process all pending videos."""
|
||||
@@ -113,8 +123,13 @@ class IncrementalProcessor:
|
||||
transcript_file = self.project.transcripts_dir / f"{video_path.stem}.json"
|
||||
self._save_transcript(transcript, transcript_file)
|
||||
|
||||
# Analyze components
|
||||
analysis = pipeline.analyze_components(frames, transcript)
|
||||
# Analyze components with project context
|
||||
context_text = self.context.to_prompt() if self.context else ""
|
||||
|
||||
# Use vision analyzer directly to pass context
|
||||
from .vision_analyzer import VisionAnalyzer
|
||||
vision_analyzer = VisionAnalyzer(config=self.config.vision)
|
||||
analysis = vision_analyzer.analyze(frames, transcript, context=context_text)
|
||||
|
||||
# Detect explicit changes from transcript
|
||||
changes = self.detect_changes(transcript)
|
||||
@@ -185,6 +200,16 @@ class IncrementalProcessor:
|
||||
for component in analysis.components:
|
||||
existing = self.project.find_component(component.name)
|
||||
|
||||
# Try to find P/N from APM
|
||||
part_number = component.part_number
|
||||
if not part_number and self.component_matcher:
|
||||
pn, source = self.component_matcher.suggest_pn(
|
||||
component.name,
|
||||
component.material
|
||||
)
|
||||
if pn:
|
||||
part_number = pn
|
||||
|
||||
if existing:
|
||||
# Update existing component
|
||||
self.project.update_component(
|
||||
@@ -195,6 +220,7 @@ class IncrementalProcessor:
|
||||
material=component.material,
|
||||
features=component.features,
|
||||
confidence=component.confidence,
|
||||
part_number=part_number or existing.part_number,
|
||||
)
|
||||
updated_components.append(component.name)
|
||||
else:
|
||||
@@ -207,6 +233,7 @@ class IncrementalProcessor:
|
||||
material=component.material,
|
||||
features=component.features,
|
||||
confidence=component.confidence,
|
||||
part_number=part_number,
|
||||
)
|
||||
new_components.append(component.name)
|
||||
|
||||
|
||||
@@ -73,6 +73,7 @@ class Project:
|
||||
self.frames_dir = self.project_dir / "frames"
|
||||
self.output_dir = self.project_dir / "output"
|
||||
self.transcripts_dir = self.knowledge_dir / "transcripts"
|
||||
self.context_dir = self.project_dir / "context" # NEW: Context documents
|
||||
|
||||
self.manifest: ProjectManifest | None = None
|
||||
|
||||
@@ -91,6 +92,11 @@ class Project:
|
||||
(project_dir / "knowledge" / "transcripts").mkdir()
|
||||
(project_dir / "frames").mkdir()
|
||||
(project_dir / "output").mkdir()
|
||||
(project_dir / "context").mkdir()
|
||||
|
||||
# Create context templates
|
||||
from .context import ContextLoader
|
||||
ContextLoader.create_template(project_dir / "context")
|
||||
|
||||
# Create manifest
|
||||
now = datetime.now().isoformat()
|
||||
@@ -116,35 +122,49 @@ class Project:
|
||||
{name}/
|
||||
├── project.json # Project manifest
|
||||
├── videos/ # Add your walkthrough videos here
|
||||
├── context/ # Context documents for LLM (edit these!)
|
||||
│ ├── project-brief.md
|
||||
│ ├── terminology.md
|
||||
│ ├── materials-standard.md
|
||||
│ ├── naming-conventions.md
|
||||
│ └── existing-components.md
|
||||
├── knowledge/ # Accumulated knowledge base
|
||||
│ └── transcripts/ # Video transcripts
|
||||
├── frames/ # Extracted keyframes
|
||||
└── output/ # Generated documentation
|
||||
```
|
||||
|
||||
## Usage
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Add a video
|
||||
cad-doc project add videos/my-video.mp4
|
||||
1. **Edit context files** in `context/` folder (optional but recommended)
|
||||
2. **Record a video** explaining your CAD model
|
||||
3. **Add and process:**
|
||||
```bash
|
||||
cad-doc project add ./ video.mp4
|
||||
cad-doc project process ./
|
||||
```
|
||||
4. **Repeat** as you make changes
|
||||
5. **Generate final docs:**
|
||||
```bash
|
||||
cad-doc project generate ./
|
||||
```
|
||||
|
||||
# Process all pending videos
|
||||
cad-doc project process
|
||||
## Context Documents
|
||||
|
||||
# Generate documentation
|
||||
cad-doc project generate
|
||||
Edit files in `context/` to give the AI better understanding:
|
||||
|
||||
# Check status
|
||||
cad-doc project status
|
||||
```
|
||||
- **project-brief.md** — What this project is about
|
||||
- **terminology.md** — Project-specific terms
|
||||
- **materials-standard.md** — Approved materials
|
||||
- **naming-conventions.md** — How you name parts
|
||||
- **existing-components.md** — Parts already in APM
|
||||
|
||||
## Workflow
|
||||
## Tips
|
||||
|
||||
1. Record a video explaining your CAD model
|
||||
2. Copy it to the `videos/` folder
|
||||
3. Run `cad-doc project add` and `cad-doc project process`
|
||||
4. Repeat as you make changes
|
||||
5. Run `cad-doc project generate` for unified documentation
|
||||
- Speak clearly, name each component
|
||||
- Mention materials and functions
|
||||
- For updates, say "now we use X instead of Y"
|
||||
- French or English both work fine
|
||||
"""
|
||||
(project_dir / "README.md").write_text(readme)
|
||||
|
||||
|
||||
@@ -191,13 +191,19 @@ class VisionAnalyzer:
|
||||
return {}
|
||||
|
||||
def analyze(
|
||||
self, frames: list[FrameInfo], transcript: Transcript
|
||||
self, frames: list[FrameInfo], transcript: Transcript, context: str = ""
|
||||
) -> ComponentAnalysis:
|
||||
"""
|
||||
Analyze frames and transcript to identify components.
|
||||
|
||||
This correlates visual analysis with verbal explanations.
|
||||
|
||||
Args:
|
||||
frames: Extracted video frames
|
||||
transcript: Whisper transcript
|
||||
context: Optional project context (from context documents)
|
||||
"""
|
||||
self._context = context # Store for use in prompts
|
||||
if not frames:
|
||||
return ComponentAnalysis(
|
||||
assembly_name="Unknown Assembly",
|
||||
@@ -259,11 +265,20 @@ class VisionAnalyzer:
|
||||
# Build prompt with transcript context
|
||||
component_prompt = self._load_prompt("component_analysis")
|
||||
|
||||
# Add context if available
|
||||
context_section = ""
|
||||
if hasattr(self, '_context') and self._context:
|
||||
context_section = f"""
|
||||
## Project Context (use this to understand terminology, materials, naming):
|
||||
{self._context[:3000]}
|
||||
|
||||
"""
|
||||
|
||||
# Add transcript context to prompt
|
||||
prompt = f"""{component_prompt}
|
||||
|
||||
{context_section}
|
||||
## Transcript from the video walkthrough:
|
||||
{transcript.full_text[:4000]} # Limit transcript length
|
||||
{transcript.full_text[:4000]}
|
||||
|
||||
## Frame timestamps analyzed:
|
||||
{[f.timestamp for f in key_frames]}
|
||||
|
||||
Reference in New Issue
Block a user