Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(all): merge apache kafka 3.8.0 771b957 #1813

Merged
merged 129 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
129 commits
Select commit Hold shift + click to select a range
59ba28f
KAFKA-16860; [1/2] Introduce group.version feature flag (#16120)
dajac May 31, 2024
5257451
KAFKA-16860; [2/2] Introduce group.version feature flag (#16149)
dajac May 31, 2024
92ed1ed
KAFKA-16864; Optimize uniform (homogenous) assignor (#16088)
dajac May 31, 2024
a39f3ec
KAFKA-16639 Ensure HeartbeatRequestManager generates leave request re…
frankvicky May 31, 2024
34f5d5b
KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 (#15…
cmccabe Jun 1, 2024
dc5a22b
KAFKA-16807 DescribeLogDirsResponseData#results#topics have unexpecte…
m1a2st Jun 2, 2024
495ec16
KAFKA-16881: InitialState type leaks into the Connect REST API OpenAP…
m1a2st Jun 3, 2024
1b11cf0
MINOR: Small refactor in TargetAssignmentBuilder (#16174)
dajac Jun 3, 2024
9c72048
KAFKA-16861: Don't convert to group to classic if the size is larger …
frankvicky Jun 3, 2024
cd52f33
KAFKA-16105: Reset read offsets when seeking to beginning in TBRLMM (…
AnatolyPopov Jun 3, 2024
961c28a
MINOR: Fix type in MetadataVersion.IBP_4_0_IV0 (#16181)
dajac Jun 4, 2024
0aa0a01
KAFKA-16664; Re-add EventAccumulator.poll(long, TimeUnit) (#16144)
jeffkbkim Jun 4, 2024
c295fef
KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProduc…
edoardocomar Jun 4, 2024
7404fdf
KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors,…
C0urante Jun 4, 2024
15ab07a
MINOR: Log time taken to compute the target assignment (#16185)
dajac Jun 4, 2024
ebc68f0
KAFKA-16821; Member Subscription Spec Interface (#16068)
rreddy-22 Jun 4, 2024
be15aa4
KAFKA-16583: Handle PartitionChangeRecord without directory IDs (#16118)
soarez Jun 4, 2024
b4f17a0
KAFKA-16525; Dynamic KRaft network manager and channel (#15986)
jsancio Jun 3, 2024
ec278a6
MINOR: Fix return tag on Javadocs for consumer group-related Admin me…
C0urante Jun 4, 2024
0e94509
KAFKA-15305 The background thread should try to process the remaining…
frankvicky Jun 4, 2024
bf0ca84
MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure c…
dongnuo123 Jun 5, 2024
eabb07b
KAFKA-16888 Fix failed StorageToolTest.testFormatSucceedsIfAllDirecto…
brandboat Jun 4, 2024
fe7ebf0
KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from re…
abhijeetk88 Jun 5, 2024
c1accdb
KAFKA-16858: Throw DataException from validateValue on array and map …
gharris1727 Jun 5, 2024
dd6d6f4
KAFKA-16573: Specify node and store where serdes are needed (#15790)
AyoubOm Jun 5, 2024
1f6c5dc
KAFKA-12317: Update FK-left-join documentation (#15689)
florin-akermann Jun 5, 2024
041f651
KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handl…
apourchet Jun 4, 2024
e82731f
KAFKA-15045: (KIP-924 pt. 17) State store computation fixed (#16194)
apourchet Jun 5, 2024
04f7ed4
KAFKA-16814 KRaft broker cannot startup when `partition.metadata` is …
brandboat Jun 4, 2024
25ca963
KAFKA-16530: Fix high-watermark calculation to not assume the leader …
ahuang98 Jun 6, 2024
1b0edf4
KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` …
dajac Jun 6, 2024
5d4e426
KAFKA-16903: Consider produce error of different task (#16222)
cadonna Jun 6, 2024
c60b618
KAFKA-15045: (KIP-924 pt. 19) Update to new AssignmentConfigs (#16219)
apourchet Jun 6, 2024
491a079
KAFKA-16786: Remove old assignment strategy usage in new consumer (#1…
lianetm Jun 6, 2024
fc267f4
KAFKA-16200: Enforce that RequestManager implementations respect user…
kirktrue Jun 7, 2024
2ab6a36
KAFKA-16606 Gate JBOD configuration on 3.7-IV2 (#15834)
soarez Jun 7, 2024
0b2829b
KAFKA-16493: Avoid unneeded subscription regex check if metadata vers…
Phuc-Hong-Tran Jun 7, 2024
85fc07f
KAFKA-16911: Update docs for KIP-862 (#16246)
JimGalasyn Jun 7, 2024
b65a79e
MINOR: update all-latency-max, range-latency-avg|max and add prefix-s…
sebastienviale Jun 7, 2024
3874560
MINOR: Update docs for for KIP-671 (#16247)
JimGalasyn Jun 7, 2024
7879f1c
KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs
cmccabe Jun 4, 2024
a8316f4
MINOR: Fix broken ReassignPartitionsCommandTest test (#16251)
soarez Jun 8, 2024
d5c9381
KAFKA-16886: Detect replica demotion in AssignmentsManager (#16232)
soarez Jun 8, 2024
81196bc
KAFKA-15045: (KIP-924 pt. 18) Better assignment testing (#16201)
apourchet Jun 9, 2024
b0333f2
KAFKA-15045: (KIP-924 pt. 20) Custom task assignment configuration fi…
apourchet Jun 9, 2024
db3bf4a
KAFKA-14509; [4/4] Handle includeAuthorizedOperations in ConsumerGrou…
riedelmax Jun 10, 2024
a2760e0
MINOR: Rename uniform assignor's internal builders (#16233)
dajac Jun 10, 2024
deeb847
modifying Readme for Docker official images (#16226)
KrishVora01 Jun 10, 2024
113baae
[MINOR] KAFKA-16373: KIP-1028: Modifying dockerfile comments (#16261)
KrishVora01 Jun 10, 2024
facbab2
KAFKA-9228: Restart tasks on runtime-only connector config changes (#…
C0urante Jun 10, 2024
68d92a5
KAFKA-16866 Used the right constant in RemoteLogManagerTest#testFetch…
kamalcph May 31, 2024
c6f0db3
KAFKA-16785 Migrate TopicBasedRemoteLogMetadataManagerRestartTest to …
brandboat Jun 2, 2024
2273e06
MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadata…
brandboat Jun 2, 2024
f9c3703
KAFKA-16859 Cleanup check if tiered storage is enabled (#16153)
m1a2st Jun 3, 2024
e4a3da6
KAFKA-16852 Adding two thread pools kafka-16852 (#16154)
muralibasani Jun 3, 2024
944f469
KAFKA-16880 Update equals and hashcode methods for two attributes (#…
muralibasani Jun 4, 2024
69158f6
KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to ClusterInstance …
kamalcph Jun 4, 2024
b6848d6
KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure Delay…
kamalcph Jun 5, 2024
025e791
MINOR: Cleanup the storage module unit tests (#16202)
kamalcph Jun 6, 2024
9460e6b
KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199)
muralibasani Jun 6, 2024
781b93b
KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageS…
chiacyu Jun 9, 2024
d94a28b
KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamic…
kamalcph Jun 10, 2024
e75cc45
KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static ass…
KrishVora01 Jun 11, 2024
15db823
KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#…
KrishVora01 Jun 11, 2024
bcd95f6
KAFKA-16904: Metric to measure the latency of remote read requests (#…
kamalcph Jun 11, 2024
46d7e44
KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when on…
dajac Jun 11, 2024
5c13a6c
MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureCo…
C0urante Jun 11, 2024
444e5de
KAFKA-15045: (KIP-924 pt. 21) UUID to ProcessId migration (#16269)
apourchet Jun 11, 2024
520fbb4
MINOR: Wait for embedded clusters to start before using them in Conne…
C0urante Jun 11, 2024
7c30eed
KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (#1…
ocadaruma Jun 6, 2024
0b4fcbb
KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remot…
abhijeetk88 Jun 12, 2024
77a6fe9
KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and oth…
apourchet Jun 12, 2024
8de153e
KAFKA-10199: Enable state updater by default (#16107)
cadonna Jun 12, 2024
6016b15
KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)
dajac Jun 12, 2024
5fd9bd1
KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas (#16…
abhijeetk88 Jun 12, 2024
72e72e3
KAFKA-16865: Add IncludeTopicAuthorizedOperations option for Describe…
tinaselenge Jun 12, 2024
0e7134c
KAFKA-16570 FenceProducers API returns "unexpected error" when succes…
edoardocomar Jun 12, 2024
62fb6a3
KAFKA-8206: KIP-899: Allow client to rebootstrap (#13277)
ivanyu Jun 12, 2024
87264e6
KAFKA-15045: (KIP-924 pt. 23) More TaskAssignmentUtils tests (#16292)
apourchet Jun 12, 2024
d7fe53f
MINOR: Remove Java 7 example code (#16308)
JimGalasyn Jun 12, 2024
1b1821d
KAFKA-16935: Automatically wait for cluster startup in embedded Conne…
C0urante Jun 13, 2024
91bd1ba
KAFKA-16890: Compute valid log-start-offset when deleting overlapping…
kamalcph Jun 12, 2024
15b6235
MINOR: Add readiness check for connector and separate Kafka cluster i…
C0urante Jun 13, 2024
4b4a591
KAFKA-15045: (KIP-924 pt. 24) internal TaskAssignor rename to LegacyT…
apourchet Jun 13, 2024
c05c5a9
MINOR: Make online downgrade failure logs less noisy and update the t…
dongnuo123 Jun 13, 2024
562b800
KAFKA-15045: (KIP-924 pt. 25) Rename old internal StickyTaskAssignor …
ableegoldman Jun 13, 2024
6330447
MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
mjsax Jun 13, 2024
3e9fe3a
MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
mjsax Jun 13, 2024
8940507
KAFKA-15045: (KIP-924 pt. 26) default standby task assignment nit (#1…
apourchet Jun 14, 2024
af4ccc5
KAFKA-16948: Reset tier lag metrics on becoming follower (#16321)
kamalcph Jun 14, 2024
34cf02c
KAFKA-16933: New consumer unsubscribe close commit fixes (#16272)
lianetm Jun 14, 2024
ee9c1c1
MINOR: Add integration tag to AdminFenceProducersIntegrationTest (#16…
edoardocomar Jun 13, 2024
dd24c92
KAFKA-16946: Utils.getHost/getPort cannot parse SASL_PLAINTEXT://host…
frankvicky Jun 14, 2024
7435dfa
MINOR: update Kafka Streams docs with 3.4 KIP information (#16336)
mjsax Jun 14, 2024
1e83351
KAFKA-16637 AsyncKafkaConsumer removes offset fetch responses from ca…
kirktrue Jun 15, 2024
c29260f
Revert "KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs"
cmccabe Jun 15, 2024
2c27977
Revert new configurations from KAFKA-16525; Dynamic KRaft network man…
cmccabe Jun 15, 2024
f2e471f
KAFKA-16932: Add documentation for the native docker image (#16338)
kagarwal06 Jun 16, 2024
99b33e2
[MINOR] Add a note for JBOD support for tiered storage (#16369)
showuon Jun 17, 2024
f2e99f3
KAFKA-16969: Log error if config conficts with MV (#16366)
soarez Jun 18, 2024
4ff8e16
KAFKA-15265: Reapply dynamic remote configs after broker restart (#16…
kamalcph Jun 18, 2024
86abaf2
KAFKA-16954: fix consumer close to release assignment in background (…
lianetm Jun 18, 2024
b682e4d
MINOR: update documentation link to 3.8 (#16382)
jlprat Jun 18, 2024
6669c30
MINOR: Fix doc for zookeeper.ssl.client.enable (#16374)
mimaison Jun 18, 2024
3f605fd
KAFKA-16988: add 1 more node for test_exactly_once_source system test…
showuon Jun 18, 2024
0225c49
KAFKA-15302: Stale value returned when using store.all() with key del…
jinyongchoi Jun 19, 2024
45027f3
KAFKA-15774: use the default dsl store supplier for fkj subscriptions…
rodesai Jun 19, 2024
26763c5
MINOR: use 2 logdirs in ZK migration system tests (#15394)
gaurav-narula Jun 18, 2024
35b34a8
Revert "KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’…
jolshan Jun 21, 2024
5edab42
MINOR: Update 3.8 documentation for Kafka Streams (#16265)
lucasbru Jun 26, 2024
31a9c70
KAFKA-17050: Revert group.version from 3.8 (#16478)
jolshan Jun 28, 2024
b20a735
Revert "KAFKA-16154" and mark MV 3.8-IV0 as the latest production (#…
jlprat Jul 2, 2024
2fbe32e
MINOR: Update dependency numbers in LICENSE file (#16514)
jlprat Jul 3, 2024
c8f88ed
KAFKA-17083: Update LATEST_STABLE_METADATA_VERSION in system tests (#…
soarez Jul 5, 2024
1dd9385
KAFKA-10199: Close pending active tasks to init on partitions lost (#…
cadonna Jul 8, 2024
1dd16c4
MINOR: Generate javadocs on all source files for streams:test-utils (…
vinnybod Jul 9, 2024
4ecbb75
KAFKA-17085: Handle tasks in state updater before tasks in task regis…
cadonna Jul 10, 2024
1d803c4
KAFKA-17111: explicitly register Afterburner module in JsonSerializer…
vbalani002 Jul 11, 2024
7d9b8e4
KAFKA-17098: Re-add task to state updater if transit to RUNNING fails…
cadonna Jul 11, 2024
21464d5
KAFKA-16905: Fix blocking DescribeCluster call in AdminClient Describ…
apoorvmittal10 Jun 6, 2024
f05e167
KAFKA-16916: Fixing error in completing future (#16249)
apoorvmittal10 Jun 8, 2024
bd29da9
KAFKA-17150: Use Utils.loadClass instead of Class.forName to resolve …
gharris1727 Jul 17, 2024
0172280
KAFKA-17148: Remove print MetaPropertiesEnsemble from kafka-storage t…
gharris1727 Jul 18, 2024
29e7796
KAFKA-17142 Fix deadlock caused by LogManagerTest#testLogRecoveryMetr…
FrankYang0529 Jul 18, 2024
7495337
KAFKA-17166 Use NoOpScheduler to rewrite LogManagerTest#testLogRecove…
FrankYang0529 Jul 22, 2024
771b957
Bump version to 3.8.0
jlprat Jul 23, 2024
d22931a
feat(all): merge apache kafka 3.8.0 771b957
superhx Aug 15, 2024
baafb4a
fix(all): fix merge compile error
superhx Aug 15, 2024
cf8ef53
fix(merge): fix storage auto format
superhx Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
70 changes: 0 additions & 70 deletions automq-shell/src/main/java/com/automq/shell/util/CLIUtils.java

This file was deleted.

This file was deleted.

67 changes: 63 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ project(':core') {
api libs.scalaLibrary

implementation project(':server-common')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':group-coordinator')
implementation project(':transaction-coordinator')
implementation project(':metadata')
Expand Down Expand Up @@ -1367,6 +1368,66 @@ project(':metadata') {
}
}

project(':group-coordinator:group-coordinator-api') {
base {
archivesName = "kafka-group-coordinator-api"
}

dependencies {
implementation project(':clients')
}

task createVersionFile() {
def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
inputs.property "commitId", commitId
inputs.property "version", version
outputs.file receiptFile

doLast {
def data = [
commitId: commitId,
version: version,
]

receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}

sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}

jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}

clean.doFirst {
delete "$buildDir/kafka/"
}

javadoc {
include "**/org/apache/kafka/coordinator/group/api/**"
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-group-coordinator.xml")
}
}

project(':group-coordinator') {
base {
archivesName = "kafka-group-coordinator"
Expand All @@ -1380,6 +1441,7 @@ project(':group-coordinator') {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':storage')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
Expand Down Expand Up @@ -2597,10 +2659,6 @@ project(':streams:test-utils') {
testRuntimeOnly libs.slf4jlog4j
}

javadoc {
include "**/org/apache/kafka/streams/test/**"
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.runtimeClasspath) {
exclude('kafka-streams*')
Expand Down Expand Up @@ -3023,6 +3081,7 @@ project(':jmh-benchmarks') {
implementation project(':raft')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')
implementation project(':storage')
implementation project(':streams')
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->
<disallow class="com.yammer.metrics.Metrics" />
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@
files="StreamThread.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>

<suppress checks="CyclomaticComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>
Expand All @@ -211,7 +211,7 @@
files="StreamsMetricsImpl.java"/>

<suppress checks="NPathComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig|KTableKTableOuterJoin).java"/>
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|TaskAssignmentUtils|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig|KTableKTableOuterJoin).java"/>

<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
throttleTimeSensor,
logContext,
hostResolver,
clientTelemetrySender);
clientTelemetrySender,
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ public class CommonClientConfigs {
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " +
"This configuration is used as the default timeout for all client operations that do not specify a <code>timeout</code> parameter.";

public static final String METADATA_RECOVERY_STRATEGY_CONFIG = "metadata.recovery.strategy";
public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how the client recovers when none of the brokers known to it is available. " +
"If set to <code>none</code>, the client fails. If set to <code>rebootstrap</code>, " +
"the client repeats the bootstrap process using <code>bootstrap.servers</code>. " +
"Rebootstrapping is useful when a client communicates with brokers so infrequently " +
"that the set of brokers may change entirely before the client refreshes metadata. " +
"Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. " +
"Brokers appear unavailable when disconnected and no current retry attempt is in-progress. " +
"Consider increasing <code>reconnect.backoff.ms</code> and <code>reconnect.backoff.max.ms</code> and " +
"decreasing <code>socket.connection.setup.timeout.ms</code> and <code>socket.connection.setup.timeout.max.ms</code> " +
"for the client.";
public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = MetadataRecoveryStrategy.NONE.name;

/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public interface KafkaClient extends Closeable {
* @param now The current time in ms
* @return The node with the fewest in-flight requests.
*/
Node leastLoadedNode(long now);
LeastLoadedNode leastLoadedNode(long now);

/**
* The number of currently in-flight requests for which we have not yet returned a response
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.Node;

public class LeastLoadedNode {
private final Node node;
private final boolean atLeastOneConnectionReady;

public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
this.node = node;
this.atLeastOneConnectionReady = atLeastOneConnectionReady;
}

public Node node() {
return node;
}

/**
* Indicates if the least loaded node is available or at least a ready connection exists.
*
* <p>There may be no node available while ready connections to live nodes exist. This may happen when
* the connections are overloaded with in-flight requests. This function takes this into account.
*/
public boolean hasNodeAvailableOrConnectionReady() {
return node != null || atLeastOneConnectionReady;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class Metadata implements Closeable {
private final ClusterResourceListeners clusterResourceListeners;
private boolean isClosed;
private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
/** Addresses with which the metadata was originally bootstrapped. */
private List<InetSocketAddress> bootstrapAddresses;

/**
* Create a new Metadata instance
Expand Down Expand Up @@ -304,6 +306,12 @@ public synchronized void bootstrap(List<InetSocketAddress> addresses) {
this.needFullUpdate = true;
this.updateVersion += 1;
this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
this.bootstrapAddresses = addresses;
}

public synchronized void rebootstrap() {
log.info("Rebootstrapping with {}", this.bootstrapAddresses);
this.bootstrap(this.bootstrapAddresses);
}

/**
Expand Down
Loading
Loading