"""Session Mover TUI Application. A Textual-based TUI for managing OpenCode sessions. """ from textual.app import App, ComposeResult from textual.containers import Container, Horizontal, Vertical from textual.widgets import Header, Footer, Input, Button, Select, ListView, ListItem, DataTable, Label, Static from textual.screen import ModalScreen from typing import List, Optional from database import Database, Session from datetime import datetime import os class MoveCopyDialog(ModalScreen): """Dialog for moving or copying sessions.""" BINDINGS = [ ("escape", "cancel", "Cancel"), ("enter", "confirm", "Confirm"), ] def __init__(self, db: Database, sessions: List[Session], is_copy: bool = False): super().__init__() self.db = db self.sessions = sessions self.is_copy = is_copy self.target_project: Optional[str] = None self.target_workspace: Optional[str] = None def compose(self) -> ComposeResult: # Get currently selected project from tree to exclude from destination list current_project_id = self.app.filter_project_id if hasattr(self.app, 'filter_project_id') else None project_options = [ (os.path.basename(p.worktree) if p.worktree else (p.name or p.id), p.id) for p in self.db.projects.values() if p.id != current_project_id # Exclude current project ] if not project_options: project_options = [("-- Same project --", "")] with Container(id="dialog"): yield Label(f"{'📋 Copy' if self.is_copy else '➡️ Move'} {len(self.sessions)} Session(s)", id="dialog-title") yield Label("Destination Project:", id="preview-label") yield Select( [("-- Select --", "")] + project_options, id="project-select", prompt="Select a project" ) yield Label("Workspace (optional):") yield Select( [("-- None --", "")] + [ (f"{w.name or w.branch or w.id}", w.id) for w in self.db.workspaces.values() ], id="workspace-select", prompt="Select workspace" ) yield Static("", id="sql-preview") with Horizontal(id="dialog-buttons"): yield Button("Cancel", variant="default", id="cancel-btn") yield Button("Confirm", variant="primary", id="confirm-btn", disabled=True) def on_mount(self) -> None: self.query_one("#dialog").border_title = f"{'Copy' if self.is_copy else 'Move'} Sessions" def on_select_changed(self, event): if event.select.id == "project-select": self.target_project = event.value if event.value else None elif event.select.id == "workspace-select": self.target_workspace = event.value if event.value else None self.update_preview() confirm_btn = self.query_one("#confirm-btn") confirm_btn.disabled = not (self.target_project is not None) def update_preview(self): if not self.target_project: preview = self.query_one("#sql-preview") preview.update("Select a project to see preview") return target_proj = self.db.projects.get(self.target_project) target_ws = self.db.workspaces.get(self.target_workspace) if self.target_workspace else None lines = ["", f"Source: {len(self.sessions)} session(s)"] if target_proj: lines.append(f"Destination Project: {target_proj.name or target_proj.worktree}") if target_ws: lines.append(f"Destination Workspace: {target_ws.name or target_ws.branch or target_ws.id}") else: lines.append("Destination Workspace: (keep current or none)") lines.append("") lines.append("SQL to execute:") for sess in self.sessions: if self.is_copy: lines.append(f" COPY session {sess.id[:12]}...") else: if target_ws: lines.append(f" UPDATE session SET project={self.target_project[:12]}, workspace={self.target_workspace[:12] if self.target_workspace else 'NULL'} WHERE id={sess.id[:12]}...") else: lines.append(f" UPDATE session SET project={self.target_project[:12]} WHERE id={sess.id[:12]}...") if self.is_copy: lines.append("\nNote: Copy creates new sessions with all related data.") self.query_one("#sql-preview").update("\n".join(lines)) def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "cancel-btn": self.action_cancel() elif event.button.id == "confirm-btn": self.action_confirm() def action_cancel(self) -> None: self.app.pop_screen() def action_confirm(self) -> None: if not self.target_project: return # Execute operation if self.is_copy: success, sql, new_ids = self.db.copy_sessions( [s.id for s in self.sessions], self.target_project, self.target_workspace ) else: success, sql = self.db.move_sessions( [s.id for s in self.sessions], self.target_project, self.target_workspace ) if success: self.app.notify(f"{'Copied' if self.is_copy else 'Moved'} {len(self.sessions)} session(s) successfully!", severity="information") self.app.pop_screen() # Get reference to main app and refresh app = self.app if hasattr(app, 'refresh_sessions'): app.refresh_sessions() else: self.app.notify(f"Operation failed: {sql}", severity="error") class BackupDialog(ModalScreen): """Dialog showing backup creation.""" def __init__(self, backup_path): super().__init__() self.backup_path = backup_path def compose(self) -> ComposeResult: with Container(id="backup-dialog"): yield Label("Backup Created!", id="backup-title") yield Static(f"Database backed up to:\n{self.backup_path}", id="backup-path") with Horizontal(): yield Button("OK", variant="primary", id="ok-btn") def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "ok-btn": self.app.pop_screen() class DeleteConfirmDialog(ModalScreen): """Confirmation dialog before deleting sessions.""" BINDINGS = [ ("escape", "cancel", "Cancel"), ] def __init__(self, sessions: List[Session]): super().__init__() self.sessions = sessions def compose(self) -> ComposeResult: titles = "\n".join(f" • {s.title or s.id[:12]}" for s in self.sessions[:10]) if len(self.sessions) > 10: titles += f"\n ... and {len(self.sessions) - 10} more" with Container(id="dialog"): yield Label("Delete Sessions", id="dialog-title") yield Static(f"Permanently delete {len(self.sessions)} session(s)?\n\n{titles}\n\nThis cannot be undone.", id="delete-warning") with Horizontal(id="dialog-buttons"): yield Button("Cancel", variant="default", id="cancel-btn") yield Button("Delete", variant="error", id="confirm-btn") def on_mount(self) -> None: self.query_one("#dialog").border_title = "Confirm Delete" def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "cancel-btn": self.action_cancel() elif event.button.id == "confirm-btn": self.dismiss(True) def action_cancel(self) -> None: self.dismiss(False) class SessionMoverApp(App): """Main TUI Application.""" CSS = """ Screen { layout: vertical; } #main-hsplit { height: 1fr; } #project-panel { width: 30%; border-right: solid $primary; padding: 0 1; } #session-panel { width: 70%; padding: 1; } #project-panel-title, #session-panel-title { height: 1; padding: 0 1; background: $primary-darken-2; color: $text; text-style: bold; } #project-list { height: 1fr; border: solid $primary-lighten-2; } #filter-bar { height: auto; padding: 0 0 1 0; } #filter-input { width: 1fr; } #sessions-table { height: 1fr; } DataTable { border: solid $primary-lighten-2; } /* Footer will be at bottom automatically */ #current-selection { height: auto; padding: 1; background: $accent; color: $text; } #selection-count { height: auto; padding: 1; background: $surface; color: $text-muted; } /* Dialog styles */ MoveCopyDialog, BackupDialog, DeleteConfirmDialog { align: center middle; } #delete-warning { margin: 1 0; color: $warning; } #dialog, #backup-dialog { width: 60; height: auto; background: $surface; padding: 1 2; border: solid $primary; } #dialog-title, #backup-title { text-align: center; text-style: bold; margin-bottom: 1; } #dialog-select { margin-bottom: 1; } #sql-preview { height: 8; border: solid $primary-darken-1; padding: 1; margin: 1 0; } #preview-label { text-style: bold; } #dialog-buttons { height: auto; align: center middle; } #dialog-buttons Button { margin: 0 1; min-width: 15; } """ BINDINGS = [ ("q", "quit", "Quit"), ("b", "backup", "Backup"), ("m", "move", "Move"), ("c", "copy", "Copy"), ("a", "toggle_archived", "Archived"), ("d", "delete", "Delete"), ("f", "focus_filter", "Filter"), ("r", "refresh", "Refresh"), ("space", "toggle_selection", "Select"), ("escape", "clear_selection", "Clear"), ] def __init__(self, db_path: str = "opencode.db"): super().__init__() self.db = Database(db_path) self.selected_session_ids: set = set() # Track by ID self.show_archived = False self.current_filter = "" self.backup_made = False self.filter_project_id = None # Start with no project selected self.filter_workspace_id = None def compose(self) -> ComposeResult: yield Header() with Horizontal(id="main-hsplit"): with Vertical(id="project-panel"): yield Label("All Projects", id="project-panel-title") yield ListView(id="project-list") with Vertical(id="session-panel"): yield Label("Sessions", id="session-panel-title") yield Static("Select a project from the left", id="current-selection") with Horizontal(id="filter-bar"): yield Input(placeholder="🔍 Filter... (f)", id="filter-input") yield Button("Clear", id="filter-clear", variant="default") yield Button("Archived", id="archived-toggle", variant="default") yield DataTable(id="sessions-table") yield Static("0 selected", id="selection-count") yield Footer() def on_mount(self) -> None: """Initialize app.""" self.db.connect() self.db.load_reference_data() self._project_ids: List[str] = [] self.refresh_project_list() # Keep filter widgets out of tab order; use 'f' to focus self.query_one("#filter-input", Input).can_focus = False self.query_one("#filter-clear", Button).can_focus = False self.query_one("#archived-toggle", Button).can_focus = False # Setup sessions table (but don't load data yet) table = self.query_one("#sessions-table") table.zebra_stripes = True table.cursor_type = "row" table.add_columns(" ", "ID", "Title", "Project", "Workspace", "Msgs", "Created") # Don't load sessions until a project is selected status = self.query_one("#current-selection") status.update("Select a project from the left") def on_input_changed(self, event: Input.Changed) -> None: if event.input.id == "filter-input": self.current_filter = event.value self.refresh_sessions() def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id == "filter-input": inp = self.query_one("#filter-input", Input) inp.can_focus = False self.query_one("#sessions-table", DataTable).focus() def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "filter-clear": self.query_one("#filter-input", Input).value = "" self.current_filter = "" self.refresh_sessions() elif event.button.id == "archived-toggle": self.action_toggle_archived() def action_focus_filter(self) -> None: inp = self.query_one("#filter-input", Input) inp.can_focus = True inp.focus() def on_data_table_row_selected(self, event): """Handle row selection in either table.""" # Use click to toggle selection in sessions table only pass # Handled by click handler def refresh_project_list(self) -> None: """Update the project list.""" lv = self.query_one("#project-list", ListView) lv.clear() self._project_ids = [] projects = sorted( self.db.projects.values(), key=lambda p: (os.path.basename(p.worktree) if p.worktree else (p.name or "")).lower() ) for proj in projects: proj_name = os.path.basename(proj.worktree) if proj.worktree else (proj.name or "Unknown") lv.append(ListItem(Label(proj_name))) self._project_ids.append(proj.id) def refresh_sessions(self) -> None: """Update the sessions table.""" table = self.query_one("#sessions-table") # Save cursor position before clearing cursor_row = table.cursor_row cursor_column = table.cursor_column table.clear(columns=True) table.add_columns(" ", "ID", "Title", "Project", "Workspace", "Msgs", "Created") sessions = self.db.get_sessions( include_archived=self.show_archived, project_id=self.filter_project_id, workspace_id=self.filter_workspace_id, search=self.current_filter ) for sess in sessions: created_dt = datetime.fromtimestamp(sess.time_created / 1000).strftime("%m-%d %H:%M") if sess.time_created else "N/A" # Check if selected by ID marker = "✅" if sess.id in self.selected_session_ids else "" table.add_row( marker, sess.id[:8], sess.title, sess.project_name[:20] if sess.project_name and len(sess.project_name) > 20 else (sess.project_name or sess.project_id[:8]), sess.workspace_name[:15] if sess.workspace_name and len(sess.workspace_name) > 15 else (sess.workspace_name or ""), str(sess.message_count), created_dt, key=sess.id # Full ID as key ) # Restore cursor position if valid if cursor_row is not None and cursor_row < len(sessions): table.move_cursor(row=cursor_row, column=cursor_column) # Update selection count count = len(self.selected_session_ids) count_widget = self.query_one("#selection-count") if count > 0: count_widget.update(f"✓ {count} session(s) selected") else: count_widget.update("0 selected") def on_list_view_highlighted(self, event: ListView.Highlighted) -> None: """Handle project list selection.""" if event.list_view.id != "project-list": return index = event.list_view.index if index is None or index >= len(self._project_ids): return proj_id = self._project_ids[index] proj = self.db.projects.get(proj_id) self.filter_project_id = proj_id self.filter_workspace_id = None status = self.query_one("#current-selection") if proj: status.update(f"📁 {proj.worktree}") if self.selected_session_ids: count = len(self.selected_session_ids) self.selected_session_ids.clear() self.notify(f"Selection cleared ({count} session(s))", severity="information") self.refresh_sessions() def action_toggle_selection(self) -> None: """Toggle selection of current row.""" table = self.query_one("#sessions-table") row_key = table.cursor_row if row_key is not None: sessions = self.db.get_sessions( include_archived=self.show_archived, project_id=self.filter_project_id, workspace_id=self.filter_workspace_id, search=self.current_filter ) if 0 <= row_key < len(sessions): session_id = sessions[row_key].id if session_id in self.selected_session_ids: self.selected_session_ids.discard(session_id) else: self.selected_session_ids.add(session_id) self.refresh_sessions() def action_clear_selection(self) -> None: """Clear filter if active, otherwise clear session selections.""" inp = self.query_one("#filter-input", Input) if inp.has_focus or self.current_filter: inp.value = "" self.current_filter = "" inp.can_focus = False self.query_one("#sessions-table", DataTable).focus() self.refresh_sessions() else: self.selected_session_ids.clear() self.refresh_sessions() def action_move(self) -> None: """Open move dialog.""" if not self.selected_session_ids: self.notify("No sessions selected", severity="warning") return # Get session objects for the dialog sessions = [self.db.get_session(sid) for sid in self.selected_session_ids] sessions = [s for s in sessions if s] if not self.backup_made: self.db.create_backup() self.backup_made = True self.notify("Backup created", severity="information") self.push_screen(MoveCopyDialog(self.db, sessions, is_copy=False)) def action_copy(self) -> None: """Open copy dialog.""" if not self.selected_session_ids: self.notify("No sessions selected", severity="warning") return # Get session objects for the dialog sessions = [self.db.get_session(sid) for sid in self.selected_session_ids] sessions = [s for s in sessions if s] if not self.backup_made: self.db.create_backup() self.backup_made = True self.notify("Backup created", severity="information") self.push_screen(MoveCopyDialog(self.db, sessions, is_copy=True)) def action_delete(self) -> None: """Open delete confirmation dialog.""" if not self.selected_session_ids: self.notify("No sessions selected", severity="warning") return sessions = [self.db.get_session(sid) for sid in self.selected_session_ids] sessions = [s for s in sessions if s] if not self.backup_made: self.db.create_backup() self.backup_made = True self.notify("Backup created", severity="information") def handle_result(confirmed: bool) -> None: if not confirmed: return success, err = self.db.delete_sessions([s.id for s in sessions]) if success: self.selected_session_ids.clear() self.notify(f"Deleted {len(sessions)} session(s)", severity="information") self.refresh_sessions() else: self.notify(f"Delete failed: {err}", severity="error") self.push_screen(DeleteConfirmDialog(sessions), handle_result) def action_toggle_archived(self) -> None: """Toggle archived sessions visibility.""" self.show_archived = not self.show_archived btn = self.query_one("#archived-toggle", Button) btn.variant = "success" if self.show_archived else "default" title = self.query_one("#session-panel-title", Label) title.update("Sessions (+ Archived)" if self.show_archived else "Sessions") self.refresh_sessions() def action_refresh(self) -> None: """Reload all data from database.""" self.db.load_reference_data() self.refresh_project_list() self.refresh_sessions() self.notify("Data refreshed", severity="information") def on_unmount(self) -> None: """Cleanup on exit.""" self.db.close() if __name__ == "__main__": app = SessionMoverApp() app.run()