Skip to content

Commit

Permalink
Await for flushInProgress when flushingAll (#3)
Browse files Browse the repository at this point in the history
* Await for flushInProgress when flushingAll. This is important to ensure that all exports are complete before freezing lambda instance.

* fix lint error
  • Loading branch information
RafalSumislawski authored Oct 4, 2023
1 parent 1007539 commit b491aa2
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
private _timer: NodeJS.Timeout | undefined;
private _shutdownOnce: BindOnceFuture<void>;
private _droppedSpansCount: number = 0;
private _isFlushInProgress: boolean = false;
private _flushInProgress: Promise<void> | null = null;

constructor(private readonly _exporter: SpanExporter, config?: T) {
const env = getEnv();
Expand Down Expand Up @@ -162,6 +162,9 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
// run exports in parallel ignoring _isFlushInProgress
promises.push(this._export(spans));
}
if (this._flushInProgress) {
promises.push(this._flushInProgress);
}
Promise.all(promises)
.then(() => {
resolve();
Expand Down Expand Up @@ -230,24 +233,23 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
}

private _flush() {
if (this._isFlushInProgress) {
if (this._flushInProgress) {
return;
}
this._isFlushInProgress = true;
this._clearTimer();

const spans = this._finishedSpans.splice(0, this._maxExportBatchSize);
this._export(spans)
this._flushInProgress = this._export(spans);
this._flushInProgress
.then(() => {
this._isFlushInProgress = false;
this._flushInProgress = null;
if (this._finishedSpans.length >= this._maxExportBatchSize) {
this._flush();
} else if (this._finishedSpans.length > 0) {
this._maybeStartTimer();
}
})
.catch(e => {
this._isFlushInProgress = false;
this._flushInProgress = null;
globalErrorHandler(e);
if (this._finishedSpans.length >= this._maxExportBatchSize) {
this._flush();
Expand Down

0 comments on commit b491aa2

Please sign in to comment.