send(zipkin2.Span... spans) {
+ void sendSpans(zipkin2.Span... spans) {
SpanBytesEncoder bytesEncoder =
sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3;
- return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
+ sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
}
- @AfterEach
- void afterEachTest() throws IOException {
+ @AfterEach void afterEachTest() throws IOException {
server.close();
}
}
diff --git a/reporter/sender-sqs/pom.xml b/reporter/sender-sqs/pom.xml
index c4e782f3..4369f4e3 100644
--- a/reporter/sender-sqs/pom.xml
+++ b/reporter/sender-sqs/pom.xml
@@ -20,7 +20,7 @@
zipkin-aws-parent
io.zipkin.aws
- 1.1.2-SNAPSHOT
+ 1.2.0-SNAPSHOT
../../pom.xml
diff --git a/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java b/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java
index 675f9ef3..450619a1 100644
--- a/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java
+++ b/reporter/sender-sqs/src/main/java/zipkin2/reporter/sqs/SQSSender.java
@@ -15,26 +15,21 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
-import com.amazonaws.handlers.AsyncHandler;
-import com.amazonaws.services.sqs.AmazonSQSAsync;
-import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest;
-import com.amazonaws.services.sqs.model.SendMessageResult;
import com.amazonaws.util.Base64;
import java.nio.charset.Charset;
import java.util.List;
-import java.util.concurrent.Future;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageEncoder;
-import zipkin2.reporter.Call;
-import zipkin2.reporter.Callback;
-import zipkin2.reporter.CheckResult;
+import zipkin2.reporter.BytesMessageSender;
+import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Encoding;
-import zipkin2.reporter.Sender;
import zipkin2.reporter.internal.Nullable;
/**
- * Zipkin Sender implementation that sends spans to an SQS queue.
+ * Zipkin {@link BytesMessageSender} implementation that sends spans to an SQS queue.
*
* The {@link AsyncReporter} batches spans into a single message to improve throughput and lower
* API requests to SQS. Based on current service capabilities, a message will contain roughly 256KiB
@@ -42,7 +37,7 @@
*
*
This sends (usually TBinaryProtocol big-endian) encoded spans to an SQS queue.
*/
-public final class SQSSender extends Sender {
+public final class SQSSender extends BytesMessageSender.Base {
private static final Charset UTF_8 = Charset.forName("UTF-8");
public static SQSSender create(String url) {
@@ -127,40 +122,31 @@ public Builder toBuilder() {
@Nullable final AWSCredentialsProvider credentialsProvider;
@Nullable final EndpointConfiguration endpointConfiguration;
final int messageMaxBytes;
- final Encoding encoding;
SQSSender(Builder builder) {
+ super(builder.encoding);
this.queueUrl = builder.queueUrl;
this.credentialsProvider = builder.credentialsProvider;
this.endpointConfiguration = builder.endpointConfiguration;
this.messageMaxBytes = builder.messageMaxBytes;
- this.encoding = builder.encoding;
- }
-
- @Override public CheckResult check() {
- // TODO need to do something better here.
- return CheckResult.OK;
}
/** get and close are typically called from different threads */
- volatile AmazonSQSAsync asyncClient;
+ volatile AmazonSQS client;
volatile boolean closeCalled;
- AmazonSQSAsync get() {
- if (asyncClient == null) {
+ AmazonSQS get() {
+ if (client == null) {
synchronized (this) {
- if (asyncClient == null) {
- asyncClient = AmazonSQSAsyncClientBuilder.standard()
+ if (client == null) {
+ client = AmazonSQSClientBuilder.standard()
.withCredentials(credentialsProvider)
- .withEndpointConfiguration(endpointConfiguration).build();
+ .withEndpointConfiguration(endpointConfiguration)
+ .build();
}
}
}
- return asyncClient;
- }
-
- @Override public Encoding encoding() {
- return encoding;
+ return client;
}
@Override public int messageMaxBytes() {
@@ -172,27 +158,25 @@ AmazonSQSAsync get() {
return (listSize + 2) * 4 / 3; // account for base64 encoding
}
- @Override
- public Call sendSpans(List list) {
- if (closeCalled) throw new IllegalStateException("closed");
+ @Override public void send(List list) {
+ if (closeCalled) throw new ClosedSenderException();
byte[] encodedSpans = BytesMessageEncoder.forEncoding(encoding()).encode(list);
String body =
- encoding() == Encoding.JSON && isAscii(encodedSpans)
- ? new String(encodedSpans, UTF_8)
+ encoding() == Encoding.JSON && isAscii(encodedSpans) ? new String(encodedSpans, UTF_8)
: Base64.encodeAsString(encodedSpans);
- return new SQSCall(new SendMessageRequest(queueUrl, body));
+ get().sendMessage(new SendMessageRequest(queueUrl, body));
}
@Override public synchronized void close() {
if (closeCalled) return;
- AmazonSQSAsync asyncClient = this.asyncClient;
- if (asyncClient != null) asyncClient.shutdown();
+ AmazonSQS client = this.client;
+ if (client != null) client.shutdown();
closeCalled = true;
}
- @Override public final String toString() {
+ @Override public String toString() {
return "SQSSender{queueUrl=" + queueUrl + "}";
}
@@ -204,55 +188,4 @@ static boolean isAscii(byte[] encodedSpans) {
}
return true;
}
-
- class SQSCall extends Call.Base {
- private final SendMessageRequest message;
- volatile Future future;
-
- SQSCall(SendMessageRequest message) {
- this.message = message;
- }
-
- @Override
- protected Void doExecute() {
- get().sendMessage(message);
- return null;
- }
-
- @Override protected void doEnqueue(Callback callback) {
- future = get().sendMessageAsync(message, new AsyncHandlerAdapter(callback));
- if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans");
- }
-
- @Override protected void doCancel() {
- Future maybeFuture = future;
- if (maybeFuture != null) maybeFuture.cancel(true);
- }
-
- @Override protected boolean doIsCanceled() {
- Future maybeFuture = future;
- return maybeFuture != null && maybeFuture.isCancelled();
- }
-
- @Override public Call clone() {
- return new SQSCall(message.clone());
- }
- }
-
- static final class AsyncHandlerAdapter
- implements AsyncHandler {
- final Callback callback;
-
- AsyncHandlerAdapter(Callback callback) {
- this.callback = callback;
- }
-
- @Override public void onError(Exception e) {
- callback.onError(e);
- }
-
- @Override public void onSuccess(SendMessageRequest request, SendMessageResult result) {
- callback.onSuccess(null);
- }
- }
}
diff --git a/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java b/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java
index b6a25a7e..b4bf7963 100644
--- a/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java
+++ b/reporter/sender-sqs/src/test/java/zipkin2/reporter/sqs/SQSSenderTest.java
@@ -17,17 +17,12 @@
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import zipkin2.Span;
import zipkin2.junit.aws.AmazonSQSExtension;
-import zipkin2.reporter.Call;
-import zipkin2.reporter.Callback;
-import zipkin2.reporter.CheckResult;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.SpanBytesEncoder;
@@ -36,13 +31,11 @@
import static zipkin2.TestObjects.CLIENT_SPAN;
class SQSSenderTest {
- @RegisterExtension
- AmazonSQSExtension sqs = new AmazonSQSExtension();
+ @RegisterExtension AmazonSQSExtension sqs = new AmazonSQSExtension();
private SQSSender sender;
- @BeforeEach
- public void setup() {
+ @BeforeEach void setup() {
sender =
SQSSender.newBuilder()
.queueUrl(sqs.queueUrl())
@@ -52,63 +45,38 @@ public void setup() {
.build();
}
- @Test
- void sendsSpans() throws Exception {
- send(CLIENT_SPAN, CLIENT_SPAN).execute();
+ @Test void send() {
+ sendSpans(CLIENT_SPAN, CLIENT_SPAN);
assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN);
}
- @Test
- void sendsSpans_json_unicode() throws Exception {
+ @Test void send_empty() {
+ sendSpans();
+
+ assertThat(readSpans()).isEmpty();
+ }
+
+ @Test void send_json_unicode() {
Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build();
- send(unicode).execute();
+ sendSpans(unicode);
assertThat(readSpans()).containsExactly(unicode);
}
- @Test
- void sendsSpans_PROTO3() throws Exception {
+ @Test void send_PROTO3() {
sender.close();
sender = sender.toBuilder().encoding(Encoding.PROTO3).build();
- send(CLIENT_SPAN, CLIENT_SPAN).execute();
+ sendSpans(CLIENT_SPAN, CLIENT_SPAN);
assertThat(readSpans()).containsExactly(CLIENT_SPAN, CLIENT_SPAN);
}
- @Test
- void outOfBandCancel() throws Exception {
- SQSSender.SQSCall call = (SQSSender.SQSCall) send(CLIENT_SPAN, CLIENT_SPAN);
- assertThat(call.isCanceled()).isFalse(); // sanity check
-
- CountDownLatch latch = new CountDownLatch(1);
- call.enqueue(new Callback<>() {
- @Override
- public void onSuccess(Void aVoid) {
- call.future.cancel(true);
- latch.countDown();
- }
-
- @Override
- public void onError(Throwable throwable) {
- latch.countDown();
- }
- });
-
- latch.await(5, TimeUnit.SECONDS);
- assertThat(call.isCanceled()).isTrue();
- }
-
- @Test
- void checkOk() {
- assertThat(sender.check()).isEqualTo(CheckResult.OK);
- }
-
- Call send(Span... spans) {
+ void sendSpans(Span... spans) {
SpanBytesEncoder bytesEncoder =
sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3;
- return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
+ sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
}
List readSpans() {
diff --git a/storage/xray-udp/pom.xml b/storage/xray-udp/pom.xml
index f687037d..a0db7fad 100644
--- a/storage/xray-udp/pom.xml
+++ b/storage/xray-udp/pom.xml
@@ -18,7 +18,7 @@
zipkin-aws-parent
io.zipkin.aws
- 1.1.2-SNAPSHOT
+ 1.2.0-SNAPSHOT
../../pom.xml
4.0.0