Files
opencode-session-manager/app.py
2026-03-07 17:15:22 -07:00

502 lines
16 KiB
Python

"""Session Mover TUI Application.
A Textual-based TUI for managing OpenCode sessions.
"""
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Header, Footer, Input, Button, Select, Tree, DataTable, Label, Static
from textual.screen import ModalScreen
from typing import List, Optional
from database import Database, Session
from datetime import datetime
import os
class MoveCopyDialog(ModalScreen):
"""Dialog for moving or copying sessions."""
BINDINGS = [
("escape", "cancel", "Cancel"),
("enter", "confirm", "Confirm"),
]
def __init__(self, db: Database, sessions: List[Session], is_copy: bool = False):
super().__init__()
self.db = db
self.sessions = sessions
self.is_copy = is_copy
self.target_project: Optional[str] = None
self.target_workspace: Optional[str] = None
def compose(self) -> ComposeResult:
# Get currently selected project from tree to exclude from destination list
current_project_id = self.app.filter_project_id if hasattr(self.app, 'filter_project_id') else None
project_options = [
(os.path.basename(p.worktree) if p.worktree else (p.name or p.id), p.id)
for p in self.db.projects.values()
if p.id != current_project_id # Exclude current project
]
if not project_options:
project_options = [("-- Same project --", "")]
with Container(id="dialog"):
yield Label(f"{'📋 Copy' if self.is_copy else '➡️ Move'} {len(self.sessions)} Session(s)", id="dialog-title")
yield Label("Destination Project:", id="preview-label")
yield Select(
[("-- Select --", "")] + project_options,
id="project-select",
prompt="Select a project"
)
yield Label("Workspace (optional):")
yield Select(
[("-- None --", "")] + [
(f"{w.name or w.branch or w.id}", w.id)
for w in self.db.workspaces.values()
],
id="workspace-select",
prompt="Select workspace"
)
yield Static("", id="sql-preview")
with Horizontal(id="dialog-buttons"):
yield Button("Cancel", variant="default", id="cancel-btn")
yield Button("Confirm", variant="primary", id="confirm-btn", disabled=True)
def on_mount(self) -> None:
self.query_one("#dialog").border_title = f"{'Copy' if self.is_copy else 'Move'} Sessions"
def on_select_changed(self, event):
if event.select.id == "project-select":
self.target_project = event.value if event.value else None
elif event.select.id == "workspace-select":
self.target_workspace = event.value if event.value else None
self.update_preview()
confirm_btn = self.query_one("#confirm-btn")
confirm_btn.disabled = not (self.target_project is not None)
def update_preview(self):
if not self.target_project:
preview = self.query_one("#sql-preview")
preview.update("Select a project to see preview")
return
target_proj = self.db.projects.get(self.target_project)
target_ws = self.db.workspaces.get(self.target_workspace) if self.target_workspace else None
lines = ["", f"Source: {len(self.sessions)} session(s)"]
if target_proj:
lines.append(f"Destination Project: {target_proj.name or target_proj.worktree}")
if target_ws:
lines.append(f"Destination Workspace: {target_ws.name or target_ws.branch or target_ws.id}")
else:
lines.append("Destination Workspace: (keep current or none)")
lines.append("")
lines.append("SQL to execute:")
for sess in self.sessions:
if self.is_copy:
lines.append(f" COPY session {sess.id[:12]}...")
else:
if target_ws:
lines.append(f" UPDATE session SET project={self.target_project[:12]}, workspace={self.target_workspace[:12] if self.target_workspace else 'NULL'} WHERE id={sess.id[:12]}...")
else:
lines.append(f" UPDATE session SET project={self.target_project[:12]} WHERE id={sess.id[:12]}...")
if self.is_copy:
lines.append("\nNote: Copy creates new sessions with all related data.")
self.query_one("#sql-preview").update("\n".join(lines))
def action_cancel(self) -> None:
self.app.pop_screen()
def action_confirm(self) -> None:
if not self.target_project:
return
# Execute operation
if self.is_copy:
success, sql, new_ids = self.db.copy_sessions(
[s.id for s in self.sessions],
self.target_project,
self.target_workspace
)
else:
success, sql = self.db.move_sessions(
[s.id for s in self.sessions],
self.target_project,
self.target_workspace
)
if success:
self.app.notify(f"{'Copied' if self.is_copy else 'Moved'} {len(self.sessions)} session(s) successfully!", severity="information")
self.app.pop_screen()
# Get reference to main app and refresh
app = self.app
if hasattr(app, 'refresh_sessions'):
app.refresh_sessions()
else:
self.app.notify(f"Operation failed: {sql}", severity="error")
class BackupDialog(ModalScreen):
"""Dialog showing backup creation."""
def __init__(self, backup_path):
super().__init__()
self.backup_path = backup_path
def compose(self) -> ComposeResult:
with Container(id="backup-dialog"):
yield Label("Backup Created!", id="backup-title")
yield Static(f"Database backed up to:\n{self.backup_path}", id="backup-path")
with Horizontal():
yield Button("OK", variant="primary", id="ok-btn")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "ok-btn":
self.app.pop_screen()
class SessionMoverApp(App):
"""Main TUI Application."""
CSS = """
Screen {
layout: vertical;
}
#main-hsplit {
height: 1fr;
}
#project-panel {
width: 30%;
border-right: solid $primary;
padding: 0 1;
}
#session-panel {
width: 70%;
padding: 1;
}
#project-tree {
height: 1fr;
border: solid $primary-lighten-2;
}
Tree {
padding: 0 1;
}
#filter-bar {
height: auto;
padding: 0 0 1 0;
}
#filter-input {
width: 1fr;
}
#sessions-table {
height: 1fr;
}
DataTable {
border: solid $primary-lighten-2;
}
/* Footer will be at bottom automatically */
#current-selection {
height: auto;
padding: 1;
background: $accent;
color: $text;
}
#selection-count {
height: auto;
padding: 1;
background: $surface;
color: $text-muted;
}
/* Dialog styles */
#dialog, #backup-dialog {
width: 50;
height: auto;
background: $surface;
padding: 1;
border: solid $primary;
}
#dialog-title, #backup-title {
text-align: center;
text-style: bold;
margin-bottom: 1;
}
#dialog-select {
margin-bottom: 1;
}
#sql-preview {
height: 8;
border: solid $primary-darken-1;
padding: 1;
margin: 1 0;
}
#preview-label {
text-style: bold;
}
#dialog-buttons {
height: auto;
align: center middle;
}
#dialog-buttons Button {
margin: 0 1;
min-width: 15;
}
"""
BINDINGS = [
("q", "quit", "Quit"),
("b", "backup", "Backup"),
("m", "move", "Move"),
("c", "copy", "Copy"),
("a", "toggle_archived", "Archived"),
("r", "refresh", "Refresh"),
("space", "toggle_selection", "Select"),
("escape", "clear_selection", "Clear"),
]
def __init__(self, db_path: str = "opencode.db"):
super().__init__()
self.db = Database(db_path)
self.selected_session_ids: set = set() # Track by ID
self.show_archived = False
self.current_filter = ""
self.backup_made = False
self.filter_project_id = None # Start with no project selected
self.filter_workspace_id = None
def compose(self) -> ComposeResult:
yield Header()
with Horizontal(id="main-hsplit"):
with Vertical(id="project-panel"):
yield Tree("Projects", id="project-tree")
with Vertical(id="session-panel"):
yield Static("Select a project from the left", id="current-selection")
with Horizontal(id="filter-bar"):
yield Input(placeholder="🔍 Filter...", id="filter-input")
yield Button("Clear", id="filter-clear", variant="default")
yield DataTable(id="sessions-table")
yield Static("0 selected", id="selection-count")
yield Footer()
def on_mount(self) -> None:
"""Initialize app."""
self.db.connect()
self.db.load_reference_data()
# Setup project tree
tree = self.query_one("#project-tree")
tree.root.expand()
self.refresh_project_tree()
# Setup sessions table (but don't load data yet)
table = self.query_one("#sessions-table")
table.zebra_stripes = True
table.cursor_type = "row"
table.add_columns(" ", "ID", "Title", "Project", "Workspace", "Msgs", "Created")
# Don't load sessions until a project is selected
status = self.query_one("#current-selection")
status.update("Select a project from the left")
def on_data_table_row_selected(self, event):
"""Handle row selection in either table."""
# Use click to toggle selection in sessions table only
pass # Handled by click handler
def refresh_project_tree(self) -> None:
"""Update the project tree."""
tree = self.query_one("#project-tree")
tree.clear()
root = tree.root
root.data = None
root.expand()
# Flat list of projects
for proj in self.db.projects.values():
proj_name = os.path.basename(proj.worktree) if proj.worktree else (proj.name or "Unknown")
root.add(proj_name, {"type": "project", "id": proj.id, "label": proj_name, "full_path": proj.worktree})
def refresh_sessions(self) -> None:
"""Update the sessions table."""
table = self.query_one("#sessions-table")
# Save cursor position before clearing
cursor_row = table.cursor_row
cursor_column = table.cursor_column
table.clear(columns=True)
table.add_columns(" ", "ID", "Title", "Project", "Workspace", "Msgs", "Created")
sessions = self.db.get_sessions(
include_archived=self.show_archived,
project_id=self.filter_project_id,
workspace_id=self.filter_workspace_id,
search=self.current_filter
)
for sess in sessions:
created_dt = datetime.fromtimestamp(sess.time_created / 1000).strftime("%m-%d %H:%M") if sess.time_created else "N/A"
# Check if selected by ID
marker = "" if sess.id in self.selected_session_ids else ""
table.add_row(
marker,
sess.id[:8],
sess.title,
sess.project_name[:20] if sess.project_name and len(sess.project_name) > 20 else (sess.project_name or sess.project_id[:8]),
sess.workspace_name[:15] if sess.workspace_name and len(sess.workspace_name) > 15 else (sess.workspace_name or ""),
str(sess.message_count),
created_dt,
key=sess.id # Full ID as key
)
# Restore cursor position if valid
if cursor_row is not None and cursor_row < len(sessions):
table.move_cursor(row=cursor_row, column=cursor_column)
# Update selection count
count = len(self.selected_session_ids)
count_widget = self.query_one("#selection-count")
if count > 0:
count_widget.update(f"{count} session(s) selected")
else:
count_widget.update("0 selected")
def on_tree_node_highlighted(self, event):
"""Handle project tree selection."""
tree = self.query_one("#project-tree")
node = tree.cursor_node
if node is None:
return
node_data = getattr(node, 'data', None)
if node_data is None:
return
node_type = node_data.get("type")
if node_type != "project":
return
status = self.query_one("#current-selection")
proj_id = node_data.get("id")
proj = self.db.projects.get(proj_id)
self.filter_project_id = proj_id
self.filter_workspace_id = None
if proj:
status.update(f"📁 {proj.worktree}")
# Clear selection when changing projects
if self.selected_session_ids:
count = len(self.selected_session_ids)
self.selected_session_ids.clear()
self.notify(f"Selection cleared ({count} session(s)))", severity="information")
self.refresh_sessions()
def action_toggle_selection(self) -> None:
"""Toggle selection of current row."""
table = self.query_one("#sessions-table")
row_key = table.cursor_row
if row_key is not None:
sessions = self.db.get_sessions(
include_archived=self.show_archived,
project_id=self.filter_project_id,
workspace_id=self.filter_workspace_id,
search=self.current_filter
)
if 0 <= row_key < len(sessions):
session_id = sessions[row_key].id
if session_id in self.selected_session_ids:
self.selected_session_ids.discard(session_id)
else:
self.selected_session_ids.add(session_id)
self.refresh_sessions()
def action_clear_selection(self) -> None:
"""Clear all selections."""
self.selected_session_ids.clear()
self.refresh_sessions()
def action_move(self) -> None:
"""Open move dialog."""
if not self.selected_session_ids:
self.notify("No sessions selected", severity="warning")
return
# Get session objects for the dialog
sessions = [self.db.get_session(sid) for sid in self.selected_session_ids]
sessions = [s for s in sessions if s]
if not self.backup_made:
self.db.create_backup()
self.backup_made = True
self.notify("Backup created", severity="information")
self.push_screen(MoveCopyDialog(self.db, sessions, is_copy=False))
def action_copy(self) -> None:
"""Open copy dialog."""
if not self.selected_session_ids:
self.notify("No sessions selected", severity="warning")
return
# Get session objects for the dialog
sessions = [self.db.get_session(sid) for sid in self.selected_session_ids]
sessions = [s for s in sessions if s]
if not self.backup_made:
self.db.create_backup()
self.backup_made = True
self.notify("Backup created", severity="information")
self.push_screen(MoveCopyDialog(self.db, sessions, is_copy=True))
def action_toggle_archived(self) -> None:
"""Toggle archived sessions visibility."""
self.show_archived = not self.show_archived
self.refresh_sessions()
def action_refresh(self) -> None:
"""Reload all data from database."""
self.db.load_reference_data()
self.refresh_project_tree()
self.refresh_sessions()
self.notify("Data refreshed", severity="information")
def on_unmount(self) -> None:
"""Cleanup on exit."""
self.db.close()
if __name__ == "__main__":
app = SessionMoverApp()
app.run()