diff --git a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java index b4d5b3ec..dcfe39ff 100644 --- a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java +++ b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java @@ -21,7 +21,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.internal.ContextInternal; -import io.vertx.core.impl.TaskQueue; +import io.vertx.core.internal.EventExecutor; import io.vertx.core.internal.VertxInternal; import io.vertx.kafka.client.common.KafkaClientOptions; import io.vertx.kafka.client.common.tracing.ProducerTracer; @@ -33,6 +33,8 @@ import java.time.Duration; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; /** * Kafka write stream implementation @@ -46,14 +48,14 @@ public class KafkaWriteStreamImpl implements KafkaWriteStream { private Handler exceptionHandler; private final VertxInternal vertx; private final ProducerTracer tracer; - private final TaskQueue taskQueue; + private final Executor workerExec; public KafkaWriteStreamImpl(Vertx vertx, Producer producer, KafkaClientOptions options) { ContextInternal ctxInt = ((ContextInternal) vertx.getOrCreateContext()).unwrap(); this.producer = producer; this.vertx = (VertxInternal) vertx; this.tracer = ProducerTracer.create(ctxInt.tracer(), options); - this.taskQueue = new TaskQueue(); + this.workerExec = ((VertxInternal)vertx).createWorkerContext().executor(); } private int len(Object value) { @@ -72,8 +74,8 @@ public Future send(ProducerRecord record) { ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(ctx, record); int len = this.len(record.value()); this.pending += len; - return ctx.executeBlocking(() -> { - Promise prom = ctx.promise(); + Promise prom = ctx.promise(); + workerExec.execute(() -> { try { this.producer.send(record, (metadata, err) -> { @@ -129,9 +131,8 @@ public Future send(ProducerRecord record) { } prom.fail(e); } - return prom.future(); - }, taskQueue) - .compose(f -> f); + }); + return prom.future(); } @Override @@ -191,17 +192,15 @@ public KafkaWriteStreamImpl exceptionHandler(Handler handler) { @Override public Future> partitionsFor(String topic) { - ContextInternal ctx = vertx.getOrCreateContext(); - return ctx.executeBlocking(() -> this.producer.partitionsFor(topic), taskQueue); + return execute(() -> this.producer.partitionsFor(topic)); } @Override public Future flush() { - ContextInternal ctx = vertx.getOrCreateContext(); - return ctx.executeBlocking(() -> { + return execute(() -> { this.producer.flush(); return null; - }, taskQueue); + }); } @Override @@ -211,15 +210,28 @@ public Future close() { @Override public Future close(long timeout) { - ContextInternal ctx = vertx.getOrCreateContext(); - return ctx.executeBlocking(() -> { + return execute(() -> { if (timeout > 0) { this.producer.close(Duration.ofMillis(timeout)); } else { this.producer.close(); } return null; - }, taskQueue); + }); + } + + private Future execute(Callable callable) { + ContextInternal ctx = vertx.getOrCreateContext(); + return ctx.future(promise -> { + workerExec.execute(() -> { + try { + T res = callable.call(); + promise.complete(res); + } catch (Exception e) { + promise.fail(e); + } + }); + }); } @Override @@ -228,11 +240,10 @@ public Producer unwrap() { } Future executeBlocking(final BlockingStatement statement) { - ContextInternal ctx = vertx.getOrCreateContext(); - return ctx.executeBlocking(() -> { + return execute(() -> { statement.execute(); return null; - }, taskQueue); + }); } @FunctionalInterface