Skip to content

Commit

Permalink
Make sure that spans are always consumed even if exporting fails to a…
Browse files Browse the repository at this point in the history
…void infinite loops
  • Loading branch information
RafalSumislawski committed Sep 25, 2023
1 parent b05ebf7 commit 65e96e7
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
* */
private _flushAll(): Promise<void> {
return new Promise((resolve, reject) => {
this._clearTimer();
const promises = [];
// calculate number of batches
const count = Math.ceil(
this._finishedSpans.length / this._maxExportBatchSize
);
for (let i = 0, j = count; i < j; i++) {
promises.push(this._flushOneBatch());
const spans = this._finishedSpans.splice(0, this._maxExportBatchSize);
promises.push(this._export(spans));
}
Promise.all(promises)
.then(() => {
Expand All @@ -166,9 +168,8 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
});
}

private _flushOneBatch(): Promise<void> {
this._clearTimer();
if (this._finishedSpans.length === 0) {
private _export(spans: ReadableSpan[]): Promise<void> {
if (spans.length === 0) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
Expand All @@ -178,11 +179,6 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
}, this._exportTimeoutMillis);
// prevent downstream exporter calls from generating spans
context.with(suppressTracing(context.active()), () => {
// Reset the finished spans buffer here because the next invocations of the _flush method
// could pass the same finished spans to the exporter if the buffer is cleared
// outside the execution of this callback.
const spans = this._finishedSpans.splice(0, this._maxExportBatchSize);

const doExport = () =>
this._exporter.export(spans, result => {
clearTimeout(timer);
Expand Down Expand Up @@ -232,12 +228,13 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
}

private _flushOneBatchAndContinue() {
this._flushOneBatch()
this._clearTimer();
const spans = this._finishedSpans.splice(0, this._maxExportBatchSize);
this._export(spans)
.then(() => {
if (this._finishedSpans.length >= this._maxExportBatchSize) {
this._flushOneBatchAndContinue()
} else if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
Expand All @@ -246,7 +243,6 @@ export abstract class BatchSpanProcessorBase<T extends BufferConfig>
if (this._finishedSpans.length >= this._maxExportBatchSize) {
this._flushOneBatchAndContinue()
} else if (this._finishedSpans.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
});
Expand Down

0 comments on commit 65e96e7

Please sign in to comment.