From 67a9eae78b0a472039bd494bc0349697fb3666f4 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Sun, 7 Apr 2024 13:29:11 -0400 Subject: [PATCH 1/8] AsyncReporter/SpanHandler: make queuedMaxBytes=0 disable pre-flight size checks Signed-off-by: Andriy Redko --- .../internal/BoundedQueueBenchmarks.java | 146 ++++++++++++++++++ .../reporter/internal/AsyncReporter.java | 12 +- .../reporter/internal/BoundedQueue.java | 50 ++++++ .../reporter/internal/ByteBoundedQueue.java | 23 ++- .../reporter/internal/SizeBoundedQueue.java | 131 ++++++++++++++++ .../internal/SizeBoundedQueueTest.java | 53 +++++++ 6 files changed, 406 insertions(+), 9 deletions(-) create mode 100644 benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java create mode 100644 core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java create mode 100644 core/src/main/java/zipkin2/reporter/internal/SizeBoundedQueue.java create mode 100644 core/src/test/java/zipkin2/reporter/internal/SizeBoundedQueueTest.java diff --git a/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java b/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java new file mode 100644 index 00000000..6ab67cb5 --- /dev/null +++ b/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java @@ -0,0 +1,146 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.reporter.internal; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@Measurement(iterations = 5, time = 1) +@Warmup(iterations = 10, time = 1) +@Fork(3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Group) +public class BoundedQueueBenchmarks { + static final byte ONE = 1; + + @Param( {"0", "10000"}) + public int maxBytes; + + @AuxCounters + @State(Scope.Thread) + public static class OfferCounters { + public int offersFailed; + public int offersMade; + + @Setup(Level.Iteration) + public void clean() { + offersFailed = offersMade = 0; + } + } + + @AuxCounters + @State(Scope.Thread) + public static class DrainCounters { + public int drained; + + @Setup(Level.Iteration) + public void clean() { + drained = 0; + } + } + + private static ThreadLocal marker = new ThreadLocal<>(); + + @State(Scope.Thread) + public static class ConsumerMarker { + public ConsumerMarker() { + marker.set(this); + } + } + + BoundedQueue q; + + @Setup + public void setup() { + q = BoundedQueue.create(10000, maxBytes); + } + + @Benchmark @Group("no_contention") @GroupThreads(1) + public void no_contention_offer(OfferCounters counters) { + if (q.offer(ONE, 1)) { + counters.offersMade++; + } else { + counters.offersFailed++; + } + } + + @Benchmark @Group("no_contention") @GroupThreads(1) + public void no_contention_drain(DrainCounters counters, ConsumerMarker cm) { + q.drainTo((s, b) -> { + counters.drained++; + return true; + }, 1000); + } + + @Benchmark @Group("mild_contention") @GroupThreads(2) + public void mild_contention_offer(OfferCounters counters) { + if (q.offer(ONE, 1)) { + counters.offersMade++; + } else { + counters.offersFailed++; + } + } + + @Benchmark @Group("mild_contention") @GroupThreads(1) + public void mild_contention_drain(DrainCounters counters, ConsumerMarker cm) { + q.drainTo((s, b) -> { + counters.drained++; + return true; + }, 1000); + } + + @Benchmark @Group("high_contention") @GroupThreads(8) + public void high_contention_offer(OfferCounters counters) { + if (q.offer(ONE, 1)) { + counters.offersMade++; + } else { + counters.offersFailed++; + } + } + + @Benchmark @Group("high_contention") @GroupThreads(1) + public void high_contention_drain(DrainCounters counters, ConsumerMarker cm) { + q.drainTo((s, b) -> { + counters.drained++; + return true; + }, 1000); + } + + @TearDown(Level.Iteration) + public void emptyQ() { + // If this thread didn't drain, return + if (marker.get() == null) return; + q.clear(); + } + + // Convenience main entry-point + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(".*" + BoundedQueueBenchmarks.class.getSimpleName() + ".*") + .build(); + + new Runner(opt).run(); + } +} diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index 093ce106..c0678c96 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -82,8 +82,8 @@ public static final class Builder { this.messageMaxBytes = asyncReporter.messageMaxBytes; this.messageTimeoutNanos = asyncReporter.messageTimeoutNanos; this.closeTimeoutNanos = asyncReporter.closeTimeoutNanos; - this.queuedMaxSpans = asyncReporter.pending.maxSize; - this.queuedMaxBytes = asyncReporter.pending.maxBytes; + this.queuedMaxSpans = asyncReporter.pending.maxSize(); + this.queuedMaxBytes = asyncReporter.pending.maxBytes(); } static int onePercentOfMemory() { @@ -181,7 +181,7 @@ static final class BoundedAsyncReporter extends AsyncReporter { static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName()); final AtomicBoolean started, closed; final BytesEncoder encoder; - final ByteBoundedQueue pending; + final BoundedQueue pending; final BytesMessageSender sender; final int messageMaxBytes; final long messageTimeoutNanos, closeTimeoutNanos; @@ -193,7 +193,7 @@ static final class BoundedAsyncReporter extends AsyncReporter { private boolean shouldWarnException = true; BoundedAsyncReporter(Builder builder, BytesEncoder encoder) { - this.pending = new ByteBoundedQueue(builder.queuedMaxSpans, builder.queuedMaxBytes); + this.pending = BoundedQueue.create(builder.queuedMaxSpans, builder.queuedMaxBytes); this.sender = builder.sender; this.messageMaxBytes = builder.messageMaxBytes; this.messageTimeoutNanos = builder.messageTimeoutNanos; @@ -241,8 +241,8 @@ void flush(BufferNextMessage bundler) { pending.drainTo(bundler, bundler.remainingNanos()); // record after flushing reduces the amount of gauge events vs on doing this on report - metrics.updateQueuedSpans(pending.count); - metrics.updateQueuedBytes(pending.sizeInBytes); + metrics.updateQueuedSpans(pending.count()); + metrics.updateQueuedBytes(pending.sizeInBytes()); // loop around if we are running, and the bundle isn't full // if we are closed, try to send what's pending diff --git a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java new file mode 100644 index 00000000..c51efaef --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package zipkin2.reporter.internal; + +/** + * Multi-producer, multi-consumer queue that could be bounded by count or/and size. + */ +abstract class BoundedQueue implements SpanWithSizeConsumer { + static BoundedQueue create(int maxSize, int maxBytes) { + if (maxBytes > 0) { + return new ByteBoundedQueue(maxSize, maxBytes); + } else { + return new SizeBoundedQueue(maxSize); + } + } + + /** + * Max element's count of this bounded queue + */ + abstract int maxSize(); + + /** + * Max element'size of this bounded queue + */ + abstract int maxBytes(); + + /** + * Clear this bounded queue + */ + abstract int clear(); + + /** + * Element's count of this bounded queue + */ + abstract int count(); + + /** + * Element's size of this bounded queue + */ + abstract int sizeInBytes(); + + /** + * Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear. + * Then, consume as many as possible. + */ + abstract int drainTo(SpanWithSizeConsumer bundler, long remainingNanos); +} diff --git a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java index cedbbe60..39f21ccd 100644 --- a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java @@ -13,7 +13,7 @@ * *

This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation. */ -final class ByteBoundedQueue implements SpanWithSizeConsumer { +final class ByteBoundedQueue extends BoundedQueue implements SpanWithSizeConsumer { final ReentrantLock lock = new ReentrantLock(false); final Condition available = lock.newCondition(); @@ -60,7 +60,7 @@ final class ByteBoundedQueue implements SpanWithSizeConsumer { } /** Blocks for up to nanosTimeout for spans to appear. Then, consume as many as possible. */ - int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { + @Override int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { try { // This may be called by multiple threads. If one is holding a lock, another is waiting. We // use lockInterruptibly to ensure the one waiting can be interrupted. @@ -81,7 +81,7 @@ int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { } /** Clears the queue unconditionally and returns count of spans cleared. */ - int clear() { + @Override int clear() { lock.lock(); try { int result = count; @@ -115,6 +115,23 @@ int doDrain(SpanWithSizeConsumer consumer) { sizeInBytes -= drainedSizeInBytes; return drainedCount; } + + @Override int count() { + return count; + } + + @Override int maxBytes() { + return maxBytes; + } + + @Override int maxSize() { + return maxSize; + } + + @Override + public int sizeInBytes() { + return sizeInBytes; + } } interface SpanWithSizeConsumer { diff --git a/core/src/main/java/zipkin2/reporter/internal/SizeBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/SizeBoundedQueue.java new file mode 100644 index 00000000..b3caf2e0 --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/internal/SizeBoundedQueue.java @@ -0,0 +1,131 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.reporter.internal; + +import java.util.Arrays; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Multi-producer, multi-consumer queue that is bounded by count. + * + *

This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation. + */ +final class SizeBoundedQueue extends BoundedQueue implements UnsizedSpanConsumer { + + final ReentrantLock lock = new ReentrantLock(false); + final Condition available = lock.newCondition(); + + final int maxSize; + + final S[] elements; + int count; + int writePos; + int readPos; + + @SuppressWarnings("unchecked") SizeBoundedQueue(int maxSize) { + this.elements = (S[]) new Object[maxSize]; + this.maxSize = maxSize; + } + + @Override public boolean offer(S next, int nextSizeInBytes) { + return offer(next); + } + + /** + * Returns true if the element could be added or false if it could not due to its size. + */ + @Override public boolean offer(S next) { + lock.lock(); + try { + if (count == maxSize) return false; + + elements[writePos++] = next; + + if (writePos == maxSize) writePos = 0; // circle back to the front of the array + + count++; + + available.signal(); // alert any drainers + return true; + } finally { + lock.unlock(); + } + } + + /** Blocks for up to nanosTimeout for spans to appear. Then, consume as many as possible. */ + @Override int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { + try { + // This may be called by multiple threads. If one is holding a lock, another is waiting. We + // use lockInterruptibly to ensure the one waiting can be interrupted. + lock.lockInterruptibly(); + try { + long nanosLeft = nanosTimeout; + while (count == 0) { + if (nanosLeft <= 0) return 0; + nanosLeft = available.awaitNanos(nanosLeft); + } + return doDrain(consumer); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + return 0; + } + } + + /** Clears the queue unconditionally and returns count of spans cleared. */ + @Override public int clear() { + lock.lock(); + try { + int result = count; + count = readPos = writePos = 0; + Arrays.fill(elements, null); + return result; + } finally { + lock.unlock(); + } + } + + int doDrain(SpanWithSizeConsumer consumer) { + int drainedCount = 0; + while (drainedCount < count) { + S next = elements[readPos]; + + if (next == null) break; + if (consumer.offer(next, 0)) { + drainedCount++; + + elements[readPos] = null; + if (++readPos == elements.length) readPos = 0; // circle back to the front of the array + } else { + break; + } + } + count -= drainedCount; + return drainedCount; + } + + @Override int count() { + return count; + } + + @Override int maxBytes() { + return 0; + } + + @Override int maxSize() { + return maxSize; + } + + @Override int sizeInBytes() { + return 0; + } +} + +interface UnsizedSpanConsumer { + /** Returns true if the element could be added or false if it could not due to its size. */ + boolean offer(S next); +} diff --git a/core/src/test/java/zipkin2/reporter/internal/SizeBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/SizeBoundedQueueTest.java new file mode 100644 index 00000000..50cd00ca --- /dev/null +++ b/core/src/test/java/zipkin2/reporter/internal/SizeBoundedQueueTest.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.reporter.internal; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class SizeBoundedQueueTest { + SizeBoundedQueue queue = new SizeBoundedQueue<>(10); + + @Test void offer_failsWhenFull_size() { + for (int i = 0; i < queue.maxSize; i++) { + assertThat(queue.offer(new byte[1], 1)).isTrue(); + } + assertThat(queue.offer(new byte[1], 1)).isFalse(); + } + + @Test void offer_updatesCount() { + for (int i = 0; i < queue.maxSize; i++) { + queue.offer(new byte[1], 1); + } + assertThat(queue.count).isEqualTo(10); + } + + @Test void offer_sizeInBytes() { + for (int i = 0; i < queue.maxSize; i++) { + queue.offer(new byte[1], 1); + } + assertThat(queue.sizeInBytes()).isEqualTo(0); + } + + @Test void circular() { + SizeBoundedQueue queue = new SizeBoundedQueue<>(10); + + List polled = new ArrayList<>(); + SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); + + // Offer more than the capacity, flushing via poll on interval + for (int i = 0; i < 15; i++) { + queue.offer(i, 1); + queue.drainTo(consumer, 1); + } + + // ensure we have all of the spans + assertThat(polled) + .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14); + } +} From b1f5195008fa06b688a42304e108c5434b09d35d Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 9 Apr 2024 17:07:26 -0400 Subject: [PATCH 2/8] Remove messageMaxBytes fro AsyncReporter::report in favor of postponing them till flushing occurs Signed-off-by: Andriy Redko --- .../internal/AsyncReporterBenchmarks.java | 15 +++++++++- .../reporter/internal/AsyncReporter.java | 28 +++++++++++++------ 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java b/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java index 7b0ff0d2..34b73d7e 100644 --- a/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java +++ b/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java @@ -6,6 +6,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -24,6 +26,7 @@ import org.openjdk.jmh.annotations.Warmup; import zipkin2.Span; import zipkin2.TestObjects; +import zipkin2.reporter.BytesEncoder; import zipkin2.reporter.Encoding; import zipkin2.reporter.InMemoryReporterMetrics; import zipkin2.reporter.SpanBytesEncoder; @@ -42,6 +45,9 @@ public class AsyncReporterBenchmarks { @Param public Encoding encoding; + @Param({"0", "20000000"}) + public int maxBytes; + @AuxCounters @State(Scope.Thread) public static class InMemoryReporterMetricsAsCounters { @@ -77,10 +83,17 @@ public void clean() { @Setup(Level.Trial) public void setup() { + final BytesEncoder encoder = Stream + .of(SpanBytesEncoder.JSON_V2, SpanBytesEncoder.PROTO3, SpanBytesEncoder.THRIFT) + .filter(e -> e.encoding().equals(encoding)) + .findAny() + .orElseThrow(() -> new IllegalStateException("Unable to find BytesEncoder for " + encoding)); + reporter = AsyncReporter.newBuilder(new NoopSender(encoding)) .messageMaxBytes(1000000) // example default from Kafka message.max.bytes + .queuedMaxBytes(maxBytes) .metrics(metrics) - .build(SpanBytesEncoder.JSON_V2); + .build(encoder); } @Benchmark @Group("no_contention") @GroupThreads(1) diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index c0678c96..ba59bacb 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -216,20 +216,30 @@ void startFlusherThread() { flushThread.start(); } + @SuppressWarnings("unchecked") @Override public void report(S next) { if (next == null) throw new NullPointerException("span == null"); // Lazy start so that reporters never used don't spawn threads if (started.compareAndSet(false, true)) startFlusherThread(); metrics.incrementSpans(1); - int nextSizeInBytes = encoder.sizeInBytes(next); - int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); - metrics.incrementSpanBytes(nextSizeInBytes); - if (closed.get() || - // don't enqueue something larger than we can drain - messageSizeOfNextSpan > messageMaxBytes || - !pending.offer(next, nextSizeInBytes)) { - metrics.incrementSpansDropped(1); - } + + if (pending instanceof UnsizedSpanConsumer) { + // enqueue now and filter our when we drain + final UnsizedSpanConsumer consumer = (UnsizedSpanConsumer)pending; + if (closed.get() || !consumer.offer(next)) { + metrics.incrementSpansDropped(1); + } + } else { + int nextSizeInBytes = encoder.sizeInBytes(next); + int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); + metrics.incrementSpanBytes(nextSizeInBytes); + if (closed.get() || + // don't enqueue something larger than we can drain + messageSizeOfNextSpan > messageMaxBytes || + !pending.offer(next, nextSizeInBytes)) { + metrics.incrementSpansDropped(1); + } + } } @Override public void flush() { From 81c97edc897c4242834762ff0197562c3c92f140 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 10 Apr 2024 09:44:14 -0400 Subject: [PATCH 3/8] Address code review comments Signed-off-by: Andriy Redko --- .../main/java/zipkin2/reporter/internal/BoundedQueue.java | 2 +- .../{SizeBoundedQueue.java => CountBoundedQueue.java} | 4 ++-- ...SizeBoundedQueueTest.java => CountBoundedQueueTest.java} | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) rename core/src/main/java/zipkin2/reporter/internal/{SizeBoundedQueue.java => CountBoundedQueue.java} (95%) rename core/src/test/java/zipkin2/reporter/internal/{SizeBoundedQueueTest.java => CountBoundedQueueTest.java} (88%) diff --git a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java index c51efaef..648269af 100644 --- a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java @@ -13,7 +13,7 @@ static BoundedQueue create(int maxSize, int maxBytes) { if (maxBytes > 0) { return new ByteBoundedQueue(maxSize, maxBytes); } else { - return new SizeBoundedQueue(maxSize); + return new CountBoundedQueue(maxSize); } } diff --git a/core/src/main/java/zipkin2/reporter/internal/SizeBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java similarity index 95% rename from core/src/main/java/zipkin2/reporter/internal/SizeBoundedQueue.java rename to core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java index b3caf2e0..5dde59fd 100644 --- a/core/src/main/java/zipkin2/reporter/internal/SizeBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java @@ -13,7 +13,7 @@ * *

This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation. */ -final class SizeBoundedQueue extends BoundedQueue implements UnsizedSpanConsumer { +final class CountBoundedQueue extends BoundedQueue implements UnsizedSpanConsumer { final ReentrantLock lock = new ReentrantLock(false); final Condition available = lock.newCondition(); @@ -25,7 +25,7 @@ final class SizeBoundedQueue extends BoundedQueue implements UnsizedSpanCo int writePos; int readPos; - @SuppressWarnings("unchecked") SizeBoundedQueue(int maxSize) { + @SuppressWarnings("unchecked") CountBoundedQueue(int maxSize) { this.elements = (S[]) new Object[maxSize]; this.maxSize = maxSize; } diff --git a/core/src/test/java/zipkin2/reporter/internal/SizeBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java similarity index 88% rename from core/src/test/java/zipkin2/reporter/internal/SizeBoundedQueueTest.java rename to core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java index 50cd00ca..b9fd6964 100644 --- a/core/src/test/java/zipkin2/reporter/internal/SizeBoundedQueueTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java @@ -10,8 +10,8 @@ import static org.assertj.core.api.Assertions.assertThat; -class SizeBoundedQueueTest { - SizeBoundedQueue queue = new SizeBoundedQueue<>(10); +class CountBoundedQueueTest { + CountBoundedQueue queue = new CountBoundedQueue<>(10); @Test void offer_failsWhenFull_size() { for (int i = 0; i < queue.maxSize; i++) { @@ -35,7 +35,7 @@ class SizeBoundedQueueTest { } @Test void circular() { - SizeBoundedQueue queue = new SizeBoundedQueue<>(10); + CountBoundedQueue queue = new CountBoundedQueue<>(10); List polled = new ArrayList<>(); SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); From 14741093d31bcc31c4aa7e9966d0551da4fd3bd1 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 10 Apr 2024 08:37:40 -1000 Subject: [PATCH 4/8] general idea Signed-off-by: Adrian Cole --- .../internal/BoundedQueueBenchmarks.java | 2 +- .../internal/ByteBoundedQueueBenchmarks.java | 2 +- .../reporter/internal/AsyncReporter.java | 29 ++----- .../reporter/internal/BoundedQueue.java | 86 +++++++++++-------- .../reporter/internal/ByteBoundedQueue.java | 35 ++++++-- .../reporter/internal/CountBoundedQueue.java | 17 ++-- .../internal/ByteBoundedQueueTest.java | 4 +- 7 files changed, 95 insertions(+), 80 deletions(-) diff --git a/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java b/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java index 6ab67cb5..4162d2fe 100644 --- a/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java +++ b/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java @@ -74,7 +74,7 @@ public ConsumerMarker() { @Setup public void setup() { - q = BoundedQueue.create(10000, maxBytes); + q = BoundedQueue.create(null, null, null, 10000, 10000, maxBytes); } @Benchmark @Group("no_contention") @GroupThreads(1) diff --git a/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java b/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java index 5a4ada72..8744a26c 100644 --- a/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java +++ b/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java @@ -70,7 +70,7 @@ public ConsumerMarker() { @Setup public void setup() { - q = new ByteBoundedQueue<>(10000, 10000); + q = new ByteBoundedQueue<>(null, null, null, 10000, 10000, 10000); } @Benchmark @Group("no_contention") @GroupThreads(1) diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index ba59bacb..0fd11503 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -45,7 +45,8 @@ * @param type of the span, usually {@code zipkin2.Span} * @since 3.0 */ -public abstract class AsyncReporter extends Component implements Reporter, Closeable, Flushable { +public abstract class AsyncReporter extends Component + implements Reporter, Closeable, Flushable { public static Builder newBuilder(BytesMessageSender sender) { return new Builder(sender); } @@ -193,7 +194,8 @@ static final class BoundedAsyncReporter extends AsyncReporter { private boolean shouldWarnException = true; BoundedAsyncReporter(Builder builder, BytesEncoder encoder) { - this.pending = BoundedQueue.create(builder.queuedMaxSpans, builder.queuedMaxBytes); + this.pending = BoundedQueue.create(encoder, builder.sender, builder.metrics, + builder.messageMaxBytes, builder.queuedMaxSpans, builder.queuedMaxBytes); this.sender = builder.sender; this.messageMaxBytes = builder.messageMaxBytes; this.messageTimeoutNanos = builder.messageTimeoutNanos; @@ -222,24 +224,11 @@ void startFlusherThread() { // Lazy start so that reporters never used don't spawn threads if (started.compareAndSet(false, true)) startFlusherThread(); metrics.incrementSpans(1); - - if (pending instanceof UnsizedSpanConsumer) { - // enqueue now and filter our when we drain - final UnsizedSpanConsumer consumer = (UnsizedSpanConsumer)pending; - if (closed.get() || !consumer.offer(next)) { - metrics.incrementSpansDropped(1); - } - } else { - int nextSizeInBytes = encoder.sizeInBytes(next); - int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); - metrics.incrementSpanBytes(nextSizeInBytes); - if (closed.get() || - // don't enqueue something larger than we can drain - messageSizeOfNextSpan > messageMaxBytes || - !pending.offer(next, nextSizeInBytes)) { - metrics.incrementSpansDropped(1); - } - } + + // enqueue now and filter our when we drain + if (closed.get() || !pending.offer(next)) { + metrics.incrementSpansDropped(1); + } } @Override public void flush() { diff --git a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java index 648269af..a18425db 100644 --- a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java @@ -5,46 +5,60 @@ package zipkin2.reporter.internal; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ReporterMetrics; + /** * Multi-producer, multi-consumer queue that could be bounded by count or/and size. */ abstract class BoundedQueue implements SpanWithSizeConsumer { - static BoundedQueue create(int maxSize, int maxBytes) { - if (maxBytes > 0) { - return new ByteBoundedQueue(maxSize, maxBytes); - } else { - return new CountBoundedQueue(maxSize); - } + static BoundedQueue create(BytesEncoder encoder, BytesMessageSender sender, + ReporterMetrics metrics, int messageMaxBytes, int maxSize, int maxBytes) { + if (maxBytes > 0) { + return new ByteBoundedQueue(encoder, sender, metrics, messageMaxBytes, maxSize, maxBytes); + } else { + return new CountBoundedQueue(maxSize); } + } + + /** + * Max element's count of this bounded queue + */ + abstract int maxSize(); + + /** + * Max element'size of this bounded queue + */ + abstract int maxBytes(); + + /** + * Clear this bounded queue + */ + abstract int clear(); + + /** + * Element's count of this bounded queue + */ + abstract int count(); - /** - * Max element's count of this bounded queue - */ - abstract int maxSize(); - - /** - * Max element'size of this bounded queue - */ - abstract int maxBytes(); - - /** - * Clear this bounded queue - */ - abstract int clear(); - - /** - * Element's count of this bounded queue - */ - abstract int count(); - - /** - * Element's size of this bounded queue - */ - abstract int sizeInBytes(); - - /** - * Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear. - * Then, consume as many as possible. - */ - abstract int drainTo(SpanWithSizeConsumer bundler, long remainingNanos); + /** + * Element's size of this bounded queue + */ + abstract int sizeInBytes(); + + /** + * Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear. + * Then, consume as many as possible. + */ + abstract int drainTo(SpanWithSizeConsumer bundler, long remainingNanos); + + /** Returns true if the element could be added or false if it could not. */ + abstract boolean offer(S next); } + +interface SpanWithSizeConsumer { + /** Returns true if the element could be added or false if it could not due to its size. */ + boolean offer(S next, int nextSizeInBytes); +} + diff --git a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java index 39f21ccd..b75b4c11 100644 --- a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java @@ -7,13 +7,20 @@ import java.util.Arrays; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ReporterMetrics; /** * Multi-producer, multi-consumer queue that is bounded by both count and size. * *

This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation. */ -final class ByteBoundedQueue extends BoundedQueue implements SpanWithSizeConsumer { +final class ByteBoundedQueue extends BoundedQueue { + final BytesEncoder encoder; + final BytesMessageSender sender; + final ReporterMetrics metrics; + final int messageMaxBytes; final ReentrantLock lock = new ReentrantLock(false); final Condition available = lock.newCondition(); @@ -28,7 +35,13 @@ final class ByteBoundedQueue extends BoundedQueue implements SpanWithSizeC int writePos; int readPos; - @SuppressWarnings("unchecked") ByteBoundedQueue(int maxSize, int maxBytes) { + @SuppressWarnings("unchecked") ByteBoundedQueue(BytesEncoder encoder, + BytesMessageSender sender, ReporterMetrics metrics, int messageMaxBytes, int maxSize, + int maxBytes) { + this.encoder = encoder; + this.sender = sender; + this.metrics = metrics; + this.messageMaxBytes = messageMaxBytes; this.elements = (S[]) new Object[maxSize]; this.sizesInBytes = new int[maxSize]; this.maxSize = maxSize; @@ -80,6 +93,15 @@ final class ByteBoundedQueue extends BoundedQueue implements SpanWithSizeC } } + @Override boolean offer(S next) { + int nextSizeInBytes = encoder.sizeInBytes(next); + int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); + metrics.incrementSpanBytes(nextSizeInBytes); + // don't enqueue something larger than we can drain + if (messageSizeOfNextSpan > messageMaxBytes) return false; + return offer(next, nextSizeInBytes); + } + /** Clears the queue unconditionally and returns count of spans cleared. */ @Override int clear() { lock.lock(); @@ -115,7 +137,7 @@ int doDrain(SpanWithSizeConsumer consumer) { sizeInBytes -= drainedSizeInBytes; return drainedCount; } - + @Override int count() { return count; } @@ -127,14 +149,9 @@ int doDrain(SpanWithSizeConsumer consumer) { @Override int maxSize() { return maxSize; } - + @Override public int sizeInBytes() { return sizeInBytes; } } - -interface SpanWithSizeConsumer { - /** Returns true if the element could be added or false if it could not due to its size. */ - boolean offer(S next, int nextSizeInBytes); -} diff --git a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java index 5dde59fd..c6bfbbd9 100644 --- a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java @@ -13,7 +13,7 @@ * *

This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation. */ -final class CountBoundedQueue extends BoundedQueue implements UnsizedSpanConsumer { +final class CountBoundedQueue extends BoundedQueue { final ReentrantLock lock = new ReentrantLock(false); final Condition available = lock.newCondition(); @@ -33,7 +33,7 @@ final class CountBoundedQueue extends BoundedQueue implements UnsizedSpanC @Override public boolean offer(S next, int nextSizeInBytes) { return offer(next); } - + /** * Returns true if the element could be added or false if it could not due to its size. */ @@ -107,25 +107,20 @@ int doDrain(SpanWithSizeConsumer consumer) { count -= drainedCount; return drainedCount; } - + @Override int count() { return count; } - + @Override int maxBytes() { return 0; } - + @Override int maxSize() { return maxSize; } - + @Override int sizeInBytes() { return 0; } } - -interface UnsizedSpanConsumer { - /** Returns true if the element could be added or false if it could not due to its size. */ - boolean offer(S next); -} diff --git a/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java index 984a790c..2892b071 100644 --- a/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java @@ -11,7 +11,7 @@ import static org.assertj.core.api.Assertions.assertThat; class ByteBoundedQueueTest { - ByteBoundedQueue queue = new ByteBoundedQueue<>(10, 10); + ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, null, 10, 10, 10); @Test void offer_failsWhenFull_size() { for (int i = 0; i < queue.maxSize; i++) { @@ -40,7 +40,7 @@ class ByteBoundedQueueTest { } @Test void circular() { - ByteBoundedQueue queue = new ByteBoundedQueue<>(10, 10); + ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, null, 10, 10, 10); List polled = new ArrayList<>(); SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); From ea6bd61578cbb18f0ac0dc32487130a3bc318412 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 10 Apr 2024 19:22:04 -0400 Subject: [PATCH 5/8] Remove count(), maxBytes() and sizeInBytes() from BoundedQueue Signed-off-by: Andriy Redko --- .../reporter/internal/AsyncReporter.java | 8 +++---- .../reporter/internal/BoundedQueue.java | 17 +-------------- .../reporter/internal/ByteBoundedQueue.java | 17 ++++----------- .../reporter/internal/CountBoundedQueue.java | 21 +++++++------------ .../internal/ByteBoundedQueueTest.java | 6 ++++-- .../internal/CountBoundedQueueTest.java | 13 ++++-------- 6 files changed, 24 insertions(+), 58 deletions(-) diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index 0fd11503..4ebbc17d 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -84,7 +84,7 @@ public static final class Builder { this.messageTimeoutNanos = asyncReporter.messageTimeoutNanos; this.closeTimeoutNanos = asyncReporter.closeTimeoutNanos; this.queuedMaxSpans = asyncReporter.pending.maxSize(); - this.queuedMaxBytes = asyncReporter.pending.maxBytes(); + this.queuedMaxBytes = asyncReporter.queuedMaxBytes; } static int onePercentOfMemory() { @@ -184,6 +184,7 @@ static final class BoundedAsyncReporter extends AsyncReporter { final BytesEncoder encoder; final BoundedQueue pending; final BytesMessageSender sender; + final int queuedMaxBytes; final int messageMaxBytes; final long messageTimeoutNanos, closeTimeoutNanos; final CountDownLatch close; @@ -197,6 +198,7 @@ static final class BoundedAsyncReporter extends AsyncReporter { this.pending = BoundedQueue.create(encoder, builder.sender, builder.metrics, builder.messageMaxBytes, builder.queuedMaxSpans, builder.queuedMaxBytes); this.sender = builder.sender; + this.queuedMaxBytes = builder.queuedMaxBytes; this.messageMaxBytes = builder.messageMaxBytes; this.messageTimeoutNanos = builder.messageTimeoutNanos; this.closeTimeoutNanos = builder.closeTimeoutNanos; @@ -239,10 +241,6 @@ void startFlusherThread() { void flush(BufferNextMessage bundler) { pending.drainTo(bundler, bundler.remainingNanos()); - // record after flushing reduces the amount of gauge events vs on doing this on report - metrics.updateQueuedSpans(pending.count()); - metrics.updateQueuedBytes(pending.sizeInBytes()); - // loop around if we are running, and the bundle isn't full // if we are closed, try to send what's pending if (!bundler.isReady() && !closed.get()) return; diff --git a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java index a18425db..db67611e 100644 --- a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java @@ -18,7 +18,7 @@ static BoundedQueue create(BytesEncoder encoder, BytesMessageSender se if (maxBytes > 0) { return new ByteBoundedQueue(encoder, sender, metrics, messageMaxBytes, maxSize, maxBytes); } else { - return new CountBoundedQueue(maxSize); + return new CountBoundedQueue(metrics, maxSize); } } @@ -27,26 +27,11 @@ static BoundedQueue create(BytesEncoder encoder, BytesMessageSender se */ abstract int maxSize(); - /** - * Max element'size of this bounded queue - */ - abstract int maxBytes(); - /** * Clear this bounded queue */ abstract int clear(); - /** - * Element's count of this bounded queue - */ - abstract int count(); - - /** - * Element's size of this bounded queue - */ - abstract int sizeInBytes(); - /** * Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear. * Then, consume as many as possible. diff --git a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java index b75b4c11..9e875750 100644 --- a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java @@ -90,6 +90,10 @@ final class ByteBoundedQueue extends BoundedQueue { } } catch (InterruptedException e) { return 0; + } finally { + // record after draining reduces the amount of gauge events vs on doing this on report + metrics.updateQueuedSpans(count); + metrics.updateQueuedBytes(sizeInBytes); } } @@ -138,20 +142,7 @@ int doDrain(SpanWithSizeConsumer consumer) { return drainedCount; } - @Override int count() { - return count; - } - - @Override int maxBytes() { - return maxBytes; - } - @Override int maxSize() { return maxSize; } - - @Override - public int sizeInBytes() { - return sizeInBytes; - } } diff --git a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java index c6bfbbd9..3b756b14 100644 --- a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java @@ -8,6 +8,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import zipkin2.reporter.ReporterMetrics; + /** * Multi-producer, multi-consumer queue that is bounded by count. * @@ -17,6 +19,7 @@ final class CountBoundedQueue extends BoundedQueue { final ReentrantLock lock = new ReentrantLock(false); final Condition available = lock.newCondition(); + final ReporterMetrics metrics; final int maxSize; @@ -25,7 +28,8 @@ final class CountBoundedQueue extends BoundedQueue { int writePos; int readPos; - @SuppressWarnings("unchecked") CountBoundedQueue(int maxSize) { + @SuppressWarnings("unchecked") CountBoundedQueue(ReporterMetrics metrics, int maxSize) { + this.metrics = metrics; this.elements = (S[]) new Object[maxSize]; this.maxSize = maxSize; } @@ -73,6 +77,9 @@ final class CountBoundedQueue extends BoundedQueue { } } catch (InterruptedException e) { return 0; + } finally { + // record after draining reduces the amount of gauge events vs on doing this on report + metrics.updateQueuedSpans(count); } } @@ -108,19 +115,7 @@ int doDrain(SpanWithSizeConsumer consumer) { return drainedCount; } - @Override int count() { - return count; - } - - @Override int maxBytes() { - return 0; - } - @Override int maxSize() { return maxSize; } - - @Override int sizeInBytes() { - return 0; - } } diff --git a/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java index 2892b071..8cefef06 100644 --- a/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java @@ -8,10 +8,12 @@ import java.util.List; import org.junit.jupiter.api.Test; +import zipkin2.reporter.ReporterMetrics; + import static org.assertj.core.api.Assertions.assertThat; class ByteBoundedQueueTest { - ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, null, 10, 10, 10); + ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, ReporterMetrics.NOOP_METRICS, 10, 10, 10); @Test void offer_failsWhenFull_size() { for (int i = 0; i < queue.maxSize; i++) { @@ -40,7 +42,7 @@ class ByteBoundedQueueTest { } @Test void circular() { - ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, null, 10, 10, 10); + ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, ReporterMetrics.NOOP_METRICS, 10, 10, 10); List polled = new ArrayList<>(); SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); diff --git a/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java index b9fd6964..a02ea416 100644 --- a/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java @@ -8,10 +8,12 @@ import java.util.List; import org.junit.jupiter.api.Test; +import zipkin2.reporter.ReporterMetrics; + import static org.assertj.core.api.Assertions.assertThat; class CountBoundedQueueTest { - CountBoundedQueue queue = new CountBoundedQueue<>(10); + CountBoundedQueue queue = new CountBoundedQueue<>(ReporterMetrics.NOOP_METRICS, 10); @Test void offer_failsWhenFull_size() { for (int i = 0; i < queue.maxSize; i++) { @@ -27,15 +29,8 @@ class CountBoundedQueueTest { assertThat(queue.count).isEqualTo(10); } - @Test void offer_sizeInBytes() { - for (int i = 0; i < queue.maxSize; i++) { - queue.offer(new byte[1], 1); - } - assertThat(queue.sizeInBytes()).isEqualTo(0); - } - @Test void circular() { - CountBoundedQueue queue = new CountBoundedQueue<>(10); + CountBoundedQueue queue = new CountBoundedQueue<>(ReporterMetrics.NOOP_METRICS, 10); List polled = new ArrayList<>(); SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); From ee98443700a26caacdf600d3712ada1212f27a3e Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 11 Apr 2024 18:42:02 -0400 Subject: [PATCH 6/8] Fixed CountBoundedQueue to respect messageMaxBytes and report metrics propertly, added tests Signed-off-by: Andriy Redko --- .../reporter/internal/BoundedQueue.java | 2 +- .../reporter/internal/CountBoundedQueue.java | 21 ++- .../reporter/internal/AsyncReporterTest.java | 152 +++++++++++++----- .../internal/CountBoundedQueueTest.java | 20 ++- 4 files changed, 145 insertions(+), 50 deletions(-) diff --git a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java index db67611e..2ca40683 100644 --- a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java @@ -18,7 +18,7 @@ static BoundedQueue create(BytesEncoder encoder, BytesMessageSender se if (maxBytes > 0) { return new ByteBoundedQueue(encoder, sender, metrics, messageMaxBytes, maxSize, maxBytes); } else { - return new CountBoundedQueue(metrics, maxSize); + return new CountBoundedQueue(encoder, sender, metrics, messageMaxBytes, maxSize); } } diff --git a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java index 3b756b14..f5f44deb 100644 --- a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java @@ -8,6 +8,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ReporterMetrics; /** @@ -19,8 +21,11 @@ final class CountBoundedQueue extends BoundedQueue { final ReentrantLock lock = new ReentrantLock(false); final Condition available = lock.newCondition(); - final ReporterMetrics metrics; + final BytesEncoder encoder; + final BytesMessageSender sender; + final ReporterMetrics metrics; + final int messageMaxBytes; final int maxSize; final S[] elements; @@ -28,8 +33,12 @@ final class CountBoundedQueue extends BoundedQueue { int writePos; int readPos; - @SuppressWarnings("unchecked") CountBoundedQueue(ReporterMetrics metrics, int maxSize) { + @SuppressWarnings("unchecked") CountBoundedQueue(BytesEncoder encoder, + BytesMessageSender sender, ReporterMetrics metrics, int messageMaxBytes, int maxSize) { + this.encoder = encoder; + this.sender = sender; this.metrics = metrics; + this.messageMaxBytes = messageMaxBytes; this.elements = (S[]) new Object[maxSize]; this.maxSize = maxSize; } @@ -102,7 +111,13 @@ int doDrain(SpanWithSizeConsumer consumer) { S next = elements[readPos]; if (next == null) break; - if (consumer.offer(next, 0)) { + + int nextSizeInBytes = encoder.sizeInBytes(next); + int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); + metrics.incrementSpanBytes(nextSizeInBytes); + + if (messageSizeOfNextSpan > messageMaxBytes) break; + if (consumer.offer(next, nextSizeInBytes)) { drainedCount++; elements[readPos] = null; diff --git a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java index b132cbf1..5f47982d 100644 --- a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java @@ -16,8 +16,10 @@ import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + import zipkin2.Span; import zipkin2.TestObjects; import zipkin2.reporter.BytesEncoder; @@ -39,102 +41,127 @@ class AsyncReporterTest { Encoding.JSON.listSizeInBytes( Collections.singletonList(SpanBytesEncoder.JSON_V2.encode(span))); - AsyncReporter reporter; InMemoryReporterMetrics metrics = new InMemoryReporterMetrics(); - @AfterEach void close() { - if (reporter != null) reporter.close(); - } - - @Test void messageMaxBytes_defaultsToSender() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageMaxBytes_defaultsToSender(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size())) - .messageMaxBytes(sizeInBytesOfSingleSpanMessage)) + .messageMaxBytes(sizeInBytesOfSingleSpanMessage)) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); // drops reporter.flush(); + reporter.close(); assertThat(sentSpans.get()).isEqualTo(1); } - @Test void messageMaxBytes_dropsWhenOverqueuing() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageMaxBytes_dropsWhenOverqueuing(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size()))) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); // dropped the one that queued more than allowed bytes reporter.flush(); + reporter.close(); assertThat(sentSpans.get()).isEqualTo(1); } - @Test void messageMaxBytes_dropsWhenTooLarge() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageMaxBytes_dropsWhenTooLarge(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size()))) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span.toBuilder().addAnnotation(1L, "fooooo").build()); reporter.flush(); + reporter.close(); assertThat(sentSpans.get()).isEqualTo(0); } - @Test void queuedMaxSpans_dropsWhenOverqueuing() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void queuedMaxSpans_dropsWhenOverqueuing(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size()))) .queuedMaxSpans(1) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); // dropped the one that queued more than allowed count reporter.flush(); - + reporter.close(); + assertThat(sentSpans.get()).isEqualTo(1); } - @Test void report_incrementsMetrics() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void report_incrementsMetrics(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); + reporter.flush(); + reporter.close(); + assertThat(metrics.spans()).isEqualTo(2); assertThat(metrics.spanBytes()).isEqualTo(SpanBytesEncoder.JSON_V2.encode(span).length * 2); } - @Test void report_incrementsSpansDropped() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void report_incrementsSpansDropped(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .queuedMaxSpans(1) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); + reporter.flush(); + reporter.close(); assertThat(metrics.spans()).isEqualTo(2); assertThat(metrics.spansDropped()).isEqualTo(1); } - @Test void flush_incrementsMetrics() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void flush_incrementsMetrics(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -144,33 +171,40 @@ class AsyncReporterTest { reporter.flush(); assertThat(metrics.queuedSpans()).isEqualTo(1); // still one span in the backlog - assertThat(metrics.queuedBytes()).isEqualTo(SpanBytesEncoder.JSON_V2.encode(span).length); + assertThat(metrics.queuedBytes()).isEqualTo(queuedMaxBytes > 0 ? SpanBytesEncoder.JSON_V2.encode(span).length : 0); assertThat(metrics.messages()).isEqualTo(1); assertThat(metrics.messageBytes()).isEqualTo(sizeInBytesOfSingleSpanMessage); reporter.flush(); + reporter.close(); assertThat(metrics.queuedSpans()).isZero(); assertThat(metrics.queuedBytes()).isZero(); assertThat(metrics.messages()).isEqualTo(2); assertThat(metrics.messageBytes()).isEqualTo(sizeInBytesOfSingleSpanMessage * 2); } - @Test void flush_incrementsMessagesDropped() { - reporter = AsyncReporter.newBuilder(FakeSender.create() + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void flush_incrementsMessagesDropped(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { throw new RuntimeException(); })) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.flush(); + reporter.close(); assertThat(metrics.messagesDropped()).isEqualTo(1); } - @Test void flush_logsFirstErrorAsWarn() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void flush_logsFirstErrorAsWarn(int queuedMaxBytes) { List logRecords = new ArrayList<>(); Handler testHandler = new Handler() { @Override @@ -191,10 +225,11 @@ public void close() throws SecurityException { logger.addHandler(testHandler); logger.setLevel(Level.FINE); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { throw new RuntimeException(); })) + .queuedMaxBytes(queuedMaxBytes) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); @@ -211,13 +246,18 @@ public void close() throws SecurityException { assertThat(logRecords.get(2).getLevel()).isEqualTo(Level.FINE); assertThat(logRecords.get(2).getMessage()).contains("RuntimeException"); + + reporter.close(); } /** It can take up to the messageTimeout past the first span to send */ - @Test void messageTimeout_flushesWhenTimeoutExceeded() throws InterruptedException { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageTimeout_flushesWhenTimeoutExceeded(int queuedMaxBytes) throws InterruptedException { CountDownLatch sentSpans = new CountDownLatch(1); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.countDown())) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(10, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -226,12 +266,17 @@ public void close() throws SecurityException { .isFalse(); assertThat(sentSpans.await(10, TimeUnit.MILLISECONDS)) .isTrue(); + + reporter.close(); } - @Test void messageTimeout_disabled() throws InterruptedException { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException { CountDownLatch sentSpans = new CountDownLatch(1); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.countDown())) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.NANOSECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -241,11 +286,13 @@ public void close() throws SecurityException { // Since no threads started, the above lingers assertThat(sentSpans.await(10, TimeUnit.MILLISECONDS)) .isFalse(); + + reporter.close(); } @Test void senderThread_threadHasAPrettyName() throws InterruptedException { BlockingQueue threadName = new LinkedBlockingQueue<>(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> threadName.offer(Thread.currentThread().getName()))) .build(SpanBytesEncoder.JSON_V2); @@ -254,10 +301,12 @@ public void close() throws SecurityException { // check name is pretty assertThat(threadName.take()) .isEqualTo("AsyncReporter{FakeSender}"); + + reporter.close(); } @Test void close_close_stopsFlushThread() throws InterruptedException { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageTimeout(2, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -273,11 +322,13 @@ public void close() throws SecurityException { BoundedAsyncReporter impl = (BoundedAsyncReporter) reporter; assertThat(impl.close.await(3, TimeUnit.MILLISECONDS)) .isTrue(); + + reporter.close(); } @Test void flush_throwsOnClose() { assertThrows(IllegalStateException.class, () -> { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -289,7 +340,7 @@ public void close() throws SecurityException { } @Test void report_doesntThrowWhenClosed() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -310,7 +361,7 @@ public void close() throws SecurityException { }); @Test void senderThread_dropsOnSenderClose_flushThread() throws InterruptedException { - reporter = AsyncReporter.newBuilder(sleepingSender) + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) .metrics(metrics) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) .build(SpanBytesEncoder.JSON_V2); @@ -325,12 +376,14 @@ public void close() throws SecurityException { assertThat(metrics.messagesDropped()).isEqualTo(1); assertThat(metrics.messagesDroppedByCause().keySet().iterator().next()) .isEqualTo(ClosedSenderException.class); + + reporter.close(); } @Test void senderThread_dropsOnReporterClose_flushThread() throws InterruptedException { CountDownLatch received = new CountDownLatch(1); CountDownLatch sent = new CountDownLatch(1); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { received.countDown(); try { @@ -352,9 +405,12 @@ public void close() throws SecurityException { assertThat(metrics.spansDropped()).isEqualTo(1); } - @Test void blocksToClearPendingSpans() throws InterruptedException { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void blocksToClearPendingSpans(int queuedMaxBytes) throws InterruptedException { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(30, TimeUnit.SECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -367,8 +423,10 @@ public void close() throws SecurityException { assertThat(metrics.spansDropped()).isEqualTo(0); } - @Test void quitsBlockingWhenOverTimeout() throws InterruptedException { - reporter = AsyncReporter.newBuilder(FakeSender.create() + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void quitsBlockingWhenOverTimeout(int queuedMaxBytes) throws InterruptedException { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { // note: we don't yet have a hook to cancel a sender, so this will remain in-flight // eventhough we are unblocking close. A later close on sender usually will kill in-flight @@ -379,6 +437,7 @@ public void close() throws SecurityException { } })) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .closeTimeout(1, TimeUnit.NANOSECONDS) .messageTimeout(30, TimeUnit.SECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -391,10 +450,12 @@ public void close() throws SecurityException { reporter.close(); // close while there's a pending span assertThat(System.nanoTime() - start) .isLessThan(TimeUnit.MILLISECONDS.toNanos(10)); // give wiggle room + + reporter.close(); } @Test void flush_incrementsMetricsAndThrowsWhenClosed() { - reporter = AsyncReporter.newBuilder(sleepingSender) + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -411,7 +472,7 @@ public void close() throws SecurityException { } @Test void flush_incrementsMetricsAndThrowsWhenSenderClosed() { - reporter = AsyncReporter.newBuilder(sleepingSender) + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -425,12 +486,14 @@ public void close() throws SecurityException { } catch (IllegalStateException e) { assertThat(metrics.spansDropped()).isEqualTo(1); assertThat(metrics.messagesDropped()).isEqualTo(1); + } finally { + reporter.close(); } } @Test void build_threadFactory() { Thread thread = new Thread(); - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .threadFactory(r -> thread) .build(SpanBytesEncoder.JSON_V2); @@ -442,6 +505,7 @@ public void close() throws SecurityException { assertThat(thread.toString()).contains("AsyncReporter{FakeSender}"); assertThat(thread.isDaemon()).isTrue(); + reporter.close(); thread.interrupt(); } diff --git a/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java index a02ea416..36e15d2d 100644 --- a/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java @@ -8,12 +8,16 @@ import java.util.List; import org.junit.jupiter.api.Test; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.Encoding; +import zipkin2.reporter.FakeSender; import zipkin2.reporter.ReporterMetrics; +import zipkin2.reporter.SpanBytesEncoder; import static org.assertj.core.api.Assertions.assertThat; class CountBoundedQueueTest { - CountBoundedQueue queue = new CountBoundedQueue<>(ReporterMetrics.NOOP_METRICS, 10); + CountBoundedQueue queue = new CountBoundedQueue<>(null, null, ReporterMetrics.NOOP_METRICS, 10, 10); @Test void offer_failsWhenFull_size() { for (int i = 0; i < queue.maxSize; i++) { @@ -30,7 +34,19 @@ class CountBoundedQueueTest { } @Test void circular() { - CountBoundedQueue queue = new CountBoundedQueue<>(ReporterMetrics.NOOP_METRICS, 10); + CountBoundedQueue queue = new CountBoundedQueue<>(new BytesEncoder() { + @Override public Encoding encoding() { + throw new UnsupportedOperationException(); + } + + @Override public int sizeInBytes(Integer input) { + return 4; + } + + @Override public byte[] encode(Integer input) { + throw new UnsupportedOperationException(); + } + }, FakeSender.create(), ReporterMetrics.NOOP_METRICS, 10, 10); List polled = new ArrayList<>(); SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); From 9705256e96a42ed63c211ceaec1203f0b411abc4 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 11 Apr 2024 19:47:50 -0400 Subject: [PATCH 7/8] Fix CountBoundedQueue draining for oversized spans Signed-off-by: Andriy Redko --- .../reporter/internal/CountBoundedQueue.java | 8 ++++++-- .../reporter/internal/AsyncReporterTest.java | 20 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java index f5f44deb..85c35170 100644 --- a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java @@ -116,8 +116,12 @@ int doDrain(SpanWithSizeConsumer consumer) { int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); metrics.incrementSpanBytes(nextSizeInBytes); - if (messageSizeOfNextSpan > messageMaxBytes) break; - if (consumer.offer(next, nextSizeInBytes)) { + if (messageSizeOfNextSpan > messageMaxBytes) { + drainedCount++; + metrics.incrementSpansDropped(1); + elements[readPos] = null; // should have been dropped before + if (++readPos == elements.length) readPos = 0; // circle back to the front of the array + } else if (consumer.offer(next, nextSizeInBytes)) { drainedCount++; elements[readPos] = null; diff --git a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java index 5f47982d..ee6a16d7 100644 --- a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java @@ -154,6 +154,26 @@ void report_incrementsSpansDropped(int queuedMaxBytes) { assertThat(metrics.spans()).isEqualTo(2); assertThat(metrics.spansDropped()).isEqualTo(1); } + + + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void report_incrementsSpansDroppedOversizing(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) + .messageMaxBytes(1) + .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) + .messageTimeout(0, TimeUnit.MILLISECONDS) + .build(SpanBytesEncoder.JSON_V2); + + reporter.report(span); + reporter.report(span); + reporter.flush(); + reporter.close(); + + assertThat(metrics.spans()).isEqualTo(2); + assertThat(metrics.spansDropped()).isEqualTo(2); + } @ParameterizedTest(name = "queuedMaxBytes={0}") @ValueSource(ints = { 0, 1000000 }) From 4369adeaba490b12cd79468aed77d30772b92d2b Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 11 Apr 2024 20:47:06 -0400 Subject: [PATCH 8/8] Minor formatting fixes and code simplification Signed-off-by: Andriy Redko --- .../reporter/internal/CountBoundedQueue.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java index 85c35170..2127d5fe 100644 --- a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java @@ -117,18 +117,14 @@ int doDrain(SpanWithSizeConsumer consumer) { metrics.incrementSpanBytes(nextSizeInBytes); if (messageSizeOfNextSpan > messageMaxBytes) { - drainedCount++; - metrics.incrementSpansDropped(1); - elements[readPos] = null; // should have been dropped before - if (++readPos == elements.length) readPos = 0; // circle back to the front of the array - } else if (consumer.offer(next, nextSizeInBytes)) { - drainedCount++; - - elements[readPos] = null; - if (++readPos == elements.length) readPos = 0; // circle back to the front of the array - } else { + metrics.incrementSpansDropped(1); + } else if (!consumer.offer(next, nextSizeInBytes)) { break; } + + drainedCount++; + elements[readPos] = null; + if (++readPos == elements.length) readPos = 0; // circle back to the front of the array } count -= drainedCount; return drainedCount;