Files
CAD-Documenter/src/cad_documenter/session.py

362 lines
11 KiB
Python
Raw Normal View History

"""
Session Manager for KB Capture
Manages recording sessions with multiple clips.
Clips can be kept or deleted before finalizing.
"""
import json
import shutil
from pathlib import Path
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional, List
from enum import Enum
import uuid
class ClipStatus(Enum):
RECORDING = "recording"
PREVIEW = "preview" # Just recorded, awaiting decision
KEPT = "kept"
DELETED = "deleted"
class SessionType(Enum):
DESIGN = "design" # CAD/Design KB
ANALYSIS = "analysis" # FEA/Analysis KB
@dataclass
class Clip:
"""A single recording clip within a session."""
id: str
filename: str
start_time: datetime
end_time: Optional[datetime] = None
duration_seconds: float = 0.0
status: ClipStatus = ClipStatus.RECORDING
note: str = "" # Optional quick note
def to_dict(self) -> dict:
return {
"id": self.id,
"filename": self.filename,
"start_time": self.start_time.isoformat(),
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_seconds": self.duration_seconds,
"status": self.status.value,
"note": self.note,
}
@classmethod
def from_dict(cls, data: dict) -> "Clip":
return cls(
id=data["id"],
filename=data["filename"],
start_time=datetime.fromisoformat(data["start_time"]),
end_time=datetime.fromisoformat(data["end_time"]) if data.get("end_time") else None,
duration_seconds=data.get("duration_seconds", 0.0),
status=ClipStatus(data.get("status", "kept")),
note=data.get("note", ""),
)
@dataclass
class Session:
"""A recording session containing multiple clips."""
id: str
name: str
project: str
session_type: SessionType
created_at: datetime
clips: List[Clip] = field(default_factory=list)
is_finalized: bool = False
@property
def total_duration(self) -> float:
"""Total duration of kept clips."""
return sum(c.duration_seconds for c in self.clips if c.status == ClipStatus.KEPT)
@property
def kept_clips(self) -> List[Clip]:
"""Clips marked as kept."""
return [c for c in self.clips if c.status == ClipStatus.KEPT]
@property
def clip_count(self) -> int:
"""Number of kept clips."""
return len(self.kept_clips)
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"project": self.project,
"session_type": self.session_type.value,
"created_at": self.created_at.isoformat(),
"clips": [c.to_dict() for c in self.clips],
"is_finalized": self.is_finalized,
}
@classmethod
def from_dict(cls, data: dict) -> "Session":
return cls(
id=data["id"],
name=data["name"],
project=data["project"],
session_type=SessionType(data.get("session_type", "design")),
created_at=datetime.fromisoformat(data["created_at"]),
clips=[Clip.from_dict(c) for c in data.get("clips", [])],
is_finalized=data.get("is_finalized", False),
)
class SessionManager:
"""
Manages recording sessions and clips.
Directory structure:
sessions/
<session-id>/
session.json # Session metadata
clips/
clip-001.mp4
clip-002.mp4
...
export/ # Final export for Clawdbot
merged.mp4
transcript.json
metadata.json
"""
def __init__(self, base_path: Path):
self.base_path = Path(base_path)
self.sessions_dir = self.base_path / "sessions"
self.sessions_dir.mkdir(parents=True, exist_ok=True)
self.current_session: Optional[Session] = None
self.current_clip: Optional[Clip] = None
def start_session(
self,
name: str,
project: str,
session_type: SessionType = SessionType.DESIGN,
) -> Session:
"""Start a new recording session."""
session_id = datetime.now().strftime("%Y%m%d-%H%M%S")
session = Session(
id=session_id,
name=name,
project=project,
session_type=session_type,
created_at=datetime.now(),
)
# Create session directory
session_dir = self.sessions_dir / session_id
session_dir.mkdir(parents=True, exist_ok=True)
(session_dir / "clips").mkdir(exist_ok=True)
self.current_session = session
self._save_session()
return session
def start_clip(self) -> tuple[Clip, Path]:
"""
Start a new clip in current session.
Returns clip object and path for recording.
"""
if not self.current_session:
raise RuntimeError("No active session")
clip_num = len(self.current_session.clips) + 1
clip_id = f"clip-{clip_num:03d}"
filename = f"{clip_id}.mp4"
clip = Clip(
id=clip_id,
filename=filename,
start_time=datetime.now(),
status=ClipStatus.RECORDING,
)
self.current_session.clips.append(clip)
self.current_clip = clip
self._save_session()
clip_path = self.sessions_dir / self.current_session.id / "clips" / filename
return clip, clip_path
def end_clip(self, duration: float) -> Clip:
"""End current clip, move to preview state."""
if not self.current_clip:
raise RuntimeError("No active clip")
self.current_clip.end_time = datetime.now()
self.current_clip.duration_seconds = duration
self.current_clip.status = ClipStatus.PREVIEW
clip = self.current_clip
self.current_clip = None
self._save_session()
return clip
def keep_clip(self, clip_id: str, note: str = "") -> None:
"""Mark a clip as kept."""
if not self.current_session:
raise RuntimeError("No active session")
for clip in self.current_session.clips:
if clip.id == clip_id:
clip.status = ClipStatus.KEPT
clip.note = note
break
self._save_session()
def delete_clip(self, clip_id: str) -> None:
"""Mark a clip as deleted and remove file."""
if not self.current_session:
raise RuntimeError("No active session")
for clip in self.current_session.clips:
if clip.id == clip_id:
clip.status = ClipStatus.DELETED
# Delete the actual file
clip_path = self.sessions_dir / self.current_session.id / "clips" / clip.filename
if clip_path.exists():
clip_path.unlink()
break
self._save_session()
def keep_last_clip(self, note: str = "") -> Optional[Clip]:
"""Keep the most recent clip in preview state."""
if not self.current_session:
return None
for clip in reversed(self.current_session.clips):
if clip.status == ClipStatus.PREVIEW:
self.keep_clip(clip.id, note)
return clip
return None
def delete_last_clip(self) -> Optional[Clip]:
"""Delete the most recent clip in preview state."""
if not self.current_session:
return None
for clip in reversed(self.current_session.clips):
if clip.status == ClipStatus.PREVIEW:
self.delete_clip(clip.id)
return clip
return None
def end_session(self) -> Session:
"""
End session and prepare for export.
Clips still in preview are auto-kept.
"""
if not self.current_session:
raise RuntimeError("No active session")
# Auto-keep any clips still in preview
for clip in self.current_session.clips:
if clip.status == ClipStatus.PREVIEW:
clip.status = ClipStatus.KEPT
self.current_session.is_finalized = True
self._save_session()
session = self.current_session
self.current_session = None
return session
def cancel_session(self) -> None:
"""Cancel session and delete all files."""
if not self.current_session:
return
session_dir = self.sessions_dir / self.current_session.id
if session_dir.exists():
shutil.rmtree(session_dir)
self.current_session = None
self.current_clip = None
def get_session(self, session_id: str) -> Optional[Session]:
"""Load a session by ID."""
session_file = self.sessions_dir / session_id / "session.json"
if session_file.exists():
with open(session_file) as f:
return Session.from_dict(json.load(f))
return None
def list_sessions(self) -> List[Session]:
"""List all sessions."""
sessions = []
for session_dir in sorted(self.sessions_dir.iterdir(), reverse=True):
if session_dir.is_dir():
session_file = session_dir / "session.json"
if session_file.exists():
with open(session_file) as f:
sessions.append(Session.from_dict(json.load(f)))
return sessions
def get_session_dir(self, session_id: str) -> Path:
"""Get session directory path."""
return self.sessions_dir / session_id
def _save_session(self) -> None:
"""Save current session to disk."""
if not self.current_session:
return
session_file = self.sessions_dir / self.current_session.id / "session.json"
with open(session_file, "w") as f:
json.dump(self.current_session.to_dict(), f, indent=2)
# Quick test
if __name__ == "__main__":
from pathlib import Path
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
manager = SessionManager(Path(tmpdir))
# Start session
session = manager.start_session(
name="Test Session",
project="P04-GigaBIT-M1",
session_type=SessionType.DESIGN,
)
print(f"Started session: {session.id}")
# Record some clips
clip1, path1 = manager.start_clip()
print(f"Recording clip 1 to: {path1}")
manager.end_clip(duration=45.5)
manager.keep_clip(clip1.id, note="Stage 2 joint")
clip2, path2 = manager.start_clip()
print(f"Recording clip 2 to: {path2}")
manager.end_clip(duration=30.0)
manager.delete_clip(clip2.id) # Oops, bad take
clip3, path3 = manager.start_clip()
print(f"Recording clip 3 to: {path3}")
manager.end_clip(duration=60.0)
manager.keep_last_clip()
# End session
session = manager.end_session()
print(f"Session ended: {session.clip_count} clips, {session.total_duration:.1f}s total")