import { Subject } from 'rxjs'; export class BatchQueue { private running = false; private readonly pending: Set = new Set(); private readonly onResult = new Subject<{ batch: Set, failed?: Set, exception?: unknown }>(); public constructor(private readonly batchSize: number, private readonly func: (items: Set) => Promise>) { } public async add(ids: T[]): Promise { const waiting = new Set(); for (const id of ids) { waiting.add(id); this.pending.add(id); } if (!this.running) { this.processBatch().catch((_e: unknown) => { // ignore }); } return new Promise((resolve, reject) => { const failed: T[] = []; const subs = this.onResult.subscribe((results: { batch: Set, failed?: Set, exception?: unknown }) => { let included = false; for (const v of results.batch.values()) { if (waiting.has(v)) { included = true; if (results.failed?.has(v)) { failed.push(v); } waiting.delete(v); } } if (!included) { return; } if (results.exception !== undefined) { subs.unsubscribe(); reject(new Error(String(results.exception))); return; } if (waiting.size === 0) { subs.unsubscribe(); resolve(failed); return; } }); }); } private async processBatch(): Promise { if (this.running) { return; } try { this.running = true; const thisBatch = new Set(); const values = this.pending.values(); for (const v of values) { thisBatch.add(v); this.pending.delete(v); if (thisBatch.size >= this.batchSize) { break; } } try { const failedItems = await this.func(thisBatch) this.onResult.next({ batch: thisBatch, failed: failedItems }); } catch (e) { this.onResult.next({ batch: thisBatch, exception: e }); } } finally { this.running = false; if (this.pending.size > 0) { this.processBatch().catch((_e: unknown) => { // ignore }); } } } }