import type { MessageBus } from '../bus/queue.ts'; import type { OutboundMessage } from '../bus/types.ts'; import type { BaseChannel } from './base.ts'; export class ChannelManager { private _channels: BaseChannel[] = []; private _bus: MessageBus; private _running = false; constructor(bus: MessageBus) { this._bus = bus; } register(channel: BaseChannel): void { this._channels.push(channel); } async startAll(): Promise { this._running = true; // Start all channels in parallel + the outbound dispatcher await Promise.all([ ...this._channels.map((ch) => ch.start().catch((err) => console.error(`[channel:${ch.name}] Failed to start: ${err}`)), ), this._dispatchOutbound(), ]); } stopAll(): void { this._running = false; for (const ch of this._channels) ch.stop(); } private async _dispatchOutbound(): Promise { while (this._running) { const msg: OutboundMessage | null = await Promise.race([ this._bus.consumeOutbound(), new Promise((r) => setTimeout(() => r(null), 1000)), ]); if (!msg) continue; if (!msg.content) continue; // empty progress marker etc. await this._route(msg); } } private async _route(msg: OutboundMessage): Promise { // Progress/tool-hint messages — only forward if sendProgress/sendToolHints enabled const isProgress = msg.metadata?.['_progress'] === true; const isToolHint = msg.metadata?.['_toolHint'] === true; if (isProgress && isToolHint) return; // suppress raw tool hints from channel delivery if (isProgress) return; // intermediate thoughts — suppress for now (channels can opt in) const channel = this._channels.find((ch) => ch.name === msg.channel); if (!channel) { // CLI channel — handled by the CLI directly return; } const content = msg.content ?? ''; const chatId = (msg.metadata?.['channel_id'] as string | undefined) ?? msg.chatId; const rootId = msg.metadata?.['root_id'] as string | undefined; try { await channel.send(chatId, content, rootId ? { rootId } : undefined); } catch (err) { console.error(`[channel:${channel.name}] Failed to send: ${String(err)}`); } } }