Files
opencode-session-manager/app.py
2026-03-07 17:15:22 -07:00

661 lines
22 KiB
Python

"""Session Mover TUI Application.
A Textual-based TUI for managing OpenCode sessions.
"""
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Header, Footer, Input, Button, Select, ListView, ListItem, DataTable, Label, Static
from textual.screen import ModalScreen
from typing import List, Optional
from database import Database, Session
from datetime import datetime
import os
class MoveCopyDialog(ModalScreen):
"""Dialog for moving or copying sessions."""
BINDINGS = [
("escape", "cancel", "Cancel"),
("enter", "confirm", "Confirm"),
]
def __init__(self, db: Database, sessions: List[Session], is_copy: bool = False):
super().__init__()
self.db = db
self.sessions = sessions
self.is_copy = is_copy
self.target_project: Optional[str] = None
self.target_workspace: Optional[str] = None
def compose(self) -> ComposeResult:
# Get currently selected project from tree to exclude from destination list
current_project_id = self.app.filter_project_id if hasattr(self.app, 'filter_project_id') else None
project_options = sorted(
[
(os.path.basename(p.worktree) if p.worktree else (p.name or p.id), p.id)
for p in self.db.projects.values()
if p.id != current_project_id
],
key=lambda x: x[0].lower()
)
if not project_options:
project_options = [("-- Same project --", "")]
with Container(id="dialog"):
yield Label(f"{'📋 Copy' if self.is_copy else '➡️ Move'} {len(self.sessions)} Session(s)", id="dialog-title")
yield Label("Destination Project:", id="preview-label")
yield Select(
[("-- Select --", "")] + project_options,
id="project-select",
prompt="Select a project"
)
yield Label("Workspace (optional):")
yield Select(
[("-- None --", "")] + [
(f"{w.name or w.branch or w.id}", w.id)
for w in self.db.workspaces.values()
],
id="workspace-select",
prompt="Select workspace"
)
yield Static("", id="sql-preview")
with Horizontal(id="dialog-buttons"):
yield Button("Cancel", variant="default", id="cancel-btn")
yield Button("Confirm", variant="primary", id="confirm-btn", disabled=True)
def on_mount(self) -> None:
self.query_one("#dialog").border_title = f"{'Copy' if self.is_copy else 'Move'} Sessions"
def on_select_changed(self, event):
if event.select.id == "project-select":
self.target_project = event.value if event.value else None
elif event.select.id == "workspace-select":
self.target_workspace = event.value if event.value else None
self.update_preview()
confirm_btn = self.query_one("#confirm-btn")
confirm_btn.disabled = not (self.target_project is not None)
def update_preview(self):
if not self.target_project:
preview = self.query_one("#sql-preview")
preview.update("Select a project to see preview")
return
target_proj = self.db.projects.get(self.target_project)
target_ws = self.db.workspaces.get(self.target_workspace) if self.target_workspace else None
lines = ["", f"Source: {len(self.sessions)} session(s)"]
if target_proj:
lines.append(f"Destination Project: {target_proj.name or target_proj.worktree}")
if target_ws:
lines.append(f"Destination Workspace: {target_ws.name or target_ws.branch or target_ws.id}")
else:
lines.append("Destination Workspace: (keep current or none)")
lines.append("")
lines.append("SQL to execute:")
for sess in self.sessions:
if self.is_copy:
lines.append(f" COPY session {sess.id[:12]}...")
else:
if target_ws:
lines.append(f" UPDATE session SET project={self.target_project[:12]}, workspace={self.target_workspace[:12] if self.target_workspace else 'NULL'} WHERE id={sess.id[:12]}...")
else:
lines.append(f" UPDATE session SET project={self.target_project[:12]} WHERE id={sess.id[:12]}...")
if self.is_copy:
lines.append("\nNote: Copy creates new sessions with all related data.")
self.query_one("#sql-preview").update("\n".join(lines))
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "cancel-btn":
self.action_cancel()
elif event.button.id == "confirm-btn":
self.action_confirm()
def action_cancel(self) -> None:
self.app.pop_screen()
def action_confirm(self) -> None:
if not self.target_project:
return
# Execute operation
if self.is_copy:
success, sql, new_ids = self.db.copy_sessions(
[s.id for s in self.sessions],
self.target_project,
self.target_workspace
)
else:
success, sql = self.db.move_sessions(
[s.id for s in self.sessions],
self.target_project,
self.target_workspace
)
if success:
self.app.notify(f"{'Copied' if self.is_copy else 'Moved'} {len(self.sessions)} session(s) successfully!", severity="information")
self.app.pop_screen()
# Get reference to main app and refresh
app = self.app
if hasattr(app, 'refresh_sessions'):
app.refresh_project_list()
app.refresh_sessions()
else:
self.app.notify(f"Operation failed: {sql}", severity="error")
class BackupDialog(ModalScreen):
"""Dialog showing backup creation."""
def __init__(self, backup_path):
super().__init__()
self.backup_path = backup_path
def compose(self) -> ComposeResult:
with Container(id="backup-dialog"):
yield Label("Backup Created!", id="backup-title")
yield Static(f"Database backed up to:\n{self.backup_path}", id="backup-path")
with Horizontal():
yield Button("OK", variant="primary", id="ok-btn")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "ok-btn":
self.app.pop_screen()
class DeleteConfirmDialog(ModalScreen):
"""Confirmation dialog before deleting sessions."""
BINDINGS = [
("escape", "cancel", "Cancel"),
]
def __init__(self, sessions: List[Session]):
super().__init__()
self.sessions = sessions
def compose(self) -> ComposeResult:
titles = "\n".join(f"{s.title or s.id[:12]}" for s in self.sessions[:10])
if len(self.sessions) > 10:
titles += f"\n ... and {len(self.sessions) - 10} more"
with Container(id="dialog"):
yield Label("Delete Sessions", id="dialog-title")
yield Static(f"Permanently delete {len(self.sessions)} session(s)?\n\n{titles}\n\nThis cannot be undone.", id="delete-warning")
with Horizontal(id="dialog-buttons"):
yield Button("Cancel", variant="default", id="cancel-btn")
yield Button("Delete", variant="error", id="confirm-btn")
def on_mount(self) -> None:
self.query_one("#dialog").border_title = "Confirm Delete"
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "cancel-btn":
self.action_cancel()
elif event.button.id == "confirm-btn":
self.dismiss(True)
def action_cancel(self) -> None:
self.dismiss(False)
class SessionMoverApp(App):
"""Main TUI Application."""
CSS = """
Screen {
layout: vertical;
}
#main-hsplit {
height: 1fr;
}
#project-panel {
width: 30%;
border-right: solid $primary;
padding: 0 1;
}
#session-panel {
width: 70%;
padding: 1;
}
#project-panel-title, #session-panel-title {
height: 1;
padding: 0 1;
background: $primary-darken-2;
color: $text;
text-style: bold;
}
#project-list {
height: 1fr;
border: solid $primary-lighten-2;
}
.proj-row {
width: 100%;
height: 1;
}
.proj-name {
width: 1fr;
}
.proj-count {
width: auto;
color: $text-muted;
}
#filter-bar {
height: auto;
padding: 0 0 1 0;
}
#filter-input {
width: 1fr;
}
#sessions-table {
height: 1fr;
}
DataTable {
border: solid $primary-lighten-2;
}
/* Footer will be at bottom automatically */
#current-selection {
height: auto;
padding: 1;
background: $accent;
color: $text;
}
#selection-count {
height: auto;
padding: 1;
background: $surface;
color: $text-muted;
}
/* Dialog styles */
MoveCopyDialog, BackupDialog, DeleteConfirmDialog {
align: center middle;
}
#delete-warning {
margin: 1 0;
color: $warning;
}
#dialog, #backup-dialog {
width: 60;
height: auto;
background: $surface;
padding: 1 2;
border: solid $primary;
}
#dialog-title, #backup-title {
text-align: center;
text-style: bold;
margin-bottom: 1;
}
#dialog-select {
margin-bottom: 1;
}
#sql-preview {
height: 8;
border: solid $primary-darken-1;
padding: 1;
margin: 1 0;
}
#preview-label {
text-style: bold;
}
#dialog-buttons {
height: auto;
align: center middle;
}
#dialog-buttons Button {
margin: 0 1;
min-width: 15;
}
"""
BINDINGS = [
("q", "quit", "Quit"),
("b", "backup", "Backup"),
("m", "move", "Move"),
("c", "copy", "Copy"),
("a", "toggle_archived", "Archived"),
("d", "delete", "Delete"),
("f", "focus_filter", "Filter"),
("r", "refresh", "Refresh"),
("space", "toggle_selection", "Select"),
("escape", "clear_selection", "Clear"),
]
def __init__(self, db_path: str = "opencode.db"):
super().__init__()
self.db = Database(db_path)
self.selected_session_ids: set = set() # Track by ID
self.show_archived = False
self.current_filter = ""
self.backup_made = False
self.filter_project_id = None # Start with no project selected
self.filter_workspace_id = None
def compose(self) -> ComposeResult:
yield Header()
with Horizontal(id="main-hsplit"):
with Vertical(id="project-panel"):
yield Label("All Projects", id="project-panel-title")
yield ListView(id="project-list")
with Vertical(id="session-panel"):
yield Label("Sessions", id="session-panel-title")
yield Static("Select a project from the left", id="current-selection")
with Horizontal(id="filter-bar"):
yield Input(placeholder="🔍 Filter... (f)", id="filter-input")
yield Button("Clear", id="filter-clear", variant="default")
yield Button("Archived", id="archived-toggle", variant="default")
yield DataTable(id="sessions-table")
yield Static("0 selected", id="selection-count")
yield Footer()
def on_mount(self) -> None:
"""Initialize app."""
self.db.connect()
self.db.load_reference_data()
self._project_ids: List[str] = []
self.refresh_project_list()
# Keep filter widgets out of tab order; use 'f' to focus
self.query_one("#filter-input", Input).can_focus = False
self.query_one("#filter-clear", Button).can_focus = False
self.query_one("#archived-toggle", Button).can_focus = False
# Setup sessions table (but don't load data yet)
table = self.query_one("#sessions-table")
table.zebra_stripes = True
table.cursor_type = "row"
table.show_cursor = False
table.add_columns(" ", "ID", "Title", "Project", "Workspace", "Msgs", "Created")
# Don't load sessions until a project is selected
status = self.query_one("#current-selection")
status.update("Select a project from the left")
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id == "filter-input":
self.current_filter = event.value
self.refresh_sessions()
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "filter-input":
inp = self.query_one("#filter-input", Input)
inp.can_focus = False
self.query_one("#sessions-table", DataTable).focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "filter-clear":
self.query_one("#filter-input", Input).value = ""
self.current_filter = ""
self.refresh_sessions()
elif event.button.id == "archived-toggle":
self.action_toggle_archived()
def action_focus_filter(self) -> None:
inp = self.query_one("#filter-input", Input)
inp.can_focus = True
inp.focus()
def on_data_table_row_selected(self, event):
"""Handle row selection in either table."""
# Use click to toggle selection in sessions table only
pass # Handled by click handler
def refresh_project_list(self) -> None:
"""Update the project list."""
lv = self.query_one("#project-list", ListView)
lv.clear()
self._project_ids = []
counts = self.db.get_session_counts_by_project()
projects = sorted(
self.db.projects.values(),
key=lambda p: (os.path.basename(p.worktree) if p.worktree else (p.name or "")).lower()
)
for proj in projects:
proj_name = os.path.basename(proj.worktree) if proj.worktree else (proj.name or "Unknown")
count = counts.get(proj.id, 0)
lv.append(ListItem(
Horizontal(
Label(proj_name, classes="proj-name"),
Label(f"({count})", classes="proj-count"),
classes="proj-row",
)
))
self._project_ids.append(proj.id)
def refresh_sessions(self) -> None:
"""Update the sessions table."""
table = self.query_one("#sessions-table")
# Save cursor position before clearing
cursor_row = table.cursor_row
cursor_column = table.cursor_column
table.clear(columns=True)
table.add_columns(" ", "ID", "Title", "Project", "Workspace", "Msgs", "Created")
sessions = self.db.get_sessions(
include_archived=self.show_archived,
project_id=self.filter_project_id,
workspace_id=self.filter_workspace_id,
search=self.current_filter
)
for sess in sessions:
created_dt = datetime.fromtimestamp(sess.time_created / 1000).strftime("%m-%d %H:%M") if sess.time_created else "N/A"
# Check if selected by ID
marker = "" if sess.id in self.selected_session_ids else ""
table.add_row(
marker,
sess.id[:8],
sess.title,
sess.project_name[:20] if sess.project_name and len(sess.project_name) > 20 else (sess.project_name or sess.project_id[:8]),
sess.workspace_name[:15] if sess.workspace_name and len(sess.workspace_name) > 15 else (sess.workspace_name or ""),
str(sess.message_count),
created_dt,
key=sess.id # Full ID as key
)
# Restore cursor position if valid
if cursor_row is not None and cursor_row < len(sessions):
table.move_cursor(row=cursor_row, column=cursor_column)
# Update selection count
count = len(self.selected_session_ids)
count_widget = self.query_one("#selection-count")
if count > 0:
count_widget.update(f"{count} session(s) selected")
else:
count_widget.update("0 selected")
def on_descendant_focus(self, event) -> None:
try:
table = self.query_one("#sessions-table", DataTable)
table.show_cursor = table.has_focus
except Exception:
pass
def on_descendant_blur(self, event) -> None:
try:
table = self.query_one("#sessions-table", DataTable)
table.show_cursor = table.has_focus
except Exception:
pass
def on_list_view_highlighted(self, event: ListView.Highlighted) -> None:
"""Handle project list selection."""
if event.list_view.id != "project-list":
return
index = event.list_view.index
if index is None or index >= len(self._project_ids):
return
proj_id = self._project_ids[index]
proj = self.db.projects.get(proj_id)
self.filter_project_id = proj_id
self.filter_workspace_id = None
status = self.query_one("#current-selection")
if proj:
status.update(f"📁 {proj.worktree}")
if self.selected_session_ids:
count = len(self.selected_session_ids)
self.selected_session_ids.clear()
self.notify(f"Selection cleared ({count} session(s))", severity="information")
self.refresh_sessions()
def action_toggle_selection(self) -> None:
"""Toggle selection of current row."""
table = self.query_one("#sessions-table")
row_key = table.cursor_row
if row_key is not None:
sessions = self.db.get_sessions(
include_archived=self.show_archived,
project_id=self.filter_project_id,
workspace_id=self.filter_workspace_id,
search=self.current_filter
)
if 0 <= row_key < len(sessions):
session_id = sessions[row_key].id
if session_id in self.selected_session_ids:
self.selected_session_ids.discard(session_id)
else:
self.selected_session_ids.add(session_id)
self.refresh_sessions()
def action_clear_selection(self) -> None:
"""Clear filter if active, otherwise clear session selections."""
inp = self.query_one("#filter-input", Input)
if inp.has_focus or self.current_filter:
inp.value = ""
self.current_filter = ""
inp.can_focus = False
self.query_one("#sessions-table", DataTable).focus()
self.refresh_sessions()
else:
self.selected_session_ids.clear()
self.refresh_sessions()
def action_move(self) -> None:
"""Open move dialog."""
if not self.selected_session_ids:
self.notify("No sessions selected", severity="warning")
return
# Get session objects for the dialog
sessions = [self.db.get_session(sid) for sid in self.selected_session_ids]
sessions = [s for s in sessions if s]
if not self.backup_made:
self.db.create_backup()
self.backup_made = True
self.notify("Backup created", severity="information")
self.push_screen(MoveCopyDialog(self.db, sessions, is_copy=False))
def action_copy(self) -> None:
"""Open copy dialog."""
if not self.selected_session_ids:
self.notify("No sessions selected", severity="warning")
return
# Get session objects for the dialog
sessions = [self.db.get_session(sid) for sid in self.selected_session_ids]
sessions = [s for s in sessions if s]
if not self.backup_made:
self.db.create_backup()
self.backup_made = True
self.notify("Backup created", severity="information")
self.push_screen(MoveCopyDialog(self.db, sessions, is_copy=True))
def action_delete(self) -> None:
"""Open delete confirmation dialog."""
if not self.selected_session_ids:
self.notify("No sessions selected", severity="warning")
return
sessions = [self.db.get_session(sid) for sid in self.selected_session_ids]
sessions = [s for s in sessions if s]
if not self.backup_made:
self.db.create_backup()
self.backup_made = True
self.notify("Backup created", severity="information")
def handle_result(confirmed: bool) -> None:
if not confirmed:
return
success, err = self.db.delete_sessions([s.id for s in sessions])
if success:
self.selected_session_ids.clear()
self.notify(f"Deleted {len(sessions)} session(s)", severity="information")
self.refresh_project_list()
self.refresh_sessions()
else:
self.notify(f"Delete failed: {err}", severity="error")
self.push_screen(DeleteConfirmDialog(sessions), handle_result)
def action_toggle_archived(self) -> None:
"""Toggle archived sessions visibility."""
self.show_archived = not self.show_archived
btn = self.query_one("#archived-toggle", Button)
btn.variant = "success" if self.show_archived else "default"
title = self.query_one("#session-panel-title", Label)
title.update("Sessions (+ Archived)" if self.show_archived else "Sessions")
self.refresh_sessions()
def action_refresh(self) -> None:
"""Reload all data from database."""
self.db.load_reference_data()
self.refresh_project_list()
self.refresh_sessions()
self.notify("Data refreshed", severity="information")
def on_unmount(self) -> None:
"""Cleanup on exit."""
self.db.close()
if __name__ == "__main__":
app = SessionMoverApp()
app.run()