9 Commits

Author SHA1 Message Date
Mario Lavoie
7f8ef93247 refactor: OBS-only mode - remove broken screen recording
Simplified to just load OBS recordings:
- Removed all FFmpeg screen capture code
- Single 'Load OBS Recording' button
- Cleaner UI with progress indicator
- Handles videos without audio gracefully
- Remembers last project and video folder

Flow: Select Project → Load Video → Transcribe → Done

OBS handles recording, this tool handles processing.
2026-02-10 18:36:17 +00:00
Mario Lavoie
8c5e35c301 fix: Address all issues from Antoine's review
UI Feedback:
- Show what's being captured (screen + mic) during recording
- Design/Analysis toggle now shows target KB path
- Color-coded status messages (red=error, green=success)

Multi-Screen Support:
- Added screen selector dropdown
- Enumerate displays using Windows API
- Pass screen geometry to FFmpeg for specific monitor capture

Button State Fixes:
- Disable Record button immediately on click (prevent double-click)
- Properly enable Pause button when recording starts
- All inputs disabled during recording

Error Recovery:
- Reset UI properly after recording failure
- Added 'Reset' button that appears on errors
- force_reset() method for emergency recovery
- Can always get back to idle state

Other:
- Better error messages
- Capture info label shows screen + mic being used
2026-02-10 18:23:08 +00:00
Mario Lavoie
d998a9a2b1 fix: Windows recording + pause button + Load Video feature
- recorder.py: Fixed Windows FFmpeg handling
  - Use CTRL_BREAK_EVENT instead of stdin 'q' to stop recording
  - Use NtSuspendProcess/NtResumeProcess for pause/resume
  - Add stderr reader thread to prevent blocking
  - Better error handling and status messages
  - Check if recording actually started before returning success

- gui_capture.py: Added Load Video feature
  - New 'Load' button in header to import existing videos
  - Copies video to session folder and runs transcription
  - Supports mp4, mkv, avi, mov, webm formats

Fixes:
- Record button now properly enables Pause button
- Recording stops cleanly on Windows
- Can load pre-recorded videos (from OBS, etc.)
2026-02-10 18:14:01 +00:00
Mario Lavoie
9b24478f04 Simplify KB Capture to match Voice Recorder pattern
Major simplification:
- Removed clips concept (no keep/delete segments)
- Single continuous recording per session with pause/resume
- Matches Voice Recorder UX pattern Antoine knows

Flow:
  Start Session → Record → Pause → Resume → Stop → Transcribe → Done

Features:
- Record/Pause/Resume/Stop controls
- Session types: Design / Analysis
- Auto-transcribe with Whisper on stop
- Finds 'screenshot' triggers in transcript for Clawdbot
- Simple dark theme UI matching Voice Recorder

Removed:
- export.py (transcription now inline)
- hotkeys.py (not needed for MVP)
- Clip management
2026-02-09 22:14:34 +00:00
Mario Lavoie
09c32cbad2 Clarify Windows vs Clawdbot responsibilities
Windows KB Capture now:
- Records clips
- Merges video
- Transcribes with Whisper (GPU)
- Finds screenshot triggers with context
- Exports: merged.mp4, transcript.json, metadata.json

Clawdbot (via knowledge-base skill) now:
- Extracts frames at trigger timestamps
- Vision analyzes frames
- Updates KB files
- Organizes images

Removed frame extraction from Windows - that's Mario's job.
Added screenshot_triggers to metadata.json with context for Mario.
2026-02-09 22:05:33 +00:00
Mario Lavoie
e647255c60 Major GUI overhaul - professional UX
Features:
- Create new projects from UI (full KB structure auto-generated)
- Visual project cards with stats (sessions, duration)
- Browse for any projects folder (SeaDrive, local, etc.)
- Persistent config (remembers your folder)
- Clean dark theme
- Better onboarding (welcome screen)
- Session info display during recording
- Improved clips list with keep/delete

Project structure created automatically:
  KB/Design/{architecture,components,materials,dev}
  KB/Analysis/{models,load-cases,results}
  Images/{components,screenshot-sessions}
  _capture/
  _context.md
2026-02-09 22:00:30 +00:00
Mario Lavoie
978c79abc0 Add Browse button for projects folder selection
- Added Browse... button to select projects folder
- Saves selected folder to config file (persistent)
- Works with SeaDrive paths
- Graceful handling when no folder selected
- Auto-detects common paths on startup
2026-02-09 21:55:16 +00:00
Mario Lavoie
0266fda42b Make KB Capture project-centric
- Sessions now live inside project folders: <Project>/_capture/<session>/
- Project picker dropdown (scans projects folder)
- Auto-discovers projects with KB/ folder or _context.md
- Windows: D:/ATODrive/Projects
- Linux: ~/obsidian-vault/2-Projects

This aligns with the KB structure where Mario updates:
- KB/dev/gen-XXX.md (session captures)
- Images/screenshot-sessions/ (frames)
2026-02-09 12:53:46 +00:00
Mario Lavoie
d5371cfe75 Add KB Capture v2 - clip-based recording system
New features:
- Clip-based workflow: record short clips, keep or delete
- Toggle recording with Ctrl+Shift+R
- Session management (start, clips, end)
- Modern CustomTkinter GUI with dark theme
- Global hotkeys for hands-free control
- Whisper transcription (local, no API)
- FFmpeg screen + audio capture
- Export to clawdbot_export/ for Mario processing

Files added:
- recorder.py: FFmpeg screen recording
- session.py: Session/clip management
- hotkeys.py: Global hotkey registration
- kb_capture.py: Main application logic
- gui_capture.py: Modern GUI
- export.py: Merge clips, transcribe, export

Docs:
- docs/KB-CAPTURE.md: Full documentation

Entry point: uv run kb-capture
2026-02-09 12:50:22 +00:00
6 changed files with 1903 additions and 0 deletions

193
docs/KB-CAPTURE.md Normal file
View File

