Skip to content

Commit

Permalink
Merge pull request #1813 from AutoMQ/merge_3.8.0
Browse files Browse the repository at this point in the history
feat(all): merge apache kafka 3.8.0 771b957
  • Loading branch information
superhx authored Aug 15, 2024
2 parents 9e4eafb + cf8ef53 commit 64bad21
Show file tree
Hide file tree
Showing 467 changed files with 17,076 additions and 8,542 deletions.
70 changes: 0 additions & 70 deletions automq-shell/src/main/java/com/automq/shell/util/CLIUtils.java

This file was deleted.

52 changes: 0 additions & 52 deletions automq-shell/src/main/java/com/automq/shell/util/S3PropUtil.java

This file was deleted.

67 changes: 63 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ project(':core') {
api libs.scalaLibrary

implementation project(':server-common')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':group-coordinator')
implementation project(':transaction-coordinator')
implementation project(':metadata')
Expand Down Expand Up @@ -1367,6 +1368,66 @@ project(':metadata') {
}
}

project(':group-coordinator:group-coordinator-api') {
base {
archivesName = "kafka-group-coordinator-api"
}

dependencies {
implementation project(':clients')
}

task createVersionFile() {
def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
inputs.property "commitId", commitId
inputs.property "version", version
outputs.file receiptFile

doLast {
def data = [
commitId: commitId,
version: version,
]

receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}

sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}

jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}

clean.doFirst {
delete "$buildDir/kafka/"
}

javadoc {
include "**/org/apache/kafka/coordinator/group/api/**"
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-group-coordinator.xml")
}
}

project(':group-coordinator') {
base {
archivesName = "kafka-group-coordinator"
Expand All @@ -1380,6 +1441,7 @@ project(':group-coordinator') {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':storage')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
Expand Down Expand Up @@ -2597,10 +2659,6 @@ project(':streams:test-utils') {
testRuntimeOnly libs.slf4jlog4j
}

javadoc {
include "**/org/apache/kafka/streams/test/**"
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.runtimeClasspath) {
exclude('kafka-streams*')
Expand Down Expand Up @@ -3023,6 +3081,7 @@ project(':jmh-benchmarks') {
implementation project(':raft')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')
implementation project(':storage')
implementation project(':streams')
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->
<disallow class="com.yammer.metrics.Metrics" />
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@
files="StreamThread.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>

<suppress checks="CyclomaticComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>
Expand All @@ -211,7 +211,7 @@
files="StreamsMetricsImpl.java"/>

<suppress checks="NPathComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig|KTableKTableOuterJoin).java"/>
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|TaskAssignmentUtils|GlobalStateManagerImpl|KStreamImplJoin|TopologyConfig|KTableKTableOuterJoin).java"/>

<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ public static NetworkClient createNetworkClient(AbstractConfig config,
throttleTimeSensor,
logContext,
hostResolver,
clientTelemetrySender);
clientTelemetrySender,
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ public class CommonClientConfigs {
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " +
"This configuration is used as the default timeout for all client operations that do not specify a <code>timeout</code> parameter.";

public static final String METADATA_RECOVERY_STRATEGY_CONFIG = "metadata.recovery.strategy";
public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how the client recovers when none of the brokers known to it is available. " +
"If set to <code>none</code>, the client fails. If set to <code>rebootstrap</code>, " +
"the client repeats the bootstrap process using <code>bootstrap.servers</code>. " +
"Rebootstrapping is useful when a client communicates with brokers so infrequently " +
"that the set of brokers may change entirely before the client refreshes metadata. " +
"Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. " +
"Brokers appear unavailable when disconnected and no current retry attempt is in-progress. " +
"Consider increasing <code>reconnect.backoff.ms</code> and <code>reconnect.backoff.max.ms</code> and " +
"decreasing <code>socket.connection.setup.timeout.ms</code> and <code>socket.connection.setup.timeout.max.ms</code> " +
"for the client.";
public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = MetadataRecoveryStrategy.NONE.name;

/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public interface KafkaClient extends Closeable {
* @param now The current time in ms
* @return The node with the fewest in-flight requests.
*/
Node leastLoadedNode(long now);
LeastLoadedNode leastLoadedNode(long now);

/**
* The number of currently in-flight requests for which we have not yet returned a response
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.Node;

public class LeastLoadedNode {
private final Node node;
private final boolean atLeastOneConnectionReady;

public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
this.node = node;
this.atLeastOneConnectionReady = atLeastOneConnectionReady;
}

public Node node() {
return node;
}

/**
* Indicates if the least loaded node is available or at least a ready connection exists.
*
* <p>There may be no node available while ready connections to live nodes exist. This may happen when
* the connections are overloaded with in-flight requests. This function takes this into account.
*/
public boolean hasNodeAvailableOrConnectionReady() {
return node != null || atLeastOneConnectionReady;
}
}
8 changes: 8 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class Metadata implements Closeable {
private final ClusterResourceListeners clusterResourceListeners;
private boolean isClosed;
private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
/** Addresses with which the metadata was originally bootstrapped. */
private List<InetSocketAddress> bootstrapAddresses;

/**
* Create a new Metadata instance
Expand Down Expand Up @@ -304,6 +306,12 @@ public synchronized void bootstrap(List<InetSocketAddress> addresses) {
this.needFullUpdate = true;
this.updateVersion += 1;
this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
this.bootstrapAddresses = addresses;
}

public synchronized void rebootstrap() {
log.info("Rebootstrapping with {}", this.bootstrapAddresses);
this.bootstrap(this.bootstrapAddresses);
}

/**
Expand Down
Loading

0 comments on commit 64bad21

Please sign in to comment.