Simplify KB Capture to match Voice Recorder pattern

Major simplification:
- Removed clips concept (no keep/delete segments)
- Single continuous recording per session with pause/resume
- Matches Voice Recorder UX pattern Antoine knows

Flow:
  Start Session → Record → Pause → Resume → Stop → Transcribe → Done

Features:
- Record/Pause/Resume/Stop controls
- Session types: Design / Analysis
- Auto-transcribe with Whisper on stop
- Finds 'screenshot' triggers in transcript for Clawdbot
- Simple dark theme UI matching Voice Recorder

Removed:
- export.py (transcription now inline)
- hotkeys.py (not needed for MVP)
- Clip management
This commit is contained in:
Mario Lavoie
2026-02-09 22:14:34 +00:00
parent 09c32cbad2
commit 9b24478f04
6 changed files with 786 additions and 2091 deletions

View File

@@ -1,338 +0,0 @@
"""
Session Export for Clawdbot
Merges clips, transcribes audio, and exports for Clawdbot processing.
Uses local Whisper for transcription (no API).
"""
import json
import subprocess
import sys
from pathlib import Path
from datetime import datetime
from typing import Optional, Callable
import tempfile
from .session import Session, SessionManager, ClipStatus
class SessionExporter:
"""
Export a recorded session for Clawdbot processing.
Steps:
1. Merge all kept clips into one video
2. Transcribe with Whisper (local)
3. Create metadata.json
4. Create clawdbot_export/ folder
"""
def __init__(
self,
session_manager: SessionManager,
whisper_model: str = "base",
on_progress: Optional[Callable[[str, float], None]] = None,
):
self.session_manager = session_manager
self.whisper_model = whisper_model
self.on_progress = on_progress or (lambda msg, pct: print(f"[{pct:.0%}] {msg}"))
def export(self, session_id: str) -> Path:
"""
Export a session for Clawdbot.
Returns path to clawdbot_export/ folder.
"""
session = self.session_manager.get_session(session_id)
if not session:
raise ValueError(f"Session not found: {session_id}")
if not session.is_finalized:
raise ValueError("Session not finalized. End the session first.")
session_dir = self.session_manager.get_session_dir(session_id)
export_dir = session_dir / "clawdbot_export"
export_dir.mkdir(exist_ok=True)
kept_clips = session.kept_clips
if not kept_clips:
raise ValueError("No clips to export")
self.on_progress("Starting export...", 0.0)
# Step 1: Merge clips
self.on_progress("Merging clips...", 0.1)
merged_path = self._merge_clips(session_dir, kept_clips, export_dir)
# Step 2: Transcribe (uses local GPU Whisper)
self.on_progress("Transcribing audio...", 0.3)
transcript = self._transcribe(merged_path, export_dir)
# Step 3: Create metadata
# Note: Frame extraction is done by Clawdbot using the knowledge-base skill
self.on_progress("Creating metadata...", 0.8)
self._create_metadata(session, export_dir, merged_path, transcript)
self.on_progress("Export complete!", 1.0)
return export_dir
def _merge_clips(
self,
session_dir: Path,
clips: list,
export_dir: Path,
) -> Path:
"""Merge kept clips into a single video."""
output_path = export_dir / "merged.mp4"
clips_dir = session_dir / "clips"
if len(clips) == 1:
# Single clip - just copy
clip_path = clips_dir / clips[0].filename
import shutil
shutil.copy2(clip_path, output_path)
return output_path
# Create concat list file
concat_file = export_dir / "concat.txt"
with open(concat_file, "w") as f:
for clip in clips:
clip_path = clips_dir / clip.filename
# FFmpeg concat needs escaped paths
escaped = str(clip_path).replace("'", "'\\''")
f.write(f"file '{escaped}'\n")
# Merge with FFmpeg
cmd = [
"ffmpeg", "-y",
"-f", "concat",
"-safe", "0",
"-i", str(concat_file),
"-c", "copy",
str(output_path),
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg merge failed: {result.stderr[-500:]}")
finally:
concat_file.unlink(missing_ok=True)
return output_path
def _transcribe(self, video_path: Path, export_dir: Path) -> dict:
"""Transcribe video audio with Whisper."""
try:
import whisper
except ImportError:
raise RuntimeError("Whisper not installed. Run: pip install openai-whisper")
# Load model
model = whisper.load_model(self.whisper_model)
# Transcribe
result = model.transcribe(
str(video_path),
language="en",
verbose=False,
)
# Save transcript
transcript_path = export_dir / "transcript.json"
with open(transcript_path, "w") as f:
json.dump(result, f, indent=2)
return result
def _extract_frames(
self,
video_path: Path,
transcript: dict,
frames_dir: Path,
) -> list[Path]:
"""Extract frames at 'screenshot' trigger timestamps."""
frames_dir.mkdir(exist_ok=True)
# Find screenshot triggers in transcript
triggers = []
for segment in transcript.get("segments", []):
text = segment.get("text", "").lower()
if "screenshot" in text:
# Get timestamp (start of segment)
timestamp = segment.get("start", 0)
triggers.append(timestamp)
if not triggers:
# No triggers found - extract frames at regular intervals
# Get video duration
duration = self._get_video_duration(video_path)
# Extract every 30 seconds
triggers = list(range(0, int(duration), 30))
# Extract frames
extracted = []
for i, ts in enumerate(triggers):
frame_path = frames_dir / f"{i+1:02d}_{self._format_timestamp(ts)}.png"
cmd = [
"ffmpeg", "-y",
"-ss", str(ts),
"-i", str(video_path),
"-vframes", "1",
"-q:v", "2",
str(frame_path),
]
try:
subprocess.run(
cmd,
capture_output=True,
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0,
)
if frame_path.exists():
extracted.append(frame_path)
except Exception:
pass
return extracted
def _get_video_duration(self, video_path: Path) -> float:
"""Get video duration in seconds."""
cmd = [
"ffprobe",
"-v", "quiet",
"-show_entries", "format=duration",
"-of", "json",
str(video_path),
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(result.stdout)
return float(data["format"]["duration"])
except:
return 300.0 # Default 5 minutes
def _format_timestamp(self, seconds: float) -> str:
"""Format seconds as MM-SS."""
mins = int(seconds // 60)
secs = int(seconds % 60)
return f"{mins:02d}-{secs:02d}"
def _find_screenshot_triggers(self, transcript: dict) -> list[dict]:
"""Find 'screenshot' triggers in transcript with context."""
triggers = []
segments = transcript.get("segments", [])
for i, segment in enumerate(segments):
text = segment.get("text", "").lower()
if "screenshot" in text:
timestamp = segment.get("start", 0)
# Get context: 2 segments before and after
context_segments = segments[max(0, i-2):i+3]
context = " ".join(s.get("text", "") for s in context_segments)
triggers.append({
"timestamp": timestamp,
"timestamp_formatted": self._format_timestamp(timestamp),
"segment_text": segment.get("text", ""),
"context": context.strip(),
})
return triggers
def _create_metadata(
self,
session: Session,
export_dir: Path,
merged_path: Path,
transcript: dict,
) -> None:
"""Create metadata.json for Clawdbot."""
# Find screenshot triggers for Mario
triggers = self._find_screenshot_triggers(transcript)
# Get video duration
duration = self._get_video_duration(merged_path)
metadata = {
"session_id": session.id,
"name": session.name,
"project": session.project,
"session_type": session.session_type.value,
"created_at": session.created_at.isoformat(),
"exported_at": datetime.now().isoformat(),
"clip_count": session.clip_count,
"total_duration": duration,
"status": "pending", # → "processed" after Clawdbot processes
"clips": [
{
"id": clip.id,
"duration": clip.duration_seconds,
"note": clip.note,
}
for clip in session.kept_clips
],
"screenshot_triggers": triggers, # Pre-parsed for Mario
"files": {
"video": "merged.mp4",
"transcript": "transcript.json",
},
}
metadata_path = export_dir / "metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
# CLI for testing
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python -m cad_documenter.export <session_id>")
print(" python -m cad_documenter.export --list")
sys.exit(1)
from pathlib import Path
# Find sessions directory
if sys.platform == "win32":
base = Path.home() / "Documents" / "KB-Capture"
else:
base = Path.home() / "kb-capture"
manager = SessionManager(base)
if sys.argv[1] == "--list":
sessions = manager.list_sessions()
if not sessions:
print("No sessions found.")
else:
print("Sessions:")
for s in sessions:
status = "" if s.is_finalized else ""
print(f" {status} {s.id}: {s.name} ({s.clip_count} clips, {s.total_duration:.0f}s)")
else:
session_id = sys.argv[1]
def on_progress(msg, pct):
bar = "" * int(pct * 20) + "" * (20 - int(pct * 20))
print(f"\r[{bar}] {msg}", end="", flush=True)
exporter = SessionExporter(manager, on_progress=on_progress)
try:
export_path = exporter.export(session_id)
print(f"\n\nExported to: {export_path}")
except Exception as e:
print(f"\nError: {e}")
sys.exit(1)

File diff suppressed because it is too large Load Diff

View File

@@ -1,161 +0,0 @@
"""
Global Hotkey Manager
Registers global hotkeys for recording control.
Works across all applications.
"""
import threading
from typing import Callable, Dict, Optional
import sys
# Use keyboard library for cross-platform hotkeys
try:
import keyboard
HAS_KEYBOARD = True
except ImportError:
HAS_KEYBOARD = False
print("Warning: 'keyboard' library not installed. Hotkeys disabled.")
class HotkeyManager:
"""
Manages global hotkeys for KB Capture.
Default hotkeys:
Ctrl+Shift+R: Toggle recording
Ctrl+Shift+K: Keep last clip
Ctrl+Shift+D: Delete last clip
Ctrl+Shift+E: End session
Escape: Cancel (when in certain states)
"""
DEFAULT_HOTKEYS = {
"toggle_recording": "ctrl+shift+r",
"keep_clip": "ctrl+shift+k",
"delete_clip": "ctrl+shift+d",
"end_session": "ctrl+shift+e",
}
def __init__(self):
self.callbacks: Dict[str, Callable] = {}
self.hotkeys: Dict[str, str] = self.DEFAULT_HOTKEYS.copy()
self.registered: Dict[str, bool] = {}
self.enabled = HAS_KEYBOARD
def set_callback(self, action: str, callback: Callable) -> None:
"""Set callback for an action."""
self.callbacks[action] = callback
def register_all(self) -> bool:
"""Register all hotkeys."""
if not self.enabled:
return False
for action, hotkey in self.hotkeys.items():
if action in self.callbacks:
try:
keyboard.add_hotkey(
hotkey,
self._create_handler(action),
suppress=False, # Don't block the key
)
self.registered[action] = True
except Exception as e:
print(f"Failed to register hotkey {hotkey}: {e}")
self.registered[action] = False
return all(self.registered.values())
def _create_handler(self, action: str) -> Callable:
"""Create a handler that calls the callback in a thread."""
def handler():
if action in self.callbacks:
# Run callback in thread to avoid blocking
threading.Thread(
target=self.callbacks[action],
daemon=True,
).start()
return handler
def unregister_all(self) -> None:
"""Unregister all hotkeys."""
if not self.enabled:
return
for action, hotkey in self.hotkeys.items():
if self.registered.get(action):
try:
keyboard.remove_hotkey(hotkey)
except:
pass
self.registered[action] = False
def set_hotkey(self, action: str, hotkey: str) -> None:
"""Change hotkey for an action."""
if action in self.hotkeys:
# Unregister old hotkey
if self.registered.get(action) and self.enabled:
try:
keyboard.remove_hotkey(self.hotkeys[action])
except:
pass
self.hotkeys[action] = hotkey
# Register new hotkey
if action in self.callbacks and self.enabled:
try:
keyboard.add_hotkey(
hotkey,
self._create_handler(action),
suppress=False,
)
self.registered[action] = True
except Exception as e:
print(f"Failed to register hotkey {hotkey}: {e}")
self.registered[action] = False
def get_hotkey_display(self, action: str) -> str:
"""Get display-friendly hotkey string."""
hotkey = self.hotkeys.get(action, "")
return hotkey.replace("+", " + ").title()
# Quick test
if __name__ == "__main__":
manager = HotkeyManager()
def on_toggle():
print("Toggle recording!")
def on_keep():
print("Keep clip!")
def on_delete():
print("Delete clip!")
def on_end():
print("End session!")
manager.set_callback("toggle_recording", on_toggle)
manager.set_callback("keep_clip", on_keep)
manager.set_callback("delete_clip", on_delete)
manager.set_callback("end_session", on_end)
if manager.register_all():
print("Hotkeys registered! Try:")
print(" Ctrl+Shift+R: Toggle recording")
print(" Ctrl+Shift+K: Keep clip")
print(" Ctrl+Shift+D: Delete clip")
print(" Ctrl+Shift+E: End session")
print("\nPress Ctrl+C to exit")
try:
keyboard.wait()
except KeyboardInterrupt:
pass
manager.unregister_all()
else:
print("Failed to register hotkeys")

View File

@@ -1,15 +1,11 @@
""" """
KB Capture - Knowledge Base Recording Tool KB Capture - Screen Recording for Knowledge Base
Main application that ties together: Simple flow: Record → Pause → Resume → Stop → Transcribe → Done
- Screen recording
- Session/clip management One session = one continuous recording.
- Hotkey control
- System tray integration
- GUI interface
""" """
import sys
import threading import threading
import time import time
from pathlib import Path from pathlib import Path
@@ -18,42 +14,38 @@ from dataclasses import dataclass
from enum import Enum from enum import Enum
from .recorder import ScreenRecorder, RecordingConfig from .recorder import ScreenRecorder, RecordingConfig
from .session import SessionManager, Session, Clip, ClipStatus, SessionType from .session import SessionManager, Session, SessionType, SessionStatus
from .hotkeys import HotkeyManager
class AppState(Enum): class AppState(Enum):
"""Application state machine.""" """Application state."""
IDLE = "idle" # No session, ready to start IDLE = "idle" # No session
SESSION_ACTIVE = "session" # Session started, not recording RECORDING = "recording" # Recording
RECORDING = "recording" # Currently recording a clip PAUSED = "paused" # Recording paused
PREVIEW = "preview" # Clip just recorded, awaiting decision TRANSCRIBING = "transcribing" # Processing
@dataclass @dataclass
class AppStatus: class AppStatus:
"""Current application status for UI updates.""" """Current application status for UI."""
state: AppState state: AppState
session_name: Optional[str] = None session_name: Optional[str] = None
project: Optional[str] = None project: Optional[str] = None
session_type: Optional[SessionType] = None session_type: Optional[SessionType] = None
clip_count: int = 0 duration: float = 0.0
total_duration: float = 0.0
current_clip_duration: float = 0.0
last_clip_duration: float = 0.0
message: str = "" message: str = ""
class KBCaptureApp: class KBCaptureApp:
""" """
Main KB Capture application. Main application.
Controls recording flow: Flow:
1. Select project from available projects 1. Select project
2. Start session (name, type) 2. Start session (name, type)
3. Toggle recording to create clips 3. Record → Pause → Resume → Stop
4. Keep or delete clips 4. Auto-transcribe with Whisper
5. End session to export 5. Ready for Clawdbot to process
""" """
def __init__( def __init__(
@@ -67,22 +59,14 @@ class KBCaptureApp:
# Components # Components
self.session_manager = SessionManager(self.projects_root) self.session_manager = SessionManager(self.projects_root)
self.recorder = ScreenRecorder(on_status=self._log) self.recorder = ScreenRecorder(on_status=self._log)
self.hotkeys = HotkeyManager()
# State # State
self.state = AppState.IDLE self.state = AppState.IDLE
self._recording_start_time: Optional[float] = None
self._duration_thread: Optional[threading.Thread] = None self._duration_thread: Optional[threading.Thread] = None
self._running = False self._running = False
# Setup hotkeys
self.hotkeys.set_callback("toggle_recording", self.toggle_recording)
self.hotkeys.set_callback("keep_clip", self.keep_last_clip)
self.hotkeys.set_callback("delete_clip", self.delete_last_clip)
self.hotkeys.set_callback("end_session", self.end_session)
def _log(self, message: str) -> None: def _log(self, message: str) -> None:
"""Log a message and update status.""" """Log and update status."""
print(f"[KB Capture] {message}") print(f"[KB Capture] {message}")
self._update_status(message=message) self._update_status(message=message)
@@ -95,28 +79,18 @@ class KBCaptureApp:
session_name=session.name if session else None, session_name=session.name if session else None,
project=session.project if session else None, project=session.project if session else None,
session_type=session.session_type if session else None, session_type=session.session_type if session else None,
clip_count=session.clip_count if session else 0, duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0,
total_duration=session.total_duration if session else 0.0,
current_clip_duration=self.recorder.get_duration() if self.state == AppState.RECORDING else 0.0,
last_clip_duration=self._get_last_clip_duration(),
message=message, message=message,
) )
self.on_status_change(status) self.on_status_change(status)
def _get_last_clip_duration(self) -> float:
"""Get duration of the last clip."""
session = self.session_manager.current_session
if session and session.clips:
return session.clips[-1].duration_seconds
return 0.0
def _start_duration_thread(self) -> None: def _start_duration_thread(self) -> None:
"""Start thread to update duration while recording.""" """Start thread to update duration."""
self._running = True self._running = True
def update_loop(): def update_loop():
while self._running and self.state == AppState.RECORDING: while self._running and self.state in (AppState.RECORDING, AppState.PAUSED):
self._update_status() self._update_status()
time.sleep(0.5) time.sleep(0.5)
@@ -124,7 +98,7 @@ class KBCaptureApp:
self._duration_thread.start() self._duration_thread.start()
def _stop_duration_thread(self) -> None: def _stop_duration_thread(self) -> None:
"""Stop duration update thread.""" """Stop duration thread."""
self._running = False self._running = False
if self._duration_thread: if self._duration_thread:
self._duration_thread.join(timeout=1) self._duration_thread.join(timeout=1)
@@ -132,146 +106,184 @@ class KBCaptureApp:
# === Public API === # === Public API ===
def start(self) -> None:
"""Start the application (register hotkeys)."""
self.hotkeys.register_all()
self._log("KB Capture started. Hotkeys active.")
def stop(self) -> None:
"""Stop the application."""
self.hotkeys.unregister_all()
self._stop_duration_thread()
if self.state == AppState.RECORDING:
self.recorder.stop()
self._log("KB Capture stopped.")
def start_session( def start_session(
self, self,
name: str, name: str,
project: str, project: str,
session_type: SessionType = SessionType.DESIGN, session_type: SessionType = SessionType.DESIGN,
) -> Session: ) -> Session:
"""Start a new recording session.""" """Start a new session and begin recording."""
if self.state != AppState.IDLE: if self.state != AppState.IDLE:
raise RuntimeError("Session already active") raise RuntimeError("Session already active")
# Create session
session = self.session_manager.start_session(name, project, session_type) session = self.session_manager.start_session(name, project, session_type)
self.state = AppState.SESSION_ACTIVE
self._log(f"Session started: {name}")
self._update_status()
return session
def toggle_recording(self) -> None:
"""Toggle recording state (hotkey handler)."""
if self.state == AppState.IDLE:
self._log("No session active. Start a session first.")
return
if self.state == AppState.RECORDING:
self._stop_recording()
else:
self._start_recording()
def _start_recording(self) -> None:
"""Start recording a new clip."""
if self.state not in (AppState.SESSION_ACTIVE, AppState.PREVIEW):
return
# Auto-keep any clip in preview
if self.state == AppState.PREVIEW:
self.session_manager.keep_last_clip()
# Start new clip
clip, clip_path = self.session_manager.start_clip()
# Start recording
config = RecordingConfig( config = RecordingConfig(
output_path=clip_path, output_path=self.session_manager.get_video_path(),
framerate=30, framerate=30,
) )
if self.recorder.start(config): if self.recorder.start(config):
self.state = AppState.RECORDING self.state = AppState.RECORDING
self._start_duration_thread() self._start_duration_thread()
self._log(f"Recording clip {clip.id}...") self._log(f"Session started: {name}")
else: else:
self._log("Failed to start recording") self.session_manager.cancel_session()
raise RuntimeError("Failed to start recording")
def _stop_recording(self) -> None:
"""Stop recording current clip."""
if self.state != AppState.RECORDING:
return
self._stop_duration_thread()
duration = self.recorder.get_duration()
output = self.recorder.stop()
if output and output.exists():
self.session_manager.end_clip(duration)
self.state = AppState.PREVIEW
self._log(f"Clip recorded: {duration:.1f}s - Keep (K) or Delete (D)?")
else:
self._log("Recording failed - no output")
self.state = AppState.SESSION_ACTIVE
self._update_status()
def keep_last_clip(self, note: str = "") -> Optional[Clip]:
"""Keep the last recorded clip."""
clip = self.session_manager.keep_last_clip(note)
if clip:
self.state = AppState.SESSION_ACTIVE
self._log(f"Kept clip: {clip.id}")
self._update_status()
return clip
def delete_last_clip(self) -> Optional[Clip]:
"""Delete the last recorded clip."""
clip = self.session_manager.delete_last_clip()
if clip:
self.state = AppState.SESSION_ACTIVE
self._log(f"Deleted clip: {clip.id}")
self._update_status()
return clip
def end_session(self) -> Optional[Session]:
"""End current session and prepare for export."""
if self.state == AppState.IDLE:
return None
# Stop recording if active
if self.state == AppState.RECORDING:
self._stop_recording()
# Keep any preview clips
if self.state == AppState.PREVIEW:
self.session_manager.keep_last_clip()
session = self.session_manager.end_session()
self.state = AppState.IDLE
self._log(f"Session ended: {session.clip_count} clips, {session.total_duration:.1f}s total")
self._update_status()
return session return session
def cancel_session(self) -> None: def pause(self) -> bool:
"""Cancel current session and delete all clips.""" """Pause recording."""
if self.state != AppState.RECORDING:
return False
if self.recorder.pause():
self.state = AppState.PAUSED
self.session_manager.update_status(SessionStatus.PAUSED)
self._update_status("Paused")
return True
return False
def resume(self) -> bool:
"""Resume recording."""
if self.state != AppState.PAUSED:
return False
if self.recorder.resume():
self.state = AppState.RECORDING
self.session_manager.update_status(SessionStatus.RECORDING)
self._update_status("Recording")
return True
return False
def toggle_pause(self) -> None:
"""Toggle pause/resume."""
if self.state == AppState.RECORDING: if self.state == AppState.RECORDING:
self._stop_duration_thread() self.pause()
elif self.state == AppState.PAUSED:
self.resume()
def stop(self) -> Optional[Session]:
"""Stop recording and transcribe."""
if self.state not in (AppState.RECORDING, AppState.PAUSED):
return None
self._stop_duration_thread()
# Get duration before stopping
duration = self.recorder.get_duration()
# Stop recording
output = self.recorder.stop()
if not output or not output.exists():
self._log("Recording failed - no output")
self.session_manager.cancel_session()
self.state = AppState.IDLE
return None
# Update session
self.session_manager.set_duration(duration)
self.session_manager.update_status(SessionStatus.TRANSCRIBING)
self.state = AppState.TRANSCRIBING
self._update_status("Transcribing...")
# Transcribe in background
threading.Thread(
target=self._transcribe,
args=(output,),
daemon=True,
).start()
return self.session_manager.current_session
def _transcribe(self, video_path: Path) -> None:
"""Transcribe video with Whisper."""
try:
import whisper
self._log("Loading Whisper model...")
model = whisper.load_model("base")
self._log("Transcribing...")
result = model.transcribe(str(video_path), language="en", verbose=False)
# Save transcript
import json
transcript_path = video_path.parent / "transcript.json"
with open(transcript_path, "w") as f:
json.dump(result, f, indent=2)
# Find screenshot triggers
triggers = []
for segment in result.get("segments", []):
text = segment.get("text", "").lower()
if "screenshot" in text:
triggers.append({
"timestamp": segment.get("start", 0),
"text": segment.get("text", ""),
})
# Save metadata
session = self.session_manager.current_session
metadata = {
"session_id": session.id,
"name": session.name,
"project": session.project,
"session_type": session.session_type.value,
"created_at": session.created_at.isoformat(),
"duration": session.duration,
"status": "ready",
"screenshot_triggers": triggers,
"files": {
"video": session.video_file,
"transcript": "transcript.json",
},
}
metadata_path = video_path.parent / "metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
# Update session
self.session_manager.set_transcript("transcript.json")
self.session_manager.end_session()
self.state = AppState.IDLE
self._log(f"Done! {len(triggers)} screenshot triggers found")
self._update_status(f"Session saved with {len(triggers)} screenshots")
except ImportError:
self._log("Whisper not installed!")
self.session_manager.end_session()
self.state = AppState.IDLE
self._update_status("Saved (no transcription)")
except Exception as e:
self._log(f"Transcription error: {e}")
self.session_manager.end_session()
self.state = AppState.IDLE
self._update_status("Saved (transcription failed)")
def cancel(self) -> None:
"""Cancel session and delete files."""
if self.state == AppState.IDLE:
return
self._stop_duration_thread()
if self.recorder.is_recording:
self.recorder.stop() self.recorder.stop()
self.session_manager.cancel_session() self.session_manager.cancel_session()
self.state = AppState.IDLE self.state = AppState.IDLE
self._log("Session cancelled") self._log("Session cancelled")
self._update_status() self._update_status("Cancelled")
def get_status(self) -> AppStatus: def get_status(self) -> AppStatus:
"""Get current application status.""" """Get current status."""
session = self.session_manager.current_session session = self.session_manager.current_session
return AppStatus( return AppStatus(
@@ -279,95 +291,5 @@ class KBCaptureApp:
session_name=session.name if session else None, session_name=session.name if session else None,
project=session.project if session else None, project=session.project if session else None,
session_type=session.session_type if session else None, session_type=session.session_type if session else None,
clip_count=session.clip_count if session else 0, duration=self.recorder.get_duration() if self.state in (AppState.RECORDING, AppState.PAUSED) else 0.0,
total_duration=session.total_duration if session else 0.0,
current_clip_duration=self.recorder.get_duration() if self.state == AppState.RECORDING else 0.0,
last_clip_duration=self._get_last_clip_duration(),
) )
def list_sessions(self) -> list[Session]:
"""List all recorded sessions."""
return self.session_manager.list_sessions()
def get_session_dir(self, session_id: str) -> Path:
"""Get session directory for export."""
return self.session_manager.get_session_dir(session_id)
# CLI for testing
if __name__ == "__main__":
import tempfile
def on_status(status: AppStatus):
state_icons = {
AppState.IDLE: "",
AppState.SESSION_ACTIVE: "🟢",
AppState.RECORDING: "🔴",
AppState.PREVIEW: "🟡",
}
icon = state_icons.get(status.state, "")
print(f"\n{icon} State: {status.state.value}")
if status.session_name:
print(f" Session: {status.session_name} ({status.project})")
print(f" Clips: {status.clip_count} | Duration: {status.total_duration:.1f}s")
if status.current_clip_duration > 0:
print(f" Recording: {status.current_clip_duration:.1f}s")
if status.message:
print(f"{status.message}")
with tempfile.TemporaryDirectory() as tmpdir:
# Create a test project
test_project = Path(tmpdir) / "Test-Project"
(test_project / "KB").mkdir(parents=True)
app = KBCaptureApp(
projects_root=Path(tmpdir),
on_status_change=on_status,
)
print("\n=== KB Capture Test ===")
print(f"Projects root: {tmpdir}")
print(f"Available projects: {app.session_manager.list_projects()}")
print("\nCommands:")
print(" s - Start session")
print(" r - Toggle recording")
print(" k - Keep last clip")
print(" d - Delete last clip")
print(" e - End session")
print(" c - Cancel session")
print(" q - Quit")
print()
app.start()
try:
while True:
cmd = input("> ").strip().lower()
if cmd == "s":
projects = app.session_manager.list_projects()
print(f"Available projects: {projects}")
project = input("Project: ").strip() or (projects[0] if projects else "Test-Project")
name = input("Session name: ").strip() or "Test Session"
app.start_session(name, project)
elif cmd == "r":
app.toggle_recording()
elif cmd == "k":
app.keep_last_clip()
elif cmd == "d":
app.delete_last_clip()
elif cmd == "e":
app.end_session()
elif cmd == "c":
app.cancel_session()
elif cmd == "q":
break
else:
print("Unknown command")
except KeyboardInterrupt:
pass
app.stop()
print("\nGoodbye!")

View File

@@ -1,8 +1,8 @@
""" """
Screen + Audio Recorder using FFmpeg Screen + Audio Recorder (Simplified)
Records screen and microphone to video file. Records screen and microphone to a single video file.
Supports Windows (gdigrab) and Linux (x11grab). Supports pause/resume within the same recording.
""" """
import subprocess import subprocess
@@ -12,7 +12,6 @@ import sys
from pathlib import Path from pathlib import Path
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Callable from typing import Optional, Callable
import json
@dataclass @dataclass
@@ -20,103 +19,65 @@ class RecordingConfig:
"""Recording configuration.""" """Recording configuration."""
output_path: Path output_path: Path
framerate: int = 30 framerate: int = 30
audio_device: Optional[str] = None # None = default mic audio_device: Optional[str] = None
video_codec: str = "libx264" video_codec: str = "libx264"
audio_codec: str = "aac" audio_codec: str = "aac"
crf: int = 23 # Quality (lower = better, 18-28 typical) crf: int = 23
preset: str = "ultrafast" # Encoding speed preset: str = "ultrafast"
capture_region: Optional[tuple[int, int, int, int]] = None # x, y, w, h
class ScreenRecorder: class ScreenRecorder:
""" """
FFmpeg-based screen recorder with audio. FFmpeg-based screen recorder with pause/resume.
Usage: Usage:
recorder = ScreenRecorder() recorder = ScreenRecorder()
recorder.start(config) recorder.start(config)
# ... recording ... # ... recording ...
recorder.pause()
# ... thinking ...
recorder.resume()
# ... more recording ...
recorder.stop() recorder.stop()
""" """
def __init__(self, on_status: Optional[Callable[[str], None]] = None): def __init__(self, on_status: Optional[Callable[[str], None]] = None):
self.process: Optional[subprocess.Popen] = None self.process: Optional[subprocess.Popen] = None
self.is_recording = False self.is_recording = False
self.is_paused = False
self.start_time: Optional[float] = None self.start_time: Optional[float] = None
self.pause_start: Optional[float] = None
self.total_paused: float = 0.0
self.output_path: Optional[Path] = None self.output_path: Optional[Path] = None
self.on_status = on_status or (lambda x: None) self.on_status = on_status or (lambda x: None)
self._monitor_thread: Optional[threading.Thread] = None
def _get_ffmpeg_cmd(self, config: RecordingConfig) -> list[str]:
def _get_platform_args(self, config: RecordingConfig) -> list[str]: """Build FFmpeg command."""
"""Get platform-specific FFmpeg input arguments.""" cmd = ["ffmpeg", "-y"]
if sys.platform == "win32": if sys.platform == "win32":
# Windows: gdigrab for screen, dshow for audio # Windows: gdigrab for screen
args = [ cmd.extend([
"-f", "gdigrab", "-f", "gdigrab",
"-framerate", str(config.framerate), "-framerate", str(config.framerate),
] "-i", "desktop",
])
if config.capture_region: # Audio: dshow
x, y, w, h = config.capture_region audio_device = config.audio_device or "Microphone Array"
args.extend([ cmd.extend([
"-offset_x", str(x), "-f", "dshow",
"-offset_y", str(y), "-i", f"audio={audio_device}",
"-video_size", f"{w}x{h}", ])
])
args.extend(["-i", "desktop"])
# Add audio input
if config.audio_device:
args.extend([
"-f", "dshow",
"-i", f"audio={config.audio_device}",
])
else:
# Try to find default microphone
args.extend([
"-f", "dshow",
"-i", "audio=Microphone Array", # Common default
])
else: else:
# Linux: x11grab for screen, pulse for audio # Linux: x11grab + pulse
display = ":0.0" cmd.extend([
args = [
"-f", "x11grab", "-f", "x11grab",
"-framerate", str(config.framerate), "-framerate", str(config.framerate),
] "-i", ":0.0",
if config.capture_region:
x, y, w, h = config.capture_region
args.extend(["-video_size", f"{w}x{h}"])
display = f":0.0+{x},{y}"
args.extend(["-i", display])
# Add audio (PulseAudio)
args.extend([
"-f", "pulse", "-f", "pulse",
"-i", "default", "-i", "default",
]) ])
return args
def start(self, config: RecordingConfig) -> bool:
"""Start recording."""
if self.is_recording:
self.on_status("Already recording")
return False
self.output_path = config.output_path
self.output_path.parent.mkdir(parents=True, exist_ok=True)
# Build FFmpeg command
cmd = ["ffmpeg", "-y"] # -y to overwrite
# Platform-specific inputs
cmd.extend(self._get_platform_args(config))
# Output settings # Output settings
cmd.extend([ cmd.extend([
@@ -125,11 +86,23 @@ class ScreenRecorder:
"-crf", str(config.crf), "-crf", str(config.crf),
"-c:a", config.audio_codec, "-c:a", config.audio_codec,
"-b:a", "128k", "-b:a", "128k",
str(self.output_path), str(config.output_path),
]) ])
return cmd
def start(self, config: RecordingConfig) -> bool:
"""Start recording."""
if self.is_recording:
self.on_status("Already recording")
return False
self.output_path = config.output_path
self.output_path.parent.mkdir(parents=True, exist_ok=True)
cmd = self._get_ffmpeg_cmd(config)
try: try:
# Start FFmpeg process
self.process = subprocess.Popen( self.process = subprocess.Popen(
cmd, cmd,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
@@ -139,68 +112,132 @@ class ScreenRecorder:
) )
self.is_recording = True self.is_recording = True
self.is_paused = False
self.start_time = time.time() self.start_time = time.time()
self.on_status(f"Recording started: {self.output_path.name}") self.total_paused = 0.0
self.on_status("Recording started")
# Start monitor thread
self._monitor_thread = threading.Thread(target=self._monitor_process, daemon=True)
self._monitor_thread.start()
return True return True
except FileNotFoundError: except FileNotFoundError:
self.on_status("FFmpeg not found. Please install FFmpeg.") self.on_status("FFmpeg not found")
return False return False
except Exception as e: except Exception as e:
self.on_status(f"Failed to start recording: {e}") self.on_status(f"Failed: {e}")
return False return False
def _monitor_process(self): def pause(self) -> bool:
"""Monitor FFmpeg process for errors.""" """Pause recording (Windows: suspend process)."""
if self.process: if not self.is_recording or self.is_paused:
stderr = self.process.stderr.read() if self.process.stderr else b"" return False
if self.process.returncode and self.process.returncode != 0:
self.on_status(f"Recording error: {stderr.decode()[-200:]}") self.is_paused = True
self.pause_start = time.time()
# On Windows, we can suspend the FFmpeg process
if sys.platform == "win32" and self.process:
try:
import ctypes
kernel32 = ctypes.windll.kernel32
handle = kernel32.OpenProcess(0x1F0FFF, False, self.process.pid)
kernel32.DebugActiveProcess(self.process.pid)
self.on_status("Paused")
except:
self.on_status("Paused (soft)")
else:
self.on_status("Paused")
return True
def resume(self) -> bool:
"""Resume recording."""
if not self.is_recording or not self.is_paused:
return False
if self.pause_start:
self.total_paused += time.time() - self.pause_start
self.is_paused = False
self.pause_start = None
# Resume FFmpeg process on Windows
if sys.platform == "win32" and self.process:
try:
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.DebugActiveProcessStop(self.process.pid)
except:
pass
self.on_status("Recording resumed")
return True
def stop(self) -> Optional[Path]: def stop(self) -> Optional[Path]:
"""Stop recording and return output path.""" """Stop recording and return output path."""
if not self.is_recording or not self.process: if not self.is_recording or not self.process:
return None return None
# If paused, add final pause duration
if self.is_paused and self.pause_start:
self.total_paused += time.time() - self.pause_start
# Resume first so we can stop properly
if sys.platform == "win32":
try:
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.DebugActiveProcessStop(self.process.pid)
except:
pass
try: try:
# Send 'q' to FFmpeg to stop gracefully # Send 'q' to FFmpeg
if self.process.stdin: if self.process.stdin:
self.process.stdin.write(b"q") self.process.stdin.write(b"q")
self.process.stdin.flush() self.process.stdin.flush()
# Wait for process to finish (with timeout)
self.process.wait(timeout=10) self.process.wait(timeout=10)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
self.process.terminate() self.process.terminate()
self.process.wait(timeout=5) self.process.wait(timeout=5)
except Exception as e: except Exception as e:
self.on_status(f"Error stopping recording: {e}") self.on_status(f"Stop error: {e}")
self.process.terminate() self.process.terminate()
self.is_recording = False self.is_recording = False
duration = time.time() - self.start_time if self.start_time else 0 self.is_paused = False
self.on_status(f"Recording stopped: {duration:.1f}s")
duration = self.get_duration()
self.on_status(f"Stopped: {duration:.1f}s")
return self.output_path if self.output_path and self.output_path.exists() else None return self.output_path if self.output_path and self.output_path.exists() else None
def get_duration(self) -> float: def get_duration(self) -> float:
"""Get current recording duration in seconds.""" """Get actual recording duration (excluding pauses)."""
if self.is_recording and self.start_time: if not self.start_time:
return time.time() - self.start_time return 0.0
return 0.0
elapsed = time.time() - self.start_time
# Subtract paused time
if self.is_paused and self.pause_start:
elapsed -= (time.time() - self.pause_start)
elapsed -= self.total_paused
return max(0, elapsed)
def get_elapsed(self) -> float:
"""Get total elapsed time (including pauses)."""
if not self.start_time:
return 0.0
return time.time() - self.start_time
@staticmethod @staticmethod
def list_audio_devices() -> list[str]: def list_audio_devices() -> list[str]:
"""List available audio input devices (Windows only).""" """List available audio input devices (Windows)."""
if sys.platform != "win32": if sys.platform != "win32":
return ["default"] return ["default"]
try: try:
result = subprocess.run( result = subprocess.run(
["ffmpeg", "-list_devices", "true", "-f", "dshow", "-i", "dummy"], ["ffmpeg", "-list_devices", "true", "-f", "dshow", "-i", "dummy"],
@@ -209,7 +246,6 @@ class ScreenRecorder:
creationflags=subprocess.CREATE_NO_WINDOW, creationflags=subprocess.CREATE_NO_WINDOW,
) )
# Parse audio devices from stderr
devices = [] devices = []
in_audio = False in_audio = False
for line in result.stderr.split("\n"): for line in result.stderr.split("\n"):
@@ -218,40 +254,12 @@ class ScreenRecorder:
elif "DirectShow video devices" in line: elif "DirectShow video devices" in line:
in_audio = False in_audio = False
elif in_audio and '"' in line: elif in_audio and '"' in line:
# Extract device name between quotes
start = line.find('"') + 1 start = line.find('"') + 1
end = line.rfind('"') end = line.rfind('"')
if start < end: if start < end:
devices.append(line[start:end]) devices.append(line[start:end])
return devices if devices else ["default"]
except Exception: return devices if devices else ["Microphone Array"]
return ["default"]
except:
return ["Microphone Array"]
# Quick test
if __name__ == "__main__":
def status(msg):
print(f"[STATUS] {msg}")
recorder = ScreenRecorder(on_status=status)
print("Available audio devices:", recorder.list_audio_devices())
config = RecordingConfig(
output_path=Path("test_recording.mp4"),
framerate=30,
)
print("Starting recording... (Ctrl+C to stop)")
if recorder.start(config):
try:
while True:
time.sleep(1)
print(f"Recording: {recorder.get_duration():.1f}s")
except KeyboardInterrupt:
pass
output = recorder.stop()
print(f"Saved to: {output}")

View File

@@ -1,92 +1,43 @@
""" """
Session Manager for KB Capture Session Manager for KB Capture (Simplified)
Manages recording sessions with multiple clips. One session = one continuous recording (with pause/resume).
Clips can be kept or deleted before finalizing. No clips, no keep/delete. Just record → transcribe → done.
""" """
import json import json
import shutil
from pathlib import Path from pathlib import Path
from dataclasses import dataclass, field, asdict from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Optional, List from typing import Optional, List
from enum import Enum from enum import Enum
import uuid
class ClipStatus(Enum):
RECORDING = "recording"
PREVIEW = "preview" # Just recorded, awaiting decision
KEPT = "kept"
DELETED = "deleted"
class SessionType(Enum): class SessionType(Enum):
DESIGN = "design" # CAD/Design KB DESIGN = "design" # CAD/Design KB
ANALYSIS = "analysis" # FEA/Analysis KB ANALYSIS = "analysis" # FEA/Analysis KB
@dataclass class SessionStatus(Enum):
class Clip: RECORDING = "recording"
"""A single recording clip within a session.""" PAUSED = "paused"
id: str TRANSCRIBING = "transcribing"
filename: str READY = "ready" # Transcribed, ready for sync
start_time: datetime PROCESSED = "processed" # Clawdbot has processed it
end_time: Optional[datetime] = None
duration_seconds: float = 0.0
status: ClipStatus = ClipStatus.RECORDING
note: str = "" # Optional quick note
def to_dict(self) -> dict:
return {
"id": self.id,
"filename": self.filename,
"start_time": self.start_time.isoformat(),
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_seconds": self.duration_seconds,
"status": self.status.value,
"note": self.note,
}
@classmethod
def from_dict(cls, data: dict) -> "Clip":
return cls(
id=data["id"],
filename=data["filename"],
start_time=datetime.fromisoformat(data["start_time"]),
end_time=datetime.fromisoformat(data["end_time"]) if data.get("end_time") else None,
duration_seconds=data.get("duration_seconds", 0.0),
status=ClipStatus(data.get("status", "kept")),
note=data.get("note", ""),
)
@dataclass @dataclass
class Session: class Session:
"""A recording session containing multiple clips.""" """A recording session."""
id: str id: str
name: str name: str
project: str project: str
session_type: SessionType session_type: SessionType
created_at: datetime created_at: datetime
clips: List[Clip] = field(default_factory=list) duration: float = 0.0
is_finalized: bool = False status: SessionStatus = SessionStatus.RECORDING
video_file: str = "recording.mp4"
@property transcript_file: Optional[str] = None
def total_duration(self) -> float:
"""Total duration of kept clips."""
return sum(c.duration_seconds for c in self.clips if c.status == ClipStatus.KEPT)
@property
def kept_clips(self) -> List[Clip]:
"""Clips marked as kept."""
return [c for c in self.clips if c.status == ClipStatus.KEPT]
@property
def clip_count(self) -> int:
"""Number of kept clips."""
return len(self.kept_clips)
def to_dict(self) -> dict: def to_dict(self) -> dict:
return { return {
@@ -95,8 +46,10 @@ class Session:
"project": self.project, "project": self.project,
"session_type": self.session_type.value, "session_type": self.session_type.value,
"created_at": self.created_at.isoformat(), "created_at": self.created_at.isoformat(),
"clips": [c.to_dict() for c in self.clips], "duration": self.duration,
"is_finalized": self.is_finalized, "status": self.status.value,
"video_file": self.video_file,
"transcript_file": self.transcript_file,
} }
@classmethod @classmethod
@@ -107,52 +60,38 @@ class Session:
project=data["project"], project=data["project"],
session_type=SessionType(data.get("session_type", "design")), session_type=SessionType(data.get("session_type", "design")),
created_at=datetime.fromisoformat(data["created_at"]), created_at=datetime.fromisoformat(data["created_at"]),
clips=[Clip.from_dict(c) for c in data.get("clips", [])], duration=data.get("duration", 0.0),
is_finalized=data.get("is_finalized", False), status=SessionStatus(data.get("status", "ready")),
video_file=data.get("video_file", "recording.mp4"),
transcript_file=data.get("transcript_file"),
) )
class SessionManager: class SessionManager:
""" """
Manages recording sessions and clips. Manages recording sessions.
Project-centric structure: Project-centric structure:
/2-Projects/<ProjectName>/ /Projects/<ProjectName>/
── KB/ ── _capture/
│ └── dev/ # gen-XXX.md session captures (Mario creates)
├── Images/
│ └── screenshot-sessions/ # Frames organized by session
└── _capture/ # Session staging
└── <session-id>/ └── <session-id>/
├── session.json ├── session.json # Metadata
├── clips/ ├── recording.mp4 # Video
│ ├── clip-001.mp4 └── transcript.json # Whisper output
│ └── ...
└── clawdbot_export/ # Ready for Mario
├── merged.mp4
├── transcript.json
└── metadata.json
""" """
def __init__(self, projects_root: Path): def __init__(self, projects_root: Path):
"""
Initialize session manager.
Args:
projects_root: Path to projects folder (e.g., /2-Projects/ or D:/ATODrive/Projects/)
"""
self.projects_root = Path(projects_root) self.projects_root = Path(projects_root)
self.current_session: Optional[Session] = None self.current_session: Optional[Session] = None
self.current_clip: Optional[Clip] = None
self._current_project_path: Optional[Path] = None self._current_project_path: Optional[Path] = None
def list_projects(self) -> List[str]: def list_projects(self) -> List[str]:
"""List available projects (folders in projects_root).""" """List available projects."""
projects = [] projects = []
if self.projects_root.exists(): if self.projects_root.exists():
for p in sorted(self.projects_root.iterdir()): for p in sorted(self.projects_root.iterdir()):
if p.is_dir() and not p.name.startswith((".", "_")): if p.is_dir() and not p.name.startswith((".", "_")):
# Check if it looks like a project (has KB folder or _context.md) # Check if it looks like a project
if (p / "KB").exists() or (p / "_context.md").exists(): if (p / "KB").exists() or (p / "_context.md").exists():
projects.append(p.name) projects.append(p.name)
return projects return projects
@@ -161,25 +100,13 @@ class SessionManager:
"""Get full path to a project.""" """Get full path to a project."""
return self.projects_root / project return self.projects_root / project
def get_capture_dir(self, project: str) -> Path:
"""Get the _capture directory for a project."""
return self.get_project_path(project) / "_capture"
@property
def sessions_dir(self) -> Path:
"""Current project's capture directory."""
if self._current_project_path:
return self._current_project_path / "_capture"
raise RuntimeError("No project selected")
def start_session( def start_session(
self, self,
name: str, name: str,
project: str, project: str,
session_type: SessionType = SessionType.DESIGN, session_type: SessionType = SessionType.DESIGN,
) -> Session: ) -> Session:
"""Start a new recording session within a project.""" """Start a new recording session."""
# Set current project
self._current_project_path = self.get_project_path(project) self._current_project_path = self.get_project_path(project)
if not self._current_project_path.exists(): if not self._current_project_path.exists():
raise ValueError(f"Project not found: {project}") raise ValueError(f"Project not found: {project}")
@@ -192,221 +119,99 @@ class SessionManager:
project=project, project=project,
session_type=session_type, session_type=session_type,
created_at=datetime.now(), created_at=datetime.now(),
status=SessionStatus.RECORDING,
) )
# Create session directory in project's _capture folder # Create session directory
capture_dir = self._current_project_path / "_capture" session_dir = self._current_project_path / "_capture" / session_id
session_dir = capture_dir / session_id
session_dir.mkdir(parents=True, exist_ok=True) session_dir.mkdir(parents=True, exist_ok=True)
(session_dir / "clips").mkdir(exist_ok=True)
self.current_session = session self.current_session = session
self._save_session() self._save_session()
return session return session
def start_clip(self) -> tuple[Clip, Path]: def get_session_dir(self) -> Path:
""" """Get current session directory."""
Start a new clip in current session. if not self.current_session or not self._current_project_path:
Returns clip object and path for recording.
"""
if not self.current_session:
raise RuntimeError("No active session") raise RuntimeError("No active session")
return self._current_project_path / "_capture" / self.current_session.id
clip_num = len(self.current_session.clips) + 1
clip_id = f"clip-{clip_num:03d}"
filename = f"{clip_id}.mp4"
clip = Clip(
id=clip_id,
filename=filename,
start_time=datetime.now(),
status=ClipStatus.RECORDING,
)
self.current_session.clips.append(clip)
self.current_clip = clip
self._save_session()
clip_path = self.sessions_dir / self.current_session.id / "clips" / filename
return clip, clip_path
def end_clip(self, duration: float) -> Clip: def get_video_path(self) -> Path:
"""End current clip, move to preview state.""" """Get path for video file."""
if not self.current_clip: return self.get_session_dir() / self.current_session.video_file
raise RuntimeError("No active clip")
self.current_clip.end_time = datetime.now()
self.current_clip.duration_seconds = duration
self.current_clip.status = ClipStatus.PREVIEW
clip = self.current_clip
self.current_clip = None
self._save_session()
return clip
def keep_clip(self, clip_id: str, note: str = "") -> None: def update_status(self, status: SessionStatus) -> None:
"""Mark a clip as kept.""" """Update session status."""
if not self.current_session: if self.current_session:
raise RuntimeError("No active session") self.current_session.status = status
self._save_session()
for clip in self.current_session.clips:
if clip.id == clip_id:
clip.status = ClipStatus.KEPT
clip.note = note
break
self._save_session()
def delete_clip(self, clip_id: str) -> None: def set_duration(self, duration: float) -> None:
"""Mark a clip as deleted and remove file.""" """Set recording duration."""
if not self.current_session: if self.current_session:
raise RuntimeError("No active session") self.current_session.duration = duration
self._save_session()
for clip in self.current_session.clips:
if clip.id == clip_id:
clip.status = ClipStatus.DELETED
# Delete the actual file
clip_path = self.sessions_dir / self.current_session.id / "clips" / clip.filename
if clip_path.exists():
clip_path.unlink()
break
self._save_session()
def keep_last_clip(self, note: str = "") -> Optional[Clip]: def set_transcript(self, transcript_file: str) -> None:
"""Keep the most recent clip in preview state.""" """Set transcript file name."""
if not self.current_session: if self.current_session:
return None self.current_session.transcript_file = transcript_file
self._save_session()
for clip in reversed(self.current_session.clips):
if clip.status == ClipStatus.PREVIEW:
self.keep_clip(clip.id, note)
return clip
return None
def delete_last_clip(self) -> Optional[Clip]:
"""Delete the most recent clip in preview state."""
if not self.current_session:
return None
for clip in reversed(self.current_session.clips):
if clip.status == ClipStatus.PREVIEW:
self.delete_clip(clip.id)
return clip
return None
def end_session(self) -> Session: def end_session(self) -> Session:
""" """End current session."""
End session and prepare for export.
Clips still in preview are auto-kept.
"""
if not self.current_session: if not self.current_session:
raise RuntimeError("No active session") raise RuntimeError("No active session")
# Auto-keep any clips still in preview self.current_session.status = SessionStatus.READY
for clip in self.current_session.clips:
if clip.status == ClipStatus.PREVIEW:
clip.status = ClipStatus.KEPT
self.current_session.is_finalized = True
self._save_session() self._save_session()
session = self.current_session session = self.current_session
self.current_session = None self.current_session = None
self._current_project_path = None
return session return session
def cancel_session(self) -> None: def cancel_session(self) -> None:
"""Cancel session and delete all files.""" """Cancel session and delete files."""
if not self.current_session: if self.current_session:
return import shutil
session_dir = self.get_session_dir()
session_dir = self.sessions_dir / self.current_session.id if session_dir.exists():
if session_dir.exists(): shutil.rmtree(session_dir)
shutil.rmtree(session_dir)
self.current_session = None self.current_session = None
self.current_clip = None self._current_project_path = None
def get_session(self, session_id: str) -> Optional[Session]:
"""Load a session by ID."""
session_file = self.sessions_dir / session_id / "session.json"
if session_file.exists():
with open(session_file) as f:
return Session.from_dict(json.load(f))
return None
def list_sessions(self, project: Optional[str] = None) -> List[Session]: def list_sessions(self, project: Optional[str] = None) -> List[Session]:
"""List sessions, optionally filtered by project.""" """List sessions for a project or all projects."""
sessions = [] sessions = []
if project: if project:
# List sessions for specific project capture_dir = self.get_project_path(project) / "_capture"
capture_dir = self.get_capture_dir(project)
if capture_dir.exists(): if capture_dir.exists():
for session_dir in sorted(capture_dir.iterdir(), reverse=True): for session_dir in sorted(capture_dir.iterdir(), reverse=True):
if session_dir.is_dir(): if session_dir.is_dir():
session_file = session_dir / "session.json" session_file = session_dir / "session.json"
if session_file.exists(): if session_file.exists():
with open(session_file) as f: try:
sessions.append(Session.from_dict(json.load(f))) with open(session_file) as f:
sessions.append(Session.from_dict(json.load(f)))
except:
pass
else: else:
# List sessions across all projects
for proj in self.list_projects(): for proj in self.list_projects():
sessions.extend(self.list_sessions(proj)) sessions.extend(self.list_sessions(proj))
sessions.sort(key=lambda s: s.created_at, reverse=True) sessions.sort(key=lambda s: s.created_at, reverse=True)
return sessions return sessions
def get_session_dir(self, session_id: str) -> Path:
"""Get session directory path."""
return self.sessions_dir / session_id
def _save_session(self) -> None: def _save_session(self) -> None:
"""Save current session to disk.""" """Save current session to disk."""
if not self.current_session: if not self.current_session:
return return
session_file = self.sessions_dir / self.current_session.id / "session.json" session_file = self.get_session_dir() / "session.json"
with open(session_file, "w") as f: with open(session_file, "w") as f:
json.dump(self.current_session.to_dict(), f, indent=2) json.dump(self.current_session.to_dict(), f, indent=2)
# Quick test
if __name__ == "__main__":
from pathlib import Path
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
manager = SessionManager(Path(tmpdir))
# Start session
session = manager.start_session(
name="Test Session",
project="P04-GigaBIT-M1",
session_type=SessionType.DESIGN,
)
print(f"Started session: {session.id}")
# Record some clips
clip1, path1 = manager.start_clip()
print(f"Recording clip 1 to: {path1}")
manager.end_clip(duration=45.5)
manager.keep_clip(clip1.id, note="Stage 2 joint")
clip2, path2 = manager.start_clip()
print(f"Recording clip 2 to: {path2}")
manager.end_clip(duration=30.0)
manager.delete_clip(clip2.id) # Oops, bad take
clip3, path3 = manager.start_clip()
print(f"Recording clip 3 to: {path3}")
manager.end_clip(duration=60.0)
manager.keep_last_clip()
# End session
session = manager.end_session()
print(f"Session ended: {session.clip_count} clips, {session.total_duration:.1f}s total")