@@ -0,0 +1,193 @@
# KB Capture v2
**Clip-based recording for engineering knowledge capture.**
## Overview
KB Capture is a lightweight recording tool that captures your CAD/FEM work as short clips, not one long video. Record what matters, delete mistakes, keep the good stuff.
## Quick Start
```bash
# Install
cd CAD-Documenter
uv sync
uv pip install customtkinter keyboard
# Launch
uv run kb-capture
```
## Workflow
### 1. Start Session
- Open KB Capture (GUI)
- **Select project** from dropdown (scans your projects folder)
- Enter session description (e.g., "Vertical support refinement")
- Select type: **Design** (CAD) or **Analysis** (FEA)
- Click **Start Session**
> Projects are auto-discovered from your projects folder (D:/ATODrive/Projects on Windows)
### 2. Record Clips
While working in NX/CAD:
- Press **Ctrl+Shift+R** to start recording
- Narrate what you're doing
- Say "screenshot" when you want a frame captured
- Press **Ctrl+Shift+R** again to stop
### 3. Review Clips
After each clip:
- **Keep (K)**: Keep the clip
- **Delete (D)**: Discard the clip (bad take)
- Or just start recording again (auto-keeps previous)
### 4. End Session
- Press **Ctrl+Shift+E** or click **End Session**
- Clips are merged and transcribed
- Exported to `clawdbot_export/` for Mario processing
## Keyboard Shortcuts
| Action | Shortcut |
|--------|----------|
| Start/Stop Recording | Ctrl+Shift+R |
| Keep Last Clip | Ctrl+Shift+K |
| Delete Last Clip | Ctrl+Shift+D |
| End Session | Ctrl+Shift+E |
## Session Types
| Type | Updates | Use For |
|------|---------|---------|
| **Design** | KB/Design/ | CAD work, component design, assembly |
| **Analysis** | KB/Analysis/ | FEA setup, mesh, BCs, results |
## Output
Sessions are stored **inside the project folder**:
```
/2-Projects/<ProjectName>/
├── KB/
│ └── dev/ # Mario creates gen-XXX.md here
├── Images/
│ └── screenshot-sessions/ # Mario moves frames here
└── _capture/ # Session staging area
└── <session-id>/
├── clips/
│ ├── clip-001.mp4
│ └── ...
├── session.json
└── clawdbot_export/ # Ready for Mario
├── merged.mp4
├── transcript.json
├── frames/
│ ├── 01_00-30.png
│ └── ...
└── metadata.json
```
**Key insight:** Sessions belong to PROJECTS, not to KB Capture. This means:
- All project data stays together
- Mario knows which KB to update
- Easy to archive/delete projects
## What Happens Next
1. **Syncthing** syncs `clawdbot_export/` to Clawdbot
2. **Mario** detects new session
3. **Vision analysis** categorizes frames
4. **KB updated** with new information
5. **Slack notification** when complete
## Tips
### Recording
- Narrate naturally — explain what you're doing
- Say "screenshot" before important views
- Keep clips short (30s - 2min)
- It's okay to delete bad takes
### Organization
- One session per work block (30-60 min)
- Use descriptive session names
- Match project name to your PKM folder
### Quality
- Close unnecessary windows before recording
- Undock NX 3D viewport for clean captures
- Speak clearly for better transcription
## Troubleshooting
### Hotkeys not working
- Run as Administrator (Windows)
- Check for conflicts with other apps
- Try restarting KB Capture
### Recording fails
- Ensure FFmpeg is installed: `choco install ffmpeg`
- Check disk space
- Check microphone permissions
### No transcription
- Whisper needs ~2GB RAM for 'base' model
- Try 'tiny' model: `--whisper-model tiny`
- Check CUDA/GPU drivers for faster processing
## Architecture
```
┌─────────────────────────────────────────┐
│ KB Capture (Windows) │
├─────────────────────────────────────────┤
│ ┌───────────┐ ┌──────────────────┐ │
│ │ Hotkeys │ │ GUI (optional) │ │
│ └─────┬─────┘ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Session Manager │ │
│ │ (clips, keep/delete, merge) │ │
│ └─────────────┬───────────────────┘ │
│ │ │
│ ┌─────────────┼───────────────────┐ │
│ │ ▼ │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ Screen Recorder │ │ │
│ │ │ (FFmpeg gdigrab) │ │ │
│ │ └─────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ Whisper Transcriber │ │ │
│ │ │ (local GPU) │ │ │
│ │ └─────────────────────────┘ │ │
│ └─────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ clawdbot_export/ │ │
│ │ merged.mp4 + transcript.json │ │
│ └─────────────┬───────────────────┘ │
└────────────────┼────────────────────────┘
│ Syncthing
┌─────────────────────────────────────────┐
│ Clawdbot (Mario) │
│ Vision analysis → KB update → Notify │
└─────────────────────────────────────────┘
```
## Requirements
- Windows 10/11
- Python 3.12+
- FFmpeg (`choco install ffmpeg`)
- CUDA GPU (recommended for Whisper)
- ~4GB RAM (for Whisper 'base' model)
## Related
- [CAD-Documenter README](../README.md) — Original project overview
- [Knowledge Base Skill](http://100.80.199.40:3000/Antoine/clawdbot-shared-skills) — How Mario processes sessions

View File

@@ -41,6 +41,11 @@ dev = [
gui = [
"customtkinter>=5.2.0",
]
capture = [
"customtkinter>=5.2.0",
"keyboard>=0.13.5", # Global hotkeys
"pystray>=0.19.0", # System tray (optional)
]
pdf = [
"pandoc", # For PDF generation fallback
]
@@ -48,6 +53,7 @@ pdf = [
[project.scripts]
cad-doc = "cad_documenter.cli:main"
cad-doc-gui = "cad_documenter.gui:main"
kb-capture = "cad_documenter.gui_capture:main"
[project.urls]
Homepage = "http://100.80.199.40:3000/Antoine/CAD-Documenter"

View File

@@ -0,0 +1,726 @@
"""
KB Capture GUI (OBS Mode)
Load OBS recordings → Transcribe → Process for KB
Simple flow: Load Video → Transcribe → Done
"""
import sys
import json
import shutil
import threading
from pathlib import Path
from typing import Optional
from tkinter import filedialog, messagebox
from datetime import datetime
try:
import customtkinter as ctk
from customtkinter import CTk, CTkFrame, CTkLabel, CTkButton, CTkEntry
from customtkinter import CTkOptionMenu, CTkToplevel, CTkProgressBar
HAS_CTK = True
except ImportError:
HAS_CTK = False
from .session import SessionManager, Session, SessionType, SessionStatus
# ============================================================================
# THEME
# ============================================================================
COLORS = {
"bg": "#0d1117",
"bg_card": "#161b22",
"bg_elevated": "#1c2128",
"border": "#30363d",
"text": "#e6edf3",
"text_secondary": "#7d8590",
"text_muted": "#484f58",
"red": "#f85149",
"green": "#3fb950",
"blue": "#58a6ff",
"orange": "#d29922",
"purple": "#a371f7",
}
# ============================================================================
# CONFIG
# ============================================================================
def get_config_path() -> Path:
if sys.platform == "win32":
config_dir = Path.home() / "AppData" / "Local" / "KBCapture"
else:
config_dir = Path.home() / ".config" / "kb-capture"
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir / "config.json"
def load_config() -> dict:
config_path = get_config_path()
if config_path.exists():
try:
with open(config_path) as f:
return json.load(f)
except:
pass
return {}
def save_config(config: dict) -> None:
with open(get_config_path(), "w") as f:
json.dump(config, f, indent=2)
# ============================================================================
# PROJECT CREATION
# ============================================================================
def create_project(projects_root: Path, name: str, description: str = "") -> Path:
"""Create a new project with KB structure."""
project_path = projects_root / name
if project_path.exists():
raise ValueError(f"Project already exists: {name}")
dirs = [
"KB/Design/components",
"KB/Design/materials",
"KB/Design/dev",
"KB/Analysis/models",
"KB/Analysis/results",
"Images/components",
"_capture",
]
for d in dirs:
(project_path / d).mkdir(parents=True, exist_ok=True)
# Create context file
context = f"""# {name}
{description}
Created: {datetime.now().strftime("%Y-%m-%d")}
"""
(project_path / "_context.md").write_text(context)
return project_path
# ============================================================================
# MAIN GUI
# ============================================================================
class KBCaptureGUI:
"""Main application window - OBS video loading only."""
def __init__(self):
if not HAS_CTK:
raise RuntimeError("CustomTkinter not installed")
# Config
self.config = load_config()
# Projects root
self.projects_root = None
if self.config.get("projects_root"):
saved = Path(self.config["projects_root"])
if saved.exists():
self.projects_root = saved
# Session manager
self.session_manager = None
# Processing state
self.is_processing = False
# Window
ctk.set_appearance_mode("dark")
self.window = CTk()
self.window.title("KB Capture")
self.window.geometry("420x520")
self.window.minsize(400, 480)
self.window.configure(fg_color=COLORS["bg"])
self._build_ui()
# Initialize if we have a projects root
if self.projects_root:
self._init_session_manager()
self._refresh_projects()
self.window.protocol("WM_DELETE_WINDOW", self._on_close)
def _init_session_manager(self):
"""Initialize session manager."""
self.session_manager = SessionManager(self.projects_root)
def _build_ui(self):
"""Build the interface."""
main = CTkFrame(self.window, fg_color=COLORS["bg"])
main.pack(fill="both", expand=True, padx=20, pady=16)
# Header
header = CTkFrame(main, fg_color="transparent")
header.pack(fill="x", pady=(0, 16))
CTkLabel(
header,
text="KB Capture",
font=("Segoe UI Semibold", 20),
text_color=COLORS["text"],
).pack(side="left")
# Folder button
CTkButton(
header,
text="📁",
width=32,
height=32,
fg_color="transparent",
hover_color=COLORS["bg_card"],
command=self._browse_folder,
).pack(side="right")
# Info card
info_frame = CTkFrame(main, fg_color=COLORS["bg_card"], corner_radius=10)
info_frame.pack(fill="x", pady=(0, 16))
info_inner = CTkFrame(info_frame, fg_color="transparent")
info_inner.pack(pady=16, padx=16, fill="x")
CTkLabel(
info_inner,
text="📹 Record with OBS, then load here",
font=("Segoe UI Semibold", 13),
text_color=COLORS["text"],
).pack(anchor="w")
CTkLabel(
info_inner,
text="Video → Transcribe → Ready for Clawdbot",
font=("", 11),
text_color=COLORS["text_secondary"],
).pack(anchor="w", pady=(4, 0))
# Status area
self.status_label = CTkLabel(
info_inner,
text="",
font=("", 11),
text_color=COLORS["text_muted"],
)
self.status_label.pack(anchor="w", pady=(8, 0))
# Progress bar (hidden by default)
self.progress = CTkProgressBar(
info_inner,
width=300,
height=6,
fg_color=COLORS["border"],
progress_color=COLORS["blue"],
)
# Don't pack yet - will show during processing
# Separator
CTkFrame(main, fg_color=COLORS["border"], height=1).pack(fill="x", pady=8)
# Project selector
proj_frame = CTkFrame(main, fg_color="transparent")
proj_frame.pack(fill="x")
proj_header = CTkFrame(proj_frame, fg_color="transparent")
proj_header.pack(fill="x", pady=(0, 8))
CTkLabel(
proj_header,
text="Project",
font=("Segoe UI Semibold", 11),
text_color=COLORS["text_secondary"],
).pack(side="left")
CTkButton(
proj_header,
text="+ New",
width=60,
height=24,
font=("", 10),
fg_color=COLORS["green"],
hover_color="#16a34a",
command=self._new_project,
).pack(side="right")
self.project_menu = CTkOptionMenu(
proj_frame,
values=["(Select folder first)"],
width=360,
height=35,
fg_color=COLORS["bg_card"],
button_color=COLORS["bg_elevated"],
button_hover_color=COLORS["border"],
)
self.project_menu.pack(fill="x")
# Session name
CTkLabel(
main,
text="Session Name",
font=("Segoe UI Semibold", 11),
text_color=COLORS["text_secondary"],
).pack(anchor="w", pady=(16, 8))
self.name_entry = CTkEntry(
main,
placeholder_text="e.g., Vertical support walkthrough",
height=35,
fg_color=COLORS["bg_card"],
border_color=COLORS["border"],
)
self.name_entry.pack(fill="x")
# Session type
CTkLabel(
main,
text="Session Type",
font=("Segoe UI Semibold", 11),
text_color=COLORS["text_secondary"],
).pack(anchor="w", pady=(16, 8))
type_frame = CTkFrame(main, fg_color="transparent")
type_frame.pack(fill="x")
type_frame.grid_columnconfigure((0, 1), weight=1)
self.type_var = ctk.StringVar(value="design")
self.design_btn = CTkButton(
type_frame,
text="🎨 Design → KB/Design/",
height=40,
font=("", 11),
fg_color=COLORS["blue"],
hover_color="#2563eb",
command=lambda: self._set_type("design"),
)
self.design_btn.grid(row=0, column=0, padx=(0, 4), sticky="ew")
self.analysis_btn = CTkButton(
type_frame,
text="📊 Analysis → KB/Analysis/",
height=40,
font=("", 11),
fg_color=COLORS["border"],
hover_color=COLORS["bg_elevated"],
command=lambda: self._set_type("analysis"),
)
self.analysis_btn.grid(row=0, column=1, padx=(4, 0), sticky="ew")
# Spacer
CTkFrame(main, fg_color="transparent").pack(fill="both", expand=True)
# Load button (main action)
self.load_btn = CTkButton(
main,
text="📂 Load OBS Recording",
height=50,
font=("Segoe UI Semibold", 14),
fg_color=COLORS["blue"],
hover_color="#2563eb",
state="disabled",
command=self._load_video,
)
self.load_btn.pack(fill="x", pady=(16, 0))
# Recent sessions link
self.recent_label = CTkLabel(
main,
text="",
font=("", 10),
text_color=COLORS["text_muted"],
cursor="hand2",
)
self.recent_label.pack(pady=(12, 0))
# Folder path
self.folder_label = CTkLabel(
main,
text=str(self.projects_root) if self.projects_root else "Click 📁 to select projects folder",
font=("", 9),
text_color=COLORS["text_muted"],
cursor="hand2",
)
self.folder_label.pack(pady=(8, 0))
self.folder_label.bind("<Button-1>", lambda e: self._browse_folder())
def _set_type(self, type_id: str):
"""Set session type."""
self.type_var.set(type_id)
if type_id == "design":
self.design_btn.configure(fg_color=COLORS["blue"])
self.analysis_btn.configure(fg_color=COLORS["border"])
else:
self.design_btn.configure(fg_color=COLORS["border"])
self.analysis_btn.configure(fg_color=COLORS["orange"])
def _browse_folder(self):
"""Browse for projects folder."""
initial = str(self.projects_root) if self.projects_root else str(Path.home())
folder = filedialog.askdirectory(
title="Select Projects Folder (e.g., ATODrive/Projects)",
initialdir=initial,
)
if folder:
self.projects_root = Path(folder)
self.folder_label.configure(text=str(self.projects_root))
self.config["projects_root"] = str(self.projects_root)
save_config(self.config)
self._init_session_manager()
self._refresh_projects()
def _refresh_projects(self):
"""Refresh project list."""
if not self.session_manager:
self.project_menu.configure(values=["(Select folder first)"])
return
projects = self.session_manager.list_projects()
if projects:
self.project_menu.configure(values=projects)
# Try to restore last used project
last_project = self.config.get("last_project")
if last_project in projects:
self.project_menu.set(last_project)
else:
self.project_menu.set(projects[0])
self.load_btn.configure(state="normal")
self._update_recent()
else:
self.project_menu.configure(values=["(No projects - click + New)"])
self.load_btn.configure(state="disabled")
def _update_recent(self):
"""Update recent sessions count."""
project = self.project_menu.get()
if project.startswith("("):
self.recent_label.configure(text="")
return
sessions = self.session_manager.list_sessions(project)
if sessions:
self.recent_label.configure(text=f"📋 {len(sessions)} previous sessions")
else:
self.recent_label.configure(text="No sessions yet")
def _new_project(self):
"""Create new project."""
if not self.projects_root:
messagebox.showwarning("No Folder", "Select a projects folder first")
return
# Simple dialog
dialog = CTkToplevel(self.window)
dialog.title("New Project")
dialog.geometry("350x180")
dialog.transient(self.window)
dialog.grab_set()
dialog.configure(fg_color=COLORS["bg"])
# Center
dialog.update_idletasks()
x = self.window.winfo_x() + (self.window.winfo_width() - 350) // 2
y = self.window.winfo_y() + (self.window.winfo_height() - 180) // 2
dialog.geometry(f"+{x}+{y}")
CTkLabel(
dialog,
text="Project Name",
font=("", 12),
text_color=COLORS["text_secondary"],
).pack(pady=(20, 8), padx=20, anchor="w")
name_entry = CTkEntry(
dialog,
placeholder_text="e.g., P05-NewProject",
width=310,
height=35,
)
name_entry.pack(padx=20)
name_entry.focus_set()
def create():
name = name_entry.get().strip()
if not name:
return
# Sanitize
name = "".join(c for c in name if c.isalnum() or c in "-_ ")
name = name.replace(" ", "-")
try:
create_project(self.projects_root, name)
dialog.destroy()
self._refresh_projects()
self.project_menu.set(name)
except ValueError as e:
messagebox.showerror("Error", str(e))
btn_frame = CTkFrame(dialog, fg_color="transparent")
btn_frame.pack(fill="x", padx=20, pady=20)
CTkButton(
btn_frame,
text="Cancel",
width=80,
fg_color="transparent",
border_width=1,
border_color=COLORS["border"],
command=dialog.destroy,
).pack(side="left")
CTkButton(
btn_frame,
text="Create",
width=100,
fg_color=COLORS["green"],
command=create,
).pack(side="right")
name_entry.bind("<Return>", lambda e: create())
def _load_video(self):
"""Load an OBS recording for processing."""
if self.is_processing:
return
project = self.project_menu.get()
if project.startswith("("):
messagebox.showwarning("No Project", "Select a project first")
return
# Save last project
self.config["last_project"] = project
save_config(self.config)
# Ask for video file
initial_dir = self.config.get("last_video_dir", str(Path.home() / "Videos"))
video_path = filedialog.askopenfilename(
title="Select OBS Recording",
initialdir=initial_dir,
filetypes=[
("Video files", "*.mp4 *.mkv *.avi *.mov *.webm *.flv"),
("All files", "*.*"),
],
)
if not video_path:
return
video_path = Path(video_path)
if not video_path.exists():
messagebox.showerror("Error", "File not found")
return
# Save last video directory
self.config["last_video_dir"] = str(video_path.parent)
save_config(self.config)
# Get session name
name = self.name_entry.get().strip() or video_path.stem
session_type = SessionType.DESIGN if self.type_var.get() == "design" else SessionType.ANALYSIS
# Start processing
self.is_processing = True
self._set_processing_ui(True, "Copying video...")
# Run in background
threading.Thread(
target=self._process_video,
args=(video_path, name, project, session_type),
daemon=True,
).start()
def _process_video(self, video_path: Path, name: str, project: str, session_type: SessionType):
"""Process video in background thread."""
try:
# Create session
session = self.session_manager.start_session(name, project, session_type)
session_dir = self.session_manager.get_session_dir()
# Copy video
dest_video = session_dir / "recording.mp4"
self._update_status("Copying video...")
shutil.copy2(video_path, dest_video)
# Transcribe
self._update_status("Loading Whisper model...")
self._transcribe(dest_video, session)
except Exception as e:
self._update_status(f"Error: {e}", error=True)
if self.session_manager.current_session:
self.session_manager.cancel_session()
finally:
self.is_processing = False
self.window.after(0, lambda: self._set_processing_ui(False))
def _transcribe(self, video_path: Path, session: Session):
"""Transcribe video with Whisper."""
try:
import whisper
self._update_status("Transcribing (this takes a while)...")
model = whisper.load_model("base")
# Whisper can handle video files directly
result = model.transcribe(str(video_path), language="en", verbose=False)
# Save transcript
transcript_path = video_path.parent / "transcript.json"
with open(transcript_path, "w") as f:
json.dump(result, f, indent=2)
# Find screenshot triggers
triggers = []
for segment in result.get("segments", []):
text = segment.get("text", "").lower()
if "screenshot" in text:
triggers.append({
"timestamp": segment.get("start", 0),
"text": segment.get("text", ""),
})
# Get video duration
duration = self._get_video_duration(video_path)
# Save metadata
metadata = {
"session_id": session.id,
"name": session.name,
"project": session.project,
"session_type": session.session_type.value,
"created_at": session.created_at.isoformat(),
"duration": duration,
"status": "ready",
"screenshot_triggers": triggers,
"source_file": video_path.name,
"files": {
"video": "recording.mp4",
"transcript": "transcript.json",
},
}
metadata_path = video_path.parent / "metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
# Update session
self.session_manager.set_duration(duration)
self.session_manager.set_transcript("transcript.json")
self.session_manager.end_session()
self._update_status(f"✅ Done! {len(triggers)} screenshot triggers found", success=True)
self.window.after(0, self._update_recent)
except ImportError:
self._update_status("⚠️ Saved (Whisper not installed)", warning=True)
self.session_manager.end_session()
except Exception as e:
error_msg = str(e)
if "audio" in error_msg.lower():
self._update_status("⚠️ Saved (video has no audio track)", warning=True)
# Still save the session without transcript
self.session_manager.end_session()
else:
raise
def _get_video_duration(self, video_path: Path) -> float:
"""Get video duration using ffprobe."""
try:
import subprocess
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(video_path),
],
capture_output=True,
text=True,
timeout=30,
)
return float(result.stdout.strip())
except:
return 0.0
def _update_status(self, text: str, error: bool = False, success: bool = False, warning: bool = False):
"""Update status label from any thread."""
def update():
if error:
color = COLORS["red"]
elif success:
color = COLORS["green"]
elif warning:
color = COLORS["orange"]
else:
color = COLORS["text_secondary"]
self.status_label.configure(text=text, text_color=color)
self.window.after(0, update)
def _set_processing_ui(self, processing: bool, status: str = ""):
"""Update UI for processing state."""
if processing:
self.load_btn.configure(state="disabled", text="Processing...")
self.project_menu.configure(state="disabled")
self.name_entry.configure(state="disabled")
self.design_btn.configure(state="disabled")
self.analysis_btn.configure(state="disabled")
self.progress.pack(fill="x", pady=(8, 0))
self.progress.configure(mode="indeterminate")
self.progress.start()
if status:
self.status_label.configure(text=status, text_color=COLORS["text_secondary"])
else:
self.load_btn.configure(state="normal", text="📂 Load OBS Recording")
self.project_menu.configure(state="normal")
self.name_entry.configure(state="normal")
self.design_btn.configure(state="normal")
self.analysis_btn.configure(state="normal")
self.progress.stop()
self.progress.pack_forget()
self.name_entry.delete(0, "end")
def _on_close(self):
"""Handle window close."""
if self.is_processing:
if not messagebox.askyesno("Processing", "Video is being processed. Close anyway?"):
return
self.window.destroy()
def run(self):
"""Run the application."""
self.window.mainloop()
def main():
"""Entry point."""
if not HAS_CTK:
print("Error: CustomTkinter not installed")
print("Install with: pip install customtkinter")
sys.exit(1)
gui = KBCaptureGUI()
gui.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,330 @@
"""
KB Capture - Screen Recording for Knowledge Base
Simple flow: Record → Pause → Resume → Stop → Transcribe → Done
One session = one continuous recording.
"""
import threading
import time
from pathlib import Path
from typing import Optional, Callable
from dataclasses import dataclass
from enum import Enum
from .recorder import ScreenRecorder, RecordingConfig
from .session import SessionManager, Session, SessionType, SessionStatus
class AppState(Enum):
"""Application state."""
IDLE = "idle" # No session
RECORDING = "recording" # Recording
PAUSED = "paused" # Recording paused
TRANSCRIBING = "transcribing" # Processing
@dataclass
class AppStatus:
"""Current application status for UI."""
state: AppState
session_name: Optional[str] = None
project: Optional[str] = None
session_type: Optional[SessionType] = None
duration: float = 0.0
message: str = ""
class KBCaptureApp:
"""
Main application.
Flow:
1. Select project
2. Start session (name, type)
3. Record → Pause → Resume → Stop
4. Auto-transcribe with Whisper
5. Ready for Clawdbot to process
"""
def __init__(
self,
projects_root: Path,
on_status_change: Optional[Callable[[AppStatus], None]] = None,
):
self.projects_root = Path(projects_root)
self.on_status_change = on_status_change or (lambda x: None)
# Components
self.session_manager = SessionManager(self.projects_root)
self.recorder = ScreenRecorder(on_status=self._log)
# State
self.state = AppState.IDLE
self._duration_thread: Optional[threading.Thread] = None
self._running = False
def _log(self, message: str) -> None:
"""Log and update status."""
print(f"[KB Capture] {message}")
self._update_status(message=message)
def _update_status(self, message: str = "") -> None:
"""Update status and notify listeners."""
session = self.session_manager.current_session
status = AppStatus(
state=self.state,
session_name=session.name if session else None,
project=session.project if session else None,
session_type=session.session_type if session else None,
duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0,
message=message,
)
self.on_status_change(status)
def _start_duration_thread(self) -> None:
"""Start thread to update duration."""
self._running = True
def update_loop():
while self._running and self.state in (AppState.RECORDING, AppState.PAUSED):
self._update_status()
time.sleep(0.5)
self._duration_thread = threading.Thread(target=update_loop, daemon=True)
self._duration_thread.start()
def _stop_duration_thread(self) -> None:
"""Stop duration thread."""
self._running = False
if self._duration_thread:
self._duration_thread.join(timeout=1)
self._duration_thread = None
# === Public API ===
def start_session(
self,
name: str,
project: str,
session_type: SessionType = SessionType.DESIGN,
screen_index: int = None,
) -> Session:
"""Start a new session and begin recording."""
if self.state != AppState.IDLE:
raise RuntimeError("Session already active")
# Create session
session = self.session_manager.start_session(name, project, session_type)
# Start recording
config = RecordingConfig(
output_path=self.session_manager.get_video_path(),
framerate=30,
screen_index=screen_index,
)
if self.recorder.start(config):
self.state = AppState.RECORDING
self._start_duration_thread()
self._log(f"Session started: {name}")
else:
self.session_manager.cancel_session()
raise RuntimeError("Failed to start recording - check FFmpeg is installed")
return session
def pause(self) -> bool:
"""Pause recording."""
if self.state != AppState.RECORDING:
return False
if self.recorder.pause():
self.state = AppState.PAUSED
self.session_manager.update_status(SessionStatus.PAUSED)
self._update_status("Paused")
return True
return False
def resume(self) -> bool:
"""Resume recording."""
if self.state != AppState.PAUSED:
return False
if self.recorder.resume():
self.state = AppState.RECORDING
self.session_manager.update_status(SessionStatus.RECORDING)
self._update_status("Recording")
return True
return False
def toggle_pause(self) -> None:
"""Toggle pause/resume."""
if self.state == AppState.RECORDING:
self.pause()
elif self.state == AppState.PAUSED:
self.resume()
def stop(self) -> Optional[Session]:
"""Stop recording and transcribe."""
if self.state not in (AppState.RECORDING, AppState.PAUSED):
return None
self._stop_duration_thread()
# Get duration before stopping
duration = self.recorder.get_duration()
# Stop recording
try:
output = self.recorder.stop()
except Exception as e:
self._log(f"Stop error: {e}")
output = None
if not output or not output.exists():
self._log("Recording failed - no output file")
self.session_manager.cancel_session()
self.state = AppState.IDLE
self._update_status("Recording failed - try again")
return None
# Update session
self.session_manager.set_duration(duration)
self.session_manager.update_status(SessionStatus.TRANSCRIBING)
self.state = AppState.TRANSCRIBING
self._update_status("Transcribing...")
# Transcribe in background
threading.Thread(
target=self._transcribe,
args=(output,),
daemon=True,
).start()
return self.session_manager.current_session
def _transcribe(self, video_path: Path) -> None:
"""Transcribe video with Whisper."""
try:
import whisper
self._log("Loading Whisper model...")
model = whisper.load_model("base")
self._log("Transcribing...")
result = model.transcribe(str(video_path), language="en", verbose=False)
# Save transcript
import json
transcript_path = video_path.parent / "transcript.json"
with open(transcript_path, "w") as f:
json.dump(result, f, indent=2)
# Find screenshot triggers
triggers = []
for segment in result.get("segments", []):
text = segment.get("text", "").lower()
if "screenshot" in text:
triggers.append({
"timestamp": segment.get("start", 0),
"text": segment.get("text", ""),
})
# Save metadata
session = self.session_manager.current_session
metadata = {
"session_id": session.id,
"name": session.name,
"project": session.project,
"session_type": session.session_type.value,
"created_at": session.created_at.isoformat(),
"duration": session.duration,
"status": "ready",
"screenshot_triggers": triggers,
"files": {
"video": session.video_file,
"transcript": "transcript.json",
},
}
metadata_path = video_path.parent / "metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
# Update session
self.session_manager.set_transcript("transcript.json")
self.session_manager.end_session()
self.state = AppState.IDLE
self._log(f"Done! {len(triggers)} screenshot triggers found")
self._update_status(f"Session saved with {len(triggers)} screenshots")
except ImportError:
self._log("Whisper not installed!")
self.session_manager.end_session()
self.state = AppState.IDLE
self._update_status("Saved (no transcription)")
except Exception as e:
self._log(f"Transcription error: {e}")
self.session_manager.end_session()
self.state = AppState.IDLE
self._update_status("Saved (transcription failed)")
def cancel(self) -> None:
"""Cancel session and delete files."""
if self.state == AppState.IDLE:
return
self._stop_duration_thread()
if self.recorder.is_recording:
try:
self.recorder.stop()
except:
pass
self.session_manager.cancel_session()
self.state = AppState.IDLE
self._log("Session cancelled")
self._update_status("Cancelled")
def force_reset(self) -> None:
"""Force reset to idle state (emergency recovery)."""
self._stop_duration_thread()
# Force stop recorder
if self.recorder.process:
try:
self.recorder.process.terminate()
except:
pass
self.recorder.is_recording = False
self.recorder.is_paused = False
# Cancel any session
if self.session_manager.current_session:
try:
self.session_manager.cancel_session()
except:
self.session_manager.current_session = None
self.state = AppState.IDLE
self._log("Force reset complete")
self._update_status("Ready to record")
def get_status(self) -> AppStatus:
"""Get current status."""
session = self.session_manager.current_session
return AppStatus(
state=self.state,
session_name=session.name if session else None,
project=session.project if session else None,
session_type=session.session_type if session else None,
duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0,
)

View File

@@ -0,0 +1,431 @@
"""
Screen + Audio Recorder (Simplified)
Records screen and microphone to a single video file.
Supports pause/resume within the same recording.
"""
import subprocess
import threading
import time
import sys
import os
import signal
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Callable
@dataclass
class RecordingConfig:
"""Recording configuration."""
output_path: Path
framerate: int = 30
audio_device: Optional[str] = None
video_codec: str = "libx264"
audio_codec: str = "aac"
crf: int = 23
preset: str = "ultrafast"
screen_index: Optional[int] = None # None = all screens, 0 = first, 1 = second, etc.
class ScreenRecorder:
"""
FFmpeg-based screen recorder with pause/resume.
Usage:
recorder = ScreenRecorder()
recorder.start(config)
# ... recording ...
recorder.pause()
# ... thinking ...
recorder.resume()
# ... more recording ...
recorder.stop()
"""
def __init__(self, on_status: Optional[Callable[[str], None]] = None):
self.process: Optional[subprocess.Popen] = None
self.is_recording = False
self.is_paused = False
self.start_time: Optional[float] = None
self.pause_start: Optional[float] = None
self.total_paused: float = 0.0
self.output_path: Optional[Path] = None
self.on_status = on_status or (lambda x: None)
self._stderr_thread: Optional[threading.Thread] = None
self._last_error: str = ""
def _get_ffmpeg_cmd(self, config: RecordingConfig) -> list[str]:
"""Build FFmpeg command."""
cmd = ["ffmpeg", "-y"]
if sys.platform == "win32":
# Windows: gdigrab for screen capture
cmd.extend([
"-f", "gdigrab",
"-framerate", str(config.framerate),
"-draw_mouse", "1",
])
# Handle screen selection
if config.screen_index is not None:
# Get screen geometry
screens = self._get_screen_geometries()
if config.screen_index < len(screens):
screen = screens[config.screen_index]
# Use offset to capture specific screen
cmd.extend([
"-offset_x", str(screen["left"]),
"-offset_y", str(screen["top"]),
"-video_size", f"{screen['width']}x{screen['height']}",
])
cmd.extend(["-i", "desktop"])
# Audio: dshow - try to find a working device
audio_device = config.audio_device
if not audio_device:
devices = self.list_audio_devices()
audio_device = devices[0] if devices else None
if audio_device:
cmd.extend([
"-f", "dshow",
"-i", f"audio={audio_device}",
])
else:
# Linux: x11grab + pulse
cmd.extend([
"-f", "x11grab",
"-framerate", str(config.framerate),
"-i", ":0.0",
"-f", "pulse",
"-i", "default",
])
# Output settings
cmd.extend([
"-c:v", config.video_codec,
"-preset", config.preset,
"-crf", str(config.crf),
"-pix_fmt", "yuv420p", # Ensure compatibility
])
# Only add audio codec if we have audio input
if sys.platform != "win32" or config.audio_device or self.list_audio_devices():
cmd.extend([
"-c:a", config.audio_codec,
"-b:a", "128k",
])
cmd.append(str(config.output_path))
return cmd
def _read_stderr(self):
"""Read stderr in background to prevent blocking."""
if not self.process or not self.process.stderr:
return
try:
for line in self.process.stderr:
if isinstance(line, bytes):
line = line.decode('utf-8', errors='ignore')
self._last_error = line.strip()
except:
pass
def start(self, config: RecordingConfig) -> bool:
"""Start recording."""
if self.is_recording:
self.on_status("Already recording")
return False
self.output_path = config.output_path
self.output_path.parent.mkdir(parents=True, exist_ok=True)
cmd = self._get_ffmpeg_cmd(config)
self.on_status(f"Starting: {' '.join(cmd[:6])}...")
try:
# On Windows, use different process creation flags
if sys.platform == "win32":
# CREATE_NEW_PROCESS_GROUP allows sending CTRL_BREAK_EVENT
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.CREATE_NO_WINDOW,
)
else:
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Start stderr reader thread
self._stderr_thread = threading.Thread(target=self._read_stderr, daemon=True)
self._stderr_thread.start()
# Wait a moment to see if FFmpeg crashes immediately
time.sleep(0.5)
if self.process.poll() is not None:
self.on_status(f"FFmpeg failed: {self._last_error}")
return False
self.is_recording = True
self.is_paused = False
self.start_time = time.time()
self.total_paused = 0.0
self.on_status("Recording started")
return True
except FileNotFoundError:
self.on_status("FFmpeg not found - install from ffmpeg.org")
return False
except Exception as e:
self.on_status(f"Failed to start: {e}")
return False
def pause(self) -> bool:
"""Pause recording (Windows: suspend process)."""
if not self.is_recording or self.is_paused:
return False
self.is_paused = True
self.pause_start = time.time()
# On Windows, we can suspend the FFmpeg process
if sys.platform == "win32" and self.process:
try:
import ctypes
kernel32 = ctypes.windll.kernel32
handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid)
if handle:
# Use NtSuspendProcess for cleaner suspension
ntdll = ctypes.windll.ntdll
ntdll.NtSuspendProcess(handle)
kernel32.CloseHandle(handle)
self.on_status("Paused")
else:
self.on_status("Paused (soft)")
except Exception as e:
self.on_status(f"Paused (soft): {e}")
else:
# On Linux, send SIGSTOP
if self.process:
self.process.send_signal(signal.SIGSTOP)
self.on_status("Paused")
return True
def resume(self) -> bool:
"""Resume recording."""
if not self.is_recording or not self.is_paused:
return False
if self.pause_start:
self.total_paused += time.time() - self.pause_start
self.is_paused = False
self.pause_start = None
# Resume FFmpeg process
if sys.platform == "win32" and self.process:
try:
import ctypes
kernel32 = ctypes.windll.kernel32
handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid)
if handle:
ntdll = ctypes.windll.ntdll
ntdll.NtResumeProcess(handle)
kernel32.CloseHandle(handle)
except:
pass
else:
# On Linux, send SIGCONT
if self.process:
self.process.send_signal(signal.SIGCONT)
self.on_status("Recording resumed")
return True
def stop(self) -> Optional[Path]:
"""Stop recording and return output path."""
if not self.is_recording or not self.process:
return None
# If paused, resume first so we can stop properly
if self.is_paused:
if self.pause_start:
self.total_paused += time.time() - self.pause_start
if sys.platform == "win32":
try:
import ctypes
kernel32 = ctypes.windll.kernel32
handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid)
if handle:
ntdll = ctypes.windll.ntdll
ntdll.NtResumeProcess(handle)
kernel32.CloseHandle(handle)
except:
pass
else:
self.process.send_signal(signal.SIGCONT)
time.sleep(0.2) # Give it a moment to resume
try:
if sys.platform == "win32":
# On Windows, send CTRL_BREAK_EVENT to gracefully stop FFmpeg
# This works because we used CREATE_NEW_PROCESS_GROUP
try:
os.kill(self.process.pid, signal.CTRL_BREAK_EVENT)
self.process.wait(timeout=5)
except (subprocess.TimeoutExpired, OSError):
# Fallback: try stdin 'q'
try:
if self.process.stdin:
self.process.stdin.write(b"q\n")
self.process.stdin.flush()
self.process.wait(timeout=5)
except:
self.process.terminate()
self.process.wait(timeout=3)
else:
# On Linux, send SIGINT (Ctrl+C equivalent)
self.process.send_signal(signal.SIGINT)
self.process.wait(timeout=10)
except subprocess.TimeoutExpired:
self.on_status("Timeout - forcing stop")
self.process.terminate()
try:
self.process.wait(timeout=5)
except:
self.process.kill()
except Exception as e:
self.on_status(f"Stop error: {e}")
try:
self.process.terminate()
except:
pass
self.is_recording = False
self.is_paused = False
duration = self.get_duration()
self.on_status(f"Stopped: {duration:.1f}s")
# Check if output file exists and has content
if self.output_path and self.output_path.exists():
if self.output_path.stat().st_size > 1000: # At least 1KB
return self.output_path
else:
self.on_status("Recording too short or failed")
return None
return None
def get_duration(self) -> float:
"""Get actual recording duration (excluding pauses)."""
if not self.start_time:
return 0.0
elapsed = time.time() - self.start_time
# Subtract paused time
if self.is_paused and self.pause_start:
elapsed -= (time.time() - self.pause_start)
elapsed -= self.total_paused
return max(0, elapsed)
def get_elapsed(self) -> float:
"""Get total elapsed time (including pauses)."""
if not self.start_time:
return 0.0
return time.time() - self.start_time
@staticmethod
def _get_screen_geometries() -> list[dict]:
"""Get geometry of all screens (Windows)."""
screens = []
if sys.platform != "win32":
return [{"left": 0, "top": 0, "width": 1920, "height": 1080}]
try:
import ctypes
class RECT(ctypes.Structure):
_fields_ = [
("left", ctypes.c_long),
("top", ctypes.c_long),
("right", ctypes.c_long),
("bottom", ctypes.c_long),
]
def callback(hMonitor, hdcMonitor, lprcMonitor, dwData):
rect = lprcMonitor.contents
screens.append({
"left": rect.left,
"top": rect.top,
"width": rect.right - rect.left,
"height": rect.bottom - rect.top,
})
return True
MONITORENUMPROC = ctypes.WINFUNCTYPE(
ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong,
ctypes.POINTER(RECT), ctypes.c_double
)
user32 = ctypes.windll.user32
user32.EnumDisplayMonitors(None, None, MONITORENUMPROC(callback), 0)
except Exception as e:
print(f"Error getting screen geometries: {e}")
return screens if screens else [{"left": 0, "top": 0, "width": 1920, "height": 1080}]
@staticmethod
def list_audio_devices() -> list[str]:
"""List available audio input devices (Windows)."""
if sys.platform != "win32":
return ["default"]
try:
result = subprocess.run(
["ffmpeg", "-list_devices", "true", "-f", "dshow", "-i", "dummy"],
capture_output=True,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW,
timeout=10,
)
devices = []
in_audio = False
for line in result.stderr.split("\n"):
if "DirectShow audio devices" in line:
in_audio = True
elif "DirectShow video devices" in line:
in_audio = False
elif in_audio and '"' in line:
start = line.find('"') + 1
end = line.rfind('"')
if start < end:
device = line[start:end]
# Skip virtual/system devices that don't work well
if "virtual" not in device.lower():
devices.append(device)
return devices if devices else []
except:
return []

