Skip to content

Commit

Permalink
Fix usage of TaskQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Oct 4, 2024
1 parent fbe6f1e commit 5737a69
Showing 1 changed file with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.internal.VertxInternal;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.tracing.ProducerTracer;
Expand All @@ -33,6 +33,8 @@

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

/**
* Kafka write stream implementation
Expand All @@ -46,14 +48,14 @@ public class KafkaWriteStreamImpl<K, V> implements KafkaWriteStream<K, V> {
private Handler<Throwable> exceptionHandler;
private final VertxInternal vertx;
private final ProducerTracer tracer;
private final TaskQueue taskQueue;
private final Executor workerExec;

public KafkaWriteStreamImpl(Vertx vertx, Producer<K, V> producer, KafkaClientOptions options) {
ContextInternal ctxInt = ((ContextInternal) vertx.getOrCreateContext()).unwrap();
this.producer = producer;
this.vertx = (VertxInternal) vertx;
this.tracer = ProducerTracer.create(ctxInt.tracer(), options);
this.taskQueue = new TaskQueue();
this.workerExec = ((VertxInternal)vertx).createWorkerContext().executor();
}

private int len(Object value) {
Expand All @@ -72,8 +74,8 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(ctx, record);
int len = this.len(record.value());
this.pending += len;
return ctx.executeBlocking(() -> {
Promise<RecordMetadata> prom = ctx.promise();
Promise<RecordMetadata> prom = ctx.promise();
workerExec.execute(() -> {
try {
this.producer.send(record, (metadata, err) -> {

Expand Down Expand Up @@ -129,9 +131,8 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
}
prom.fail(e);
}
return prom.future();
}, taskQueue)
.compose(f -> f);
});
return prom.future();
}

@Override
Expand Down Expand Up @@ -191,17 +192,15 @@ public KafkaWriteStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {

@Override
public Future<List<PartitionInfo>> partitionsFor(String topic) {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.executeBlocking(() -> this.producer.partitionsFor(topic), taskQueue);
return execute(() -> this.producer.partitionsFor(topic));
}

@Override
public Future<Void> flush() {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.executeBlocking(() -> {
return execute(() -> {
this.producer.flush();
return null;
}, taskQueue);
});
}

@Override
Expand All @@ -211,15 +210,28 @@ public Future<Void> close() {

@Override
public Future<Void> close(long timeout) {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.executeBlocking(() -> {
return execute(() -> {
if (timeout > 0) {
this.producer.close(Duration.ofMillis(timeout));
} else {
this.producer.close();
}
return null;
}, taskQueue);
});
}

private <T> Future<T> execute(Callable<T> callable) {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.future(promise -> {
workerExec.execute(() -> {
try {
T res = callable.call();
promise.complete(res);
} catch (Exception e) {
promise.fail(e);
}
});
});
}

@Override
Expand All @@ -228,11 +240,10 @@ public Producer<K, V> unwrap() {
}

Future<Void> executeBlocking(final BlockingStatement statement) {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.executeBlocking(() -> {
return execute(() -> {
statement.execute();
return null;
}, taskQueue);
});
}

@FunctionalInterface
Expand Down

0 comments on commit 5737a69

Please sign in to comment.