From 9b24478f041295167b89b1fe6e6a402e01a17373 Mon Sep 17 00:00:00 2001 From: Mario Lavoie Date: Mon, 9 Feb 2026 22:14:34 +0000 Subject: [PATCH] Simplify KB Capture to match Voice Recorder pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major simplification: - Removed clips concept (no keep/delete segments) - Single continuous recording per session with pause/resume - Matches Voice Recorder UX pattern Antoine knows Flow: Start Session → Record → Pause → Resume → Stop → Transcribe → Done Features: - Record/Pause/Resume/Stop controls - Session types: Design / Analysis - Auto-transcribe with Whisper on stop - Finds 'screenshot' triggers in transcript for Clawdbot - Simple dark theme UI matching Voice Recorder Removed: - export.py (transcription now inline) - hotkeys.py (not needed for MVP) - Clip management --- src/cad_documenter/export.py | 338 -------- src/cad_documenter/gui_capture.py | 1339 +++++++++-------------------- src/cad_documenter/hotkeys.py | 161 ---- src/cad_documenter/kb_capture.py | 422 ++++----- src/cad_documenter/recorder.py | 270 +++--- src/cad_documenter/session.py | 347 ++------ 6 files changed, 786 insertions(+), 2091 deletions(-) delete mode 100644 src/cad_documenter/export.py delete mode 100644 src/cad_documenter/hotkeys.py diff --git a/src/cad_documenter/export.py b/src/cad_documenter/export.py deleted file mode 100644 index d12b6eb..0000000 --- a/src/cad_documenter/export.py +++ /dev/null @@ -1,338 +0,0 @@ -""" -Session Export for Clawdbot - -Merges clips, transcribes audio, and exports for Clawdbot processing. -Uses local Whisper for transcription (no API). -""" - -import json -import subprocess -import sys -from pathlib import Path -from datetime import datetime -from typing import Optional, Callable -import tempfile - -from .session import Session, SessionManager, ClipStatus - - -class SessionExporter: - """ - Export a recorded session for Clawdbot processing. - - Steps: - 1. Merge all kept clips into one video - 2. Transcribe with Whisper (local) - 3. Create metadata.json - 4. Create clawdbot_export/ folder - """ - - def __init__( - self, - session_manager: SessionManager, - whisper_model: str = "base", - on_progress: Optional[Callable[[str, float], None]] = None, - ): - self.session_manager = session_manager - self.whisper_model = whisper_model - self.on_progress = on_progress or (lambda msg, pct: print(f"[{pct:.0%}] {msg}")) - - def export(self, session_id: str) -> Path: - """ - Export a session for Clawdbot. - - Returns path to clawdbot_export/ folder. - """ - session = self.session_manager.get_session(session_id) - if not session: - raise ValueError(f"Session not found: {session_id}") - - if not session.is_finalized: - raise ValueError("Session not finalized. End the session first.") - - session_dir = self.session_manager.get_session_dir(session_id) - export_dir = session_dir / "clawdbot_export" - export_dir.mkdir(exist_ok=True) - - kept_clips = session.kept_clips - if not kept_clips: - raise ValueError("No clips to export") - - self.on_progress("Starting export...", 0.0) - - # Step 1: Merge clips - self.on_progress("Merging clips...", 0.1) - merged_path = self._merge_clips(session_dir, kept_clips, export_dir) - - # Step 2: Transcribe (uses local GPU Whisper) - self.on_progress("Transcribing audio...", 0.3) - transcript = self._transcribe(merged_path, export_dir) - - # Step 3: Create metadata - # Note: Frame extraction is done by Clawdbot using the knowledge-base skill - self.on_progress("Creating metadata...", 0.8) - self._create_metadata(session, export_dir, merged_path, transcript) - - self.on_progress("Export complete!", 1.0) - - return export_dir - - def _merge_clips( - self, - session_dir: Path, - clips: list, - export_dir: Path, - ) -> Path: - """Merge kept clips into a single video.""" - output_path = export_dir / "merged.mp4" - clips_dir = session_dir / "clips" - - if len(clips) == 1: - # Single clip - just copy - clip_path = clips_dir / clips[0].filename - import shutil - shutil.copy2(clip_path, output_path) - return output_path - - # Create concat list file - concat_file = export_dir / "concat.txt" - with open(concat_file, "w") as f: - for clip in clips: - clip_path = clips_dir / clip.filename - # FFmpeg concat needs escaped paths - escaped = str(clip_path).replace("'", "'\\''") - f.write(f"file '{escaped}'\n") - - # Merge with FFmpeg - cmd = [ - "ffmpeg", "-y", - "-f", "concat", - "-safe", "0", - "-i", str(concat_file), - "-c", "copy", - str(output_path), - ] - - try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, - ) - if result.returncode != 0: - raise RuntimeError(f"FFmpeg merge failed: {result.stderr[-500:]}") - finally: - concat_file.unlink(missing_ok=True) - - return output_path - - def _transcribe(self, video_path: Path, export_dir: Path) -> dict: - """Transcribe video audio with Whisper.""" - try: - import whisper - except ImportError: - raise RuntimeError("Whisper not installed. Run: pip install openai-whisper") - - # Load model - model = whisper.load_model(self.whisper_model) - - # Transcribe - result = model.transcribe( - str(video_path), - language="en", - verbose=False, - ) - - # Save transcript - transcript_path = export_dir / "transcript.json" - with open(transcript_path, "w") as f: - json.dump(result, f, indent=2) - - return result - - def _extract_frames( - self, - video_path: Path, - transcript: dict, - frames_dir: Path, - ) -> list[Path]: - """Extract frames at 'screenshot' trigger timestamps.""" - frames_dir.mkdir(exist_ok=True) - - # Find screenshot triggers in transcript - triggers = [] - for segment in transcript.get("segments", []): - text = segment.get("text", "").lower() - if "screenshot" in text: - # Get timestamp (start of segment) - timestamp = segment.get("start", 0) - triggers.append(timestamp) - - if not triggers: - # No triggers found - extract frames at regular intervals - # Get video duration - duration = self._get_video_duration(video_path) - # Extract every 30 seconds - triggers = list(range(0, int(duration), 30)) - - # Extract frames - extracted = [] - for i, ts in enumerate(triggers): - frame_path = frames_dir / f"{i+1:02d}_{self._format_timestamp(ts)}.png" - - cmd = [ - "ffmpeg", "-y", - "-ss", str(ts), - "-i", str(video_path), - "-vframes", "1", - "-q:v", "2", - str(frame_path), - ] - - try: - subprocess.run( - cmd, - capture_output=True, - creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, - ) - if frame_path.exists(): - extracted.append(frame_path) - except Exception: - pass - - return extracted - - def _get_video_duration(self, video_path: Path) -> float: - """Get video duration in seconds.""" - cmd = [ - "ffprobe", - "-v", "quiet", - "-show_entries", "format=duration", - "-of", "json", - str(video_path), - ] - - try: - result = subprocess.run(cmd, capture_output=True, text=True) - data = json.loads(result.stdout) - return float(data["format"]["duration"]) - except: - return 300.0 # Default 5 minutes - - def _format_timestamp(self, seconds: float) -> str: - """Format seconds as MM-SS.""" - mins = int(seconds // 60) - secs = int(seconds % 60) - return f"{mins:02d}-{secs:02d}" - - def _find_screenshot_triggers(self, transcript: dict) -> list[dict]: - """Find 'screenshot' triggers in transcript with context.""" - triggers = [] - segments = transcript.get("segments", []) - - for i, segment in enumerate(segments): - text = segment.get("text", "").lower() - if "screenshot" in text: - timestamp = segment.get("start", 0) - - # Get context: 2 segments before and after - context_segments = segments[max(0, i-2):i+3] - context = " ".join(s.get("text", "") for s in context_segments) - - triggers.append({ - "timestamp": timestamp, - "timestamp_formatted": self._format_timestamp(timestamp), - "segment_text": segment.get("text", ""), - "context": context.strip(), - }) - - return triggers - - def _create_metadata( - self, - session: Session, - export_dir: Path, - merged_path: Path, - transcript: dict, - ) -> None: - """Create metadata.json for Clawdbot.""" - # Find screenshot triggers for Mario - triggers = self._find_screenshot_triggers(transcript) - - # Get video duration - duration = self._get_video_duration(merged_path) - - metadata = { - "session_id": session.id, - "name": session.name, - "project": session.project, - "session_type": session.session_type.value, - "created_at": session.created_at.isoformat(), - "exported_at": datetime.now().isoformat(), - "clip_count": session.clip_count, - "total_duration": duration, - "status": "pending", # → "processed" after Clawdbot processes - "clips": [ - { - "id": clip.id, - "duration": clip.duration_seconds, - "note": clip.note, - } - for clip in session.kept_clips - ], - "screenshot_triggers": triggers, # Pre-parsed for Mario - "files": { - "video": "merged.mp4", - "transcript": "transcript.json", - }, - } - - metadata_path = export_dir / "metadata.json" - with open(metadata_path, "w") as f: - json.dump(metadata, f, indent=2) - - -# CLI for testing -if __name__ == "__main__": - import sys - - if len(sys.argv) < 2: - print("Usage: python -m cad_documenter.export ") - print(" python -m cad_documenter.export --list") - sys.exit(1) - - from pathlib import Path - - # Find sessions directory - if sys.platform == "win32": - base = Path.home() / "Documents" / "KB-Capture" - else: - base = Path.home() / "kb-capture" - - manager = SessionManager(base) - - if sys.argv[1] == "--list": - sessions = manager.list_sessions() - if not sessions: - print("No sessions found.") - else: - print("Sessions:") - for s in sessions: - status = "✓" if s.is_finalized else "○" - print(f" {status} {s.id}: {s.name} ({s.clip_count} clips, {s.total_duration:.0f}s)") - else: - session_id = sys.argv[1] - - def on_progress(msg, pct): - bar = "█" * int(pct * 20) + "░" * (20 - int(pct * 20)) - print(f"\r[{bar}] {msg}", end="", flush=True) - - exporter = SessionExporter(manager, on_progress=on_progress) - - try: - export_path = exporter.export(session_id) - print(f"\n\nExported to: {export_path}") - except Exception as e: - print(f"\nError: {e}") - sys.exit(1) diff --git a/src/cad_documenter/gui_capture.py b/src/cad_documenter/gui_capture.py index 7be7ae0..86c3d7c 100644 --- a/src/cad_documenter/gui_capture.py +++ b/src/cad_documenter/gui_capture.py @@ -1,21 +1,15 @@ """ -KB Capture GUI v2 +KB Capture GUI -Professional recording tool for engineering knowledge capture. -Features: -- Browse and select projects folder -- Create new projects with full KB structure -- Visual project cards -- Clip-based recording with keep/delete -- Session history +Simple recording interface modeled after Voice Recorder. +Record → Pause → Resume → Stop → Transcribe → Done """ import sys import json import threading -import time from pathlib import Path -from typing import Optional, List +from typing import Optional from tkinter import filedialog, messagebox from datetime import datetime @@ -23,33 +17,31 @@ try: import customtkinter as ctk from customtkinter import CTk, CTkFrame, CTkLabel, CTkButton, CTkEntry from customtkinter import CTkOptionMenu, CTkScrollableFrame, CTkToplevel - from customtkinter import CTkInputDialog HAS_CTK = True except ImportError: HAS_CTK = False - print("CustomTkinter not installed. Run: pip install customtkinter") from .kb_capture import KBCaptureApp, AppState, AppStatus -from .session import SessionType, ClipStatus, Session +from .session import SessionType # ============================================================================ -# THEME +# THEME (matching Voice Recorder) # ============================================================================ COLORS = { - "bg": "#0f0f0f", - "bg_card": "#1a1a1a", - "bg_hover": "#252525", - "accent": "#3b82f6", # Blue - "accent_hover": "#2563eb", - "success": "#22c55e", # Green - "danger": "#ef4444", # Red - "warning": "#f59e0b", # Amber - "text": "#ffffff", - "text_secondary": "#a1a1aa", - "text_muted": "#52525b", - "border": "#27272a", + "bg": "#0d1117", + "bg_card": "#161b22", + "bg_elevated": "#1c2128", + "border": "#30363d", + "text": "#e6edf3", + "text_secondary": "#7d8590", + "text_muted": "#484f58", + "red": "#f85149", + "green": "#3fb950", + "blue": "#58a6ff", + "orange": "#d29922", + "purple": "#a371f7", } @@ -58,7 +50,6 @@ COLORS = { # ============================================================================ def get_config_path() -> Path: - """Get path to config file.""" if sys.platform == "win32": config_dir = Path.home() / "AppData" / "Local" / "KBCapture" else: @@ -68,7 +59,6 @@ def get_config_path() -> Path: def load_config() -> dict: - """Load config from file.""" config_path = get_config_path() if config_path.exists(): try: @@ -80,457 +70,46 @@ def load_config() -> dict: def save_config(config: dict) -> None: - """Save config to file.""" - config_path = get_config_path() - with open(config_path, "w") as f: + with open(get_config_path(), "w") as f: json.dump(config, f, indent=2) # ============================================================================ -# PROJECT MANAGEMENT +# PROJECT CREATION # ============================================================================ -def create_project_structure(projects_root: Path, name: str, description: str = "") -> Path: - """ - Create a new project with full KB structure. - - Structure: - / - ├── _context.md # Project context - ├── KB/ - │ ├── _index.md # KB overview - │ ├── Design/ - │ │ ├── _index.md - │ │ ├── architecture/ - │ │ ├── components/ - │ │ ├── materials/ - │ │ └── dev/ # Session captures go here - │ └── Analysis/ - │ ├── _index.md - │ ├── models/ - │ ├── load-cases/ - │ └── results/ - ├── Images/ - │ ├── components/ - │ └── screenshot-sessions/ - └── _capture/ # Recording sessions staged here - """ +def create_project(projects_root: Path, name: str, description: str = "") -> Path: + """Create a new project with KB structure.""" project_path = projects_root / name if project_path.exists(): raise ValueError(f"Project already exists: {name}") - # Create directory structure dirs = [ - "KB/Design/architecture", - "KB/Design/components", + "KB/Design/components", "KB/Design/materials", "KB/Design/dev", "KB/Analysis/models", - "KB/Analysis/load-cases", "KB/Analysis/results", "Images/components", - "Images/screenshot-sessions", "_capture", ] for d in dirs: (project_path / d).mkdir(parents=True, exist_ok=True) - # Create _context.md - context_content = f"""# {name} + # Create context file + context = f"""# {name} {description} -## Project Overview - -*Add project description, objectives, and key requirements here.* - -## Key Contacts - -| Role | Name | Notes | -|------|------|-------| -| Engineer | | | -| Client | | | - -## Timeline - -- **Created:** {datetime.now().strftime("%Y-%m-%d")} -- **Status:** Active - -## Notes - -*Add project-specific notes here.* +Created: {datetime.now().strftime("%Y-%m-%d")} """ - (project_path / "_context.md").write_text(context_content) - - # Create KB index - kb_index = f"""# {name} — Knowledge Base - -## Overview - -This knowledge base captures all design and analysis knowledge for {name}. - -## Structure - -- **Design/** — CAD/design documentation - - `architecture/` — System-level design - - `components/` — Individual part documentation - - `materials/` — Material selections and specs - - `dev/` — Session captures (gen-001, gen-002, ...) - -- **Analysis/** — FEA/simulation documentation - - `models/` — Model definitions - - `load-cases/` — Boundary conditions and loads - - `results/` — Analysis outputs and validation - -## Recent Sessions - -*Sessions will be listed here as they're processed.* - ---- - -*Last updated: {datetime.now().strftime("%Y-%m-%d")}* -""" - (project_path / "KB" / "_index.md").write_text(kb_index) - - # Create Design index - design_index = f"""# Design Knowledge Base - -## Components - -*Component documentation will appear here.* - -## Architecture - -*System-level design documentation.* - -## Materials - -*Material selections and specifications.* -""" - (project_path / "KB" / "Design" / "_index.md").write_text(design_index) - - # Create Analysis index - analysis_index = f"""# Analysis Knowledge Base - -## Models - -*FEA model documentation.* - -## Load Cases - -*Boundary conditions and load definitions.* - -## Results - -*Analysis results and validation.* -""" - (project_path / "KB" / "Analysis" / "_index.md").write_text(analysis_index) + (project_path / "_context.md").write_text(context) return project_path -def get_project_stats(project_path: Path) -> dict: - """Get statistics for a project.""" - stats = { - "sessions": 0, - "total_clips": 0, - "total_duration": 0, - "last_session": None, - "has_kb": (project_path / "KB").exists(), - } - - capture_dir = project_path / "_capture" - if capture_dir.exists(): - for session_dir in capture_dir.iterdir(): - if session_dir.is_dir(): - session_file = session_dir / "session.json" - if session_file.exists(): - try: - with open(session_file) as f: - session = json.load(f) - stats["sessions"] += 1 - stats["total_clips"] += len([c for c in session.get("clips", []) if c.get("status") == "kept"]) - stats["total_duration"] += session.get("total_duration", 0) - - created = session.get("created_at") - if created and (not stats["last_session"] or created > stats["last_session"]): - stats["last_session"] = created - except: - pass - - return stats - - -# ============================================================================ -# DIALOGS -# ============================================================================ - -class NewProjectDialog: - """Dialog for creating a new project.""" - - def __init__(self, parent, projects_root: Path): - self.result = None - self.projects_root = projects_root - - self.dialog = CTkToplevel(parent) - self.dialog.title("New Project") - self.dialog.geometry("450x300") - self.dialog.transient(parent) - self.dialog.grab_set() - self.dialog.configure(fg_color=COLORS["bg"]) - - # Center on parent - self.dialog.update_idletasks() - x = parent.winfo_x() + (parent.winfo_width() - 450) // 2 - y = parent.winfo_y() + (parent.winfo_height() - 300) // 2 - self.dialog.geometry(f"+{x}+{y}") - - self._build_ui() - - # Focus - self.name_entry.focus_set() - - def _build_ui(self): - # Title - CTkLabel( - self.dialog, - text="Create New Project", - font=("", 20, "bold"), - text_color=COLORS["text"], - ).pack(pady=(20, 10)) - - CTkLabel( - self.dialog, - text="This will create a project with full KB structure", - font=("", 12), - text_color=COLORS["text_secondary"], - ).pack(pady=(0, 20)) - - # Form - form = CTkFrame(self.dialog, fg_color="transparent") - form.pack(fill="x", padx=30) - - # Name - CTkLabel(form, text="Project Name:", text_color=COLORS["text_secondary"]).pack(anchor="w") - self.name_entry = CTkEntry( - form, - placeholder_text="e.g., P05-NewProject", - width=380, - height=35, - ) - self.name_entry.pack(fill="x", pady=(5, 15)) - - # Description - CTkLabel(form, text="Description (optional):", text_color=COLORS["text_secondary"]).pack(anchor="w") - self.desc_entry = CTkEntry( - form, - placeholder_text="Brief project description", - width=380, - height=35, - ) - self.desc_entry.pack(fill="x", pady=(5, 20)) - - # Buttons - btn_frame = CTkFrame(self.dialog, fg_color="transparent") - btn_frame.pack(fill="x", padx=30, pady=20) - - CTkButton( - btn_frame, - text="Cancel", - width=100, - fg_color="transparent", - border_width=1, - border_color=COLORS["border"], - hover_color=COLORS["bg_hover"], - command=self.dialog.destroy, - ).pack(side="left") - - CTkButton( - btn_frame, - text="Create Project", - width=150, - fg_color=COLORS["accent"], - hover_color=COLORS["accent_hover"], - command=self._create, - ).pack(side="right") - - def _create(self): - name = self.name_entry.get().strip() - if not name: - messagebox.showerror("Error", "Project name is required") - return - - # Sanitize name - name = "".join(c for c in name if c.isalnum() or c in "-_ ") - name = name.replace(" ", "-") - - try: - path = create_project_structure( - self.projects_root, - name, - self.desc_entry.get().strip(), - ) - self.result = name - self.dialog.destroy() - except ValueError as e: - messagebox.showerror("Error", str(e)) - except Exception as e: - messagebox.showerror("Error", f"Failed to create project: {e}") - - -# ============================================================================ -# COMPONENTS -# ============================================================================ - -class ProjectCard(CTkFrame): - """A card displaying a project.""" - - def __init__( - self, - master, - name: str, - stats: dict, - selected: bool = False, - on_select=None, - **kwargs - ): - super().__init__(master, **kwargs) - - self.name = name - self.on_select = on_select - - self.configure( - fg_color=COLORS["accent"] if selected else COLORS["bg_card"], - corner_radius=10, - cursor="hand2", - ) - - # Bind click - self.bind("", lambda e: self._clicked()) - - # Content - self.grid_columnconfigure(0, weight=1) - - # Name - name_label = CTkLabel( - self, - text=name, - font=("", 14, "bold"), - text_color=COLORS["text"], - anchor="w", - ) - name_label.grid(row=0, column=0, sticky="w", padx=15, pady=(12, 2)) - name_label.bind("", lambda e: self._clicked()) - - # Stats - if stats["sessions"] > 0: - stats_text = f"{stats['sessions']} sessions • {int(stats['total_duration']//60)}m recorded" - else: - stats_text = "No recordings yet" - - stats_label = CTkLabel( - self, - text=stats_text, - font=("", 11), - text_color=COLORS["text_secondary"] if not selected else COLORS["text"], - anchor="w", - ) - stats_label.grid(row=1, column=0, sticky="w", padx=15, pady=(0, 12)) - stats_label.bind("", lambda e: self._clicked()) - - # Status indicator - if not stats["has_kb"]: - warning = CTkLabel( - self, - text="⚠️ No KB", - font=("", 10), - text_color=COLORS["warning"], - ) - warning.grid(row=0, column=1, padx=15) - - def _clicked(self): - if self.on_select: - self.on_select(self.name) - - -class ClipCard(CTkFrame): - """A card showing a single clip.""" - - def __init__( - self, - master, - clip_id: str, - duration: float, - status: ClipStatus, - note: str = "", - on_delete=None, - **kwargs - ): - super().__init__(master, **kwargs) - - self.configure(fg_color=COLORS["bg_card"], corner_radius=8) - - # Status colors - status_colors = { - ClipStatus.KEPT: COLORS["success"], - ClipStatus.PREVIEW: COLORS["warning"], - ClipStatus.DELETED: COLORS["danger"], - ClipStatus.RECORDING: COLORS["danger"], - } - color = status_colors.get(status, COLORS["text_muted"]) - - # Layout - self.grid_columnconfigure(1, weight=1) - - # Status dot - CTkLabel(self, text="●", text_color=color, font=("", 14)).grid( - row=0, column=0, padx=(12, 8), pady=10 - ) - - # Clip info - info_frame = CTkFrame(self, fg_color="transparent") - info_frame.grid(row=0, column=1, sticky="w", pady=10) - - CTkLabel( - info_frame, - text=clip_id, - font=("", 12, "bold"), - text_color=COLORS["text"], - ).pack(anchor="w") - - if note: - CTkLabel( - info_frame, - text=note, - font=("", 11), - text_color=COLORS["text_secondary"], - ).pack(anchor="w") - - # Duration - mins = int(duration // 60) - secs = int(duration % 60) - CTkLabel( - self, - text=f"{mins}:{secs:02d}", - font=("", 12), - text_color=COLORS["text_secondary"], - ).grid(row=0, column=2, padx=10) - - # Delete button - if status in (ClipStatus.PREVIEW, ClipStatus.KEPT) and on_delete: - CTkButton( - self, - text="🗑️", - width=28, - height=28, - fg_color="transparent", - hover_color=COLORS["bg_hover"], - command=lambda: on_delete(clip_id), - ).grid(row=0, column=3, padx=(0, 10)) - - # ============================================================================ # MAIN GUI # ============================================================================ @@ -542,10 +121,10 @@ class KBCaptureGUI: if not HAS_CTK: raise RuntimeError("CustomTkinter not installed") - # Load config + # Config self.config = load_config() - # Get projects root from config or None + # Projects root self.projects_root = None if self.config.get("projects_root"): saved = Path(self.config["projects_root"]) @@ -554,18 +133,17 @@ class KBCaptureGUI: # App self.app = None - self.selected_project = None + self.recording_indicator_visible = True # Window ctk.set_appearance_mode("dark") self.window = CTk() self.window.title("KB Capture") - self.window.geometry("600x700") - self.window.minsize(500, 600) + self.window.geometry("400x600") + self.window.minsize(380, 550) self.window.configure(fg_color=COLORS["bg"]) - # Build UI self._build_ui() # Initialize app if we have a projects root @@ -573,386 +151,240 @@ class KBCaptureGUI: self._init_app() self._refresh_projects() - # Cleanup + # Start indicator animation + self._update_indicator() + self.window.protocol("WM_DELETE_WINDOW", self._on_close) def _init_app(self): """Initialize the capture app.""" - if self.app: - self.app.stop() - self.app = KBCaptureApp( projects_root=self.projects_root, on_status_change=self._on_status_change, ) - self.app.start() def _build_ui(self): - """Build the main interface.""" + """Build the interface.""" + main = CTkFrame(self.window, fg_color=COLORS["bg"]) + main.pack(fill="both", expand=True, padx=20, pady=16) + # Header - header = CTkFrame(self.window, fg_color="transparent", height=60) - header.pack(fill="x", padx=20, pady=(15, 10)) - header.pack_propagate(False) + header = CTkFrame(main, fg_color="transparent") + header.pack(fill="x", pady=(0, 12)) CTkLabel( header, text="KB Capture", - font=("", 24, "bold"), - text_color=COLORS["text"], - ).pack(side="left", pady=10) - - # Settings button (right side) - CTkButton( - header, - text="⚙️", - width=40, - height=40, - fg_color="transparent", - hover_color=COLORS["bg_hover"], - command=self._show_settings, - ).pack(side="right") - - # Folder selector - folder_frame = CTkFrame(self.window, fg_color=COLORS["bg_card"], corner_radius=10) - folder_frame.pack(fill="x", padx=20, pady=(0, 15)) - - folder_inner = CTkFrame(folder_frame, fg_color="transparent") - folder_inner.pack(fill="x", padx=15, pady=12) - - CTkLabel( - folder_inner, - text="📁", - font=("", 18), - ).pack(side="left", padx=(0, 10)) - - self.folder_label = CTkLabel( - folder_inner, - text=str(self.projects_root) if self.projects_root else "No folder selected — click Browse", - font=("", 12), - text_color=COLORS["text"] if self.projects_root else COLORS["text_muted"], - anchor="w", - ) - self.folder_label.pack(side="left", fill="x", expand=True) - - CTkButton( - folder_inner, - text="Browse...", - width=80, - height=30, - fg_color=COLORS["accent"], - hover_color=COLORS["accent_hover"], - command=self._browse_folder, - ).pack(side="right") - - # Main content area (will switch between project list and recording view) - self.content_frame = CTkFrame(self.window, fg_color="transparent") - self.content_frame.pack(fill="both", expand=True, padx=20, pady=(0, 15)) - - # Show appropriate view - if self.projects_root: - self._show_project_list() - else: - self._show_welcome() - - # Bottom bar (recording controls) - hidden until session starts - self.controls_frame = CTkFrame(self.window, fg_color=COLORS["bg_card"], corner_radius=0) - # Will be shown when session starts - - def _show_welcome(self): - """Show welcome screen when no folder selected.""" - for widget in self.content_frame.winfo_children(): - widget.destroy() - - welcome = CTkFrame(self.content_frame, fg_color="transparent") - welcome.place(relx=0.5, rely=0.4, anchor="center") - - CTkLabel( - welcome, - text="👋 Welcome to KB Capture", - font=("", 20, "bold"), - text_color=COLORS["text"], - ).pack(pady=(0, 10)) - - CTkLabel( - welcome, - text="Select your projects folder to get started", - font=("", 14), - text_color=COLORS["text_secondary"], - ).pack(pady=(0, 20)) - - CTkButton( - welcome, - text="Browse for Projects Folder", - width=200, - height=40, - font=("", 14), - fg_color=COLORS["accent"], - hover_color=COLORS["accent_hover"], - command=self._browse_folder, - ).pack() - - def _show_project_list(self): - """Show project list view.""" - for widget in self.content_frame.winfo_children(): - widget.destroy() - - # Hide controls if visible - self.controls_frame.pack_forget() - - # Header row - header = CTkFrame(self.content_frame, fg_color="transparent") - header.pack(fill="x", pady=(0, 10)) - - CTkLabel( - header, - text="Projects", - font=("", 16, "bold"), + font=("Segoe UI Semibold", 18), text_color=COLORS["text"], ).pack(side="left") + # Folder button CTkButton( header, - text="+ New Project", - width=120, + text="📁", + width=32, height=32, - fg_color=COLORS["success"], + fg_color="transparent", + hover_color=COLORS["bg_card"], + command=self._browse_folder, + ).pack(side="right") + + # Timer card + timer_frame = CTkFrame(main, fg_color=COLORS["bg_card"], corner_radius=10) + timer_frame.pack(fill="x", pady=(0, 12)) + + timer_inner = CTkFrame(timer_frame, fg_color="transparent") + timer_inner.pack(pady=20) + + # Recording indicator + timer + timer_row = CTkFrame(timer_inner, fg_color="transparent") + timer_row.pack() + + self.indicator = CTkLabel( + timer_row, + text="", + font=("", 10), + text_color=COLORS["red"], + ) + self.indicator.pack(side="left", padx=(0, 4)) + + self.timer_label = CTkLabel( + timer_row, + text="00:00:00", + font=("Consolas", 36, "bold"), + text_color=COLORS["text_muted"], + ) + self.timer_label.pack(side="left") + + self.status_label = CTkLabel( + timer_inner, + text="Select a project to start", + font=("", 11), + text_color=COLORS["text_secondary"], + ) + self.status_label.pack(pady=(8, 0)) + + # Recording controls + controls = CTkFrame(main, fg_color="transparent") + controls.pack(fill="x", pady=12) + controls.grid_columnconfigure((0, 1), weight=1) + + self.record_btn = CTkButton( + controls, + text="Record", + height=45, + font=("Segoe UI Semibold", 12), + fg_color=COLORS["red"], + hover_color="#dc2626", + state="disabled", + command=self._toggle_recording, + ) + self.record_btn.grid(row=0, column=0, padx=(0, 6), sticky="ew") + + self.pause_btn = CTkButton( + controls, + text="Pause", + height=45, + font=("", 12), + fg_color=COLORS["border"], + hover_color=COLORS["bg_elevated"], + state="disabled", + command=self._toggle_pause, + ) + self.pause_btn.grid(row=0, column=1, padx=(6, 0), sticky="ew") + + # Separator + CTkFrame(main, fg_color=COLORS["border"], height=1).pack(fill="x", pady=12) + + # Project selector + proj_frame = CTkFrame(main, fg_color="transparent") + proj_frame.pack(fill="x") + + proj_header = CTkFrame(proj_frame, fg_color="transparent") + proj_header.pack(fill="x", pady=(0, 8)) + + CTkLabel( + proj_header, + text="Project", + font=("Segoe UI Semibold", 11), + text_color=COLORS["text_secondary"], + ).pack(side="left") + + CTkButton( + proj_header, + text="+ New", + width=60, + height=24, + font=("", 10), + fg_color=COLORS["green"], hover_color="#16a34a", command=self._new_project, ).pack(side="right") - CTkButton( - header, - text="↻ Refresh", - width=80, - height=32, - fg_color="transparent", - border_width=1, + self.project_menu = CTkOptionMenu( + proj_frame, + values=["(Select folder first)"], + width=340, + height=35, + fg_color=COLORS["bg_card"], + button_color=COLORS["bg_elevated"], + button_hover_color=COLORS["border"], + ) + self.project_menu.pack(fill="x") + + # Session name + CTkLabel( + main, + text="Session Name", + font=("Segoe UI Semibold", 11), + text_color=COLORS["text_secondary"], + ).pack(anchor="w", pady=(16, 8)) + + self.name_entry = CTkEntry( + main, + placeholder_text="What are you working on?", + height=35, + fg_color=COLORS["bg_card"], border_color=COLORS["border"], - hover_color=COLORS["bg_hover"], - command=self._refresh_projects, - ).pack(side="right", padx=(0, 10)) - - # Project list - self.projects_scroll = CTkScrollableFrame( - self.content_frame, - fg_color="transparent", ) - self.projects_scroll.pack(fill="both", expand=True) + self.name_entry.pack(fill="x") - # Populate projects - self._populate_projects() - - def _populate_projects(self): - """Populate the projects list.""" - for widget in self.projects_scroll.winfo_children(): - widget.destroy() + # Session type + type_frame = CTkFrame(main, fg_color="transparent") + type_frame.pack(fill="x", pady=(12, 0)) + type_frame.grid_columnconfigure((0, 1), weight=1) - if not self.app: - CTkLabel( - self.projects_scroll, - text="No projects found", - text_color=COLORS["text_muted"], - ).pack(pady=40) - return + self.type_var = ctk.StringVar(value="design") - projects = self.app.session_manager.list_projects() - - if not projects: - empty = CTkFrame(self.projects_scroll, fg_color="transparent") - empty.pack(pady=40) - - CTkLabel( - empty, - text="No projects yet", - font=("", 14), - text_color=COLORS["text_secondary"], - ).pack() - - CTkLabel( - empty, - text="Click '+ New Project' to create one", - font=("", 12), - text_color=COLORS["text_muted"], - ).pack(pady=(5, 0)) - return - - for name in projects: - stats = get_project_stats(self.projects_root / name) - card = ProjectCard( - self.projects_scroll, - name=name, - stats=stats, - selected=(name == self.selected_project), - on_select=self._select_project, - ) - card.pack(fill="x", pady=3) - - # Start Session button (shown when project selected) - if self.selected_project: - btn_frame = CTkFrame(self.projects_scroll, fg_color="transparent") - btn_frame.pack(fill="x", pady=(20, 10)) - - CTkButton( - btn_frame, - text=f"▶️ Start Recording Session", - height=45, - font=("", 14, "bold"), - fg_color=COLORS["accent"], - hover_color=COLORS["accent_hover"], - command=self._start_session_dialog, - ).pack(fill="x") - - def _show_recording_view(self): - """Show the recording interface.""" - for widget in self.content_frame.winfo_children(): - widget.destroy() - - # Session info - session = self.app.session_manager.current_session - - info_frame = CTkFrame(self.content_frame, fg_color=COLORS["bg_card"], corner_radius=10) - info_frame.pack(fill="x", pady=(0, 15)) - - info_inner = CTkFrame(info_frame, fg_color="transparent") - info_inner.pack(fill="x", padx=15, pady=12) - - CTkLabel( - info_inner, - text=session.project, - font=("", 12), - text_color=COLORS["text_secondary"], - ).pack(anchor="w") - - CTkLabel( - info_inner, - text=session.name, - font=("", 18, "bold"), - text_color=COLORS["text"], - ).pack(anchor="w") - - type_color = COLORS["accent"] if session.session_type == SessionType.DESIGN else COLORS["warning"] - CTkLabel( - info_inner, - text=f"● {session.session_type.value.title()}", + self.design_btn = CTkButton( + type_frame, + text="🎨 Design", + height=40, font=("", 11), - text_color=type_color, - ).pack(anchor="w", pady=(5, 0)) - - # Status display - self.status_frame = CTkFrame(self.content_frame, fg_color="transparent") - self.status_frame.pack(fill="x", pady=(0, 10)) - - self.status_icon = CTkLabel( - self.status_frame, - text="●", - font=("", 40), - text_color=COLORS["success"], + fg_color=COLORS["blue"], + hover_color="#2563eb", + command=lambda: self._set_type("design"), ) - self.status_icon.pack() + self.design_btn.grid(row=0, column=0, padx=(0, 4), sticky="ew") - self.status_label = CTkLabel( - self.status_frame, - text="Ready to record", - font=("", 16), - text_color=COLORS["text"], + self.analysis_btn = CTkButton( + type_frame, + text="📊 Analysis", + height=40, + font=("", 11), + fg_color=COLORS["border"], + hover_color=COLORS["bg_elevated"], + command=lambda: self._set_type("analysis"), ) - self.status_label.pack() + self.analysis_btn.grid(row=0, column=1, padx=(4, 0), sticky="ew") - self.duration_label = CTkLabel( - self.status_frame, - text="", - font=("", 24, "bold"), - text_color=COLORS["text"], + # Spacer + CTkFrame(main, fg_color="transparent").pack(fill="both", expand=True) + + # Folder path + self.folder_label = CTkLabel( + main, + text=str(self.projects_root) if self.projects_root else "No folder selected", + font=("", 9), + text_color=COLORS["text_muted"], + cursor="hand2", ) - self.duration_label.pack(pady=(5, 0)) - - # Clips list - clips_header = CTkFrame(self.content_frame, fg_color="transparent") - clips_header.pack(fill="x", pady=(10, 5)) - - CTkLabel( - clips_header, - text="Clips", - font=("", 14, "bold"), - text_color=COLORS["text"], - ).pack(side="left") - - self.clips_count_label = CTkLabel( - clips_header, - text="0 clips", - font=("", 12), - text_color=COLORS["text_secondary"], - ) - self.clips_count_label.pack(side="right") - - self.clips_scroll = CTkScrollableFrame( - self.content_frame, - fg_color="transparent", - ) - self.clips_scroll.pack(fill="both", expand=True) - - # Show controls bar - self._show_controls() - - def _show_controls(self): - """Show the recording controls bar.""" - self.controls_frame.pack(fill="x", side="bottom") - - for widget in self.controls_frame.winfo_children(): - widget.destroy() - - inner = CTkFrame(self.controls_frame, fg_color="transparent") - inner.pack(fill="x", padx=15, pady=15) - inner.grid_columnconfigure((0, 1, 2), weight=1) - - # Record button - self.record_btn = CTkButton( - inner, - text="⏺️ Record", - height=50, - font=("", 14, "bold"), - fg_color=COLORS["danger"], - hover_color="#dc2626", - command=self._toggle_recording, - ) - self.record_btn.grid(row=0, column=0, padx=5, sticky="ew") - - # Keep button - self.keep_btn = CTkButton( - inner, - text="✓ Keep", - height=50, - font=("", 14), - fg_color=COLORS["success"], - hover_color="#16a34a", - command=self._keep_clip, - state="disabled", - ) - self.keep_btn.grid(row=0, column=1, padx=5, sticky="ew") - - # End button - self.end_btn = CTkButton( - inner, - text="End Session", - height=50, - font=("", 14), - fg_color=COLORS["bg_hover"], - hover_color=COLORS["border"], - command=self._end_session, - ) - self.end_btn.grid(row=0, column=2, padx=5, sticky="ew") + self.folder_label.pack(pady=(12, 0)) + self.folder_label.bind("", lambda e: self._browse_folder()) # Hotkey hints CTkLabel( - self.controls_frame, - text="Ctrl+Shift+R: Record • K: Keep • D: Delete • E: End", - font=("", 10), + main, + text="Ctrl+Shift+R: Record/Stop • Ctrl+Shift+P: Pause/Resume", + font=("", 9), text_color=COLORS["text_muted"], - ).pack(pady=(0, 10)) + ).pack(pady=(8, 0)) - # ======================================================================== - # ACTIONS - # ======================================================================== + def _set_type(self, type_id: str): + """Set session type.""" + self.type_var.set(type_id) + if type_id == "design": + self.design_btn.configure(fg_color=COLORS["blue"]) + self.analysis_btn.configure(fg_color=COLORS["border"]) + else: + self.design_btn.configure(fg_color=COLORS["border"]) + self.analysis_btn.configure(fg_color=COLORS["orange"]) + + def _update_indicator(self): + """Animate recording indicator.""" + if self.app and self.app.state == AppState.RECORDING: + self.recording_indicator_visible = not self.recording_indicator_visible + self.indicator.configure( + text="●" if self.recording_indicator_visible else "", + text_color=COLORS["red"], + ) + elif self.app and self.app.state == AppState.PAUSED: + self.indicator.configure(text="●", text_color=COLORS["orange"]) + else: + self.indicator.configure(text="") + + self.window.after(500, self._update_indicator) def _browse_folder(self): """Browse for projects folder.""" @@ -965,179 +397,206 @@ class KBCaptureGUI: if folder: self.projects_root = Path(folder) - self.folder_label.configure( - text=str(self.projects_root), - text_color=COLORS["text"], - ) + self.folder_label.configure(text=str(self.projects_root)) - # Save config self.config["projects_root"] = str(self.projects_root) save_config(self.config) - # Initialize app self._init_app() - self._show_project_list() - self._refresh_projects() - - def _new_project(self): - """Show new project dialog.""" - dialog = NewProjectDialog(self.window, self.projects_root) - self.window.wait_window(dialog.dialog) - - if dialog.result: - self.selected_project = dialog.result self._refresh_projects() def _refresh_projects(self): - """Refresh the projects list.""" - self._populate_projects() - - def _select_project(self, name: str): - """Select a project.""" - self.selected_project = name - self._populate_projects() - - def _start_session_dialog(self): - """Show dialog to start a session.""" - if not self.selected_project: + """Refresh project list.""" + if not self.app: + self.project_menu.configure(values=["(Select folder first)"]) return - # Simple dialog for session name - dialog = CTkInputDialog( - text=f"Session name for {self.selected_project}:", - title="Start Session", - ) - name = dialog.get_input() - - if name: - self._start_session(name, SessionType.DESIGN) + projects = self.app.session_manager.list_projects() + if projects: + self.project_menu.configure(values=projects) + self.project_menu.set(projects[0]) + self.record_btn.configure(state="normal") + self.status_label.configure(text="Ready to record") + else: + self.project_menu.configure(values=["(No projects - click + New)"]) + self.record_btn.configure(state="disabled") + self.status_label.configure(text="Create a project first") - def _start_session(self, name: str, session_type: SessionType): - """Start a recording session.""" + def _new_project(self): + """Create new project.""" + if not self.projects_root: + messagebox.showwarning("No Folder", "Select a projects folder first") + return + + # Simple dialog + dialog = CTkToplevel(self.window) + dialog.title("New Project") + dialog.geometry("350x180") + dialog.transient(self.window) + dialog.grab_set() + dialog.configure(fg_color=COLORS["bg"]) + + # Center + dialog.update_idletasks() + x = self.window.winfo_x() + (self.window.winfo_width() - 350) // 2 + y = self.window.winfo_y() + (self.window.winfo_height() - 180) // 2 + dialog.geometry(f"+{x}+{y}") + + CTkLabel( + dialog, + text="Project Name", + font=("", 12), + text_color=COLORS["text_secondary"], + ).pack(pady=(20, 8), padx=20, anchor="w") + + name_entry = CTkEntry( + dialog, + placeholder_text="e.g., P05-NewProject", + width=310, + height=35, + ) + name_entry.pack(padx=20) + name_entry.focus_set() + + def create(): + name = name_entry.get().strip() + if not name: + return + + # Sanitize + name = "".join(c for c in name if c.isalnum() or c in "-_ ") + name = name.replace(" ", "-") + + try: + create_project(self.projects_root, name) + dialog.destroy() + self._refresh_projects() + self.project_menu.set(name) + except ValueError as e: + messagebox.showerror("Error", str(e)) + + btn_frame = CTkFrame(dialog, fg_color="transparent") + btn_frame.pack(fill="x", padx=20, pady=20) + + CTkButton( + btn_frame, + text="Cancel", + width=80, + fg_color="transparent", + border_width=1, + border_color=COLORS["border"], + command=dialog.destroy, + ).pack(side="left") + + CTkButton( + btn_frame, + text="Create", + width=100, + fg_color=COLORS["green"], + command=create, + ).pack(side="right") + + name_entry.bind("", lambda e: create()) + + def _toggle_recording(self): + """Start or stop recording.""" + if not self.app: + return + + if self.app.state == AppState.IDLE: + self._start_recording() + elif self.app.state in (AppState.RECORDING, AppState.PAUSED): + self._stop_recording() + + def _start_recording(self): + """Start recording session.""" + project = self.project_menu.get() + if project.startswith("("): + messagebox.showwarning("No Project", "Select a project first") + return + + name = self.name_entry.get().strip() or f"Session {datetime.now().strftime('%H:%M')}" + session_type = SessionType.DESIGN if self.type_var.get() == "design" else SessionType.ANALYSIS + try: - self.app.start_session(name, self.selected_project, session_type) - self._show_recording_view() + self.app.start_session(name, project, session_type) + + self.record_btn.configure(text="Stop", fg_color=COLORS["border"]) + self.pause_btn.configure(state="normal", fg_color=COLORS["orange"]) + self.project_menu.configure(state="disabled") + self.name_entry.configure(state="disabled") + self.design_btn.configure(state="disabled") + self.analysis_btn.configure(state="disabled") + self.timer_label.configure(text_color=COLORS["text"]) + self.status_label.configure(text="Recording...", text_color=COLORS["red"]) + except Exception as e: messagebox.showerror("Error", str(e)) - def _toggle_recording(self): - """Toggle recording state.""" - if self.app: - self.app.toggle_recording() + def _stop_recording(self): + """Stop recording and transcribe.""" + if not self.app: + return + + self.app.stop() + + self.record_btn.configure(text="Record", fg_color=COLORS["red"], state="disabled") + self.pause_btn.configure(state="disabled", text="Pause", fg_color=COLORS["border"]) + self.timer_label.configure(text_color=COLORS["green"]) + self.status_label.configure(text="Transcribing...", text_color=COLORS["orange"]) - def _keep_clip(self): - """Keep the last clip.""" - if self.app: - self.app.keep_last_clip() - - def _delete_clip(self, clip_id: str): - """Delete a clip.""" - if self.app: - self.app.session_manager.delete_clip(clip_id) - self._update_clips_list() - - def _end_session(self): - """End the current session.""" - if self.app: - session = self.app.end_session() - if session: - messagebox.showinfo( - "Session Saved", - f"Recorded {session.clip_count} clips ({session.total_duration:.0f}s total)\n\n" - f"Saved to: {self.selected_project}/_capture/{session.id}/" - ) - self._show_project_list() - self._refresh_projects() - - def _show_settings(self): - """Show settings (placeholder).""" - messagebox.showinfo("Settings", "Settings coming soon!") - - # ======================================================================== - # STATUS UPDATES - # ======================================================================== + def _toggle_pause(self): + """Toggle pause/resume.""" + if not self.app: + return + + self.app.toggle_pause() + + if self.app.state == AppState.PAUSED: + self.pause_btn.configure(text="Resume", fg_color=COLORS["green"]) + self.timer_label.configure(text_color=COLORS["orange"]) + self.status_label.configure(text="Paused", text_color=COLORS["orange"]) + else: + self.pause_btn.configure(text="Pause", fg_color=COLORS["orange"]) + self.timer_label.configure(text_color=COLORS["text"]) + self.status_label.configure(text="Recording...", text_color=COLORS["red"]) def _on_status_change(self, status: AppStatus): - """Handle status updates from the app.""" + """Handle status updates.""" self.window.after(0, lambda: self._update_ui(status)) def _update_ui(self, status: AppStatus): - """Update UI based on status.""" - if not hasattr(self, 'status_icon'): - return + """Update UI from status.""" + # Timer + if status.state in (AppState.RECORDING, AppState.PAUSED): + secs = int(status.duration) + hours = secs // 3600 + mins = (secs % 3600) // 60 + secs = secs % 60 + self.timer_label.configure(text=f"{hours:02d}:{mins:02d}:{secs:02d}") - # Update status display - if status.state == AppState.RECORDING: - self.status_icon.configure(text_color=COLORS["danger"]) - self.status_label.configure(text="Recording...") - self.record_btn.configure(text="⏹️ Stop", fg_color=COLORS["danger"]) - self.keep_btn.configure(state="disabled") - - # Duration - secs = int(status.current_clip_duration) - self.duration_label.configure(text=f"{secs//60}:{secs%60:02d}") - - elif status.state == AppState.PREVIEW: - self.status_icon.configure(text_color=COLORS["warning"]) - self.status_label.configure(text="Keep or delete?") - self.record_btn.configure(text="⏺️ Record", fg_color=COLORS["success"]) - self.keep_btn.configure(state="normal") - - secs = int(status.last_clip_duration) - self.duration_label.configure(text=f"{secs//60}:{secs%60:02d}") - - else: # SESSION_ACTIVE - self.status_icon.configure(text_color=COLORS["success"]) - self.status_label.configure(text="Ready to record") - self.record_btn.configure(text="⏺️ Record", fg_color=COLORS["danger"]) - self.keep_btn.configure(state="disabled") - self.duration_label.configure(text="") + # Back to idle + if status.state == AppState.IDLE and self.record_btn.cget("text") != "Record": + self.record_btn.configure(text="Record", fg_color=COLORS["red"], state="normal") + self.pause_btn.configure(state="disabled", text="Pause", fg_color=COLORS["border"]) + self.project_menu.configure(state="normal") + self.name_entry.configure(state="normal") + self.design_btn.configure(state="normal") + self.analysis_btn.configure(state="normal") + self.name_entry.delete(0, "end") - # Update clips list - self._update_clips_list() - - def _update_clips_list(self): - """Update the clips list display.""" - if not self.app or not hasattr(self, 'clips_scroll'): - return - - session = self.app.session_manager.current_session - if not session: - return - - # Clear - for widget in self.clips_scroll.winfo_children(): - widget.destroy() - - # Count - kept = [c for c in session.clips if c.status != ClipStatus.DELETED] - self.clips_count_label.configure(text=f"{len(kept)} clips • {session.total_duration:.0f}s") - - if not kept: - CTkLabel( - self.clips_scroll, - text="No clips yet\nPress Ctrl+Shift+R to record", - font=("", 12), - text_color=COLORS["text_muted"], - justify="center", - ).pack(pady=30) - return - - # Add clips (newest first) - for clip in reversed(kept): - ClipCard( - self.clips_scroll, - clip_id=clip.id, - duration=clip.duration_seconds, - status=clip.status, - note=clip.note, - on_delete=self._delete_clip, - ).pack(fill="x", pady=2) + # Status message + if status.message: + self.status_label.configure(text=status.message, text_color=COLORS["text_secondary"]) def _on_close(self): """Handle window close.""" - if self.app: - self.app.stop() + if self.app and self.app.state != AppState.IDLE: + if messagebox.askyesno("Recording Active", "Cancel current recording?"): + self.app.cancel() + else: + return + self.window.destroy() def run(self): diff --git a/src/cad_documenter/hotkeys.py b/src/cad_documenter/hotkeys.py deleted file mode 100644 index 5f75743..0000000 --- a/src/cad_documenter/hotkeys.py +++ /dev/null @@ -1,161 +0,0 @@ -""" -Global Hotkey Manager - -Registers global hotkeys for recording control. -Works across all applications. -""" - -import threading -from typing import Callable, Dict, Optional -import sys - -# Use keyboard library for cross-platform hotkeys -try: - import keyboard - HAS_KEYBOARD = True -except ImportError: - HAS_KEYBOARD = False - print("Warning: 'keyboard' library not installed. Hotkeys disabled.") - - -class HotkeyManager: - """ - Manages global hotkeys for KB Capture. - - Default hotkeys: - Ctrl+Shift+R: Toggle recording - Ctrl+Shift+K: Keep last clip - Ctrl+Shift+D: Delete last clip - Ctrl+Shift+E: End session - Escape: Cancel (when in certain states) - """ - - DEFAULT_HOTKEYS = { - "toggle_recording": "ctrl+shift+r", - "keep_clip": "ctrl+shift+k", - "delete_clip": "ctrl+shift+d", - "end_session": "ctrl+shift+e", - } - - def __init__(self): - self.callbacks: Dict[str, Callable] = {} - self.hotkeys: Dict[str, str] = self.DEFAULT_HOTKEYS.copy() - self.registered: Dict[str, bool] = {} - self.enabled = HAS_KEYBOARD - - def set_callback(self, action: str, callback: Callable) -> None: - """Set callback for an action.""" - self.callbacks[action] = callback - - def register_all(self) -> bool: - """Register all hotkeys.""" - if not self.enabled: - return False - - for action, hotkey in self.hotkeys.items(): - if action in self.callbacks: - try: - keyboard.add_hotkey( - hotkey, - self._create_handler(action), - suppress=False, # Don't block the key - ) - self.registered[action] = True - except Exception as e: - print(f"Failed to register hotkey {hotkey}: {e}") - self.registered[action] = False - - return all(self.registered.values()) - - def _create_handler(self, action: str) -> Callable: - """Create a handler that calls the callback in a thread.""" - def handler(): - if action in self.callbacks: - # Run callback in thread to avoid blocking - threading.Thread( - target=self.callbacks[action], - daemon=True, - ).start() - return handler - - def unregister_all(self) -> None: - """Unregister all hotkeys.""" - if not self.enabled: - return - - for action, hotkey in self.hotkeys.items(): - if self.registered.get(action): - try: - keyboard.remove_hotkey(hotkey) - except: - pass - self.registered[action] = False - - def set_hotkey(self, action: str, hotkey: str) -> None: - """Change hotkey for an action.""" - if action in self.hotkeys: - # Unregister old hotkey - if self.registered.get(action) and self.enabled: - try: - keyboard.remove_hotkey(self.hotkeys[action]) - except: - pass - - self.hotkeys[action] = hotkey - - # Register new hotkey - if action in self.callbacks and self.enabled: - try: - keyboard.add_hotkey( - hotkey, - self._create_handler(action), - suppress=False, - ) - self.registered[action] = True - except Exception as e: - print(f"Failed to register hotkey {hotkey}: {e}") - self.registered[action] = False - - def get_hotkey_display(self, action: str) -> str: - """Get display-friendly hotkey string.""" - hotkey = self.hotkeys.get(action, "") - return hotkey.replace("+", " + ").title() - - -# Quick test -if __name__ == "__main__": - manager = HotkeyManager() - - def on_toggle(): - print("Toggle recording!") - - def on_keep(): - print("Keep clip!") - - def on_delete(): - print("Delete clip!") - - def on_end(): - print("End session!") - - manager.set_callback("toggle_recording", on_toggle) - manager.set_callback("keep_clip", on_keep) - manager.set_callback("delete_clip", on_delete) - manager.set_callback("end_session", on_end) - - if manager.register_all(): - print("Hotkeys registered! Try:") - print(" Ctrl+Shift+R: Toggle recording") - print(" Ctrl+Shift+K: Keep clip") - print(" Ctrl+Shift+D: Delete clip") - print(" Ctrl+Shift+E: End session") - print("\nPress Ctrl+C to exit") - - try: - keyboard.wait() - except KeyboardInterrupt: - pass - - manager.unregister_all() - else: - print("Failed to register hotkeys") diff --git a/src/cad_documenter/kb_capture.py b/src/cad_documenter/kb_capture.py index 254e0ec..7e78bb8 100644 --- a/src/cad_documenter/kb_capture.py +++ b/src/cad_documenter/kb_capture.py @@ -1,15 +1,11 @@ """ -KB Capture - Knowledge Base Recording Tool +KB Capture - Screen Recording for Knowledge Base -Main application that ties together: -- Screen recording -- Session/clip management -- Hotkey control -- System tray integration -- GUI interface +Simple flow: Record → Pause → Resume → Stop → Transcribe → Done + +One session = one continuous recording. """ -import sys import threading import time from pathlib import Path @@ -18,42 +14,38 @@ from dataclasses import dataclass from enum import Enum from .recorder import ScreenRecorder, RecordingConfig -from .session import SessionManager, Session, Clip, ClipStatus, SessionType -from .hotkeys import HotkeyManager +from .session import SessionManager, Session, SessionType, SessionStatus class AppState(Enum): - """Application state machine.""" - IDLE = "idle" # No session, ready to start - SESSION_ACTIVE = "session" # Session started, not recording - RECORDING = "recording" # Currently recording a clip - PREVIEW = "preview" # Clip just recorded, awaiting decision + """Application state.""" + IDLE = "idle" # No session + RECORDING = "recording" # Recording + PAUSED = "paused" # Recording paused + TRANSCRIBING = "transcribing" # Processing @dataclass class AppStatus: - """Current application status for UI updates.""" + """Current application status for UI.""" state: AppState session_name: Optional[str] = None project: Optional[str] = None session_type: Optional[SessionType] = None - clip_count: int = 0 - total_duration: float = 0.0 - current_clip_duration: float = 0.0 - last_clip_duration: float = 0.0 + duration: float = 0.0 message: str = "" class KBCaptureApp: """ - Main KB Capture application. + Main application. - Controls recording flow: - 1. Select project from available projects + Flow: + 1. Select project 2. Start session (name, type) - 3. Toggle recording to create clips - 4. Keep or delete clips - 5. End session to export + 3. Record → Pause → Resume → Stop + 4. Auto-transcribe with Whisper + 5. Ready for Clawdbot to process """ def __init__( @@ -67,22 +59,14 @@ class KBCaptureApp: # Components self.session_manager = SessionManager(self.projects_root) self.recorder = ScreenRecorder(on_status=self._log) - self.hotkeys = HotkeyManager() # State self.state = AppState.IDLE - self._recording_start_time: Optional[float] = None self._duration_thread: Optional[threading.Thread] = None self._running = False - - # Setup hotkeys - self.hotkeys.set_callback("toggle_recording", self.toggle_recording) - self.hotkeys.set_callback("keep_clip", self.keep_last_clip) - self.hotkeys.set_callback("delete_clip", self.delete_last_clip) - self.hotkeys.set_callback("end_session", self.end_session) def _log(self, message: str) -> None: - """Log a message and update status.""" + """Log and update status.""" print(f"[KB Capture] {message}") self._update_status(message=message) @@ -95,28 +79,18 @@ class KBCaptureApp: session_name=session.name if session else None, project=session.project if session else None, session_type=session.session_type if session else None, - clip_count=session.clip_count if session else 0, - total_duration=session.total_duration if session else 0.0, - current_clip_duration=self.recorder.get_duration() if self.state == AppState.RECORDING else 0.0, - last_clip_duration=self._get_last_clip_duration(), + duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0, message=message, ) self.on_status_change(status) - def _get_last_clip_duration(self) -> float: - """Get duration of the last clip.""" - session = self.session_manager.current_session - if session and session.clips: - return session.clips[-1].duration_seconds - return 0.0 - def _start_duration_thread(self) -> None: - """Start thread to update duration while recording.""" + """Start thread to update duration.""" self._running = True def update_loop(): - while self._running and self.state == AppState.RECORDING: + while self._running and self.state in (AppState.RECORDING, AppState.PAUSED): self._update_status() time.sleep(0.5) @@ -124,7 +98,7 @@ class KBCaptureApp: self._duration_thread.start() def _stop_duration_thread(self) -> None: - """Stop duration update thread.""" + """Stop duration thread.""" self._running = False if self._duration_thread: self._duration_thread.join(timeout=1) @@ -132,146 +106,184 @@ class KBCaptureApp: # === Public API === - def start(self) -> None: - """Start the application (register hotkeys).""" - self.hotkeys.register_all() - self._log("KB Capture started. Hotkeys active.") - - def stop(self) -> None: - """Stop the application.""" - self.hotkeys.unregister_all() - self._stop_duration_thread() - - if self.state == AppState.RECORDING: - self.recorder.stop() - - self._log("KB Capture stopped.") - def start_session( self, name: str, project: str, session_type: SessionType = SessionType.DESIGN, ) -> Session: - """Start a new recording session.""" + """Start a new session and begin recording.""" if self.state != AppState.IDLE: raise RuntimeError("Session already active") + # Create session session = self.session_manager.start_session(name, project, session_type) - self.state = AppState.SESSION_ACTIVE - - self._log(f"Session started: {name}") - self._update_status() - - return session - - def toggle_recording(self) -> None: - """Toggle recording state (hotkey handler).""" - if self.state == AppState.IDLE: - self._log("No session active. Start a session first.") - return - - if self.state == AppState.RECORDING: - self._stop_recording() - else: - self._start_recording() - - def _start_recording(self) -> None: - """Start recording a new clip.""" - if self.state not in (AppState.SESSION_ACTIVE, AppState.PREVIEW): - return - - # Auto-keep any clip in preview - if self.state == AppState.PREVIEW: - self.session_manager.keep_last_clip() - - # Start new clip - clip, clip_path = self.session_manager.start_clip() + # Start recording config = RecordingConfig( - output_path=clip_path, + output_path=self.session_manager.get_video_path(), framerate=30, ) if self.recorder.start(config): self.state = AppState.RECORDING self._start_duration_thread() - self._log(f"Recording clip {clip.id}...") + self._log(f"Session started: {name}") else: - self._log("Failed to start recording") - - def _stop_recording(self) -> None: - """Stop recording current clip.""" - if self.state != AppState.RECORDING: - return - - self._stop_duration_thread() - duration = self.recorder.get_duration() - output = self.recorder.stop() - - if output and output.exists(): - self.session_manager.end_clip(duration) - self.state = AppState.PREVIEW - self._log(f"Clip recorded: {duration:.1f}s - Keep (K) or Delete (D)?") - else: - self._log("Recording failed - no output") - self.state = AppState.SESSION_ACTIVE - - self._update_status() - - def keep_last_clip(self, note: str = "") -> Optional[Clip]: - """Keep the last recorded clip.""" - clip = self.session_manager.keep_last_clip(note) - if clip: - self.state = AppState.SESSION_ACTIVE - self._log(f"Kept clip: {clip.id}") - self._update_status() - return clip - - def delete_last_clip(self) -> Optional[Clip]: - """Delete the last recorded clip.""" - clip = self.session_manager.delete_last_clip() - if clip: - self.state = AppState.SESSION_ACTIVE - self._log(f"Deleted clip: {clip.id}") - self._update_status() - return clip - - def end_session(self) -> Optional[Session]: - """End current session and prepare for export.""" - if self.state == AppState.IDLE: - return None - - # Stop recording if active - if self.state == AppState.RECORDING: - self._stop_recording() - - # Keep any preview clips - if self.state == AppState.PREVIEW: - self.session_manager.keep_last_clip() - - session = self.session_manager.end_session() - self.state = AppState.IDLE - - self._log(f"Session ended: {session.clip_count} clips, {session.total_duration:.1f}s total") - self._update_status() + self.session_manager.cancel_session() + raise RuntimeError("Failed to start recording") return session - def cancel_session(self) -> None: - """Cancel current session and delete all clips.""" + def pause(self) -> bool: + """Pause recording.""" + if self.state != AppState.RECORDING: + return False + + if self.recorder.pause(): + self.state = AppState.PAUSED + self.session_manager.update_status(SessionStatus.PAUSED) + self._update_status("Paused") + return True + return False + + def resume(self) -> bool: + """Resume recording.""" + if self.state != AppState.PAUSED: + return False + + if self.recorder.resume(): + self.state = AppState.RECORDING + self.session_manager.update_status(SessionStatus.RECORDING) + self._update_status("Recording") + return True + return False + + def toggle_pause(self) -> None: + """Toggle pause/resume.""" if self.state == AppState.RECORDING: - self._stop_duration_thread() + self.pause() + elif self.state == AppState.PAUSED: + self.resume() + + def stop(self) -> Optional[Session]: + """Stop recording and transcribe.""" + if self.state not in (AppState.RECORDING, AppState.PAUSED): + return None + + self._stop_duration_thread() + + # Get duration before stopping + duration = self.recorder.get_duration() + + # Stop recording + output = self.recorder.stop() + + if not output or not output.exists(): + self._log("Recording failed - no output") + self.session_manager.cancel_session() + self.state = AppState.IDLE + return None + + # Update session + self.session_manager.set_duration(duration) + self.session_manager.update_status(SessionStatus.TRANSCRIBING) + self.state = AppState.TRANSCRIBING + self._update_status("Transcribing...") + + # Transcribe in background + threading.Thread( + target=self._transcribe, + args=(output,), + daemon=True, + ).start() + + return self.session_manager.current_session + + def _transcribe(self, video_path: Path) -> None: + """Transcribe video with Whisper.""" + try: + import whisper + + self._log("Loading Whisper model...") + model = whisper.load_model("base") + + self._log("Transcribing...") + result = model.transcribe(str(video_path), language="en", verbose=False) + + # Save transcript + import json + transcript_path = video_path.parent / "transcript.json" + with open(transcript_path, "w") as f: + json.dump(result, f, indent=2) + + # Find screenshot triggers + triggers = [] + for segment in result.get("segments", []): + text = segment.get("text", "").lower() + if "screenshot" in text: + triggers.append({ + "timestamp": segment.get("start", 0), + "text": segment.get("text", ""), + }) + + # Save metadata + session = self.session_manager.current_session + metadata = { + "session_id": session.id, + "name": session.name, + "project": session.project, + "session_type": session.session_type.value, + "created_at": session.created_at.isoformat(), + "duration": session.duration, + "status": "ready", + "screenshot_triggers": triggers, + "files": { + "video": session.video_file, + "transcript": "transcript.json", + }, + } + + metadata_path = video_path.parent / "metadata.json" + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + + # Update session + self.session_manager.set_transcript("transcript.json") + self.session_manager.end_session() + + self.state = AppState.IDLE + self._log(f"Done! {len(triggers)} screenshot triggers found") + self._update_status(f"Session saved with {len(triggers)} screenshots") + + except ImportError: + self._log("Whisper not installed!") + self.session_manager.end_session() + self.state = AppState.IDLE + self._update_status("Saved (no transcription)") + except Exception as e: + self._log(f"Transcription error: {e}") + self.session_manager.end_session() + self.state = AppState.IDLE + self._update_status("Saved (transcription failed)") + + def cancel(self) -> None: + """Cancel session and delete files.""" + if self.state == AppState.IDLE: + return + + self._stop_duration_thread() + + if self.recorder.is_recording: self.recorder.stop() self.session_manager.cancel_session() self.state = AppState.IDLE - self._log("Session cancelled") - self._update_status() + self._update_status("Cancelled") def get_status(self) -> AppStatus: - """Get current application status.""" + """Get current status.""" session = self.session_manager.current_session return AppStatus( @@ -279,95 +291,5 @@ class KBCaptureApp: session_name=session.name if session else None, project=session.project if session else None, session_type=session.session_type if session else None, - clip_count=session.clip_count if session else 0, - total_duration=session.total_duration if session else 0.0, - current_clip_duration=self.recorder.get_duration() if self.state == AppState.RECORDING else 0.0, - last_clip_duration=self._get_last_clip_duration(), + duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0, ) - - def list_sessions(self) -> list[Session]: - """List all recorded sessions.""" - return self.session_manager.list_sessions() - - def get_session_dir(self, session_id: str) -> Path: - """Get session directory for export.""" - return self.session_manager.get_session_dir(session_id) - - -# CLI for testing -if __name__ == "__main__": - import tempfile - - def on_status(status: AppStatus): - state_icons = { - AppState.IDLE: "⚪", - AppState.SESSION_ACTIVE: "🟢", - AppState.RECORDING: "🔴", - AppState.PREVIEW: "🟡", - } - icon = state_icons.get(status.state, "⚪") - - print(f"\n{icon} State: {status.state.value}") - if status.session_name: - print(f" Session: {status.session_name} ({status.project})") - print(f" Clips: {status.clip_count} | Duration: {status.total_duration:.1f}s") - if status.current_clip_duration > 0: - print(f" Recording: {status.current_clip_duration:.1f}s") - if status.message: - print(f" → {status.message}") - - with tempfile.TemporaryDirectory() as tmpdir: - # Create a test project - test_project = Path(tmpdir) / "Test-Project" - (test_project / "KB").mkdir(parents=True) - - app = KBCaptureApp( - projects_root=Path(tmpdir), - on_status_change=on_status, - ) - - print("\n=== KB Capture Test ===") - print(f"Projects root: {tmpdir}") - print(f"Available projects: {app.session_manager.list_projects()}") - print("\nCommands:") - print(" s - Start session") - print(" r - Toggle recording") - print(" k - Keep last clip") - print(" d - Delete last clip") - print(" e - End session") - print(" c - Cancel session") - print(" q - Quit") - print() - - app.start() - - try: - while True: - cmd = input("> ").strip().lower() - - if cmd == "s": - projects = app.session_manager.list_projects() - print(f"Available projects: {projects}") - project = input("Project: ").strip() or (projects[0] if projects else "Test-Project") - name = input("Session name: ").strip() or "Test Session" - app.start_session(name, project) - elif cmd == "r": - app.toggle_recording() - elif cmd == "k": - app.keep_last_clip() - elif cmd == "d": - app.delete_last_clip() - elif cmd == "e": - app.end_session() - elif cmd == "c": - app.cancel_session() - elif cmd == "q": - break - else: - print("Unknown command") - - except KeyboardInterrupt: - pass - - app.stop() - print("\nGoodbye!") diff --git a/src/cad_documenter/recorder.py b/src/cad_documenter/recorder.py index 4478d48..d0e02fc 100644 --- a/src/cad_documenter/recorder.py +++ b/src/cad_documenter/recorder.py @@ -1,8 +1,8 @@ """ -Screen + Audio Recorder using FFmpeg +Screen + Audio Recorder (Simplified) -Records screen and microphone to video file. -Supports Windows (gdigrab) and Linux (x11grab). +Records screen and microphone to a single video file. +Supports pause/resume within the same recording. """ import subprocess @@ -12,7 +12,6 @@ import sys from pathlib import Path from dataclasses import dataclass from typing import Optional, Callable -import json @dataclass @@ -20,103 +19,65 @@ class RecordingConfig: """Recording configuration.""" output_path: Path framerate: int = 30 - audio_device: Optional[str] = None # None = default mic + audio_device: Optional[str] = None video_codec: str = "libx264" audio_codec: str = "aac" - crf: int = 23 # Quality (lower = better, 18-28 typical) - preset: str = "ultrafast" # Encoding speed - capture_region: Optional[tuple[int, int, int, int]] = None # x, y, w, h + crf: int = 23 + preset: str = "ultrafast" class ScreenRecorder: """ - FFmpeg-based screen recorder with audio. + FFmpeg-based screen recorder with pause/resume. Usage: recorder = ScreenRecorder() recorder.start(config) # ... recording ... + recorder.pause() + # ... thinking ... + recorder.resume() + # ... more recording ... recorder.stop() """ def __init__(self, on_status: Optional[Callable[[str], None]] = None): self.process: Optional[subprocess.Popen] = None self.is_recording = False + self.is_paused = False self.start_time: Optional[float] = None + self.pause_start: Optional[float] = None + self.total_paused: float = 0.0 self.output_path: Optional[Path] = None self.on_status = on_status or (lambda x: None) - self._monitor_thread: Optional[threading.Thread] = None - - def _get_platform_args(self, config: RecordingConfig) -> list[str]: - """Get platform-specific FFmpeg input arguments.""" + + def _get_ffmpeg_cmd(self, config: RecordingConfig) -> list[str]: + """Build FFmpeg command.""" + cmd = ["ffmpeg", "-y"] if sys.platform == "win32": - # Windows: gdigrab for screen, dshow for audio - args = [ + # Windows: gdigrab for screen + cmd.extend([ "-f", "gdigrab", "-framerate", str(config.framerate), - ] + "-i", "desktop", + ]) - if config.capture_region: - x, y, w, h = config.capture_region - args.extend([ - "-offset_x", str(x), - "-offset_y", str(y), - "-video_size", f"{w}x{h}", - ]) - - args.extend(["-i", "desktop"]) - - # Add audio input - if config.audio_device: - args.extend([ - "-f", "dshow", - "-i", f"audio={config.audio_device}", - ]) - else: - # Try to find default microphone - args.extend([ - "-f", "dshow", - "-i", "audio=Microphone Array", # Common default - ]) - + # Audio: dshow + audio_device = config.audio_device or "Microphone Array" + cmd.extend([ + "-f", "dshow", + "-i", f"audio={audio_device}", + ]) else: - # Linux: x11grab for screen, pulse for audio - display = ":0.0" - args = [ + # Linux: x11grab + pulse + cmd.extend([ "-f", "x11grab", "-framerate", str(config.framerate), - ] - - if config.capture_region: - x, y, w, h = config.capture_region - args.extend(["-video_size", f"{w}x{h}"]) - display = f":0.0+{x},{y}" - - args.extend(["-i", display]) - - # Add audio (PulseAudio) - args.extend([ + "-i", ":0.0", "-f", "pulse", "-i", "default", ]) - - return args - - def start(self, config: RecordingConfig) -> bool: - """Start recording.""" - if self.is_recording: - self.on_status("Already recording") - return False - - self.output_path = config.output_path - self.output_path.parent.mkdir(parents=True, exist_ok=True) - - # Build FFmpeg command - cmd = ["ffmpeg", "-y"] # -y to overwrite - - # Platform-specific inputs - cmd.extend(self._get_platform_args(config)) # Output settings cmd.extend([ @@ -125,11 +86,23 @@ class ScreenRecorder: "-crf", str(config.crf), "-c:a", config.audio_codec, "-b:a", "128k", - str(self.output_path), + str(config.output_path), ]) + return cmd + + def start(self, config: RecordingConfig) -> bool: + """Start recording.""" + if self.is_recording: + self.on_status("Already recording") + return False + + self.output_path = config.output_path + self.output_path.parent.mkdir(parents=True, exist_ok=True) + + cmd = self._get_ffmpeg_cmd(config) + try: - # Start FFmpeg process self.process = subprocess.Popen( cmd, stdin=subprocess.PIPE, @@ -139,68 +112,132 @@ class ScreenRecorder: ) self.is_recording = True + self.is_paused = False self.start_time = time.time() - self.on_status(f"Recording started: {self.output_path.name}") - - # Start monitor thread - self._monitor_thread = threading.Thread(target=self._monitor_process, daemon=True) - self._monitor_thread.start() + self.total_paused = 0.0 + self.on_status("Recording started") return True except FileNotFoundError: - self.on_status("FFmpeg not found. Please install FFmpeg.") + self.on_status("FFmpeg not found") return False except Exception as e: - self.on_status(f"Failed to start recording: {e}") + self.on_status(f"Failed: {e}") return False - def _monitor_process(self): - """Monitor FFmpeg process for errors.""" - if self.process: - stderr = self.process.stderr.read() if self.process.stderr else b"" - if self.process.returncode and self.process.returncode != 0: - self.on_status(f"Recording error: {stderr.decode()[-200:]}") + def pause(self) -> bool: + """Pause recording (Windows: suspend process).""" + if not self.is_recording or self.is_paused: + return False + + self.is_paused = True + self.pause_start = time.time() + + # On Windows, we can suspend the FFmpeg process + if sys.platform == "win32" and self.process: + try: + import ctypes + kernel32 = ctypes.windll.kernel32 + handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid) + kernel32.DebugActiveProcess(self.process.pid) + self.on_status("Paused") + except: + self.on_status("Paused (soft)") + else: + self.on_status("Paused") + + return True + + def resume(self) -> bool: + """Resume recording.""" + if not self.is_recording or not self.is_paused: + return False + + if self.pause_start: + self.total_paused += time.time() - self.pause_start + + self.is_paused = False + self.pause_start = None + + # Resume FFmpeg process on Windows + if sys.platform == "win32" and self.process: + try: + import ctypes + kernel32 = ctypes.windll.kernel32 + kernel32.DebugActiveProcessStop(self.process.pid) + except: + pass + + self.on_status("Recording resumed") + return True def stop(self) -> Optional[Path]: """Stop recording and return output path.""" if not self.is_recording or not self.process: return None - + + # If paused, add final pause duration + if self.is_paused and self.pause_start: + self.total_paused += time.time() - self.pause_start + # Resume first so we can stop properly + if sys.platform == "win32": + try: + import ctypes + kernel32 = ctypes.windll.kernel32 + kernel32.DebugActiveProcessStop(self.process.pid) + except: + pass + try: - # Send 'q' to FFmpeg to stop gracefully + # Send 'q' to FFmpeg if self.process.stdin: self.process.stdin.write(b"q") self.process.stdin.flush() - # Wait for process to finish (with timeout) self.process.wait(timeout=10) except subprocess.TimeoutExpired: self.process.terminate() self.process.wait(timeout=5) except Exception as e: - self.on_status(f"Error stopping recording: {e}") + self.on_status(f"Stop error: {e}") self.process.terminate() - + self.is_recording = False - duration = time.time() - self.start_time if self.start_time else 0 - self.on_status(f"Recording stopped: {duration:.1f}s") + self.is_paused = False + + duration = self.get_duration() + self.on_status(f"Stopped: {duration:.1f}s") return self.output_path if self.output_path and self.output_path.exists() else None def get_duration(self) -> float: - """Get current recording duration in seconds.""" - if self.is_recording and self.start_time: - return time.time() - self.start_time - return 0.0 + """Get actual recording duration (excluding pauses).""" + if not self.start_time: + return 0.0 + + elapsed = time.time() - self.start_time + + # Subtract paused time + if self.is_paused and self.pause_start: + elapsed -= (time.time() - self.pause_start) + elapsed -= self.total_paused + + return max(0, elapsed) + + def get_elapsed(self) -> float: + """Get total elapsed time (including pauses).""" + if not self.start_time: + return 0.0 + return time.time() - self.start_time @staticmethod def list_audio_devices() -> list[str]: - """List available audio input devices (Windows only).""" + """List available audio input devices (Windows).""" if sys.platform != "win32": return ["default"] - + try: result = subprocess.run( ["ffmpeg", "-list_devices", "true", "-f", "dshow", "-i", "dummy"], @@ -209,7 +246,6 @@ class ScreenRecorder: creationflags=subprocess.CREATE_NO_WINDOW, ) - # Parse audio devices from stderr devices = [] in_audio = False for line in result.stderr.split("\n"): @@ -218,40 +254,12 @@ class ScreenRecorder: elif "DirectShow video devices" in line: in_audio = False elif in_audio and '"' in line: - # Extract device name between quotes start = line.find('"') + 1 end = line.rfind('"') if start < end: devices.append(line[start:end]) - - return devices if devices else ["default"] - except Exception: - return ["default"] - - -# Quick test -if __name__ == "__main__": - def status(msg): - print(f"[STATUS] {msg}") - - recorder = ScreenRecorder(on_status=status) - - print("Available audio devices:", recorder.list_audio_devices()) - - config = RecordingConfig( - output_path=Path("test_recording.mp4"), - framerate=30, - ) - - print("Starting recording... (Ctrl+C to stop)") - if recorder.start(config): - try: - while True: - time.sleep(1) - print(f"Recording: {recorder.get_duration():.1f}s") - except KeyboardInterrupt: - pass - - output = recorder.stop() - print(f"Saved to: {output}") + return devices if devices else ["Microphone Array"] + + except: + return ["Microphone Array"] diff --git a/src/cad_documenter/session.py b/src/cad_documenter/session.py index f425e62..12d20cf 100644 --- a/src/cad_documenter/session.py +++ b/src/cad_documenter/session.py @@ -1,92 +1,43 @@ """ -Session Manager for KB Capture +Session Manager for KB Capture (Simplified) -Manages recording sessions with multiple clips. -Clips can be kept or deleted before finalizing. +One session = one continuous recording (with pause/resume). +No clips, no keep/delete. Just record → transcribe → done. """ import json -import shutil from pathlib import Path -from dataclasses import dataclass, field, asdict +from dataclasses import dataclass from datetime import datetime from typing import Optional, List from enum import Enum -import uuid - - -class ClipStatus(Enum): - RECORDING = "recording" - PREVIEW = "preview" # Just recorded, awaiting decision - KEPT = "kept" - DELETED = "deleted" class SessionType(Enum): - DESIGN = "design" # CAD/Design KB + DESIGN = "design" # CAD/Design KB ANALYSIS = "analysis" # FEA/Analysis KB -@dataclass -class Clip: - """A single recording clip within a session.""" - id: str - filename: str - start_time: datetime - end_time: Optional[datetime] = None - duration_seconds: float = 0.0 - status: ClipStatus = ClipStatus.RECORDING - note: str = "" # Optional quick note - - def to_dict(self) -> dict: - return { - "id": self.id, - "filename": self.filename, - "start_time": self.start_time.isoformat(), - "end_time": self.end_time.isoformat() if self.end_time else None, - "duration_seconds": self.duration_seconds, - "status": self.status.value, - "note": self.note, - } - - @classmethod - def from_dict(cls, data: dict) -> "Clip": - return cls( - id=data["id"], - filename=data["filename"], - start_time=datetime.fromisoformat(data["start_time"]), - end_time=datetime.fromisoformat(data["end_time"]) if data.get("end_time") else None, - duration_seconds=data.get("duration_seconds", 0.0), - status=ClipStatus(data.get("status", "kept")), - note=data.get("note", ""), - ) +class SessionStatus(Enum): + RECORDING = "recording" + PAUSED = "paused" + TRANSCRIBING = "transcribing" + READY = "ready" # Transcribed, ready for sync + PROCESSED = "processed" # Clawdbot has processed it @dataclass class Session: - """A recording session containing multiple clips.""" + """A recording session.""" id: str name: str project: str session_type: SessionType created_at: datetime - clips: List[Clip] = field(default_factory=list) - is_finalized: bool = False - - @property - def total_duration(self) -> float: - """Total duration of kept clips.""" - return sum(c.duration_seconds for c in self.clips if c.status == ClipStatus.KEPT) - - @property - def kept_clips(self) -> List[Clip]: - """Clips marked as kept.""" - return [c for c in self.clips if c.status == ClipStatus.KEPT] - - @property - def clip_count(self) -> int: - """Number of kept clips.""" - return len(self.kept_clips) + duration: float = 0.0 + status: SessionStatus = SessionStatus.RECORDING + video_file: str = "recording.mp4" + transcript_file: Optional[str] = None def to_dict(self) -> dict: return { @@ -95,8 +46,10 @@ class Session: "project": self.project, "session_type": self.session_type.value, "created_at": self.created_at.isoformat(), - "clips": [c.to_dict() for c in self.clips], - "is_finalized": self.is_finalized, + "duration": self.duration, + "status": self.status.value, + "video_file": self.video_file, + "transcript_file": self.transcript_file, } @classmethod @@ -107,52 +60,38 @@ class Session: project=data["project"], session_type=SessionType(data.get("session_type", "design")), created_at=datetime.fromisoformat(data["created_at"]), - clips=[Clip.from_dict(c) for c in data.get("clips", [])], - is_finalized=data.get("is_finalized", False), + duration=data.get("duration", 0.0), + status=SessionStatus(data.get("status", "ready")), + video_file=data.get("video_file", "recording.mp4"), + transcript_file=data.get("transcript_file"), ) class SessionManager: """ - Manages recording sessions and clips. + Manages recording sessions. Project-centric structure: - /2-Projects// - ├── KB/ - │ └── dev/ # gen-XXX.md session captures (Mario creates) - ├── Images/ - │ └── screenshot-sessions/ # Frames organized by session - └── _capture/ # Session staging + /Projects// + └── _capture/ └── / - ├── session.json - ├── clips/ - │ ├── clip-001.mp4 - │ └── ... - └── clawdbot_export/ # Ready for Mario - ├── merged.mp4 - ├── transcript.json - └── metadata.json + ├── session.json # Metadata + ├── recording.mp4 # Video + └── transcript.json # Whisper output """ def __init__(self, projects_root: Path): - """ - Initialize session manager. - - Args: - projects_root: Path to projects folder (e.g., /2-Projects/ or D:/ATODrive/Projects/) - """ self.projects_root = Path(projects_root) self.current_session: Optional[Session] = None - self.current_clip: Optional[Clip] = None self._current_project_path: Optional[Path] = None def list_projects(self) -> List[str]: - """List available projects (folders in projects_root).""" + """List available projects.""" projects = [] if self.projects_root.exists(): for p in sorted(self.projects_root.iterdir()): if p.is_dir() and not p.name.startswith((".", "_")): - # Check if it looks like a project (has KB folder or _context.md) + # Check if it looks like a project if (p / "KB").exists() or (p / "_context.md").exists(): projects.append(p.name) return projects @@ -161,25 +100,13 @@ class SessionManager: """Get full path to a project.""" return self.projects_root / project - def get_capture_dir(self, project: str) -> Path: - """Get the _capture directory for a project.""" - return self.get_project_path(project) / "_capture" - - @property - def sessions_dir(self) -> Path: - """Current project's capture directory.""" - if self._current_project_path: - return self._current_project_path / "_capture" - raise RuntimeError("No project selected") - def start_session( self, name: str, project: str, session_type: SessionType = SessionType.DESIGN, ) -> Session: - """Start a new recording session within a project.""" - # Set current project + """Start a new recording session.""" self._current_project_path = self.get_project_path(project) if not self._current_project_path.exists(): raise ValueError(f"Project not found: {project}") @@ -192,221 +119,99 @@ class SessionManager: project=project, session_type=session_type, created_at=datetime.now(), + status=SessionStatus.RECORDING, ) - # Create session directory in project's _capture folder - capture_dir = self._current_project_path / "_capture" - session_dir = capture_dir / session_id + # Create session directory + session_dir = self._current_project_path / "_capture" / session_id session_dir.mkdir(parents=True, exist_ok=True) - (session_dir / "clips").mkdir(exist_ok=True) self.current_session = session self._save_session() return session - def start_clip(self) -> tuple[Clip, Path]: - """ - Start a new clip in current session. - Returns clip object and path for recording. - """ - if not self.current_session: + def get_session_dir(self) -> Path: + """Get current session directory.""" + if not self.current_session or not self._current_project_path: raise RuntimeError("No active session") - - clip_num = len(self.current_session.clips) + 1 - clip_id = f"clip-{clip_num:03d}" - filename = f"{clip_id}.mp4" - - clip = Clip( - id=clip_id, - filename=filename, - start_time=datetime.now(), - status=ClipStatus.RECORDING, - ) - - self.current_session.clips.append(clip) - self.current_clip = clip - self._save_session() - - clip_path = self.sessions_dir / self.current_session.id / "clips" / filename - return clip, clip_path + return self._current_project_path / "_capture" / self.current_session.id - def end_clip(self, duration: float) -> Clip: - """End current clip, move to preview state.""" - if not self.current_clip: - raise RuntimeError("No active clip") - - self.current_clip.end_time = datetime.now() - self.current_clip.duration_seconds = duration - self.current_clip.status = ClipStatus.PREVIEW - - clip = self.current_clip - self.current_clip = None - self._save_session() - - return clip + def get_video_path(self) -> Path: + """Get path for video file.""" + return self.get_session_dir() / self.current_session.video_file - def keep_clip(self, clip_id: str, note: str = "") -> None: - """Mark a clip as kept.""" - if not self.current_session: - raise RuntimeError("No active session") - - for clip in self.current_session.clips: - if clip.id == clip_id: - clip.status = ClipStatus.KEPT - clip.note = note - break - - self._save_session() + def update_status(self, status: SessionStatus) -> None: + """Update session status.""" + if self.current_session: + self.current_session.status = status + self._save_session() - def delete_clip(self, clip_id: str) -> None: - """Mark a clip as deleted and remove file.""" - if not self.current_session: - raise RuntimeError("No active session") - - for clip in self.current_session.clips: - if clip.id == clip_id: - clip.status = ClipStatus.DELETED - - # Delete the actual file - clip_path = self.sessions_dir / self.current_session.id / "clips" / clip.filename - if clip_path.exists(): - clip_path.unlink() - break - - self._save_session() + def set_duration(self, duration: float) -> None: + """Set recording duration.""" + if self.current_session: + self.current_session.duration = duration + self._save_session() - def keep_last_clip(self, note: str = "") -> Optional[Clip]: - """Keep the most recent clip in preview state.""" - if not self.current_session: - return None - - for clip in reversed(self.current_session.clips): - if clip.status == ClipStatus.PREVIEW: - self.keep_clip(clip.id, note) - return clip - return None - - def delete_last_clip(self) -> Optional[Clip]: - """Delete the most recent clip in preview state.""" - if not self.current_session: - return None - - for clip in reversed(self.current_session.clips): - if clip.status == ClipStatus.PREVIEW: - self.delete_clip(clip.id) - return clip - return None + def set_transcript(self, transcript_file: str) -> None: + """Set transcript file name.""" + if self.current_session: + self.current_session.transcript_file = transcript_file + self._save_session() def end_session(self) -> Session: - """ - End session and prepare for export. - Clips still in preview are auto-kept. - """ + """End current session.""" if not self.current_session: raise RuntimeError("No active session") - # Auto-keep any clips still in preview - for clip in self.current_session.clips: - if clip.status == ClipStatus.PREVIEW: - clip.status = ClipStatus.KEPT - - self.current_session.is_finalized = True + self.current_session.status = SessionStatus.READY self._save_session() session = self.current_session self.current_session = None + self._current_project_path = None return session def cancel_session(self) -> None: - """Cancel session and delete all files.""" - if not self.current_session: - return - - session_dir = self.sessions_dir / self.current_session.id - if session_dir.exists(): - shutil.rmtree(session_dir) + """Cancel session and delete files.""" + if self.current_session: + import shutil + session_dir = self.get_session_dir() + if session_dir.exists(): + shutil.rmtree(session_dir) self.current_session = None - self.current_clip = None - - def get_session(self, session_id: str) -> Optional[Session]: - """Load a session by ID.""" - session_file = self.sessions_dir / session_id / "session.json" - if session_file.exists(): - with open(session_file) as f: - return Session.from_dict(json.load(f)) - return None + self._current_project_path = None def list_sessions(self, project: Optional[str] = None) -> List[Session]: - """List sessions, optionally filtered by project.""" + """List sessions for a project or all projects.""" sessions = [] if project: - # List sessions for specific project - capture_dir = self.get_capture_dir(project) + capture_dir = self.get_project_path(project) / "_capture" if capture_dir.exists(): for session_dir in sorted(capture_dir.iterdir(), reverse=True): if session_dir.is_dir(): session_file = session_dir / "session.json" if session_file.exists(): - with open(session_file) as f: - sessions.append(Session.from_dict(json.load(f))) + try: + with open(session_file) as f: + sessions.append(Session.from_dict(json.load(f))) + except: + pass else: - # List sessions across all projects for proj in self.list_projects(): sessions.extend(self.list_sessions(proj)) sessions.sort(key=lambda s: s.created_at, reverse=True) return sessions - def get_session_dir(self, session_id: str) -> Path: - """Get session directory path.""" - return self.sessions_dir / session_id - def _save_session(self) -> None: """Save current session to disk.""" if not self.current_session: return - session_file = self.sessions_dir / self.current_session.id / "session.json" + session_file = self.get_session_dir() / "session.json" with open(session_file, "w") as f: json.dump(self.current_session.to_dict(), f, indent=2) - - -# Quick test -if __name__ == "__main__": - from pathlib import Path - import tempfile - - with tempfile.TemporaryDirectory() as tmpdir: - manager = SessionManager(Path(tmpdir)) - - # Start session - session = manager.start_session( - name="Test Session", - project="P04-GigaBIT-M1", - session_type=SessionType.DESIGN, - ) - print(f"Started session: {session.id}") - - # Record some clips - clip1, path1 = manager.start_clip() - print(f"Recording clip 1 to: {path1}") - manager.end_clip(duration=45.5) - manager.keep_clip(clip1.id, note="Stage 2 joint") - - clip2, path2 = manager.start_clip() - print(f"Recording clip 2 to: {path2}") - manager.end_clip(duration=30.0) - manager.delete_clip(clip2.id) # Oops, bad take - - clip3, path3 = manager.start_clip() - print(f"Recording clip 3 to: {path3}") - manager.end_clip(duration=60.0) - manager.keep_last_clip() - - # End session - session = manager.end_session() - print(f"Session ended: {session.clip_count} clips, {session.total_duration:.1f}s total")