View File

@@ -0,0 +1,217 @@
"""
Session Manager for KB Capture (Simplified)
One session = one continuous recording (with pause/resume).
No clips, no keep/delete. Just record → transcribe → done.
"""
import json
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
from enum import Enum
class SessionType(Enum):
DESIGN = "design" # CAD/Design KB
ANALYSIS = "analysis" # FEA/Analysis KB
class SessionStatus(Enum):
RECORDING = "recording"
PAUSED = "paused"
TRANSCRIBING = "transcribing"
READY = "ready" # Transcribed, ready for sync
PROCESSED = "processed" # Clawdbot has processed it
@dataclass
class Session:
"""A recording session."""
id: str
name: str
project: str
session_type: SessionType
created_at: datetime
duration: float = 0.0
status: SessionStatus = SessionStatus.RECORDING
video_file: str = "recording.mp4"
transcript_file: Optional[str] = None
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"project": self.project,
"session_type": self.session_type.value,
"created_at": self.created_at.isoformat(),
"duration": self.duration,
"status": self.status.value,
"video_file": self.video_file,
"transcript_file": self.transcript_file,
}
@classmethod
def from_dict(cls, data: dict) -> "Session":
return cls(
id=data["id"],
name=data["name"],
project=data["project"],
session_type=SessionType(data.get("session_type", "design")),
created_at=datetime.fromisoformat(data["created_at"]),
duration=data.get("duration", 0.0),
status=SessionStatus(data.get("status", "ready")),
video_file=data.get("video_file", "recording.mp4"),
transcript_file=data.get("transcript_file"),
)
class SessionManager:
"""
Manages recording sessions.
Project-centric structure:
/Projects/<ProjectName>/
└── _capture/
└── <session-id>/
├── session.json # Metadata
├── recording.mp4 # Video
└── transcript.json # Whisper output
"""
def __init__(self, projects_root: Path):
self.projects_root = Path(projects_root)
self.current_session: Optional[Session] = None
self._current_project_path: Optional[Path] = None
def list_projects(self) -> List[str]:
"""List available projects."""
projects = []
if self.projects_root.exists():
for p in sorted(self.projects_root.iterdir()):
if p.is_dir() and not p.name.startswith((".", "_")):
# Check if it looks like a project
if (p / "KB").exists() or (p / "_context.md").exists():
projects.append(p.name)
return projects
def get_project_path(self, project: str) -> Path:
"""Get full path to a project."""
return self.projects_root / project
def start_session(
self,
name: str,
project: str,
session_type: SessionType = SessionType.DESIGN,
) -> Session:
"""Start a new recording session."""
self._current_project_path = self.get_project_path(project)
if not self._current_project_path.exists():
raise ValueError(f"Project not found: {project}")
session_id = datetime.now().strftime("%Y%m%d-%H%M%S")
session = Session(
id=session_id,
name=name,
project=project,
session_type=session_type,
created_at=datetime.now(),
status=SessionStatus.RECORDING,
)
# Create session directory
session_dir = self._current_project_path / "_capture" / session_id
session_dir.mkdir(parents=True, exist_ok=True)
self.current_session = session
self._save_session()
return session
def get_session_dir(self) -> Path:
"""Get current session directory."""
if not self.current_session or not self._current_project_path:
raise RuntimeError("No active session")
return self._current_project_path / "_capture" / self.current_session.id
def get_video_path(self) -> Path:
"""Get path for video file."""
return self.get_session_dir() / self.current_session.video_file
def update_status(self, status: SessionStatus) -> None:
"""Update session status."""
if self.current_session:
self.current_session.status = status
self._save_session()
def set_duration(self, duration: float) -> None:
"""Set recording duration."""
if self.current_session:
self.current_session.duration = duration
self._save_session()
def set_transcript(self, transcript_file: str) -> None:
"""Set transcript file name."""
if self.current_session:
self.current_session.transcript_file = transcript_file
self._save_session()
def end_session(self) -> Session:
"""End current session."""
if not self.current_session:
raise RuntimeError("No active session")
self.current_session.status = SessionStatus.READY
self._save_session()
session = self.current_session
self.current_session = None
self._current_project_path = None
return session
def cancel_session(self) -> None:
"""Cancel session and delete files."""
if self.current_session:
import shutil
session_dir = self.get_session_dir()
if session_dir.exists():
shutil.rmtree(session_dir)
self.current_session = None
self._current_project_path = None
def list_sessions(self, project: Optional[str] = None) -> List[Session]:
"""List sessions for a project or all projects."""
sessions = []
if project:
capture_dir = self.get_project_path(project) / "_capture"
if capture_dir.exists():
for session_dir in sorted(capture_dir.iterdir(), reverse=True):
if session_dir.is_dir():
session_file = session_dir / "session.json"
if session_file.exists():
try:
with open(session_file) as f:
sessions.append(Session.from_dict(json.load(f)))
except:
pass
else:
for proj in self.list_projects():
sessions.extend(self.list_sessions(proj))
sessions.sort(key=lambda s: s.created_at, reverse=True)
return sessions
def _save_session(self) -> None:
"""Save current session to disk."""
if not self.current_session:
return
session_file = self.get_session_dir() / "session.json"
with open(session_file, "w") as f:
json.dump(self.current_session.to_dict(), f, indent=2)