66 lines
1.8 KiB
TypeScript
66 lines
1.8 KiB
TypeScript
/**
|
|
* An async iterator that allows pushing values from outside
|
|
* and consuming them with `for await...of`
|
|
*/
|
|
export class AsyncPushIterator<T> implements AsyncIterable<T> {
|
|
private queue: T[] = [];
|
|
private resolvers: ((result: IteratorResult<T>) => void)[] = [];
|
|
private closed = false;
|
|
|
|
/**
|
|
* Push a value into the iterator.
|
|
* If there's a pending consumer, it will receive the value immediately.
|
|
* Otherwise, it will be buffered until consumed.
|
|
*/
|
|
push(value: T): void {
|
|
if (this.closed) return;
|
|
|
|
if (this.resolvers.length > 0) {
|
|
// Someone is waiting for a value, resolve immediately
|
|
const resolve = this.resolvers.shift()!;
|
|
resolve({ value, done: false });
|
|
} else {
|
|
// No one waiting, buffer the value
|
|
this.queue.push(value);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close the iterator. No more values can be pushed.
|
|
* Pending consumers will receive { done: true }.
|
|
*/
|
|
close(): void {
|
|
this.closed = true;
|
|
for (const resolve of this.resolvers) {
|
|
resolve({ value: undefined as T, done: true });
|
|
}
|
|
this.resolvers = [];
|
|
}
|
|
|
|
[Symbol.asyncIterator](): AsyncIterator<T> {
|
|
return {
|
|
next: (): Promise<IteratorResult<T>> => {
|
|
// If we have buffered values, return immediately
|
|
if (this.queue.length > 0) {
|
|
return Promise.resolve({ value: this.queue.shift()!, done: false });
|
|
}
|
|
|
|
// If closed and no buffered values, we're done
|
|
if (this.closed) {
|
|
return Promise.resolve({ value: undefined as T, done: true });
|
|
}
|
|
|
|
// Wait for a value to be pushed
|
|
return new Promise((resolve) => {
|
|
this.resolvers.push(resolve);
|
|
});
|
|
},
|
|
};
|
|
}
|
|
|
|
[Symbol.asyncDispose](): Promise<void> {
|
|
this.close();
|
|
return Promise.resolve();
|
|
}
|
|
}
|