Compare commits
9 Commits
main
...
feature/kb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f8ef93247 | ||
|
|
8c5e35c301 | ||
|
|
d998a9a2b1 | ||
|
|
9b24478f04 | ||
|
|
09c32cbad2 | ||
|
|
e647255c60 | ||
|
|
978c79abc0 | ||
|
|
0266fda42b | ||
|
|
d5371cfe75 |
193
docs/KB-CAPTURE.md
Normal file
193
docs/KB-CAPTURE.md
Normal file
@@ -0,0 +1,193 @@
|
||||
# KB Capture v2
|
||||
|
||||
**Clip-based recording for engineering knowledge capture.**
|
||||
|
||||
## Overview
|
||||
|
||||
KB Capture is a lightweight recording tool that captures your CAD/FEM work as short clips, not one long video. Record what matters, delete mistakes, keep the good stuff.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Install
|
||||
cd CAD-Documenter
|
||||
uv sync
|
||||
uv pip install customtkinter keyboard
|
||||
|
||||
# Launch
|
||||
uv run kb-capture
|
||||
```
|
||||
|
||||
## Workflow
|
||||
|
||||
### 1. Start Session
|
||||
- Open KB Capture (GUI)
|
||||
- **Select project** from dropdown (scans your projects folder)
|
||||
- Enter session description (e.g., "Vertical support refinement")
|
||||
- Select type: **Design** (CAD) or **Analysis** (FEA)
|
||||
- Click **Start Session**
|
||||
|
||||
> Projects are auto-discovered from your projects folder (D:/ATODrive/Projects on Windows)
|
||||
|
||||
### 2. Record Clips
|
||||
While working in NX/CAD:
|
||||
- Press **Ctrl+Shift+R** to start recording
|
||||
- Narrate what you're doing
|
||||
- Say "screenshot" when you want a frame captured
|
||||
- Press **Ctrl+Shift+R** again to stop
|
||||
|
||||
### 3. Review Clips
|
||||
After each clip:
|
||||
- **Keep (K)**: Keep the clip
|
||||
- **Delete (D)**: Discard the clip (bad take)
|
||||
- Or just start recording again (auto-keeps previous)
|
||||
|
||||
### 4. End Session
|
||||
- Press **Ctrl+Shift+E** or click **End Session**
|
||||
- Clips are merged and transcribed
|
||||
- Exported to `clawdbot_export/` for Mario processing
|
||||
|
||||
## Keyboard Shortcuts
|
||||
|
||||
| Action | Shortcut |
|
||||
|--------|----------|
|
||||
| Start/Stop Recording | Ctrl+Shift+R |
|
||||
| Keep Last Clip | Ctrl+Shift+K |
|
||||
| Delete Last Clip | Ctrl+Shift+D |
|
||||
| End Session | Ctrl+Shift+E |
|
||||
|
||||
## Session Types
|
||||
|
||||
| Type | Updates | Use For |
|
||||
|------|---------|---------|
|
||||
| **Design** | KB/Design/ | CAD work, component design, assembly |
|
||||
| **Analysis** | KB/Analysis/ | FEA setup, mesh, BCs, results |
|
||||
|
||||
## Output
|
||||
|
||||
Sessions are stored **inside the project folder**:
|
||||
|
||||
```
|
||||
/2-Projects/<ProjectName>/
|
||||
├── KB/
|
||||
│ └── dev/ # Mario creates gen-XXX.md here
|
||||
├── Images/
|
||||
│ └── screenshot-sessions/ # Mario moves frames here
|
||||
└── _capture/ # Session staging area
|
||||
└── <session-id>/
|
||||
├── clips/
|
||||
│ ├── clip-001.mp4
|
||||
│ └── ...
|
||||
├── session.json
|
||||
└── clawdbot_export/ # Ready for Mario
|
||||
├── merged.mp4
|
||||
├── transcript.json
|
||||
├── frames/
|
||||
│ ├── 01_00-30.png
|
||||
│ └── ...
|
||||
└── metadata.json
|
||||
```
|
||||
|
||||
**Key insight:** Sessions belong to PROJECTS, not to KB Capture. This means:
|
||||
- All project data stays together
|
||||
- Mario knows which KB to update
|
||||
- Easy to archive/delete projects
|
||||
|
||||
## What Happens Next
|
||||
|
||||
1. **Syncthing** syncs `clawdbot_export/` to Clawdbot
|
||||
2. **Mario** detects new session
|
||||
3. **Vision analysis** categorizes frames
|
||||
4. **KB updated** with new information
|
||||
5. **Slack notification** when complete
|
||||
|
||||
## Tips
|
||||
|
||||
### Recording
|
||||
- Narrate naturally — explain what you're doing
|
||||
- Say "screenshot" before important views
|
||||
- Keep clips short (30s - 2min)
|
||||
- It's okay to delete bad takes
|
||||
|
||||
### Organization
|
||||
- One session per work block (30-60 min)
|
||||
- Use descriptive session names
|
||||
- Match project name to your PKM folder
|
||||
|
||||
### Quality
|
||||
- Close unnecessary windows before recording
|
||||
- Undock NX 3D viewport for clean captures
|
||||
- Speak clearly for better transcription
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Hotkeys not working
|
||||
- Run as Administrator (Windows)
|
||||
- Check for conflicts with other apps
|
||||
- Try restarting KB Capture
|
||||
|
||||
### Recording fails
|
||||
- Ensure FFmpeg is installed: `choco install ffmpeg`
|
||||
- Check disk space
|
||||
- Check microphone permissions
|
||||
|
||||
### No transcription
|
||||
- Whisper needs ~2GB RAM for 'base' model
|
||||
- Try 'tiny' model: `--whisper-model tiny`
|
||||
- Check CUDA/GPU drivers for faster processing
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────┐
|
||||
│ KB Capture (Windows) │
|
||||
├─────────────────────────────────────────┤
|
||||
│ ┌───────────┐ ┌──────────────────┐ │
|
||||
│ │ Hotkeys │ │ GUI (optional) │ │
|
||||
│ └─────┬─────┘ └────────┬─────────┘ │
|
||||
│ │ │ │
|
||||
│ ▼ ▼ │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ Session Manager │ │
|
||||
│ │ (clips, keep/delete, merge) │ │
|
||||
│ └─────────────┬───────────────────┘ │
|
||||
│ │ │
|
||||
│ ┌─────────────┼───────────────────┐ │
|
||||
│ │ ▼ │ │
|
||||
│ │ ┌─────────────────────────┐ │ │
|
||||
│ │ │ Screen Recorder │ │ │
|
||||
│ │ │ (FFmpeg gdigrab) │ │ │
|
||||
│ │ └─────────────────────────┘ │ │
|
||||
│ │ │ │
|
||||
│ │ ┌─────────────────────────┐ │ │
|
||||
│ │ │ Whisper Transcriber │ │ │
|
||||
│ │ │ (local GPU) │ │ │
|
||||
│ │ └─────────────────────────┘ │ │
|
||||
│ └─────────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ clawdbot_export/ │ │
|
||||
│ │ merged.mp4 + transcript.json │ │
|
||||
│ └─────────────┬───────────────────┘ │
|
||||
└────────────────┼────────────────────────┘
|
||||
│ Syncthing
|
||||
▼
|
||||
┌─────────────────────────────────────────┐
|
||||
│ Clawdbot (Mario) │
|
||||
│ Vision analysis → KB update → Notify │
|
||||
└─────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
- Windows 10/11
|
||||
- Python 3.12+
|
||||
- FFmpeg (`choco install ffmpeg`)
|
||||
- CUDA GPU (recommended for Whisper)
|
||||
- ~4GB RAM (for Whisper 'base' model)
|
||||
|
||||
## Related
|
||||
|
||||
- [CAD-Documenter README](../README.md) — Original project overview
|
||||
- [Knowledge Base Skill](http://100.80.199.40:3000/Antoine/clawdbot-shared-skills) — How Mario processes sessions
|
||||
@@ -41,6 +41,11 @@ dev = [
|
||||
gui = [
|
||||
"customtkinter>=5.2.0",
|
||||
]
|
||||
capture = [
|
||||
"customtkinter>=5.2.0",
|
||||
"keyboard>=0.13.5", # Global hotkeys
|
||||
"pystray>=0.19.0", # System tray (optional)
|
||||
]
|
||||
pdf = [
|
||||
"pandoc", # For PDF generation fallback
|
||||
]
|
||||
@@ -48,6 +53,7 @@ pdf = [
|
||||
[project.scripts]
|
||||
cad-doc = "cad_documenter.cli:main"
|
||||
cad-doc-gui = "cad_documenter.gui:main"
|
||||
kb-capture = "cad_documenter.gui_capture:main"
|
||||
|
||||
[project.urls]
|
||||
Homepage = "http://100.80.199.40:3000/Antoine/CAD-Documenter"
|
||||
|
||||
726
src/cad_documenter/gui_capture.py
Normal file
726
src/cad_documenter/gui_capture.py
Normal file
@@ -0,0 +1,726 @@
|
||||
"""
|
||||
KB Capture GUI (OBS Mode)
|
||||
|
||||
Load OBS recordings → Transcribe → Process for KB
|
||||
|
||||
Simple flow: Load Video → Transcribe → Done
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import shutil
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from tkinter import filedialog, messagebox
|
||||
from datetime import datetime
|
||||
|
||||
try:
|
||||
import customtkinter as ctk
|
||||
from customtkinter import CTk, CTkFrame, CTkLabel, CTkButton, CTkEntry
|
||||
from customtkinter import CTkOptionMenu, CTkToplevel, CTkProgressBar
|
||||
HAS_CTK = True
|
||||
except ImportError:
|
||||
HAS_CTK = False
|
||||
|
||||
from .session import SessionManager, Session, SessionType, SessionStatus
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# THEME
|
||||
# ============================================================================
|
||||
|
||||
COLORS = {
|
||||
"bg": "#0d1117",
|
||||
"bg_card": "#161b22",
|
||||
"bg_elevated": "#1c2128",
|
||||
"border": "#30363d",
|
||||
"text": "#e6edf3",
|
||||
"text_secondary": "#7d8590",
|
||||
"text_muted": "#484f58",
|
||||
"red": "#f85149",
|
||||
"green": "#3fb950",
|
||||
"blue": "#58a6ff",
|
||||
"orange": "#d29922",
|
||||
"purple": "#a371f7",
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CONFIG
|
||||
# ============================================================================
|
||||
|
||||
def get_config_path() -> Path:
|
||||
if sys.platform == "win32":
|
||||
config_dir = Path.home() / "AppData" / "Local" / "KBCapture"
|
||||
else:
|
||||
config_dir = Path.home() / ".config" / "kb-capture"
|
||||
config_dir.mkdir(parents=True, exist_ok=True)
|
||||
return config_dir / "config.json"
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
config_path = get_config_path()
|
||||
if config_path.exists():
|
||||
try:
|
||||
with open(config_path) as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def save_config(config: dict) -> None:
|
||||
with open(get_config_path(), "w") as f:
|
||||
json.dump(config, f, indent=2)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PROJECT CREATION
|
||||
# ============================================================================
|
||||
|
||||
def create_project(projects_root: Path, name: str, description: str = "") -> Path:
|
||||
"""Create a new project with KB structure."""
|
||||
project_path = projects_root / name
|
||||
|
||||
if project_path.exists():
|
||||
raise ValueError(f"Project already exists: {name}")
|
||||
|
||||
dirs = [
|
||||
"KB/Design/components",
|
||||
"KB/Design/materials",
|
||||
"KB/Design/dev",
|
||||
"KB/Analysis/models",
|
||||
"KB/Analysis/results",
|
||||
"Images/components",
|
||||
"_capture",
|
||||
]
|
||||
|
||||
for d in dirs:
|
||||
(project_path / d).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create context file
|
||||
context = f"""# {name}
|
||||
|
||||
{description}
|
||||
|
||||
Created: {datetime.now().strftime("%Y-%m-%d")}
|
||||
"""
|
||||
(project_path / "_context.md").write_text(context)
|
||||
|
||||
return project_path
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# MAIN GUI
|
||||
# ============================================================================
|
||||
|
||||
class KBCaptureGUI:
|
||||
"""Main application window - OBS video loading only."""
|
||||
|
||||
def __init__(self):
|
||||
if not HAS_CTK:
|
||||
raise RuntimeError("CustomTkinter not installed")
|
||||
|
||||
# Config
|
||||
self.config = load_config()
|
||||
|
||||
# Projects root
|
||||
self.projects_root = None
|
||||
if self.config.get("projects_root"):
|
||||
saved = Path(self.config["projects_root"])
|
||||
if saved.exists():
|
||||
self.projects_root = saved
|
||||
|
||||
# Session manager
|
||||
self.session_manager = None
|
||||
|
||||
# Processing state
|
||||
self.is_processing = False
|
||||
|
||||
# Window
|
||||
ctk.set_appearance_mode("dark")
|
||||
|
||||
self.window = CTk()
|
||||
self.window.title("KB Capture")
|
||||
self.window.geometry("420x520")
|
||||
self.window.minsize(400, 480)
|
||||
self.window.configure(fg_color=COLORS["bg"])
|
||||
|
||||
self._build_ui()
|
||||
|
||||
# Initialize if we have a projects root
|
||||
if self.projects_root:
|
||||
self._init_session_manager()
|
||||
self._refresh_projects()
|
||||
|
||||
self.window.protocol("WM_DELETE_WINDOW", self._on_close)
|
||||
|
||||
def _init_session_manager(self):
|
||||
"""Initialize session manager."""
|
||||
self.session_manager = SessionManager(self.projects_root)
|
||||
|
||||
def _build_ui(self):
|
||||
"""Build the interface."""
|
||||
main = CTkFrame(self.window, fg_color=COLORS["bg"])
|
||||
main.pack(fill="both", expand=True, padx=20, pady=16)
|
||||
|
||||
# Header
|
||||
header = CTkFrame(main, fg_color="transparent")
|
||||
header.pack(fill="x", pady=(0, 16))
|
||||
|
||||
CTkLabel(
|
||||
header,
|
||||
text="KB Capture",
|
||||
font=("Segoe UI Semibold", 20),
|
||||
text_color=COLORS["text"],
|
||||
).pack(side="left")
|
||||
|
||||
# Folder button
|
||||
CTkButton(
|
||||
header,
|
||||
text="📁",
|
||||
width=32,
|
||||
height=32,
|
||||
fg_color="transparent",
|
||||
hover_color=COLORS["bg_card"],
|
||||
command=self._browse_folder,
|
||||
).pack(side="right")
|
||||
|
||||
# Info card
|
||||
info_frame = CTkFrame(main, fg_color=COLORS["bg_card"], corner_radius=10)
|
||||
info_frame.pack(fill="x", pady=(0, 16))
|
||||
|
||||
info_inner = CTkFrame(info_frame, fg_color="transparent")
|
||||
info_inner.pack(pady=16, padx=16, fill="x")
|
||||
|
||||
CTkLabel(
|
||||
info_inner,
|
||||
text="📹 Record with OBS, then load here",
|
||||
font=("Segoe UI Semibold", 13),
|
||||
text_color=COLORS["text"],
|
||||
).pack(anchor="w")
|
||||
|
||||
CTkLabel(
|
||||
info_inner,
|
||||
text="Video → Transcribe → Ready for Clawdbot",
|
||||
font=("", 11),
|
||||
text_color=COLORS["text_secondary"],
|
||||
).pack(anchor="w", pady=(4, 0))
|
||||
|
||||
# Status area
|
||||
self.status_label = CTkLabel(
|
||||
info_inner,
|
||||
text="",
|
||||
font=("", 11),
|
||||
text_color=COLORS["text_muted"],
|
||||
)
|
||||
self.status_label.pack(anchor="w", pady=(8, 0))
|
||||
|
||||
# Progress bar (hidden by default)
|
||||
self.progress = CTkProgressBar(
|
||||
info_inner,
|
||||
width=300,
|
||||
height=6,
|
||||
fg_color=COLORS["border"],
|
||||
progress_color=COLORS["blue"],
|
||||
)
|
||||
# Don't pack yet - will show during processing
|
||||
|
||||
# Separator
|
||||
CTkFrame(main, fg_color=COLORS["border"], height=1).pack(fill="x", pady=8)
|
||||
|
||||
# Project selector
|
||||
proj_frame = CTkFrame(main, fg_color="transparent")
|
||||
proj_frame.pack(fill="x")
|
||||
|
||||
proj_header = CTkFrame(proj_frame, fg_color="transparent")
|
||||
proj_header.pack(fill="x", pady=(0, 8))
|
||||
|
||||
CTkLabel(
|
||||
proj_header,
|
||||
text="Project",
|
||||
font=("Segoe UI Semibold", 11),
|
||||
text_color=COLORS["text_secondary"],
|
||||
).pack(side="left")
|
||||
|
||||
CTkButton(
|
||||
proj_header,
|
||||
text="+ New",
|
||||
width=60,
|
||||
height=24,
|
||||
font=("", 10),
|
||||
fg_color=COLORS["green"],
|
||||
hover_color="#16a34a",
|
||||
command=self._new_project,
|
||||
).pack(side="right")
|
||||
|
||||
self.project_menu = CTkOptionMenu(
|
||||
proj_frame,
|
||||
values=["(Select folder first)"],
|
||||
width=360,
|
||||
height=35,
|
||||
fg_color=COLORS["bg_card"],
|
||||
button_color=COLORS["bg_elevated"],
|
||||
button_hover_color=COLORS["border"],
|
||||
)
|
||||
self.project_menu.pack(fill="x")
|
||||
|
||||
# Session name
|
||||
CTkLabel(
|
||||
main,
|
||||
text="Session Name",
|
||||
font=("Segoe UI Semibold", 11),
|
||||
text_color=COLORS["text_secondary"],
|
||||
).pack(anchor="w", pady=(16, 8))
|
||||
|
||||
self.name_entry = CTkEntry(
|
||||
main,
|
||||
placeholder_text="e.g., Vertical support walkthrough",
|
||||
height=35,
|
||||
fg_color=COLORS["bg_card"],
|
||||
border_color=COLORS["border"],
|
||||
)
|
||||
self.name_entry.pack(fill="x")
|
||||
|
||||
# Session type
|
||||
CTkLabel(
|
||||
main,
|
||||
text="Session Type",
|
||||
font=("Segoe UI Semibold", 11),
|
||||
text_color=COLORS["text_secondary"],
|
||||
).pack(anchor="w", pady=(16, 8))
|
||||
|
||||
type_frame = CTkFrame(main, fg_color="transparent")
|
||||
type_frame.pack(fill="x")
|
||||
type_frame.grid_columnconfigure((0, 1), weight=1)
|
||||
|
||||
self.type_var = ctk.StringVar(value="design")
|
||||
|
||||
self.design_btn = CTkButton(
|
||||
type_frame,
|
||||
text="🎨 Design → KB/Design/",
|
||||
height=40,
|
||||
font=("", 11),
|
||||
fg_color=COLORS["blue"],
|
||||
hover_color="#2563eb",
|
||||
command=lambda: self._set_type("design"),
|
||||
)
|
||||
self.design_btn.grid(row=0, column=0, padx=(0, 4), sticky="ew")
|
||||
|
||||
self.analysis_btn = CTkButton(
|
||||
type_frame,
|
||||
text="📊 Analysis → KB/Analysis/",
|
||||
height=40,
|
||||
font=("", 11),
|
||||
fg_color=COLORS["border"],
|
||||
hover_color=COLORS["bg_elevated"],
|
||||
command=lambda: self._set_type("analysis"),
|
||||
)
|
||||
self.analysis_btn.grid(row=0, column=1, padx=(4, 0), sticky="ew")
|
||||
|
||||
# Spacer
|
||||
CTkFrame(main, fg_color="transparent").pack(fill="both", expand=True)
|
||||
|
||||
# Load button (main action)
|
||||
self.load_btn = CTkButton(
|
||||
main,
|
||||
text="📂 Load OBS Recording",
|
||||
height=50,
|
||||
font=("Segoe UI Semibold", 14),
|
||||
fg_color=COLORS["blue"],
|
||||
hover_color="#2563eb",
|
||||
state="disabled",
|
||||
command=self._load_video,
|
||||
)
|
||||
self.load_btn.pack(fill="x", pady=(16, 0))
|
||||
|
||||
# Recent sessions link
|
||||
self.recent_label = CTkLabel(
|
||||
main,
|
||||
text="",
|
||||
font=("", 10),
|
||||
text_color=COLORS["text_muted"],
|
||||
cursor="hand2",
|
||||
)
|
||||
self.recent_label.pack(pady=(12, 0))
|
||||
|
||||
# Folder path
|
||||
self.folder_label = CTkLabel(
|
||||
main,
|
||||
text=str(self.projects_root) if self.projects_root else "Click 📁 to select projects folder",
|
||||
font=("", 9),
|
||||
text_color=COLORS["text_muted"],
|
||||
cursor="hand2",
|
||||
)
|
||||
self.folder_label.pack(pady=(8, 0))
|
||||
self.folder_label.bind("<Button-1>", lambda e: self._browse_folder())
|
||||
|
||||
def _set_type(self, type_id: str):
|
||||
"""Set session type."""
|
||||
self.type_var.set(type_id)
|
||||
if type_id == "design":
|
||||
self.design_btn.configure(fg_color=COLORS["blue"])
|
||||
self.analysis_btn.configure(fg_color=COLORS["border"])
|
||||
else:
|
||||
self.design_btn.configure(fg_color=COLORS["border"])
|
||||
self.analysis_btn.configure(fg_color=COLORS["orange"])
|
||||
|
||||
def _browse_folder(self):
|
||||
"""Browse for projects folder."""
|
||||
initial = str(self.projects_root) if self.projects_root else str(Path.home())
|
||||
|
||||
folder = filedialog.askdirectory(
|
||||
title="Select Projects Folder (e.g., ATODrive/Projects)",
|
||||
initialdir=initial,
|
||||
)
|
||||
|
||||
if folder:
|
||||
self.projects_root = Path(folder)
|
||||
self.folder_label.configure(text=str(self.projects_root))
|
||||
|
||||
self.config["projects_root"] = str(self.projects_root)
|
||||
save_config(self.config)
|
||||
|
||||
self._init_session_manager()
|
||||
self._refresh_projects()
|
||||
|
||||
def _refresh_projects(self):
|
||||
"""Refresh project list."""
|
||||
if not self.session_manager:
|
||||
self.project_menu.configure(values=["(Select folder first)"])
|
||||
return
|
||||
|
||||
projects = self.session_manager.list_projects()
|
||||
if projects:
|
||||
self.project_menu.configure(values=projects)
|
||||
# Try to restore last used project
|
||||
last_project = self.config.get("last_project")
|
||||
if last_project in projects:
|
||||
self.project_menu.set(last_project)
|
||||
else:
|
||||
self.project_menu.set(projects[0])
|
||||
self.load_btn.configure(state="normal")
|
||||
self._update_recent()
|
||||
else:
|
||||
self.project_menu.configure(values=["(No projects - click + New)"])
|
||||
self.load_btn.configure(state="disabled")
|
||||
|
||||
def _update_recent(self):
|
||||
"""Update recent sessions count."""
|
||||
project = self.project_menu.get()
|
||||
if project.startswith("("):
|
||||
self.recent_label.configure(text="")
|
||||
return
|
||||
|
||||
sessions = self.session_manager.list_sessions(project)
|
||||
if sessions:
|
||||
self.recent_label.configure(text=f"📋 {len(sessions)} previous sessions")
|
||||
else:
|
||||
self.recent_label.configure(text="No sessions yet")
|
||||
|
||||
def _new_project(self):
|
||||
"""Create new project."""
|
||||
if not self.projects_root:
|
||||
messagebox.showwarning("No Folder", "Select a projects folder first")
|
||||
return
|
||||
|
||||
# Simple dialog
|
||||
dialog = CTkToplevel(self.window)
|
||||
dialog.title("New Project")
|
||||
dialog.geometry("350x180")
|
||||
dialog.transient(self.window)
|
||||
dialog.grab_set()
|
||||
dialog.configure(fg_color=COLORS["bg"])
|
||||
|
||||
# Center
|
||||
dialog.update_idletasks()
|
||||
x = self.window.winfo_x() + (self.window.winfo_width() - 350) // 2
|
||||
y = self.window.winfo_y() + (self.window.winfo_height() - 180) // 2
|
||||
dialog.geometry(f"+{x}+{y}")
|
||||
|
||||
CTkLabel(
|
||||
dialog,
|
||||
text="Project Name",
|
||||
font=("", 12),
|
||||
text_color=COLORS["text_secondary"],
|
||||
).pack(pady=(20, 8), padx=20, anchor="w")
|
||||
|
||||
name_entry = CTkEntry(
|
||||
dialog,
|
||||
placeholder_text="e.g., P05-NewProject",
|
||||
width=310,
|
||||
height=35,
|
||||
)
|
||||
name_entry.pack(padx=20)
|
||||
name_entry.focus_set()
|
||||
|
||||
def create():
|
||||
name = name_entry.get().strip()
|
||||
if not name:
|
||||
return
|
||||
|
||||
# Sanitize
|
||||
name = "".join(c for c in name if c.isalnum() or c in "-_ ")
|
||||
name = name.replace(" ", "-")
|
||||
|
||||
try:
|
||||
create_project(self.projects_root, name)
|
||||
dialog.destroy()
|
||||
self._refresh_projects()
|
||||
self.project_menu.set(name)
|
||||
except ValueError as e:
|
||||
messagebox.showerror("Error", str(e))
|
||||
|
||||
btn_frame = CTkFrame(dialog, fg_color="transparent")
|
||||
btn_frame.pack(fill="x", padx=20, pady=20)
|
||||
|
||||
CTkButton(
|
||||
btn_frame,
|
||||
text="Cancel",
|
||||
width=80,
|
||||
fg_color="transparent",
|
||||
border_width=1,
|
||||
border_color=COLORS["border"],
|
||||
command=dialog.destroy,
|
||||
).pack(side="left")
|
||||
|
||||
CTkButton(
|
||||
btn_frame,
|
||||
text="Create",
|
||||
width=100,
|
||||
fg_color=COLORS["green"],
|
||||
command=create,
|
||||
).pack(side="right")
|
||||
|
||||
name_entry.bind("<Return>", lambda e: create())
|
||||
|
||||
def _load_video(self):
|
||||
"""Load an OBS recording for processing."""
|
||||
if self.is_processing:
|
||||
return
|
||||
|
||||
project = self.project_menu.get()
|
||||
if project.startswith("("):
|
||||
messagebox.showwarning("No Project", "Select a project first")
|
||||
return
|
||||
|
||||
# Save last project
|
||||
self.config["last_project"] = project
|
||||
save_config(self.config)
|
||||
|
||||
# Ask for video file
|
||||
initial_dir = self.config.get("last_video_dir", str(Path.home() / "Videos"))
|
||||
|
||||
video_path = filedialog.askopenfilename(
|
||||
title="Select OBS Recording",
|
||||
initialdir=initial_dir,
|
||||
filetypes=[
|
||||
("Video files", "*.mp4 *.mkv *.avi *.mov *.webm *.flv"),
|
||||
("All files", "*.*"),
|
||||
],
|
||||
)
|
||||
|
||||
if not video_path:
|
||||
return
|
||||
|
||||
video_path = Path(video_path)
|
||||
if not video_path.exists():
|
||||
messagebox.showerror("Error", "File not found")
|
||||
return
|
||||
|
||||
# Save last video directory
|
||||
self.config["last_video_dir"] = str(video_path.parent)
|
||||
save_config(self.config)
|
||||
|
||||
# Get session name
|
||||
name = self.name_entry.get().strip() or video_path.stem
|
||||
session_type = SessionType.DESIGN if self.type_var.get() == "design" else SessionType.ANALYSIS
|
||||
|
||||
# Start processing
|
||||
self.is_processing = True
|
||||
self._set_processing_ui(True, "Copying video...")
|
||||
|
||||
# Run in background
|
||||
threading.Thread(
|
||||
target=self._process_video,
|
||||
args=(video_path, name, project, session_type),
|
||||
daemon=True,
|
||||
).start()
|
||||
|
||||
def _process_video(self, video_path: Path, name: str, project: str, session_type: SessionType):
|
||||
"""Process video in background thread."""
|
||||
try:
|
||||
# Create session
|
||||
session = self.session_manager.start_session(name, project, session_type)
|
||||
session_dir = self.session_manager.get_session_dir()
|
||||
|
||||
# Copy video
|
||||
dest_video = session_dir / "recording.mp4"
|
||||
self._update_status("Copying video...")
|
||||
shutil.copy2(video_path, dest_video)
|
||||
|
||||
# Transcribe
|
||||
self._update_status("Loading Whisper model...")
|
||||
self._transcribe(dest_video, session)
|
||||
|
||||
except Exception as e:
|
||||
self._update_status(f"Error: {e}", error=True)
|
||||
if self.session_manager.current_session:
|
||||
self.session_manager.cancel_session()
|
||||
finally:
|
||||
self.is_processing = False
|
||||
self.window.after(0, lambda: self._set_processing_ui(False))
|
||||
|
||||
def _transcribe(self, video_path: Path, session: Session):
|
||||
"""Transcribe video with Whisper."""
|
||||
try:
|
||||
import whisper
|
||||
|
||||
self._update_status("Transcribing (this takes a while)...")
|
||||
model = whisper.load_model("base")
|
||||
|
||||
# Whisper can handle video files directly
|
||||
result = model.transcribe(str(video_path), language="en", verbose=False)
|
||||
|
||||
# Save transcript
|
||||
transcript_path = video_path.parent / "transcript.json"
|
||||
with open(transcript_path, "w") as f:
|
||||
json.dump(result, f, indent=2)
|
||||
|
||||
# Find screenshot triggers
|
||||
triggers = []
|
||||
for segment in result.get("segments", []):
|
||||
text = segment.get("text", "").lower()
|
||||
if "screenshot" in text:
|
||||
triggers.append({
|
||||
"timestamp": segment.get("start", 0),
|
||||
"text": segment.get("text", ""),
|
||||
})
|
||||
|
||||
# Get video duration
|
||||
duration = self._get_video_duration(video_path)
|
||||
|
||||
# Save metadata
|
||||
metadata = {
|
||||
"session_id": session.id,
|
||||
"name": session.name,
|
||||
"project": session.project,
|
||||
"session_type": session.session_type.value,
|
||||
"created_at": session.created_at.isoformat(),
|
||||
"duration": duration,
|
||||
"status": "ready",
|
||||
"screenshot_triggers": triggers,
|
||||
"source_file": video_path.name,
|
||||
"files": {
|
||||
"video": "recording.mp4",
|
||||
"transcript": "transcript.json",
|
||||
},
|
||||
}
|
||||
|
||||
metadata_path = video_path.parent / "metadata.json"
|
||||
with open(metadata_path, "w") as f:
|
||||
json.dump(metadata, f, indent=2)
|
||||
|
||||
# Update session
|
||||
self.session_manager.set_duration(duration)
|
||||
self.session_manager.set_transcript("transcript.json")
|
||||
self.session_manager.end_session()
|
||||
|
||||
self._update_status(f"✅ Done! {len(triggers)} screenshot triggers found", success=True)
|
||||
self.window.after(0, self._update_recent)
|
||||
|
||||
except ImportError:
|
||||
self._update_status("⚠️ Saved (Whisper not installed)", warning=True)
|
||||
self.session_manager.end_session()
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
if "audio" in error_msg.lower():
|
||||
self._update_status("⚠️ Saved (video has no audio track)", warning=True)
|
||||
# Still save the session without transcript
|
||||
self.session_manager.end_session()
|
||||
else:
|
||||
raise
|
||||
|
||||
def _get_video_duration(self, video_path: Path) -> float:
|
||||
"""Get video duration using ffprobe."""
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
[
|
||||
"ffprobe", "-v", "error",
|
||||
"-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
str(video_path),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
return float(result.stdout.strip())
|
||||
except:
|
||||
return 0.0
|
||||
|
||||
def _update_status(self, text: str, error: bool = False, success: bool = False, warning: bool = False):
|
||||
"""Update status label from any thread."""
|
||||
def update():
|
||||
if error:
|
||||
color = COLORS["red"]
|
||||
elif success:
|
||||
color = COLORS["green"]
|
||||
elif warning:
|
||||
color = COLORS["orange"]
|
||||
else:
|
||||
color = COLORS["text_secondary"]
|
||||
self.status_label.configure(text=text, text_color=color)
|
||||
|
||||
self.window.after(0, update)
|
||||
|
||||
def _set_processing_ui(self, processing: bool, status: str = ""):
|
||||
"""Update UI for processing state."""
|
||||
if processing:
|
||||
self.load_btn.configure(state="disabled", text="Processing...")
|
||||
self.project_menu.configure(state="disabled")
|
||||
self.name_entry.configure(state="disabled")
|
||||
self.design_btn.configure(state="disabled")
|
||||
self.analysis_btn.configure(state="disabled")
|
||||
self.progress.pack(fill="x", pady=(8, 0))
|
||||
self.progress.configure(mode="indeterminate")
|
||||
self.progress.start()
|
||||
if status:
|
||||
self.status_label.configure(text=status, text_color=COLORS["text_secondary"])
|
||||
else:
|
||||
self.load_btn.configure(state="normal", text="📂 Load OBS Recording")
|
||||
self.project_menu.configure(state="normal")
|
||||
self.name_entry.configure(state="normal")
|
||||
self.design_btn.configure(state="normal")
|
||||
self.analysis_btn.configure(state="normal")
|
||||
self.progress.stop()
|
||||
self.progress.pack_forget()
|
||||
self.name_entry.delete(0, "end")
|
||||
|
||||
def _on_close(self):
|
||||
"""Handle window close."""
|
||||
if self.is_processing:
|
||||
if not messagebox.askyesno("Processing", "Video is being processed. Close anyway?"):
|
||||
return
|
||||
self.window.destroy()
|
||||
|
||||
def run(self):
|
||||
"""Run the application."""
|
||||
self.window.mainloop()
|
||||
|
||||
|
||||
def main():
|
||||
"""Entry point."""
|
||||
if not HAS_CTK:
|
||||
print("Error: CustomTkinter not installed")
|
||||
print("Install with: pip install customtkinter")
|
||||
sys.exit(1)
|
||||
|
||||
gui = KBCaptureGUI()
|
||||
gui.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
330
src/cad_documenter/kb_capture.py
Normal file
330
src/cad_documenter/kb_capture.py
Normal file
@@ -0,0 +1,330 @@
|
||||
"""
|
||||
KB Capture - Screen Recording for Knowledge Base
|
||||
|
||||
Simple flow: Record → Pause → Resume → Stop → Transcribe → Done
|
||||
|
||||
One session = one continuous recording.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional, Callable
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
from .recorder import ScreenRecorder, RecordingConfig
|
||||
from .session import SessionManager, Session, SessionType, SessionStatus
|
||||
|
||||
|
||||
class AppState(Enum):
|
||||
"""Application state."""
|
||||
IDLE = "idle" # No session
|
||||
RECORDING = "recording" # Recording
|
||||
PAUSED = "paused" # Recording paused
|
||||
TRANSCRIBING = "transcribing" # Processing
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppStatus:
|
||||
"""Current application status for UI."""
|
||||
state: AppState
|
||||
session_name: Optional[str] = None
|
||||
project: Optional[str] = None
|
||||
session_type: Optional[SessionType] = None
|
||||
duration: float = 0.0
|
||||
message: str = ""
|
||||
|
||||
|
||||
class KBCaptureApp:
|
||||
"""
|
||||
Main application.
|
||||
|
||||
Flow:
|
||||
1. Select project
|
||||
2. Start session (name, type)
|
||||
3. Record → Pause → Resume → Stop
|
||||
4. Auto-transcribe with Whisper
|
||||
5. Ready for Clawdbot to process
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
projects_root: Path,
|
||||
on_status_change: Optional[Callable[[AppStatus], None]] = None,
|
||||
):
|
||||
self.projects_root = Path(projects_root)
|
||||
self.on_status_change = on_status_change or (lambda x: None)
|
||||
|
||||
# Components
|
||||
self.session_manager = SessionManager(self.projects_root)
|
||||
self.recorder = ScreenRecorder(on_status=self._log)
|
||||
|
||||
# State
|
||||
self.state = AppState.IDLE
|
||||
self._duration_thread: Optional[threading.Thread] = None
|
||||
self._running = False
|
||||
|
||||
def _log(self, message: str) -> None:
|
||||
"""Log and update status."""
|
||||
print(f"[KB Capture] {message}")
|
||||
self._update_status(message=message)
|
||||
|
||||
def _update_status(self, message: str = "") -> None:
|
||||
"""Update status and notify listeners."""
|
||||
session = self.session_manager.current_session
|
||||
|
||||
status = AppStatus(
|
||||
state=self.state,
|
||||
session_name=session.name if session else None,
|
||||
project=session.project if session else None,
|
||||
session_type=session.session_type if session else None,
|
||||
duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0,
|
||||
message=message,
|
||||
)
|
||||
|
||||
self.on_status_change(status)
|
||||
|
||||
def _start_duration_thread(self) -> None:
|
||||
"""Start thread to update duration."""
|
||||
self._running = True
|
||||
|
||||
def update_loop():
|
||||
while self._running and self.state in (AppState.RECORDING, AppState.PAUSED):
|
||||
self._update_status()
|
||||
time.sleep(0.5)
|
||||
|
||||
self._duration_thread = threading.Thread(target=update_loop, daemon=True)
|
||||
self._duration_thread.start()
|
||||
|
||||
def _stop_duration_thread(self) -> None:
|
||||
"""Stop duration thread."""
|
||||
self._running = False
|
||||
if self._duration_thread:
|
||||
self._duration_thread.join(timeout=1)
|
||||
self._duration_thread = None
|
||||
|
||||
# === Public API ===
|
||||
|
||||
def start_session(
|
||||
self,
|
||||
name: str,
|
||||
project: str,
|
||||
session_type: SessionType = SessionType.DESIGN,
|
||||
screen_index: int = None,
|
||||
) -> Session:
|
||||
"""Start a new session and begin recording."""
|
||||
if self.state != AppState.IDLE:
|
||||
raise RuntimeError("Session already active")
|
||||
|
||||
# Create session
|
||||
session = self.session_manager.start_session(name, project, session_type)
|
||||
|
||||
# Start recording
|
||||
config = RecordingConfig(
|
||||
output_path=self.session_manager.get_video_path(),
|
||||
framerate=30,
|
||||
screen_index=screen_index,
|
||||
)
|
||||
|
||||
if self.recorder.start(config):
|
||||
self.state = AppState.RECORDING
|
||||
self._start_duration_thread()
|
||||
self._log(f"Session started: {name}")
|
||||
else:
|
||||
self.session_manager.cancel_session()
|
||||
raise RuntimeError("Failed to start recording - check FFmpeg is installed")
|
||||
|
||||
return session
|
||||
|
||||
def pause(self) -> bool:
|
||||
"""Pause recording."""
|
||||
if self.state != AppState.RECORDING:
|
||||
return False
|
||||
|
||||
if self.recorder.pause():
|
||||
self.state = AppState.PAUSED
|
||||
self.session_manager.update_status(SessionStatus.PAUSED)
|
||||
self._update_status("Paused")
|
||||
return True
|
||||
return False
|
||||
|
||||
def resume(self) -> bool:
|
||||
"""Resume recording."""
|
||||
if self.state != AppState.PAUSED:
|
||||
return False
|
||||
|
||||
if self.recorder.resume():
|
||||
self.state = AppState.RECORDING
|
||||
self.session_manager.update_status(SessionStatus.RECORDING)
|
||||
self._update_status("Recording")
|
||||
return True
|
||||
return False
|
||||
|
||||
def toggle_pause(self) -> None:
|
||||
"""Toggle pause/resume."""
|
||||
if self.state == AppState.RECORDING:
|
||||
self.pause()
|
||||
elif self.state == AppState.PAUSED:
|
||||
self.resume()
|
||||
|
||||
def stop(self) -> Optional[Session]:
|
||||
"""Stop recording and transcribe."""
|
||||
if self.state not in (AppState.RECORDING, AppState.PAUSED):
|
||||
return None
|
||||
|
||||
self._stop_duration_thread()
|
||||
|
||||
# Get duration before stopping
|
||||
duration = self.recorder.get_duration()
|
||||
|
||||
# Stop recording
|
||||
try:
|
||||
output = self.recorder.stop()
|
||||
except Exception as e:
|
||||
self._log(f"Stop error: {e}")
|
||||
output = None
|
||||
|
||||
if not output or not output.exists():
|
||||
self._log("Recording failed - no output file")
|
||||
self.session_manager.cancel_session()
|
||||
self.state = AppState.IDLE
|
||||
self._update_status("Recording failed - try again")
|
||||
return None
|
||||
|
||||
# Update session
|
||||
self.session_manager.set_duration(duration)
|
||||
self.session_manager.update_status(SessionStatus.TRANSCRIBING)
|
||||
self.state = AppState.TRANSCRIBING
|
||||
self._update_status("Transcribing...")
|
||||
|
||||
# Transcribe in background
|
||||
threading.Thread(
|
||||
target=self._transcribe,
|
||||
args=(output,),
|
||||
daemon=True,
|
||||
).start()
|
||||
|
||||
return self.session_manager.current_session
|
||||
|
||||
def _transcribe(self, video_path: Path) -> None:
|
||||
"""Transcribe video with Whisper."""
|
||||
try:
|
||||
import whisper
|
||||
|
||||
self._log("Loading Whisper model...")
|
||||
model = whisper.load_model("base")
|
||||
|
||||
self._log("Transcribing...")
|
||||
result = model.transcribe(str(video_path), language="en", verbose=False)
|
||||
|
||||
# Save transcript
|
||||
import json
|
||||
transcript_path = video_path.parent / "transcript.json"
|
||||
with open(transcript_path, "w") as f:
|
||||
json.dump(result, f, indent=2)
|
||||
|
||||
# Find screenshot triggers
|
||||
triggers = []
|
||||
for segment in result.get("segments", []):
|
||||
text = segment.get("text", "").lower()
|
||||
if "screenshot" in text:
|
||||
triggers.append({
|
||||
"timestamp": segment.get("start", 0),
|
||||
"text": segment.get("text", ""),
|
||||
})
|
||||
|
||||
# Save metadata
|
||||
session = self.session_manager.current_session
|
||||
metadata = {
|
||||
"session_id": session.id,
|
||||
"name": session.name,
|
||||
"project": session.project,
|
||||
"session_type": session.session_type.value,
|
||||
"created_at": session.created_at.isoformat(),
|
||||
"duration": session.duration,
|
||||
"status": "ready",
|
||||
"screenshot_triggers": triggers,
|
||||
"files": {
|
||||
"video": session.video_file,
|
||||
"transcript": "transcript.json",
|
||||
},
|
||||
}
|
||||
|
||||
metadata_path = video_path.parent / "metadata.json"
|
||||
with open(metadata_path, "w") as f:
|
||||
json.dump(metadata, f, indent=2)
|
||||
|
||||
# Update session
|
||||
self.session_manager.set_transcript("transcript.json")
|
||||
self.session_manager.end_session()
|
||||
|
||||
self.state = AppState.IDLE
|
||||
self._log(f"Done! {len(triggers)} screenshot triggers found")
|
||||
self._update_status(f"Session saved with {len(triggers)} screenshots")
|
||||
|
||||
except ImportError:
|
||||
self._log("Whisper not installed!")
|
||||
self.session_manager.end_session()
|
||||
self.state = AppState.IDLE
|
||||
self._update_status("Saved (no transcription)")
|
||||
except Exception as e:
|
||||
self._log(f"Transcription error: {e}")
|
||||
self.session_manager.end_session()
|
||||
self.state = AppState.IDLE
|
||||
self._update_status("Saved (transcription failed)")
|
||||
|
||||
def cancel(self) -> None:
|
||||
"""Cancel session and delete files."""
|
||||
if self.state == AppState.IDLE:
|
||||
return
|
||||
|
||||
self._stop_duration_thread()
|
||||
|
||||
if self.recorder.is_recording:
|
||||
try:
|
||||
self.recorder.stop()
|
||||
except:
|
||||
pass
|
||||
|
||||
self.session_manager.cancel_session()
|
||||
self.state = AppState.IDLE
|
||||
self._log("Session cancelled")
|
||||
self._update_status("Cancelled")
|
||||
|
||||
def force_reset(self) -> None:
|
||||
"""Force reset to idle state (emergency recovery)."""
|
||||
self._stop_duration_thread()
|
||||
|
||||
# Force stop recorder
|
||||
if self.recorder.process:
|
||||
try:
|
||||
self.recorder.process.terminate()
|
||||
except:
|
||||
pass
|
||||
|
||||
self.recorder.is_recording = False
|
||||
self.recorder.is_paused = False
|
||||
|
||||
# Cancel any session
|
||||
if self.session_manager.current_session:
|
||||
try:
|
||||
self.session_manager.cancel_session()
|
||||
except:
|
||||
self.session_manager.current_session = None
|
||||
|
||||
self.state = AppState.IDLE
|
||||
self._log("Force reset complete")
|
||||
self._update_status("Ready to record")
|
||||
|
||||
def get_status(self) -> AppStatus:
|
||||
"""Get current status."""
|
||||
session = self.session_manager.current_session
|
||||
|
||||
return AppStatus(
|
||||
state=self.state,
|
||||
session_name=session.name if session else None,
|
||||
project=session.project if session else None,
|
||||
session_type=session.session_type if session else None,
|
||||
duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0,
|
||||
)
|
||||
431
src/cad_documenter/recorder.py
Normal file
431
src/cad_documenter/recorder.py
Normal file
@@ -0,0 +1,431 @@
|
||||
"""
|
||||
Screen + Audio Recorder (Simplified)
|
||||
|
||||
Records screen and microphone to a single video file.
|
||||
Supports pause/resume within the same recording.
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import signal
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Callable
|
||||
|
||||
|
||||
@dataclass
|
||||
class RecordingConfig:
|
||||
"""Recording configuration."""
|
||||
output_path: Path
|
||||
framerate: int = 30
|
||||
audio_device: Optional[str] = None
|
||||
video_codec: str = "libx264"
|
||||
audio_codec: str = "aac"
|
||||
crf: int = 23
|
||||
preset: str = "ultrafast"
|
||||
screen_index: Optional[int] = None # None = all screens, 0 = first, 1 = second, etc.
|
||||
|
||||
|
||||
class ScreenRecorder:
|
||||
"""
|
||||
FFmpeg-based screen recorder with pause/resume.
|
||||
|
||||
Usage:
|
||||
recorder = ScreenRecorder()
|
||||
recorder.start(config)
|
||||
# ... recording ...
|
||||
recorder.pause()
|
||||
# ... thinking ...
|
||||
recorder.resume()
|
||||
# ... more recording ...
|
||||
recorder.stop()
|
||||
"""
|
||||
|
||||
def __init__(self, on_status: Optional[Callable[[str], None]] = None):
|
||||
self.process: Optional[subprocess.Popen] = None
|
||||
self.is_recording = False
|
||||
self.is_paused = False
|
||||
self.start_time: Optional[float] = None
|
||||
self.pause_start: Optional[float] = None
|
||||
self.total_paused: float = 0.0
|
||||
self.output_path: Optional[Path] = None
|
||||
self.on_status = on_status or (lambda x: None)
|
||||
self._stderr_thread: Optional[threading.Thread] = None
|
||||
self._last_error: str = ""
|
||||
|
||||
def _get_ffmpeg_cmd(self, config: RecordingConfig) -> list[str]:
|
||||
"""Build FFmpeg command."""
|
||||
cmd = ["ffmpeg", "-y"]
|
||||
|
||||
if sys.platform == "win32":
|
||||
# Windows: gdigrab for screen capture
|
||||
cmd.extend([
|
||||
"-f", "gdigrab",
|
||||
"-framerate", str(config.framerate),
|
||||
"-draw_mouse", "1",
|
||||
])
|
||||
|
||||
# Handle screen selection
|
||||
if config.screen_index is not None:
|
||||
# Get screen geometry
|
||||
screens = self._get_screen_geometries()
|
||||
if config.screen_index < len(screens):
|
||||
screen = screens[config.screen_index]
|
||||
# Use offset to capture specific screen
|
||||
cmd.extend([
|
||||
"-offset_x", str(screen["left"]),
|
||||
"-offset_y", str(screen["top"]),
|
||||
"-video_size", f"{screen['width']}x{screen['height']}",
|
||||
])
|
||||
|
||||
cmd.extend(["-i", "desktop"])
|
||||
|
||||
# Audio: dshow - try to find a working device
|
||||
audio_device = config.audio_device
|
||||
if not audio_device:
|
||||
devices = self.list_audio_devices()
|
||||
audio_device = devices[0] if devices else None
|
||||
|
||||
if audio_device:
|
||||
cmd.extend([
|
||||
"-f", "dshow",
|
||||
"-i", f"audio={audio_device}",
|
||||
])
|
||||
else:
|
||||
# Linux: x11grab + pulse
|
||||
cmd.extend([
|
||||
"-f", "x11grab",
|
||||
"-framerate", str(config.framerate),
|
||||
"-i", ":0.0",
|
||||
"-f", "pulse",
|
||||
"-i", "default",
|
||||
])
|
||||
|
||||
# Output settings
|
||||
cmd.extend([
|
||||
"-c:v", config.video_codec,
|
||||
"-preset", config.preset,
|
||||
"-crf", str(config.crf),
|
||||
"-pix_fmt", "yuv420p", # Ensure compatibility
|
||||
])
|
||||
|
||||
# Only add audio codec if we have audio input
|
||||
if sys.platform != "win32" or config.audio_device or self.list_audio_devices():
|
||||
cmd.extend([
|
||||
"-c:a", config.audio_codec,
|
||||
"-b:a", "128k",
|
||||
])
|
||||
|
||||
cmd.append(str(config.output_path))
|
||||
|
||||
return cmd
|
||||
|
||||
def _read_stderr(self):
|
||||
"""Read stderr in background to prevent blocking."""
|
||||
if not self.process or not self.process.stderr:
|
||||
return
|
||||
try:
|
||||
for line in self.process.stderr:
|
||||
if isinstance(line, bytes):
|
||||
line = line.decode('utf-8', errors='ignore')
|
||||
self._last_error = line.strip()
|
||||
except:
|
||||
pass
|
||||
|
||||
def start(self, config: RecordingConfig) -> bool:
|
||||
"""Start recording."""
|
||||
if self.is_recording:
|
||||
self.on_status("Already recording")
|
||||
return False
|
||||
|
||||
self.output_path = config.output_path
|
||||
self.output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
cmd = self._get_ffmpeg_cmd(config)
|
||||
self.on_status(f"Starting: {' '.join(cmd[:6])}...")
|
||||
|
||||
try:
|
||||
# On Windows, use different process creation flags
|
||||
if sys.platform == "win32":
|
||||
# CREATE_NEW_PROCESS_GROUP allows sending CTRL_BREAK_EVENT
|
||||
self.process = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.CREATE_NO_WINDOW,
|
||||
)
|
||||
else:
|
||||
self.process = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
# Start stderr reader thread
|
||||
self._stderr_thread = threading.Thread(target=self._read_stderr, daemon=True)
|
||||
self._stderr_thread.start()
|
||||
|
||||
# Wait a moment to see if FFmpeg crashes immediately
|
||||
time.sleep(0.5)
|
||||
if self.process.poll() is not None:
|
||||
self.on_status(f"FFmpeg failed: {self._last_error}")
|
||||
return False
|
||||
|
||||
self.is_recording = True
|
||||
self.is_paused = False
|
||||
self.start_time = time.time()
|
||||
self.total_paused = 0.0
|
||||
self.on_status("Recording started")
|
||||
|
||||
return True
|
||||
|
||||
except FileNotFoundError:
|
||||
self.on_status("FFmpeg not found - install from ffmpeg.org")
|
||||
return False
|
||||
except Exception as e:
|
||||
self.on_status(f"Failed to start: {e}")
|
||||
return False
|
||||
|
||||
def pause(self) -> bool:
|
||||
"""Pause recording (Windows: suspend process)."""
|
||||
if not self.is_recording or self.is_paused:
|
||||
return False
|
||||
|
||||
self.is_paused = True
|
||||
self.pause_start = time.time()
|
||||
|
||||
# On Windows, we can suspend the FFmpeg process
|
||||
if sys.platform == "win32" and self.process:
|
||||
try:
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid)
|
||||
if handle:
|
||||
# Use NtSuspendProcess for cleaner suspension
|
||||
ntdll = ctypes.windll.ntdll
|
||||
ntdll.NtSuspendProcess(handle)
|
||||
kernel32.CloseHandle(handle)
|
||||
self.on_status("Paused")
|
||||
else:
|
||||
self.on_status("Paused (soft)")
|
||||
except Exception as e:
|
||||
self.on_status(f"Paused (soft): {e}")
|
||||
else:
|
||||
# On Linux, send SIGSTOP
|
||||
if self.process:
|
||||
self.process.send_signal(signal.SIGSTOP)
|
||||
self.on_status("Paused")
|
||||
|
||||
return True
|
||||
|
||||
def resume(self) -> bool:
|
||||
"""Resume recording."""
|
||||
if not self.is_recording or not self.is_paused:
|
||||
return False
|
||||
|
||||
if self.pause_start:
|
||||
self.total_paused += time.time() - self.pause_start
|
||||
|
||||
self.is_paused = False
|
||||
self.pause_start = None
|
||||
|
||||
# Resume FFmpeg process
|
||||
if sys.platform == "win32" and self.process:
|
||||
try:
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid)
|
||||
if handle:
|
||||
ntdll = ctypes.windll.ntdll
|
||||
ntdll.NtResumeProcess(handle)
|
||||
kernel32.CloseHandle(handle)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
# On Linux, send SIGCONT
|
||||
if self.process:
|
||||
self.process.send_signal(signal.SIGCONT)
|
||||
|
||||
self.on_status("Recording resumed")
|
||||
return True
|
||||
|
||||
def stop(self) -> Optional[Path]:
|
||||
"""Stop recording and return output path."""
|
||||
if not self.is_recording or not self.process:
|
||||
return None
|
||||
|
||||
# If paused, resume first so we can stop properly
|
||||
if self.is_paused:
|
||||
if self.pause_start:
|
||||
self.total_paused += time.time() - self.pause_start
|
||||
|
||||
if sys.platform == "win32":
|
||||
try:
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid)
|
||||
if handle:
|
||||
ntdll = ctypes.windll.ntdll
|
||||
ntdll.NtResumeProcess(handle)
|
||||
kernel32.CloseHandle(handle)
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
self.process.send_signal(signal.SIGCONT)
|
||||
|
||||
time.sleep(0.2) # Give it a moment to resume
|
||||
|
||||
try:
|
||||
if sys.platform == "win32":
|
||||
# On Windows, send CTRL_BREAK_EVENT to gracefully stop FFmpeg
|
||||
# This works because we used CREATE_NEW_PROCESS_GROUP
|
||||
try:
|
||||
os.kill(self.process.pid, signal.CTRL_BREAK_EVENT)
|
||||
self.process.wait(timeout=5)
|
||||
except (subprocess.TimeoutExpired, OSError):
|
||||
# Fallback: try stdin 'q'
|
||||
try:
|
||||
if self.process.stdin:
|
||||
self.process.stdin.write(b"q\n")
|
||||
self.process.stdin.flush()
|
||||
self.process.wait(timeout=5)
|
||||
except:
|
||||
self.process.terminate()
|
||||
self.process.wait(timeout=3)
|
||||
else:
|
||||
# On Linux, send SIGINT (Ctrl+C equivalent)
|
||||
self.process.send_signal(signal.SIGINT)
|
||||
self.process.wait(timeout=10)
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
self.on_status("Timeout - forcing stop")
|
||||
self.process.terminate()
|
||||
try:
|
||||
self.process.wait(timeout=5)
|
||||
except:
|
||||
self.process.kill()
|
||||
except Exception as e:
|
||||
self.on_status(f"Stop error: {e}")
|
||||
try:
|
||||
self.process.terminate()
|
||||
except:
|
||||
pass
|
||||
|
||||
self.is_recording = False
|
||||
self.is_paused = False
|
||||
|
||||
duration = self.get_duration()
|
||||
self.on_status(f"Stopped: {duration:.1f}s")
|
||||
|
||||
# Check if output file exists and has content
|
||||
if self.output_path and self.output_path.exists():
|
||||
if self.output_path.stat().st_size > 1000: # At least 1KB
|
||||
return self.output_path
|
||||
else:
|
||||
self.on_status("Recording too short or failed")
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
def get_duration(self) -> float:
|
||||
"""Get actual recording duration (excluding pauses)."""
|
||||
if not self.start_time:
|
||||
return 0.0
|
||||
|
||||
elapsed = time.time() - self.start_time
|
||||
|
||||
# Subtract paused time
|
||||
if self.is_paused and self.pause_start:
|
||||
elapsed -= (time.time() - self.pause_start)
|
||||
elapsed -= self.total_paused
|
||||
|
||||
return max(0, elapsed)
|
||||
|
||||
def get_elapsed(self) -> float:
|
||||
"""Get total elapsed time (including pauses)."""
|
||||
if not self.start_time:
|
||||
return 0.0
|
||||
return time.time() - self.start_time
|
||||
|
||||
@staticmethod
|
||||
def _get_screen_geometries() -> list[dict]:
|
||||
"""Get geometry of all screens (Windows)."""
|
||||
screens = []
|
||||
|
||||
if sys.platform != "win32":
|
||||
return [{"left": 0, "top": 0, "width": 1920, "height": 1080}]
|
||||
|
||||
try:
|
||||
import ctypes
|
||||
|
||||
class RECT(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("left", ctypes.c_long),
|
||||
("top", ctypes.c_long),
|
||||
("right", ctypes.c_long),
|
||||
("bottom", ctypes.c_long),
|
||||
]
|
||||
|
||||
def callback(hMonitor, hdcMonitor, lprcMonitor, dwData):
|
||||
rect = lprcMonitor.contents
|
||||
screens.append({
|
||||
"left": rect.left,
|
||||
"top": rect.top,
|
||||
"width": rect.right - rect.left,
|
||||
"height": rect.bottom - rect.top,
|
||||
})
|
||||
return True
|
||||
|
||||
MONITORENUMPROC = ctypes.WINFUNCTYPE(
|
||||
ctypes.c_bool, ctypes.c_ulong, ctypes.c_ulong,
|
||||
ctypes.POINTER(RECT), ctypes.c_double
|
||||
)
|
||||
|
||||
user32 = ctypes.windll.user32
|
||||
user32.EnumDisplayMonitors(None, None, MONITORENUMPROC(callback), 0)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error getting screen geometries: {e}")
|
||||
|
||||
return screens if screens else [{"left": 0, "top": 0, "width": 1920, "height": 1080}]
|
||||
|
||||
@staticmethod
|
||||
def list_audio_devices() -> list[str]:
|
||||
"""List available audio input devices (Windows)."""
|
||||
if sys.platform != "win32":
|
||||
return ["default"]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ffmpeg", "-list_devices", "true", "-f", "dshow", "-i", "dummy"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
creationflags=subprocess.CREATE_NO_WINDOW,
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
devices = []
|
||||
in_audio = False
|
||||
for line in result.stderr.split("\n"):
|
||||
if "DirectShow audio devices" in line:
|
||||
in_audio = True
|
||||
elif "DirectShow video devices" in line:
|
||||
in_audio = False
|
||||
elif in_audio and '"' in line:
|
||||
start = line.find('"') + 1
|
||||
end = line.rfind('"')
|
||||
if start < end:
|
||||
device = line[start:end]
|
||||
# Skip virtual/system devices that don't work well
|
||||
if "virtual" not in device.lower():
|
||||
devices.append(device)
|
||||
|
||||
return devices if devices else []
|
||||
|
||||
except:
|
||||
return []
|
||||
217
src/cad_documenter/session.py
Normal file
217
src/cad_documenter/session.py
Normal file
@@ -0,0 +1,217 @@
|
||||
"""
|
||||
Session Manager for KB Capture (Simplified)
|
||||
|
||||
One session = one continuous recording (with pause/resume).
|
||||
No clips, no keep/delete. Just record → transcribe → done.
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class SessionType(Enum):
|
||||
DESIGN = "design" # CAD/Design KB
|
||||
ANALYSIS = "analysis" # FEA/Analysis KB
|
||||
|
||||
|
||||
class SessionStatus(Enum):
|
||||
RECORDING = "recording"
|
||||
PAUSED = "paused"
|
||||
TRANSCRIBING = "transcribing"
|
||||
READY = "ready" # Transcribed, ready for sync
|
||||
PROCESSED = "processed" # Clawdbot has processed it
|
||||
|
||||
|
||||
@dataclass
|
||||
class Session:
|
||||
"""A recording session."""
|
||||
id: str
|
||||
name: str
|
||||
project: str
|
||||
session_type: SessionType
|
||||
created_at: datetime
|
||||
duration: float = 0.0
|
||||
status: SessionStatus = SessionStatus.RECORDING
|
||||
video_file: str = "recording.mp4"
|
||||
transcript_file: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"name": self.name,
|
||||
"project": self.project,
|
||||
"session_type": self.session_type.value,
|
||||
"created_at": self.created_at.isoformat(),
|
||||
"duration": self.duration,
|
||||
"status": self.status.value,
|
||||
"video_file": self.video_file,
|
||||
"transcript_file": self.transcript_file,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "Session":
|
||||
return cls(
|
||||
id=data["id"],
|
||||
name=data["name"],
|
||||
project=data["project"],
|
||||
session_type=SessionType(data.get("session_type", "design")),
|
||||
created_at=datetime.fromisoformat(data["created_at"]),
|
||||
duration=data.get("duration", 0.0),
|
||||
status=SessionStatus(data.get("status", "ready")),
|
||||
video_file=data.get("video_file", "recording.mp4"),
|
||||
transcript_file=data.get("transcript_file"),
|
||||
)
|
||||
|
||||
|
||||
class SessionManager:
|
||||
"""
|
||||
Manages recording sessions.
|
||||
|
||||
Project-centric structure:
|
||||
/Projects/<ProjectName>/
|
||||
└── _capture/
|
||||
└── <session-id>/
|
||||
├── session.json # Metadata
|
||||
├── recording.mp4 # Video
|
||||
└── transcript.json # Whisper output
|
||||
"""
|
||||
|
||||
def __init__(self, projects_root: Path):
|
||||
self.projects_root = Path(projects_root)
|
||||
self.current_session: Optional[Session] = None
|
||||
self._current_project_path: Optional[Path] = None
|
||||
|
||||
def list_projects(self) -> List[str]:
|
||||
"""List available projects."""
|
||||
projects = []
|
||||
if self.projects_root.exists():
|
||||
for p in sorted(self.projects_root.iterdir()):
|
||||
if p.is_dir() and not p.name.startswith((".", "_")):
|
||||
# Check if it looks like a project
|
||||
if (p / "KB").exists() or (p / "_context.md").exists():
|
||||
projects.append(p.name)
|
||||
return projects
|
||||
|
||||
def get_project_path(self, project: str) -> Path:
|
||||
"""Get full path to a project."""
|
||||
return self.projects_root / project
|
||||
|
||||
def start_session(
|
||||
self,
|
||||
name: str,
|
||||
project: str,
|
||||
session_type: SessionType = SessionType.DESIGN,
|
||||
) -> Session:
|
||||
"""Start a new recording session."""
|
||||
self._current_project_path = self.get_project_path(project)
|
||||
if not self._current_project_path.exists():
|
||||
raise ValueError(f"Project not found: {project}")
|
||||
|
||||
session_id = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
|
||||
session = Session(
|
||||
id=session_id,
|
||||
name=name,
|
||||
project=project,
|
||||
session_type=session_type,
|
||||
created_at=datetime.now(),
|
||||
status=SessionStatus.RECORDING,
|
||||
)
|
||||
|
||||
# Create session directory
|
||||
session_dir = self._current_project_path / "_capture" / session_id
|
||||
session_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.current_session = session
|
||||
self._save_session()
|
||||
|
||||
return session
|
||||
|
||||
def get_session_dir(self) -> Path:
|
||||
"""Get current session directory."""
|
||||
if not self.current_session or not self._current_project_path:
|
||||
raise RuntimeError("No active session")
|
||||
return self._current_project_path / "_capture" / self.current_session.id
|
||||
|
||||
def get_video_path(self) -> Path:
|
||||
"""Get path for video file."""
|
||||
return self.get_session_dir() / self.current_session.video_file
|
||||
|
||||
def update_status(self, status: SessionStatus) -> None:
|
||||
"""Update session status."""
|
||||
if self.current_session:
|
||||
self.current_session.status = status
|
||||
self._save_session()
|
||||
|
||||
def set_duration(self, duration: float) -> None:
|
||||
"""Set recording duration."""
|
||||
if self.current_session:
|
||||
self.current_session.duration = duration
|
||||
self._save_session()
|
||||
|
||||
def set_transcript(self, transcript_file: str) -> None:
|
||||
"""Set transcript file name."""
|
||||
if self.current_session:
|
||||
self.current_session.transcript_file = transcript_file
|
||||
self._save_session()
|
||||
|
||||
def end_session(self) -> Session:
|
||||
"""End current session."""
|
||||
if not self.current_session:
|
||||
raise RuntimeError("No active session")
|
||||
|
||||
self.current_session.status = SessionStatus.READY
|
||||
self._save_session()
|
||||
|
||||
session = self.current_session
|
||||
self.current_session = None
|
||||
self._current_project_path = None
|
||||
|
||||
return session
|
||||
|
||||
def cancel_session(self) -> None:
|
||||
"""Cancel session and delete files."""
|
||||
if self.current_session:
|
||||
import shutil
|
||||
session_dir = self.get_session_dir()
|
||||
if session_dir.exists():
|
||||
shutil.rmtree(session_dir)
|
||||
|
||||
self.current_session = None
|
||||
self._current_project_path = None
|
||||
|
||||
def list_sessions(self, project: Optional[str] = None) -> List[Session]:
|
||||
"""List sessions for a project or all projects."""
|
||||
sessions = []
|
||||
|
||||
if project:
|
||||
capture_dir = self.get_project_path(project) / "_capture"
|
||||
if capture_dir.exists():
|
||||
for session_dir in sorted(capture_dir.iterdir(), reverse=True):
|
||||
if session_dir.is_dir():
|
||||
session_file = session_dir / "session.json"
|
||||
if session_file.exists():
|
||||
try:
|
||||
with open(session_file) as f:
|
||||
sessions.append(Session.from_dict(json.load(f)))
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
for proj in self.list_projects():
|
||||
sessions.extend(self.list_sessions(proj))
|
||||
sessions.sort(key=lambda s: s.created_at, reverse=True)
|
||||
|
||||
return sessions
|
||||
|
||||
def _save_session(self) -> None:
|
||||
"""Save current session to disk."""
|
||||
if not self.current_session:
|
||||
return
|
||||
|
||||
session_file = self.get_session_dir() / "session.json"
|
||||
with open(session_file, "w") as f:
|
||||
json.dump(self.current_session.to_dict(), f, indent=2)
|
||||
Reference in New Issue
Block a user