feat: claude one-shot port from nanobot python codebase (v0.1.4.post4)
This commit is contained in:
51
src/bus/queue.ts
Normal file
51
src/bus/queue.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import type { InboundMessage, OutboundMessage } from './types.ts';
|
||||
|
||||
/** A simple async FIFO queue that mirrors Python's asyncio.Queue behaviour. */
|
||||
class AsyncQueue<T> {
|
||||
private _items: T[] = [];
|
||||
private _waiters: Array<(value: T) => void> = [];
|
||||
|
||||
enqueue(item: T): void {
|
||||
const waiter = this._waiters.shift();
|
||||
if (waiter) {
|
||||
waiter(item);
|
||||
} else {
|
||||
this._items.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
dequeue(): Promise<T> {
|
||||
const item = this._items.shift();
|
||||
if (item !== undefined) {
|
||||
return Promise.resolve(item);
|
||||
}
|
||||
return new Promise<T>((resolve) => {
|
||||
this._waiters.push(resolve);
|
||||
});
|
||||
}
|
||||
|
||||
get size(): number {
|
||||
return this._items.length;
|
||||
}
|
||||
}
|
||||
|
||||
export class MessageBus {
|
||||
private _inbound = new AsyncQueue<InboundMessage>();
|
||||
private _outbound = new AsyncQueue<OutboundMessage>();
|
||||
|
||||
publishInbound(msg: InboundMessage): void {
|
||||
this._inbound.enqueue(msg);
|
||||
}
|
||||
|
||||
consumeInbound(): Promise<InboundMessage> {
|
||||
return this._inbound.dequeue();
|
||||
}
|
||||
|
||||
publishOutbound(msg: OutboundMessage): void {
|
||||
this._outbound.enqueue(msg);
|
||||
}
|
||||
|
||||
consumeOutbound(): Promise<OutboundMessage> {
|
||||
return this._outbound.dequeue();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user