Skip to content

Commit

Permalink
KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (apach…
Browse files Browse the repository at this point in the history
…e#14935)

Part of KIP-714.

Add support to collect client instance id of the global consumer.

Reviewers: Walker Carlson <[email protected]>, Lucas Brutschy <[email protected]>
  • Loading branch information
mjsax authored Dec 8, 2023
1 parent e6e7d8c commit fb5d45d
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 17 deletions.
37 changes: 27 additions & 10 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.errors.ProcessorStateException;
Expand Down Expand Up @@ -1895,9 +1896,7 @@ public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout)
throw new IllegalStateException("KafkaStreams has been stopped (" + state + ").");
}

long remainingTimeMs = timeout.toMillis();
long startTimestampMs = time.milliseconds();

final Timer remainingTime = time.timer(timeout.toMillis());
final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl();

// (1) fan-out calls to threads
Expand All @@ -1911,13 +1910,15 @@ public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout)
}

// GlobalThread
KafkaFuture<Uuid> globalThreadFuture = null;
if (globalStreamThread != null) {
globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout);
}

// (2) get admin client instance id in a blocking fashion, while Stream/GlobalThreads work in parallel
try {
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
final long nowMs = time.milliseconds();
remainingTimeMs -= nowMs - startTimestampMs;
startTimestampMs = nowMs;
remainingTime.update(time.milliseconds());
} catch (final IllegalStateException telemetryDisabledError) {
// swallow
log.debug("Telemetry is disabled on the admin client.");
Expand All @@ -1933,15 +1934,13 @@ public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout)
for (final Map.Entry<String, KafkaFuture<Uuid>> consumerFuture : consumerFutures.entrySet()) {
final Uuid instanceId = getOrThrowException(
consumerFuture.getValue(),
remainingTimeMs,
remainingTime.remainingMs(),
() -> String.format(
"Could not retrieve consumer instance id for %s.",
consumerFuture.getKey()
)
);
final long nowMs = time.milliseconds();
remainingTimeMs -= nowMs - startTimestampMs;
startTimestampMs = nowMs;
remainingTime.update(time.milliseconds());

// could be `null` if telemetry is disabled on the consumer itself
if (instanceId != null) {
Expand All @@ -1957,6 +1956,24 @@ public synchronized ClientInstanceIds clientInstanceIds(final Duration timeout)
// (3b) collect producers from StreamsThread

// (3c) collect from GlobalThread
if (globalThreadFuture != null) {
final Uuid instanceId = getOrThrowException(
globalThreadFuture,
remainingTime.remainingMs(),
() -> "Could not retrieve global consumer client instance id."
);
remainingTime.update(time.milliseconds());

// could be `null` if telemetry is disabled on the client itself
if (instanceId != null) {
clientInstanceIds.addConsumerInstanceId(
globalStreamThread.getName(),
instanceId
);
} else {
log.debug("Telemetry is disabled for the global consumer.");
}
}

return clientInstanceIds;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
Expand Down Expand Up @@ -65,6 +69,8 @@ public class GlobalStreamThread extends Thread {
private final AtomicLong cacheSize;
private volatile StreamsException startupException;
private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();

/**
* The states that the global stream thread can be in
Expand Down Expand Up @@ -310,6 +316,32 @@ public void run() {
cache.resize(size);
}
stateConsumer.pollAndUpdate();

if (fetchDeadlineClientInstanceId != -1) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
// we pass in a timeout of zero, to just trigger the "get instance id" background RPC,
// we don't want to block the global thread that can do useful work in the meantime
clientInstanceIdFuture.complete(globalConsumer.clientInstanceId(Duration.ZERO));
fetchDeadlineClientInstanceId = -1;
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow the error,
// to allow returning a partial result for all other clients
clientInstanceIdFuture.complete(null);
fetchDeadlineClientInstanceId = -1;
} catch (final TimeoutException swallow) {
// swallow
} catch (final Exception error) {
clientInstanceIdFuture.completeExceptionally(error);
fetchDeadlineClientInstanceId = -1;
}
} else {
clientInstanceIdFuture.completeExceptionally(
new TimeoutException("Could not retrieve global consumer client instance id.")
);
fetchDeadlineClientInstanceId = -1;
}
}
}
} catch (final InvalidOffsetException recoverableException) {
wipeStateStore = true;
Expand Down Expand Up @@ -454,4 +486,24 @@ public void shutdown() {
public Map<MetricName, Metric> consumerMetrics() {
return Collections.unmodifiableMap(globalConsumer.metrics());
}

// this method is NOT thread-safe (we rely on the callee to be `synchronized`)
public KafkaFuture<Uuid> globalConsumerInstanceId(final Duration timeout) {
boolean setDeadline = false;

if (clientInstanceIdFuture.isDone()) {
if (clientInstanceIdFuture.isCompletedExceptionally()) {
clientInstanceIdFuture = new KafkaFutureImpl<>();
setDeadline = true;
}
} else {
setDeadline = true;
}

if (setDeadline) {
fetchDeadlineClientInstanceId = time.milliseconds() + timeout.toMillis();
}

return clientInstanceIdFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -737,22 +737,22 @@ void maybeGetClientInstanceIds() {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
maybeResetFetchDeadline();
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow the error,
// to allow returning a partial result for all other clients
mainConsumerInstanceIdFuture.complete(null);
maybeResetFetchDeadline();
} catch (final TimeoutException swallow) {
// swallow
} catch (final Exception error) {
mainConsumerInstanceIdFuture.completeExceptionally(error);
maybeResetFetchDeadline();
}
} else {
mainConsumerInstanceIdFuture.completeExceptionally(
new TimeoutException("Could not retrieve main consumer client instance id.")
);
}
}
maybeResetFetchDeadline();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1400,15 +1400,37 @@ public void shouldThrowTimeoutExceptionWhenMainConsumerFutureDoesNotComplete() {
}
}

@Test
public void shouldThrowTimeoutExceptionWhenGlobalConsumerFutureDoesNotComplete() {
adminClient.setClientInstanceId(Uuid.randomUuid());

final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
streams.start();

when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any()))
.thenReturn(new KafkaFutureImpl<>());

final TimeoutException timeoutException = assertThrows(
TimeoutException.class,
() -> streams.clientInstanceIds(Duration.ZERO)
);
assertThat(timeoutException.getCause(), instanceOf(java.util.concurrent.TimeoutException.class));
}
}

@Test
public void shouldCountDownTimeoutAcrossClient() {
adminClient.setClientInstanceId(Uuid.randomUuid());
adminClient.advanceTimeOnClientInstanceId(time, Duration.ofMillis(10L).toMillis());

final Time mockTime = time;
final AtomicLong expectedTimeout = new AtomicLong(20L);
final AtomicLong expectedTimeout = new AtomicLong(50L);
final AtomicBoolean didAssertThreadOne = new AtomicBoolean(false);
final AtomicBoolean didAssertThreadTwo = new AtomicBoolean(false);
final AtomicBoolean didAssertGlobalThread = new AtomicBoolean(false);

when(streamThreadOne.consumerClientInstanceIds(any()))
.thenReturn(Collections.singletonMap("consumer1", new KafkaFutureImpl<Uuid>() {
@Override
Expand All @@ -1430,14 +1452,29 @@ public Uuid get(final long timeout, final TimeUnit timeUnit) {
}
}));

final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");

try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
streams.start();
streams.clientInstanceIds(Duration.ofMillis(30L));

when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any()))
.thenReturn(new KafkaFutureImpl<Uuid>() {
@Override
public Uuid get(final long timeout, final TimeUnit timeUnit) {
didAssertGlobalThread.set(true);
assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-8L)));
mockTime.sleep(8L);
return null;
}
});

streams.clientInstanceIds(Duration.ofMillis(60L));
}

assertThat(didAssertThreadOne.get(), equalTo(true));
assertThat(didAssertThreadTwo.get(), equalTo(true));
assertThat(didAssertGlobalThread.get(), equalTo(true));
}

@Deprecated // testing old PAPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ public void shouldNotSetEnableMetricCollectionByDefault() {
}

@Test
public void shouldEnableMetricCollectionForAllInternalClients() {
public void shouldEnableMetricCollectionForAllInternalClientsByDefault() {
props.put(StreamsConfig.ENABLE_METRICS_PUSH_CONFIG, true);
final StreamsConfig streamsConfig = new StreamsConfig(props);

Expand Down
Loading

0 comments on commit fb5d45d

Please sign in to comment.