Files
node-metaverse/lib/classes/BatchQueue.ts
2025-01-17 23:53:31 +00:00

127 lines
3.2 KiB
TypeScript

import { Subject } from 'rxjs';
export class BatchQueue<T>
{
private running = false;
private readonly pending: Set<T> = new Set<T>();
private readonly onResult = new Subject<{
batch: Set<T>,
failed?: Set<T>,
exception?: unknown
}>();
public constructor(private readonly batchSize: number, private readonly func: (items: Set<T>) => Promise<Set<T>>)
{
}
public async add(ids: T[]): Promise<T[]>
{
const waiting = new Set<T>();
for (const id of ids)
{
waiting.add(id);
this.pending.add(id);
}
if (!this.running)
{
this.processBatch().catch((_e: unknown) =>
{
// ignore
});
}
return new Promise<T[]>((resolve, reject) =>
{
const failed: T[] = [];
const subs = this.onResult.subscribe((results: {
batch: Set<T>,
failed?: Set<T>,
exception?: unknown
}) =>
{
let included = false;
for (const v of results.batch.values())
{
if (waiting.has(v))
{
included = true;
if (results.failed?.has(v))
{
failed.push(v);
}
waiting.delete(v);
}
}
if (!included)
{
return;
}
if (results.exception !== undefined)
{
subs.unsubscribe();
reject(new Error(String(results.exception)));
return;
}
if (waiting.size === 0)
{
subs.unsubscribe();
resolve(failed);
return;
}
});
});
}
private async processBatch(): Promise<void>
{
if (this.running)
{
return;
}
try
{
this.running = true;
const thisBatch = new Set<T>();
const values = this.pending.values();
for (const v of values)
{
thisBatch.add(v);
this.pending.delete(v);
if (thisBatch.size >= this.batchSize)
{
break;
}
}
try
{
const failedItems = await this.func(thisBatch)
this.onResult.next({
batch: thisBatch,
failed: failedItems
});
}
catch (e)
{
this.onResult.next({
batch: thisBatch,
exception: e
});
}
}
finally
{
this.running = false;
if (this.pending.size > 0)
{
this.processBatch().catch((_e: unknown) =>
{
// ignore
});
}
}
}
}