52 lines
1.2 KiB
TypeScript
52 lines
1.2 KiB
TypeScript
import type { InboundMessage, OutboundMessage } from './types.ts';
|
|
|
|
/** A simple async FIFO queue that mirrors Python's asyncio.Queue behaviour. */
|
|
class AsyncQueue<T> {
|
|
private _items: T[] = [];
|
|
private _waiters: Array<(value: T) => void> = [];
|
|
|
|
enqueue(item: T): void {
|
|
const waiter = this._waiters.shift();
|
|
if (waiter) {
|
|
waiter(item);
|
|
} else {
|
|
this._items.push(item);
|
|
}
|
|
}
|
|
|
|
dequeue(): Promise<T> {
|
|
const item = this._items.shift();
|
|
if (item !== undefined) {
|
|
return Promise.resolve(item);
|
|
}
|
|
return new Promise<T>((resolve) => {
|
|
this._waiters.push(resolve);
|
|
});
|
|
}
|
|
|
|
get size(): number {
|
|
return this._items.length;
|
|
}
|
|
}
|
|
|
|
export class MessageBus {
|
|
private _inbound = new AsyncQueue<InboundMessage>();
|
|
private _outbound = new AsyncQueue<OutboundMessage>();
|
|
|
|
publishInbound(msg: InboundMessage): void {
|
|
this._inbound.enqueue(msg);
|
|
}
|
|
|
|
consumeInbound(): Promise<InboundMessage> {
|
|
return this._inbound.dequeue();
|
|
}
|
|
|
|
publishOutbound(msg: OutboundMessage): void {
|
|
this._outbound.enqueue(msg);
|
|
}
|
|
|
|
consumeOutbound(): Promise<OutboundMessage> {
|
|
return this._outbound.dequeue();
|
|
}
|
|
}
|