From d5371cfe7585ac591268420685883f6a754719ab Mon Sep 17 00:00:00 2001 From: Mario Lavoie Date: Mon, 9 Feb 2026 12:50:22 +0000 Subject: [PATCH] Add KB Capture v2 - clip-based recording system New features: - Clip-based workflow: record short clips, keep or delete - Toggle recording with Ctrl+Shift+R - Session management (start, clips, end) - Modern CustomTkinter GUI with dark theme - Global hotkeys for hands-free control - Whisper transcription (local, no API) - FFmpeg screen + audio capture - Export to clawdbot_export/ for Mario processing Files added: - recorder.py: FFmpeg screen recording - session.py: Session/clip management - hotkeys.py: Global hotkey registration - kb_capture.py: Main application logic - gui_capture.py: Modern GUI - export.py: Merge clips, transcribe, export Docs: - docs/KB-CAPTURE.md: Full documentation Entry point: uv run kb-capture --- docs/KB-CAPTURE.md | 181 +++++++++++ pyproject.toml | 6 + src/cad_documenter/export.py | 311 +++++++++++++++++++ src/cad_documenter/gui_capture.py | 480 ++++++++++++++++++++++++++++++ src/cad_documenter/hotkeys.py | 161 ++++++++++ src/cad_documenter/kb_capture.py | 364 ++++++++++++++++++++++ src/cad_documenter/recorder.py | 257 ++++++++++++++++ src/cad_documenter/session.py | 361 ++++++++++++++++++++++ 8 files changed, 2121 insertions(+) create mode 100644 docs/KB-CAPTURE.md create mode 100644 src/cad_documenter/export.py create mode 100644 src/cad_documenter/gui_capture.py create mode 100644 src/cad_documenter/hotkeys.py create mode 100644 src/cad_documenter/kb_capture.py create mode 100644 src/cad_documenter/recorder.py create mode 100644 src/cad_documenter/session.py diff --git a/docs/KB-CAPTURE.md b/docs/KB-CAPTURE.md new file mode 100644 index 0000000..0b888df --- /dev/null +++ b/docs/KB-CAPTURE.md @@ -0,0 +1,181 @@ +# KB Capture v2 + +**Clip-based recording for engineering knowledge capture.** + +## Overview + +KB Capture is a lightweight recording tool that captures your CAD/FEM work as short clips, not one long video. Record what matters, delete mistakes, keep the good stuff. + +## Quick Start + +```bash +# Install +cd CAD-Documenter +uv sync +uv pip install customtkinter keyboard + +# Launch +uv run kb-capture +``` + +## Workflow + +### 1. Start Session +- Open KB Capture (system tray or GUI) +- Enter project name (e.g., "P04-GigaBIT-M1") +- Enter session description (e.g., "Vertical support refinement") +- Select type: **Design** (CAD) or **Analysis** (FEA) +- Click **Start Session** + +### 2. Record Clips +While working in NX/CAD: +- Press **Ctrl+Shift+R** to start recording +- Narrate what you're doing +- Say "screenshot" when you want a frame captured +- Press **Ctrl+Shift+R** again to stop + +### 3. Review Clips +After each clip: +- **Keep (K)**: Keep the clip +- **Delete (D)**: Discard the clip (bad take) +- Or just start recording again (auto-keeps previous) + +### 4. End Session +- Press **Ctrl+Shift+E** or click **End Session** +- Clips are merged and transcribed +- Exported to `clawdbot_export/` for Mario processing + +## Keyboard Shortcuts + +| Action | Shortcut | +|--------|----------| +| Start/Stop Recording | Ctrl+Shift+R | +| Keep Last Clip | Ctrl+Shift+K | +| Delete Last Clip | Ctrl+Shift+D | +| End Session | Ctrl+Shift+E | + +## Session Types + +| Type | Updates | Use For | +|------|---------|---------| +| **Design** | KB/Design/ | CAD work, component design, assembly | +| **Analysis** | KB/Analysis/ | FEA setup, mesh, BCs, results | + +## Output + +After ending a session: + +``` +sessions// +├── clips/ +│ ├── clip-001.mp4 +│ ├── clip-002.mp4 +│ └── ... +├── session.json +└── clawdbot_export/ + ├── merged.mp4 # All clips merged + ├── transcript.json # Whisper transcription + ├── frames/ # Extracted at "screenshot" triggers + │ ├── 01_00-30.png + │ └── ... + └── metadata.json # Session info for Clawdbot +``` + +## What Happens Next + +1. **Syncthing** syncs `clawdbot_export/` to Clawdbot +2. **Mario** detects new session +3. **Vision analysis** categorizes frames +4. **KB updated** with new information +5. **Slack notification** when complete + +## Tips + +### Recording +- Narrate naturally — explain what you're doing +- Say "screenshot" before important views +- Keep clips short (30s - 2min) +- It's okay to delete bad takes + +### Organization +- One session per work block (30-60 min) +- Use descriptive session names +- Match project name to your PKM folder + +### Quality +- Close unnecessary windows before recording +- Undock NX 3D viewport for clean captures +- Speak clearly for better transcription + +## Troubleshooting + +### Hotkeys not working +- Run as Administrator (Windows) +- Check for conflicts with other apps +- Try restarting KB Capture + +### Recording fails +- Ensure FFmpeg is installed: `choco install ffmpeg` +- Check disk space +- Check microphone permissions + +### No transcription +- Whisper needs ~2GB RAM for 'base' model +- Try 'tiny' model: `--whisper-model tiny` +- Check CUDA/GPU drivers for faster processing + +## Architecture + +``` +┌─────────────────────────────────────────┐ +│ KB Capture (Windows) │ +├─────────────────────────────────────────┤ +│ ┌───────────┐ ┌──────────────────┐ │ +│ │ Hotkeys │ │ GUI (optional) │ │ +│ └─────┬─────┘ └────────┬─────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌─────────────────────────────────┐ │ +│ │ Session Manager │ │ +│ │ (clips, keep/delete, merge) │ │ +│ └─────────────┬───────────────────┘ │ +│ │ │ +│ ┌─────────────┼───────────────────┐ │ +│ │ ▼ │ │ +│ │ ┌─────────────────────────┐ │ │ +│ │ │ Screen Recorder │ │ │ +│ │ │ (FFmpeg gdigrab) │ │ │ +│ │ └─────────────────────────┘ │ │ +│ │ │ │ +│ │ ┌─────────────────────────┐ │ │ +│ │ │ Whisper Transcriber │ │ │ +│ │ │ (local GPU) │ │ │ +│ │ └─────────────────────────┘ │ │ +│ └─────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────┐ │ +│ │ clawdbot_export/ │ │ +│ │ merged.mp4 + transcript.json │ │ +│ └─────────────┬───────────────────┘ │ +└────────────────┼────────────────────────┘ + │ Syncthing + ▼ +┌─────────────────────────────────────────┐ +│ Clawdbot (Mario) │ +│ Vision analysis → KB update → Notify │ +└─────────────────────────────────────────┘ +``` + +## Requirements + +- Windows 10/11 +- Python 3.12+ +- FFmpeg (`choco install ffmpeg`) +- CUDA GPU (recommended for Whisper) +- ~4GB RAM (for Whisper 'base' model) + +## Related + +- [CAD-Documenter README](../README.md) — Original project overview +- [Knowledge Base Skill](http://100.80.199.40:3000/Antoine/clawdbot-shared-skills) — How Mario processes sessions diff --git a/pyproject.toml b/pyproject.toml index b3dd238..15d8291 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,11 @@ dev = [ gui = [ "customtkinter>=5.2.0", ] +capture = [ + "customtkinter>=5.2.0", + "keyboard>=0.13.5", # Global hotkeys + "pystray>=0.19.0", # System tray (optional) +] pdf = [ "pandoc", # For PDF generation fallback ] @@ -48,6 +53,7 @@ pdf = [ [project.scripts] cad-doc = "cad_documenter.cli:main" cad-doc-gui = "cad_documenter.gui:main" +kb-capture = "cad_documenter.gui_capture:main" [project.urls] Homepage = "http://100.80.199.40:3000/Antoine/CAD-Documenter" diff --git a/src/cad_documenter/export.py b/src/cad_documenter/export.py new file mode 100644 index 0000000..d6d8452 --- /dev/null +++ b/src/cad_documenter/export.py @@ -0,0 +1,311 @@ +""" +Session Export for Clawdbot + +Merges clips, transcribes audio, and exports for Clawdbot processing. +Uses local Whisper for transcription (no API). +""" + +import json +import subprocess +import sys +from pathlib import Path +from datetime import datetime +from typing import Optional, Callable +import tempfile + +from .session import Session, SessionManager, ClipStatus + + +class SessionExporter: + """ + Export a recorded session for Clawdbot processing. + + Steps: + 1. Merge all kept clips into one video + 2. Transcribe with Whisper (local) + 3. Create metadata.json + 4. Create clawdbot_export/ folder + """ + + def __init__( + self, + session_manager: SessionManager, + whisper_model: str = "base", + on_progress: Optional[Callable[[str, float], None]] = None, + ): + self.session_manager = session_manager + self.whisper_model = whisper_model + self.on_progress = on_progress or (lambda msg, pct: print(f"[{pct:.0%}] {msg}")) + + def export(self, session_id: str) -> Path: + """ + Export a session for Clawdbot. + + Returns path to clawdbot_export/ folder. + """ + session = self.session_manager.get_session(session_id) + if not session: + raise ValueError(f"Session not found: {session_id}") + + if not session.is_finalized: + raise ValueError("Session not finalized. End the session first.") + + session_dir = self.session_manager.get_session_dir(session_id) + export_dir = session_dir / "clawdbot_export" + export_dir.mkdir(exist_ok=True) + + kept_clips = session.kept_clips + if not kept_clips: + raise ValueError("No clips to export") + + self.on_progress("Starting export...", 0.0) + + # Step 1: Merge clips + self.on_progress("Merging clips...", 0.1) + merged_path = self._merge_clips(session_dir, kept_clips, export_dir) + + # Step 2: Transcribe + self.on_progress("Transcribing audio...", 0.3) + transcript = self._transcribe(merged_path, export_dir) + + # Step 3: Extract frames at screenshot triggers + self.on_progress("Extracting frames...", 0.7) + frames_dir = export_dir / "frames" + self._extract_frames(merged_path, transcript, frames_dir) + + # Step 4: Create metadata + self.on_progress("Creating metadata...", 0.9) + self._create_metadata(session, export_dir, merged_path) + + self.on_progress("Export complete!", 1.0) + + return export_dir + + def _merge_clips( + self, + session_dir: Path, + clips: list, + export_dir: Path, + ) -> Path: + """Merge kept clips into a single video.""" + output_path = export_dir / "merged.mp4" + clips_dir = session_dir / "clips" + + if len(clips) == 1: + # Single clip - just copy + clip_path = clips_dir / clips[0].filename + import shutil + shutil.copy2(clip_path, output_path) + return output_path + + # Create concat list file + concat_file = export_dir / "concat.txt" + with open(concat_file, "w") as f: + for clip in clips: + clip_path = clips_dir / clip.filename + # FFmpeg concat needs escaped paths + escaped = str(clip_path).replace("'", "'\\''") + f.write(f"file '{escaped}'\n") + + # Merge with FFmpeg + cmd = [ + "ffmpeg", "-y", + "-f", "concat", + "-safe", "0", + "-i", str(concat_file), + "-c", "copy", + str(output_path), + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, + ) + if result.returncode != 0: + raise RuntimeError(f"FFmpeg merge failed: {result.stderr[-500:]}") + finally: + concat_file.unlink(missing_ok=True) + + return output_path + + def _transcribe(self, video_path: Path, export_dir: Path) -> dict: + """Transcribe video audio with Whisper.""" + try: + import whisper + except ImportError: + raise RuntimeError("Whisper not installed. Run: pip install openai-whisper") + + # Load model + model = whisper.load_model(self.whisper_model) + + # Transcribe + result = model.transcribe( + str(video_path), + language="en", + verbose=False, + ) + + # Save transcript + transcript_path = export_dir / "transcript.json" + with open(transcript_path, "w") as f: + json.dump(result, f, indent=2) + + return result + + def _extract_frames( + self, + video_path: Path, + transcript: dict, + frames_dir: Path, + ) -> list[Path]: + """Extract frames at 'screenshot' trigger timestamps.""" + frames_dir.mkdir(exist_ok=True) + + # Find screenshot triggers in transcript + triggers = [] + for segment in transcript.get("segments", []): + text = segment.get("text", "").lower() + if "screenshot" in text: + # Get timestamp (start of segment) + timestamp = segment.get("start", 0) + triggers.append(timestamp) + + if not triggers: + # No triggers found - extract frames at regular intervals + # Get video duration + duration = self._get_video_duration(video_path) + # Extract every 30 seconds + triggers = list(range(0, int(duration), 30)) + + # Extract frames + extracted = [] + for i, ts in enumerate(triggers): + frame_path = frames_dir / f"{i+1:02d}_{self._format_timestamp(ts)}.png" + + cmd = [ + "ffmpeg", "-y", + "-ss", str(ts), + "-i", str(video_path), + "-vframes", "1", + "-q:v", "2", + str(frame_path), + ] + + try: + subprocess.run( + cmd, + capture_output=True, + creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, + ) + if frame_path.exists(): + extracted.append(frame_path) + except Exception: + pass + + return extracted + + def _get_video_duration(self, video_path: Path) -> float: + """Get video duration in seconds.""" + cmd = [ + "ffprobe", + "-v", "quiet", + "-show_entries", "format=duration", + "-of", "json", + str(video_path), + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True) + data = json.loads(result.stdout) + return float(data["format"]["duration"]) + except: + return 300.0 # Default 5 minutes + + def _format_timestamp(self, seconds: float) -> str: + """Format seconds as MM-SS.""" + mins = int(seconds // 60) + secs = int(seconds % 60) + return f"{mins:02d}-{secs:02d}" + + def _create_metadata( + self, + session: Session, + export_dir: Path, + merged_path: Path, + ) -> None: + """Create metadata.json for Clawdbot.""" + metadata = { + "session_id": session.id, + "name": session.name, + "project": session.project, + "session_type": session.session_type.value, + "created_at": session.created_at.isoformat(), + "exported_at": datetime.now().isoformat(), + "clip_count": session.clip_count, + "total_duration": session.total_duration, + "clips": [ + { + "id": clip.id, + "duration": clip.duration_seconds, + "note": clip.note, + } + for clip in session.kept_clips + ], + "files": { + "video": "merged.mp4", + "transcript": "transcript.json", + "frames": "frames/", + }, + } + + metadata_path = export_dir / "metadata.json" + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + + +# CLI for testing +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage: python -m cad_documenter.export ") + print(" python -m cad_documenter.export --list") + sys.exit(1) + + from pathlib import Path + + # Find sessions directory + if sys.platform == "win32": + base = Path.home() / "Documents" / "KB-Capture" + else: + base = Path.home() / "kb-capture" + + manager = SessionManager(base) + + if sys.argv[1] == "--list": + sessions = manager.list_sessions() + if not sessions: + print("No sessions found.") + else: + print("Sessions:") + for s in sessions: + status = "✓" if s.is_finalized else "○" + print(f" {status} {s.id}: {s.name} ({s.clip_count} clips, {s.total_duration:.0f}s)") + else: + session_id = sys.argv[1] + + def on_progress(msg, pct): + bar = "█" * int(pct * 20) + "░" * (20 - int(pct * 20)) + print(f"\r[{bar}] {msg}", end="", flush=True) + + exporter = SessionExporter(manager, on_progress=on_progress) + + try: + export_path = exporter.export(session_id) + print(f"\n\nExported to: {export_path}") + except Exception as e: + print(f"\nError: {e}") + sys.exit(1) diff --git a/src/cad_documenter/gui_capture.py b/src/cad_documenter/gui_capture.py new file mode 100644 index 0000000..e4d9b02 --- /dev/null +++ b/src/cad_documenter/gui_capture.py @@ -0,0 +1,480 @@ +""" +KB Capture GUI + +Modern, clean interface for recording engineering sessions. +Uses CustomTkinter for a native look. +""" + +import sys +import threading +from pathlib import Path +from typing import Optional + +try: + import customtkinter as ctk + from customtkinter import CTk, CTkFrame, CTkLabel, CTkButton, CTkEntry + from customtkinter import CTkOptionMenu, CTkScrollableFrame, CTkProgressBar + HAS_CTK = True +except ImportError: + HAS_CTK = False + print("CustomTkinter not installed. Run: pip install customtkinter") + +from .kb_capture import KBCaptureApp, AppState, AppStatus +from .session import SessionType, ClipStatus + + +# Colors +COLORS = { + "idle": "#6B7280", # Gray + "session": "#10B981", # Green + "recording": "#EF4444", # Red + "preview": "#F59E0B", # Amber + "bg_dark": "#1F2937", + "bg_light": "#374151", + "text": "#F9FAFB", + "text_dim": "#9CA3AF", +} + + +class ClipCard(CTkFrame): + """A card showing a single clip.""" + + def __init__( + self, + master, + clip_id: str, + duration: float, + status: ClipStatus, + note: str = "", + on_delete: Optional[callable] = None, + **kwargs + ): + super().__init__(master, **kwargs) + + self.configure(fg_color=COLORS["bg_light"], corner_radius=8) + + # Status indicator + status_colors = { + ClipStatus.KEPT: "#10B981", + ClipStatus.PREVIEW: "#F59E0B", + ClipStatus.DELETED: "#EF4444", + ClipStatus.RECORDING: "#EF4444", + } + color = status_colors.get(status, COLORS["idle"]) + + # Layout + self.grid_columnconfigure(1, weight=1) + + # Status dot + dot = CTkLabel(self, text="●", text_color=color, font=("", 16)) + dot.grid(row=0, column=0, padx=(10, 5), pady=10) + + # Clip info + info_frame = CTkFrame(self, fg_color="transparent") + info_frame.grid(row=0, column=1, sticky="w", pady=10) + + title = CTkLabel( + info_frame, + text=clip_id, + font=("", 14, "bold"), + text_color=COLORS["text"], + ) + title.pack(anchor="w") + + subtitle = CTkLabel( + info_frame, + text=f"{duration:.1f}s" + (f" • {note}" if note else ""), + font=("", 12), + text_color=COLORS["text_dim"], + ) + subtitle.pack(anchor="w") + + # Duration + duration_label = CTkLabel( + self, + text=f"{int(duration//60)}:{int(duration%60):02d}", + font=("", 14), + text_color=COLORS["text_dim"], + ) + duration_label.grid(row=0, column=2, padx=10) + + # Delete button (only for preview/kept) + if status in (ClipStatus.PREVIEW, ClipStatus.KEPT) and on_delete: + delete_btn = CTkButton( + self, + text="🗑️", + width=30, + height=30, + fg_color="transparent", + hover_color=COLORS["bg_dark"], + command=lambda: on_delete(clip_id), + ) + delete_btn.grid(row=0, column=3, padx=(0, 10)) + + +class KBCaptureGUI: + """Main GUI window for KB Capture.""" + + def __init__(self, base_path: Path): + if not HAS_CTK: + raise RuntimeError("CustomTkinter not installed") + + # App + self.app = KBCaptureApp( + base_path=base_path, + on_status_change=self._on_status_change, + ) + + # Window + ctk.set_appearance_mode("dark") + ctk.set_default_color_theme("blue") + + self.window = CTk() + self.window.title("KB Capture") + self.window.geometry("500x600") + self.window.minsize(400, 500) + + # Build UI + self._build_ui() + + # Start app + self.app.start() + + # Cleanup on close + self.window.protocol("WM_DELETE_WINDOW", self._on_close) + + def _build_ui(self): + """Build the main UI.""" + self.window.grid_columnconfigure(0, weight=1) + self.window.grid_rowconfigure(2, weight=1) + + # === Header === + header = CTkFrame(self.window, fg_color="transparent") + header.grid(row=0, column=0, sticky="ew", padx=20, pady=(20, 10)) + header.grid_columnconfigure(1, weight=1) + + self.status_indicator = CTkLabel( + header, + text="●", + font=("", 32), + text_color=COLORS["idle"], + ) + self.status_indicator.grid(row=0, column=0, padx=(0, 10)) + + self.status_label = CTkLabel( + header, + text="Ready", + font=("", 24, "bold"), + text_color=COLORS["text"], + ) + self.status_label.grid(row=0, column=1, sticky="w") + + self.duration_label = CTkLabel( + header, + text="", + font=("", 20), + text_color=COLORS["text_dim"], + ) + self.duration_label.grid(row=0, column=2) + + # === Session Info / Start Form === + self.session_frame = CTkFrame(self.window, fg_color=COLORS["bg_light"], corner_radius=12) + self.session_frame.grid(row=1, column=0, sticky="ew", padx=20, pady=10) + self.session_frame.grid_columnconfigure(1, weight=1) + + # Project + CTkLabel(self.session_frame, text="Project:", text_color=COLORS["text_dim"]).grid( + row=0, column=0, padx=15, pady=(15, 5), sticky="w" + ) + self.project_entry = CTkEntry( + self.session_frame, + placeholder_text="P04-GigaBIT-M1", + width=250, + ) + self.project_entry.grid(row=0, column=1, padx=(0, 15), pady=(15, 5), sticky="ew") + + # Session name + CTkLabel(self.session_frame, text="Session:", text_color=COLORS["text_dim"]).grid( + row=1, column=0, padx=15, pady=5, sticky="w" + ) + self.name_entry = CTkEntry( + self.session_frame, + placeholder_text="What are you working on?", + width=250, + ) + self.name_entry.grid(row=1, column=1, padx=(0, 15), pady=5, sticky="ew") + + # Session type + CTkLabel(self.session_frame, text="Type:", text_color=COLORS["text_dim"]).grid( + row=2, column=0, padx=15, pady=5, sticky="w" + ) + self.type_menu = CTkOptionMenu( + self.session_frame, + values=["Design", "Analysis"], + width=150, + ) + self.type_menu.grid(row=2, column=1, padx=(0, 15), pady=5, sticky="w") + + # Start button + self.start_btn = CTkButton( + self.session_frame, + text="Start Session", + font=("", 14, "bold"), + height=40, + command=self._start_session, + ) + self.start_btn.grid(row=3, column=0, columnspan=2, padx=15, pady=15, sticky="ew") + + # === Clips List === + clips_header = CTkFrame(self.window, fg_color="transparent") + clips_header.grid(row=2, column=0, sticky="new", padx=20, pady=(10, 0)) + clips_header.grid_columnconfigure(0, weight=1) + + CTkLabel( + clips_header, + text="Clips", + font=("", 16, "bold"), + text_color=COLORS["text"], + ).grid(row=0, column=0, sticky="w") + + self.clips_count = CTkLabel( + clips_header, + text="0 clips • 0:00", + font=("", 14), + text_color=COLORS["text_dim"], + ) + self.clips_count.grid(row=0, column=1, sticky="e") + + self.clips_frame = CTkScrollableFrame( + self.window, + fg_color="transparent", + ) + self.clips_frame.grid(row=3, column=0, sticky="nsew", padx=20, pady=10) + self.clips_frame.grid_columnconfigure(0, weight=1) + self.window.grid_rowconfigure(3, weight=1) + + # Empty state + self.empty_label = CTkLabel( + self.clips_frame, + text="No clips yet.\nPress Ctrl+Shift+R to start recording.", + font=("", 14), + text_color=COLORS["text_dim"], + justify="center", + ) + self.empty_label.grid(row=0, column=0, pady=40) + + # === Control Bar === + controls = CTkFrame(self.window, fg_color=COLORS["bg_light"], corner_radius=0) + controls.grid(row=4, column=0, sticky="sew", pady=0) + controls.grid_columnconfigure((0, 1, 2), weight=1) + + self.record_btn = CTkButton( + controls, + text="⏺️ Record", + font=("", 14), + height=50, + fg_color=COLORS["recording"], + hover_color="#DC2626", + command=self.app.toggle_recording, + state="disabled", + ) + self.record_btn.grid(row=0, column=0, padx=5, pady=10, sticky="ew") + + self.keep_btn = CTkButton( + controls, + text="✓ Keep", + font=("", 14), + height=50, + fg_color=COLORS["session"], + hover_color="#059669", + command=lambda: self.app.keep_last_clip(), + state="disabled", + ) + self.keep_btn.grid(row=0, column=1, padx=5, pady=10, sticky="ew") + + self.end_btn = CTkButton( + controls, + text="End Session", + font=("", 14), + height=50, + fg_color=COLORS["idle"], + hover_color="#4B5563", + command=self._end_session, + state="disabled", + ) + self.end_btn.grid(row=0, column=2, padx=5, pady=10, sticky="ew") + + # Hotkey hints + hints = CTkLabel( + controls, + text="Ctrl+Shift+R: Record • K: Keep • D: Delete • E: End", + font=("", 11), + text_color=COLORS["text_dim"], + ) + hints.grid(row=1, column=0, columnspan=3, pady=(0, 10)) + + def _start_session(self): + """Start a new session.""" + project = self.project_entry.get() or "P04-GigaBIT-M1" + name = self.name_entry.get() or "Recording Session" + session_type = SessionType.DESIGN if self.type_menu.get() == "Design" else SessionType.ANALYSIS + + self.app.start_session(name, project, session_type) + + # Update UI + self.start_btn.configure(state="disabled", text="Session Active") + self.record_btn.configure(state="normal") + self.end_btn.configure(state="normal") + self.project_entry.configure(state="disabled") + self.name_entry.configure(state="disabled") + self.type_menu.configure(state="disabled") + + def _end_session(self): + """End the current session.""" + session = self.app.end_session() + + if session: + # Show summary + self.status_label.configure(text="Session Saved!") + + # Reset UI + self.start_btn.configure(state="normal", text="Start Session") + self.record_btn.configure(state="disabled") + self.keep_btn.configure(state="disabled") + self.end_btn.configure(state="disabled") + self.project_entry.configure(state="normal") + self.name_entry.configure(state="normal") + self.type_menu.configure(state="normal") + + # Clear clips + for widget in self.clips_frame.winfo_children(): + widget.destroy() + self.empty_label = CTkLabel( + self.clips_frame, + text=f"Session saved: {session.clip_count} clips, {session.total_duration:.0f}s\nReady for next session.", + font=("", 14), + text_color=COLORS["text_dim"], + justify="center", + ) + self.empty_label.grid(row=0, column=0, pady=40) + + def _on_status_change(self, status: AppStatus): + """Handle status updates from the app.""" + # Run on main thread + self.window.after(0, lambda: self._update_ui(status)) + + def _update_ui(self, status: AppStatus): + """Update UI based on status.""" + # State colors and labels + state_config = { + AppState.IDLE: (COLORS["idle"], "Ready", ""), + AppState.SESSION_ACTIVE: (COLORS["session"], "Session Active", ""), + AppState.RECORDING: (COLORS["recording"], "Recording", ""), + AppState.PREVIEW: (COLORS["preview"], "Review Clip", ""), + } + + color, label, _ = state_config.get(status.state, (COLORS["idle"], "Ready", "")) + + self.status_indicator.configure(text_color=color) + + if status.state == AppState.RECORDING: + self.status_label.configure(text=f"Recording... {status.current_clip_duration:.1f}s") + self.record_btn.configure(text="⏹️ Stop", fg_color=COLORS["recording"]) + elif status.state == AppState.PREVIEW: + self.status_label.configure(text=f"Keep or Delete? ({status.last_clip_duration:.1f}s)") + self.record_btn.configure(text="⏺️ Record", fg_color=COLORS["session"]) + self.keep_btn.configure(state="normal") + else: + self.status_label.configure(text=label) + self.record_btn.configure(text="⏺️ Record", fg_color=COLORS["recording"]) + if status.state != AppState.IDLE: + self.keep_btn.configure(state="disabled") + + # Duration + if status.state == AppState.RECORDING: + secs = int(status.current_clip_duration) + self.duration_label.configure(text=f"{secs//60}:{secs%60:02d}") + elif status.total_duration > 0: + secs = int(status.total_duration) + self.duration_label.configure(text=f"Total: {secs//60}:{secs%60:02d}") + else: + self.duration_label.configure(text="") + + # Clips count + self.clips_count.configure( + text=f"{status.clip_count} clips • {int(status.total_duration//60)}:{int(status.total_duration%60):02d}" + ) + + # Update clips list + self._update_clips_list() + + def _update_clips_list(self): + """Update the clips list display.""" + session = self.app.session_manager.current_session + if not session: + return + + # Clear existing + for widget in self.clips_frame.winfo_children(): + widget.destroy() + + if not session.clips: + self.empty_label = CTkLabel( + self.clips_frame, + text="No clips yet.\nPress Ctrl+Shift+R to start recording.", + font=("", 14), + text_color=COLORS["text_dim"], + justify="center", + ) + self.empty_label.grid(row=0, column=0, pady=40) + return + + # Add clip cards (reversed for newest first) + for i, clip in enumerate(reversed(session.clips)): + if clip.status != ClipStatus.DELETED: + card = ClipCard( + self.clips_frame, + clip_id=clip.id, + duration=clip.duration_seconds, + status=clip.status, + note=clip.note, + on_delete=self._delete_clip, + ) + card.grid(row=i, column=0, sticky="ew", pady=2) + + def _delete_clip(self, clip_id: str): + """Delete a clip.""" + self.app.session_manager.delete_clip(clip_id) + self._update_clips_list() + + def _on_close(self): + """Handle window close.""" + self.app.stop() + self.window.destroy() + + def run(self): + """Run the GUI main loop.""" + self.window.mainloop() + + +def main(): + """Entry point for GUI.""" + if not HAS_CTK: + print("Error: CustomTkinter not installed") + print("Install with: pip install customtkinter") + sys.exit(1) + + # Default to user's documents folder + if sys.platform == "win32": + base = Path.home() / "Documents" / "KB-Capture" + else: + base = Path.home() / "kb-capture" + + base.mkdir(parents=True, exist_ok=True) + + gui = KBCaptureGUI(base_path=base) + gui.run() + + +if __name__ == "__main__": + main() diff --git a/src/cad_documenter/hotkeys.py b/src/cad_documenter/hotkeys.py new file mode 100644 index 0000000..5f75743 --- /dev/null +++ b/src/cad_documenter/hotkeys.py @@ -0,0 +1,161 @@ +""" +Global Hotkey Manager + +Registers global hotkeys for recording control. +Works across all applications. +""" + +import threading +from typing import Callable, Dict, Optional +import sys + +# Use keyboard library for cross-platform hotkeys +try: + import keyboard + HAS_KEYBOARD = True +except ImportError: + HAS_KEYBOARD = False + print("Warning: 'keyboard' library not installed. Hotkeys disabled.") + + +class HotkeyManager: + """ + Manages global hotkeys for KB Capture. + + Default hotkeys: + Ctrl+Shift+R: Toggle recording + Ctrl+Shift+K: Keep last clip + Ctrl+Shift+D: Delete last clip + Ctrl+Shift+E: End session + Escape: Cancel (when in certain states) + """ + + DEFAULT_HOTKEYS = { + "toggle_recording": "ctrl+shift+r", + "keep_clip": "ctrl+shift+k", + "delete_clip": "ctrl+shift+d", + "end_session": "ctrl+shift+e", + } + + def __init__(self): + self.callbacks: Dict[str, Callable] = {} + self.hotkeys: Dict[str, str] = self.DEFAULT_HOTKEYS.copy() + self.registered: Dict[str, bool] = {} + self.enabled = HAS_KEYBOARD + + def set_callback(self, action: str, callback: Callable) -> None: + """Set callback for an action.""" + self.callbacks[action] = callback + + def register_all(self) -> bool: + """Register all hotkeys.""" + if not self.enabled: + return False + + for action, hotkey in self.hotkeys.items(): + if action in self.callbacks: + try: + keyboard.add_hotkey( + hotkey, + self._create_handler(action), + suppress=False, # Don't block the key + ) + self.registered[action] = True + except Exception as e: + print(f"Failed to register hotkey {hotkey}: {e}") + self.registered[action] = False + + return all(self.registered.values()) + + def _create_handler(self, action: str) -> Callable: + """Create a handler that calls the callback in a thread.""" + def handler(): + if action in self.callbacks: + # Run callback in thread to avoid blocking + threading.Thread( + target=self.callbacks[action], + daemon=True, + ).start() + return handler + + def unregister_all(self) -> None: + """Unregister all hotkeys.""" + if not self.enabled: + return + + for action, hotkey in self.hotkeys.items(): + if self.registered.get(action): + try: + keyboard.remove_hotkey(hotkey) + except: + pass + self.registered[action] = False + + def set_hotkey(self, action: str, hotkey: str) -> None: + """Change hotkey for an action.""" + if action in self.hotkeys: + # Unregister old hotkey + if self.registered.get(action) and self.enabled: + try: + keyboard.remove_hotkey(self.hotkeys[action]) + except: + pass + + self.hotkeys[action] = hotkey + + # Register new hotkey + if action in self.callbacks and self.enabled: + try: + keyboard.add_hotkey( + hotkey, + self._create_handler(action), + suppress=False, + ) + self.registered[action] = True + except Exception as e: + print(f"Failed to register hotkey {hotkey}: {e}") + self.registered[action] = False + + def get_hotkey_display(self, action: str) -> str: + """Get display-friendly hotkey string.""" + hotkey = self.hotkeys.get(action, "") + return hotkey.replace("+", " + ").title() + + +# Quick test +if __name__ == "__main__": + manager = HotkeyManager() + + def on_toggle(): + print("Toggle recording!") + + def on_keep(): + print("Keep clip!") + + def on_delete(): + print("Delete clip!") + + def on_end(): + print("End session!") + + manager.set_callback("toggle_recording", on_toggle) + manager.set_callback("keep_clip", on_keep) + manager.set_callback("delete_clip", on_delete) + manager.set_callback("end_session", on_end) + + if manager.register_all(): + print("Hotkeys registered! Try:") + print(" Ctrl+Shift+R: Toggle recording") + print(" Ctrl+Shift+K: Keep clip") + print(" Ctrl+Shift+D: Delete clip") + print(" Ctrl+Shift+E: End session") + print("\nPress Ctrl+C to exit") + + try: + keyboard.wait() + except KeyboardInterrupt: + pass + + manager.unregister_all() + else: + print("Failed to register hotkeys") diff --git a/src/cad_documenter/kb_capture.py b/src/cad_documenter/kb_capture.py new file mode 100644 index 0000000..72ebe1f --- /dev/null +++ b/src/cad_documenter/kb_capture.py @@ -0,0 +1,364 @@ +""" +KB Capture - Knowledge Base Recording Tool + +Main application that ties together: +- Screen recording +- Session/clip management +- Hotkey control +- System tray integration +- GUI interface +""" + +import sys +import threading +import time +from pathlib import Path +from typing import Optional, Callable +from dataclasses import dataclass +from enum import Enum + +from .recorder import ScreenRecorder, RecordingConfig +from .session import SessionManager, Session, Clip, ClipStatus, SessionType +from .hotkeys import HotkeyManager + + +class AppState(Enum): + """Application state machine.""" + IDLE = "idle" # No session, ready to start + SESSION_ACTIVE = "session" # Session started, not recording + RECORDING = "recording" # Currently recording a clip + PREVIEW = "preview" # Clip just recorded, awaiting decision + + +@dataclass +class AppStatus: + """Current application status for UI updates.""" + state: AppState + session_name: Optional[str] = None + project: Optional[str] = None + session_type: Optional[SessionType] = None + clip_count: int = 0 + total_duration: float = 0.0 + current_clip_duration: float = 0.0 + last_clip_duration: float = 0.0 + message: str = "" + + +class KBCaptureApp: + """ + Main KB Capture application. + + Controls recording flow: + 1. Start session (project, name, type) + 2. Toggle recording to create clips + 3. Keep or delete clips + 4. End session to export + """ + + def __init__( + self, + base_path: Path, + on_status_change: Optional[Callable[[AppStatus], None]] = None, + ): + self.base_path = Path(base_path) + self.on_status_change = on_status_change or (lambda x: None) + + # Components + self.session_manager = SessionManager(self.base_path) + self.recorder = ScreenRecorder(on_status=self._log) + self.hotkeys = HotkeyManager() + + # State + self.state = AppState.IDLE + self._recording_start_time: Optional[float] = None + self._duration_thread: Optional[threading.Thread] = None + self._running = False + + # Setup hotkeys + self.hotkeys.set_callback("toggle_recording", self.toggle_recording) + self.hotkeys.set_callback("keep_clip", self.keep_last_clip) + self.hotkeys.set_callback("delete_clip", self.delete_last_clip) + self.hotkeys.set_callback("end_session", self.end_session) + + def _log(self, message: str) -> None: + """Log a message and update status.""" + print(f"[KB Capture] {message}") + self._update_status(message=message) + + def _update_status(self, message: str = "") -> None: + """Update status and notify listeners.""" + session = self.session_manager.current_session + + status = AppStatus( + state=self.state, + session_name=session.name if session else None, + project=session.project if session else None, + session_type=session.session_type if session else None, + clip_count=session.clip_count if session else 0, + total_duration=session.total_duration if session else 0.0, + current_clip_duration=self.recorder.get_duration() if self.state == AppState.RECORDING else 0.0, + last_clip_duration=self._get_last_clip_duration(), + message=message, + ) + + self.on_status_change(status) + + def _get_last_clip_duration(self) -> float: + """Get duration of the last clip.""" + session = self.session_manager.current_session + if session and session.clips: + return session.clips[-1].duration_seconds + return 0.0 + + def _start_duration_thread(self) -> None: + """Start thread to update duration while recording.""" + self._running = True + + def update_loop(): + while self._running and self.state == AppState.RECORDING: + self._update_status() + time.sleep(0.5) + + self._duration_thread = threading.Thread(target=update_loop, daemon=True) + self._duration_thread.start() + + def _stop_duration_thread(self) -> None: + """Stop duration update thread.""" + self._running = False + if self._duration_thread: + self._duration_thread.join(timeout=1) + self._duration_thread = None + + # === Public API === + + def start(self) -> None: + """Start the application (register hotkeys).""" + self.hotkeys.register_all() + self._log("KB Capture started. Hotkeys active.") + + def stop(self) -> None: + """Stop the application.""" + self.hotkeys.unregister_all() + self._stop_duration_thread() + + if self.state == AppState.RECORDING: + self.recorder.stop() + + self._log("KB Capture stopped.") + + def start_session( + self, + name: str, + project: str, + session_type: SessionType = SessionType.DESIGN, + ) -> Session: + """Start a new recording session.""" + if self.state != AppState.IDLE: + raise RuntimeError("Session already active") + + session = self.session_manager.start_session(name, project, session_type) + self.state = AppState.SESSION_ACTIVE + + self._log(f"Session started: {name}") + self._update_status() + + return session + + def toggle_recording(self) -> None: + """Toggle recording state (hotkey handler).""" + if self.state == AppState.IDLE: + self._log("No session active. Start a session first.") + return + + if self.state == AppState.RECORDING: + self._stop_recording() + else: + self._start_recording() + + def _start_recording(self) -> None: + """Start recording a new clip.""" + if self.state not in (AppState.SESSION_ACTIVE, AppState.PREVIEW): + return + + # Auto-keep any clip in preview + if self.state == AppState.PREVIEW: + self.session_manager.keep_last_clip() + + # Start new clip + clip, clip_path = self.session_manager.start_clip() + + config = RecordingConfig( + output_path=clip_path, + framerate=30, + ) + + if self.recorder.start(config): + self.state = AppState.RECORDING + self._start_duration_thread() + self._log(f"Recording clip {clip.id}...") + else: + self._log("Failed to start recording") + + def _stop_recording(self) -> None: + """Stop recording current clip.""" + if self.state != AppState.RECORDING: + return + + self._stop_duration_thread() + duration = self.recorder.get_duration() + output = self.recorder.stop() + + if output and output.exists(): + self.session_manager.end_clip(duration) + self.state = AppState.PREVIEW + self._log(f"Clip recorded: {duration:.1f}s - Keep (K) or Delete (D)?") + else: + self._log("Recording failed - no output") + self.state = AppState.SESSION_ACTIVE + + self._update_status() + + def keep_last_clip(self, note: str = "") -> Optional[Clip]: + """Keep the last recorded clip.""" + clip = self.session_manager.keep_last_clip(note) + if clip: + self.state = AppState.SESSION_ACTIVE + self._log(f"Kept clip: {clip.id}") + self._update_status() + return clip + + def delete_last_clip(self) -> Optional[Clip]: + """Delete the last recorded clip.""" + clip = self.session_manager.delete_last_clip() + if clip: + self.state = AppState.SESSION_ACTIVE + self._log(f"Deleted clip: {clip.id}") + self._update_status() + return clip + + def end_session(self) -> Optional[Session]: + """End current session and prepare for export.""" + if self.state == AppState.IDLE: + return None + + # Stop recording if active + if self.state == AppState.RECORDING: + self._stop_recording() + + # Keep any preview clips + if self.state == AppState.PREVIEW: + self.session_manager.keep_last_clip() + + session = self.session_manager.end_session() + self.state = AppState.IDLE + + self._log(f"Session ended: {session.clip_count} clips, {session.total_duration:.1f}s total") + self._update_status() + + return session + + def cancel_session(self) -> None: + """Cancel current session and delete all clips.""" + if self.state == AppState.RECORDING: + self._stop_duration_thread() + self.recorder.stop() + + self.session_manager.cancel_session() + self.state = AppState.IDLE + + self._log("Session cancelled") + self._update_status() + + def get_status(self) -> AppStatus: + """Get current application status.""" + session = self.session_manager.current_session + + return AppStatus( + state=self.state, + session_name=session.name if session else None, + project=session.project if session else None, + session_type=session.session_type if session else None, + clip_count=session.clip_count if session else 0, + total_duration=session.total_duration if session else 0.0, + current_clip_duration=self.recorder.get_duration() if self.state == AppState.RECORDING else 0.0, + last_clip_duration=self._get_last_clip_duration(), + ) + + def list_sessions(self) -> list[Session]: + """List all recorded sessions.""" + return self.session_manager.list_sessions() + + def get_session_dir(self, session_id: str) -> Path: + """Get session directory for export.""" + return self.session_manager.get_session_dir(session_id) + + +# CLI for testing +if __name__ == "__main__": + import tempfile + + def on_status(status: AppStatus): + state_icons = { + AppState.IDLE: "⚪", + AppState.SESSION_ACTIVE: "🟢", + AppState.RECORDING: "🔴", + AppState.PREVIEW: "🟡", + } + icon = state_icons.get(status.state, "⚪") + + print(f"\n{icon} State: {status.state.value}") + if status.session_name: + print(f" Session: {status.session_name} ({status.project})") + print(f" Clips: {status.clip_count} | Duration: {status.total_duration:.1f}s") + if status.current_clip_duration > 0: + print(f" Recording: {status.current_clip_duration:.1f}s") + if status.message: + print(f" → {status.message}") + + with tempfile.TemporaryDirectory() as tmpdir: + app = KBCaptureApp( + base_path=Path(tmpdir), + on_status_change=on_status, + ) + + print("\n=== KB Capture Test ===") + print("Commands:") + print(" s - Start session") + print(" r - Toggle recording") + print(" k - Keep last clip") + print(" d - Delete last clip") + print(" e - End session") + print(" c - Cancel session") + print(" q - Quit") + print() + + app.start() + + try: + while True: + cmd = input("> ").strip().lower() + + if cmd == "s": + name = input("Session name: ").strip() or "Test Session" + project = input("Project: ").strip() or "P04-GigaBIT-M1" + app.start_session(name, project) + elif cmd == "r": + app.toggle_recording() + elif cmd == "k": + app.keep_last_clip() + elif cmd == "d": + app.delete_last_clip() + elif cmd == "e": + app.end_session() + elif cmd == "c": + app.cancel_session() + elif cmd == "q": + break + else: + print("Unknown command") + + except KeyboardInterrupt: + pass + + app.stop() + print("\nGoodbye!") diff --git a/src/cad_documenter/recorder.py b/src/cad_documenter/recorder.py new file mode 100644 index 0000000..4478d48 --- /dev/null +++ b/src/cad_documenter/recorder.py @@ -0,0 +1,257 @@ +""" +Screen + Audio Recorder using FFmpeg + +Records screen and microphone to video file. +Supports Windows (gdigrab) and Linux (x11grab). +""" + +import subprocess +import threading +import time +import sys +from pathlib import Path +from dataclasses import dataclass +from typing import Optional, Callable +import json + + +@dataclass +class RecordingConfig: + """Recording configuration.""" + output_path: Path + framerate: int = 30 + audio_device: Optional[str] = None # None = default mic + video_codec: str = "libx264" + audio_codec: str = "aac" + crf: int = 23 # Quality (lower = better, 18-28 typical) + preset: str = "ultrafast" # Encoding speed + capture_region: Optional[tuple[int, int, int, int]] = None # x, y, w, h + + +class ScreenRecorder: + """ + FFmpeg-based screen recorder with audio. + + Usage: + recorder = ScreenRecorder() + recorder.start(config) + # ... recording ... + recorder.stop() + """ + + def __init__(self, on_status: Optional[Callable[[str], None]] = None): + self.process: Optional[subprocess.Popen] = None + self.is_recording = False + self.start_time: Optional[float] = None + self.output_path: Optional[Path] = None + self.on_status = on_status or (lambda x: None) + self._monitor_thread: Optional[threading.Thread] = None + + def _get_platform_args(self, config: RecordingConfig) -> list[str]: + """Get platform-specific FFmpeg input arguments.""" + + if sys.platform == "win32": + # Windows: gdigrab for screen, dshow for audio + args = [ + "-f", "gdigrab", + "-framerate", str(config.framerate), + ] + + if config.capture_region: + x, y, w, h = config.capture_region + args.extend([ + "-offset_x", str(x), + "-offset_y", str(y), + "-video_size", f"{w}x{h}", + ]) + + args.extend(["-i", "desktop"]) + + # Add audio input + if config.audio_device: + args.extend([ + "-f", "dshow", + "-i", f"audio={config.audio_device}", + ]) + else: + # Try to find default microphone + args.extend([ + "-f", "dshow", + "-i", "audio=Microphone Array", # Common default + ]) + + else: + # Linux: x11grab for screen, pulse for audio + display = ":0.0" + args = [ + "-f", "x11grab", + "-framerate", str(config.framerate), + ] + + if config.capture_region: + x, y, w, h = config.capture_region + args.extend(["-video_size", f"{w}x{h}"]) + display = f":0.0+{x},{y}" + + args.extend(["-i", display]) + + # Add audio (PulseAudio) + args.extend([ + "-f", "pulse", + "-i", "default", + ]) + + return args + + def start(self, config: RecordingConfig) -> bool: + """Start recording.""" + if self.is_recording: + self.on_status("Already recording") + return False + + self.output_path = config.output_path + self.output_path.parent.mkdir(parents=True, exist_ok=True) + + # Build FFmpeg command + cmd = ["ffmpeg", "-y"] # -y to overwrite + + # Platform-specific inputs + cmd.extend(self._get_platform_args(config)) + + # Output settings + cmd.extend([ + "-c:v", config.video_codec, + "-preset", config.preset, + "-crf", str(config.crf), + "-c:a", config.audio_codec, + "-b:a", "128k", + str(self.output_path), + ]) + + try: + # Start FFmpeg process + self.process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, + ) + + self.is_recording = True + self.start_time = time.time() + self.on_status(f"Recording started: {self.output_path.name}") + + # Start monitor thread + self._monitor_thread = threading.Thread(target=self._monitor_process, daemon=True) + self._monitor_thread.start() + + return True + + except FileNotFoundError: + self.on_status("FFmpeg not found. Please install FFmpeg.") + return False + except Exception as e: + self.on_status(f"Failed to start recording: {e}") + return False + + def _monitor_process(self): + """Monitor FFmpeg process for errors.""" + if self.process: + stderr = self.process.stderr.read() if self.process.stderr else b"" + if self.process.returncode and self.process.returncode != 0: + self.on_status(f"Recording error: {stderr.decode()[-200:]}") + + def stop(self) -> Optional[Path]: + """Stop recording and return output path.""" + if not self.is_recording or not self.process: + return None + + try: + # Send 'q' to FFmpeg to stop gracefully + if self.process.stdin: + self.process.stdin.write(b"q") + self.process.stdin.flush() + + # Wait for process to finish (with timeout) + self.process.wait(timeout=10) + + except subprocess.TimeoutExpired: + self.process.terminate() + self.process.wait(timeout=5) + except Exception as e: + self.on_status(f"Error stopping recording: {e}") + self.process.terminate() + + self.is_recording = False + duration = time.time() - self.start_time if self.start_time else 0 + self.on_status(f"Recording stopped: {duration:.1f}s") + + return self.output_path if self.output_path and self.output_path.exists() else None + + def get_duration(self) -> float: + """Get current recording duration in seconds.""" + if self.is_recording and self.start_time: + return time.time() - self.start_time + return 0.0 + + @staticmethod + def list_audio_devices() -> list[str]: + """List available audio input devices (Windows only).""" + if sys.platform != "win32": + return ["default"] + + try: + result = subprocess.run( + ["ffmpeg", "-list_devices", "true", "-f", "dshow", "-i", "dummy"], + capture_output=True, + text=True, + creationflags=subprocess.CREATE_NO_WINDOW, + ) + + # Parse audio devices from stderr + devices = [] + in_audio = False + for line in result.stderr.split("\n"): + if "DirectShow audio devices" in line: + in_audio = True + elif "DirectShow video devices" in line: + in_audio = False + elif in_audio and '"' in line: + # Extract device name between quotes + start = line.find('"') + 1 + end = line.rfind('"') + if start < end: + devices.append(line[start:end]) + + return devices if devices else ["default"] + + except Exception: + return ["default"] + + +# Quick test +if __name__ == "__main__": + def status(msg): + print(f"[STATUS] {msg}") + + recorder = ScreenRecorder(on_status=status) + + print("Available audio devices:", recorder.list_audio_devices()) + + config = RecordingConfig( + output_path=Path("test_recording.mp4"), + framerate=30, + ) + + print("Starting recording... (Ctrl+C to stop)") + if recorder.start(config): + try: + while True: + time.sleep(1) + print(f"Recording: {recorder.get_duration():.1f}s") + except KeyboardInterrupt: + pass + + output = recorder.stop() + print(f"Saved to: {output}") diff --git a/src/cad_documenter/session.py b/src/cad_documenter/session.py new file mode 100644 index 0000000..92036fc --- /dev/null +++ b/src/cad_documenter/session.py @@ -0,0 +1,361 @@ +""" +Session Manager for KB Capture + +Manages recording sessions with multiple clips. +Clips can be kept or deleted before finalizing. +""" + +import json +import shutil +from pathlib import Path +from dataclasses import dataclass, field, asdict +from datetime import datetime +from typing import Optional, List +from enum import Enum +import uuid + + +class ClipStatus(Enum): + RECORDING = "recording" + PREVIEW = "preview" # Just recorded, awaiting decision + KEPT = "kept" + DELETED = "deleted" + + +class SessionType(Enum): + DESIGN = "design" # CAD/Design KB + ANALYSIS = "analysis" # FEA/Analysis KB + + +@dataclass +class Clip: + """A single recording clip within a session.""" + id: str + filename: str + start_time: datetime + end_time: Optional[datetime] = None + duration_seconds: float = 0.0 + status: ClipStatus = ClipStatus.RECORDING + note: str = "" # Optional quick note + + def to_dict(self) -> dict: + return { + "id": self.id, + "filename": self.filename, + "start_time": self.start_time.isoformat(), + "end_time": self.end_time.isoformat() if self.end_time else None, + "duration_seconds": self.duration_seconds, + "status": self.status.value, + "note": self.note, + } + + @classmethod + def from_dict(cls, data: dict) -> "Clip": + return cls( + id=data["id"], + filename=data["filename"], + start_time=datetime.fromisoformat(data["start_time"]), + end_time=datetime.fromisoformat(data["end_time"]) if data.get("end_time") else None, + duration_seconds=data.get("duration_seconds", 0.0), + status=ClipStatus(data.get("status", "kept")), + note=data.get("note", ""), + ) + + +@dataclass +class Session: + """A recording session containing multiple clips.""" + id: str + name: str + project: str + session_type: SessionType + created_at: datetime + clips: List[Clip] = field(default_factory=list) + is_finalized: bool = False + + @property + def total_duration(self) -> float: + """Total duration of kept clips.""" + return sum(c.duration_seconds for c in self.clips if c.status == ClipStatus.KEPT) + + @property + def kept_clips(self) -> List[Clip]: + """Clips marked as kept.""" + return [c for c in self.clips if c.status == ClipStatus.KEPT] + + @property + def clip_count(self) -> int: + """Number of kept clips.""" + return len(self.kept_clips) + + def to_dict(self) -> dict: + return { + "id": self.id, + "name": self.name, + "project": self.project, + "session_type": self.session_type.value, + "created_at": self.created_at.isoformat(), + "clips": [c.to_dict() for c in self.clips], + "is_finalized": self.is_finalized, + } + + @classmethod + def from_dict(cls, data: dict) -> "Session": + return cls( + id=data["id"], + name=data["name"], + project=data["project"], + session_type=SessionType(data.get("session_type", "design")), + created_at=datetime.fromisoformat(data["created_at"]), + clips=[Clip.from_dict(c) for c in data.get("clips", [])], + is_finalized=data.get("is_finalized", False), + ) + + +class SessionManager: + """ + Manages recording sessions and clips. + + Directory structure: + sessions/ + ├── / + │ ├── session.json # Session metadata + │ ├── clips/ + │ │ ├── clip-001.mp4 + │ │ ├── clip-002.mp4 + │ │ └── ... + │ └── export/ # Final export for Clawdbot + │ ├── merged.mp4 + │ ├── transcript.json + │ └── metadata.json + """ + + def __init__(self, base_path: Path): + self.base_path = Path(base_path) + self.sessions_dir = self.base_path / "sessions" + self.sessions_dir.mkdir(parents=True, exist_ok=True) + + self.current_session: Optional[Session] = None + self.current_clip: Optional[Clip] = None + + def start_session( + self, + name: str, + project: str, + session_type: SessionType = SessionType.DESIGN, + ) -> Session: + """Start a new recording session.""" + session_id = datetime.now().strftime("%Y%m%d-%H%M%S") + + session = Session( + id=session_id, + name=name, + project=project, + session_type=session_type, + created_at=datetime.now(), + ) + + # Create session directory + session_dir = self.sessions_dir / session_id + session_dir.mkdir(parents=True, exist_ok=True) + (session_dir / "clips").mkdir(exist_ok=True) + + self.current_session = session + self._save_session() + + return session + + def start_clip(self) -> tuple[Clip, Path]: + """ + Start a new clip in current session. + Returns clip object and path for recording. + """ + if not self.current_session: + raise RuntimeError("No active session") + + clip_num = len(self.current_session.clips) + 1 + clip_id = f"clip-{clip_num:03d}" + filename = f"{clip_id}.mp4" + + clip = Clip( + id=clip_id, + filename=filename, + start_time=datetime.now(), + status=ClipStatus.RECORDING, + ) + + self.current_session.clips.append(clip) + self.current_clip = clip + self._save_session() + + clip_path = self.sessions_dir / self.current_session.id / "clips" / filename + return clip, clip_path + + def end_clip(self, duration: float) -> Clip: + """End current clip, move to preview state.""" + if not self.current_clip: + raise RuntimeError("No active clip") + + self.current_clip.end_time = datetime.now() + self.current_clip.duration_seconds = duration + self.current_clip.status = ClipStatus.PREVIEW + + clip = self.current_clip + self.current_clip = None + self._save_session() + + return clip + + def keep_clip(self, clip_id: str, note: str = "") -> None: + """Mark a clip as kept.""" + if not self.current_session: + raise RuntimeError("No active session") + + for clip in self.current_session.clips: + if clip.id == clip_id: + clip.status = ClipStatus.KEPT + clip.note = note + break + + self._save_session() + + def delete_clip(self, clip_id: str) -> None: + """Mark a clip as deleted and remove file.""" + if not self.current_session: + raise RuntimeError("No active session") + + for clip in self.current_session.clips: + if clip.id == clip_id: + clip.status = ClipStatus.DELETED + + # Delete the actual file + clip_path = self.sessions_dir / self.current_session.id / "clips" / clip.filename + if clip_path.exists(): + clip_path.unlink() + break + + self._save_session() + + def keep_last_clip(self, note: str = "") -> Optional[Clip]: + """Keep the most recent clip in preview state.""" + if not self.current_session: + return None + + for clip in reversed(self.current_session.clips): + if clip.status == ClipStatus.PREVIEW: + self.keep_clip(clip.id, note) + return clip + return None + + def delete_last_clip(self) -> Optional[Clip]: + """Delete the most recent clip in preview state.""" + if not self.current_session: + return None + + for clip in reversed(self.current_session.clips): + if clip.status == ClipStatus.PREVIEW: + self.delete_clip(clip.id) + return clip + return None + + def end_session(self) -> Session: + """ + End session and prepare for export. + Clips still in preview are auto-kept. + """ + if not self.current_session: + raise RuntimeError("No active session") + + # Auto-keep any clips still in preview + for clip in self.current_session.clips: + if clip.status == ClipStatus.PREVIEW: + clip.status = ClipStatus.KEPT + + self.current_session.is_finalized = True + self._save_session() + + session = self.current_session + self.current_session = None + + return session + + def cancel_session(self) -> None: + """Cancel session and delete all files.""" + if not self.current_session: + return + + session_dir = self.sessions_dir / self.current_session.id + if session_dir.exists(): + shutil.rmtree(session_dir) + + self.current_session = None + self.current_clip = None + + def get_session(self, session_id: str) -> Optional[Session]: + """Load a session by ID.""" + session_file = self.sessions_dir / session_id / "session.json" + if session_file.exists(): + with open(session_file) as f: + return Session.from_dict(json.load(f)) + return None + + def list_sessions(self) -> List[Session]: + """List all sessions.""" + sessions = [] + for session_dir in sorted(self.sessions_dir.iterdir(), reverse=True): + if session_dir.is_dir(): + session_file = session_dir / "session.json" + if session_file.exists(): + with open(session_file) as f: + sessions.append(Session.from_dict(json.load(f))) + return sessions + + def get_session_dir(self, session_id: str) -> Path: + """Get session directory path.""" + return self.sessions_dir / session_id + + def _save_session(self) -> None: + """Save current session to disk.""" + if not self.current_session: + return + + session_file = self.sessions_dir / self.current_session.id / "session.json" + with open(session_file, "w") as f: + json.dump(self.current_session.to_dict(), f, indent=2) + + +# Quick test +if __name__ == "__main__": + from pathlib import Path + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + manager = SessionManager(Path(tmpdir)) + + # Start session + session = manager.start_session( + name="Test Session", + project="P04-GigaBIT-M1", + session_type=SessionType.DESIGN, + ) + print(f"Started session: {session.id}") + + # Record some clips + clip1, path1 = manager.start_clip() + print(f"Recording clip 1 to: {path1}") + manager.end_clip(duration=45.5) + manager.keep_clip(clip1.id, note="Stage 2 joint") + + clip2, path2 = manager.start_clip() + print(f"Recording clip 2 to: {path2}") + manager.end_clip(duration=30.0) + manager.delete_clip(clip2.id) # Oops, bad take + + clip3, path3 = manager.start_clip() + print(f"Recording clip 3 to: {path3}") + manager.end_clip(duration=60.0) + manager.keep_last_clip() + + # End session + session = manager.end_session() + print(f"Session ended: {session.clip_count} clips, {session.total_duration:.1f}s total")