Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-kafka/trunk' into merge-trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Feb 11, 2025
2 parents c661aa4 + ce8b08c commit 668b255
Show file tree
Hide file tree
Showing 238 changed files with 6,589 additions and 4,632 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
[![CI](https://github.com/apache/kafka/actions/workflows/ci.yml/badge.svg?branch=trunk&event=push)](https://github.com/apache/kafka/actions/workflows/ci.yml?query=event%3Apush+branch%3Atrunk)
[![Flaky Test Report](https://github.com/apache/kafka/actions/workflows/generate-reports.yml/badge.svg?branch=trunk&event=schedule)](https://github.com/apache/kafka/actions/workflows/generate-reports.yml?query=event%3Aschedule+branch%3Atrunk)

[**Apache Kafka**](https://kafka.apache.org) is an open-source distributed event streaming platform used by thousands of

companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
[**Apache Kafka**](https://kafka.apache.org) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.

Expand Down
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,7 @@ project(':core') {
implementation libs.jacksonModuleScala
implementation libs.jacksonDataformatCsv
implementation libs.jacksonJDK8Datatypes
implementation libs.jacksonDatabindYaml
implementation libs.joptSimple
implementation libs.jose4j
implementation libs.metrics
Expand Down Expand Up @@ -1591,7 +1592,7 @@ project(':group-coordinator') {


project(':test-common:test-common-internal-api') {
// Interfaces, config classes, and other test APIs. Java 17 only
// Interfaces, config classes, and other test APIs. Java 17 is the minimum Java version.
base {
archivesName = "kafka-test-common-internal-api"
}
Expand All @@ -1618,7 +1619,7 @@ project(':test-common:test-common-internal-api') {
}

project(':test-common:test-common-util') {
// Runtime-only JUnit extensions for entire project. Java 11 only
// Runtime-only JUnit extensions for entire project. Java 11 is the minimum Java version required.
base {
archivesName = "kafka-test-common-util"
}
Expand All @@ -1641,7 +1642,7 @@ project(':test-common:test-common-util') {
}

project(':test-common:test-common-runtime') {
// Runtime-only JUnit extensions for integration tests. Java 17 only
// Runtime-only JUnit extensions for integration tests. Java 17 is the minimum Java version.
base {
archivesName = "kafka-test-common-runtime"
}
Expand Down Expand Up @@ -1864,7 +1865,7 @@ project(':examples') {
}

checkstyle {
configProperties = checkstyleConfigProperties("import-control-core.xml")
configProperties = checkstyleConfigProperties("import-control-examples.xml")
}
}

Expand Down
19 changes: 15 additions & 4 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
"-//Checkstyle//DTD ImportControl Configuration 1.4//EN"
"https://checkstyle.sourceforge.io/dtds/import_control_1_4.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -74,6 +74,14 @@
<allow class="kafka.server.MetadataCache" />
</subpackage>

<subpackage name="docker">
<allow class="com.fasterxml.jackson.annotation.JsonAnyGetter" />
<allow class="com.fasterxml.jackson.annotation.JsonAnySetter" />
<allow class="com.fasterxml.jackson.annotation.JsonProperty" />
<allow class="com.fasterxml.jackson.annotation.JsonPropertyOrder" />
<allow class="com.fasterxml.jackson.annotation.JsonIgnoreProperties" />
</subpackage>

<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
</subpackage>
Expand Down Expand Up @@ -135,6 +143,7 @@
<allow pkg="kafka.security.authorizer"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.zk"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.clients.admin"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.metadata"/>
Expand All @@ -160,8 +169,10 @@
</subpackage>

<subpackage name="utils">
<allow pkg="org.apache.logging.log4j" />
<allow pkg="org.apache.logging.log4j.core.config" />
<file name="LoggingController.scala">
<allow pkg="org.apache.logging.log4j" />
<allow pkg="org.apache.logging.log4j.core.config" />
</file>
</subpackage>

<subpackage name="clients">
Expand Down
42 changes: 42 additions & 0 deletions checkstyle/import-control-examples.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<import-control pkg="kafka.examples">

<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="scala" />
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />

<!-- public API-->
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.common.serialization" />


</import-control>
1 change: 1 addition & 0 deletions checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.storage.log.metrics"/>
<allow pkg="org.apache.kafka.test" />
<allow pkg="com.github.benmanes.caffeine.cache" />
<allow pkg="org.apache.kafka.coordinator.transaction"/>
Expand Down
10 changes: 6 additions & 4 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<!DOCTYPE import-control PUBLIC
"-//Puppy Crawl//DTD Import Control 1.1//EN"
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
"-//Checkstyle//DTD ImportControl Configuration 1.4//EN"
"https://checkstyle.sourceforge.io/dtds/import_control_1_4.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -581,7 +581,10 @@
<allow pkg="javax.crypto"/>
<allow pkg="org.apache.maven.artifact.versioning" />
<allow pkg="org.eclipse.jetty.util" />
<allow pkg="org.apache.logging.log4j" />

<file name="(Mock)?Loggers(Test)?" regex="true">
<allow pkg="org.apache.logging.log4j" />
</file>

<subpackage name="rest">
<allow pkg="org.eclipse.jetty" />
Expand All @@ -593,7 +596,6 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.http"/>
<allow pkg="io.swagger.v3.oas.annotations"/>
<allow pkg="org.apache.logging.log4j" />
</subpackage>

<subpackage name="isolation">
Expand Down
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@
<suppress checks="JavaNCSS"
files="(DistributedHerder|Worker)Test.java"/>

<suppress checks="ParameterNumber"
files="WorkerSinkTaskTest.java"/>

<!-- Raft -->
<suppress checks="NPathComplexity"
files="(DynamicVoter|RecordsIterator).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.WakeupException;

import java.util.Map;
Expand All @@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback {
*
* @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully.
* <p><ul>
* <li> {@link AuthorizationException} if not authorized to the topic or group
* <li> {@link InvalidRecordStateException} if the record state is invalid
* <li> {@link AuthorizationException} if not authorized to the topic of group
* <li> {@link NotLeaderOrFollowerException} if the leader had changed by the time the acknowledgements were sent
* <li> {@link DisconnectException} if the broker disconnected before the request could be completed
* <li> {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called
* <li> {@link InterruptException} if the calling thread is interrupted before or while this function is called
* <li> {@link KafkaException} for any other unrecoverable errors
* </ul>
* <p>Note that even if the exception is a retriable exception, the acknowledgement could not be completed and the
* records need to be fetched again. The callback is called after any retries have been performed.
*/
void onComplete(Map<TopicIdPartition, Set<Long>> offsets, Exception exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;

import org.slf4j.Logger;
Expand Down Expand Up @@ -45,10 +46,7 @@ public boolean hasEnteredCallback() {
void onComplete(List<Map<TopicIdPartition, Acknowledgements>> acknowledgementsMapList) {
final ArrayList<Throwable> exceptions = new ArrayList<>();
acknowledgementsMapList.forEach(acknowledgementsMap -> acknowledgementsMap.forEach((partition, acknowledgements) -> {
Exception exception = null;
if (acknowledgements.getAcknowledgeErrorCode() != null) {
exception = acknowledgements.getAcknowledgeErrorCode().exception();
}
KafkaException exception = acknowledgements.getAcknowledgeException();
Set<Long> offsets = acknowledgements.getAcknowledgementsTypeMap().keySet();
Set<Long> offsetsCopy = Collections.unmodifiableSet(offsets);
enteredCallback = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.protocol.Errors;

import java.util.ArrayList;
Expand All @@ -35,15 +36,20 @@ public class Acknowledgements {
// The acknowledgements keyed by offset. If the record is a gap, the AcknowledgeType will be null.
private final Map<Long, AcknowledgeType> acknowledgements;

// When the broker responds to the acknowledgements, this is the error code returned.
private Errors acknowledgeErrorCode;
// When the broker responds to the acknowledgements, this is the exception thrown.
private KafkaException acknowledgeException;

// Set when the broker has responded to the acknowledgements.
private boolean completed;

public static Acknowledgements empty() {
return new Acknowledgements(new TreeMap<>());
}

private Acknowledgements(Map<Long, AcknowledgeType> acknowledgements) {
this.acknowledgements = acknowledgements;
this.acknowledgeException = null;
this.completed = false;
}

/**
Expand Down Expand Up @@ -115,25 +121,26 @@ public int size() {
* @return Whether the acknowledgements were sent to the broker and a response received
*/
public boolean isCompleted() {
return acknowledgeErrorCode != null;
return completed;
}

/**
* Set the acknowledgement error code when the response has been received from the broker.
* Completes the acknowledgements when the response has been received from the broker.
*
* @param acknowledgeErrorCode the error code
* @param acknowledgeException the exception (will be null if successful)
*/
public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) {
this.acknowledgeErrorCode = acknowledgeErrorCode;
public void complete(KafkaException acknowledgeException) {
this.acknowledgeException = acknowledgeException;
completed = true;
}

/**
* Get the acknowledgement error code when the response has been received from the broker.
* Get the acknowledgement exception when the response has been received from the broker.
*
* @return the error code
*/
public Errors getAcknowledgeErrorCode() {
return acknowledgeErrorCode;
public KafkaException getAcknowledgeException() {
return acknowledgeException;
}

/**
Expand Down Expand Up @@ -301,10 +308,10 @@ private boolean canOptimiseForSingleAcknowledgeType(AcknowledgementBatch acknowl
public String toString() {
StringBuilder sb = new StringBuilder("Acknowledgements(");
sb.append(acknowledgements);
if (acknowledgeErrorCode != null) {
sb.append(", errorCode=");
sb.append(acknowledgeErrorCode.code());
}
sb.append(", acknowledgeException=");
sb.append(acknowledgeException != null ? Errors.forException(acknowledgeException) : "null");
sb.append(", completed=");
sb.append(completed);
sb.append(")");
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import java.util.Objects;

/**
* This class combines Acknowledgements with the id of the node to use for acknowledging.
*/
public class NodeAcknowledgements {
private final int nodeId;
private final Acknowledgements acknowledgements;

public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) {
this.nodeId = nodeId;
this.acknowledgements = Objects.requireNonNull(acknowledgements);
}

public int nodeId() {
return nodeId;
}

public Acknowledgements acknowledgements() {
return acknowledgements;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* to keep track of aborted transactions or the need to keep track of fetch position.
*/
public class ShareCompletedFetch {

final int nodeId;
final TopicIdPartition partition;
final ShareFetchResponseData.PartitionData partitionData;
final short requestVersion;
Expand All @@ -79,12 +79,14 @@ public class ShareCompletedFetch {

ShareCompletedFetch(final LogContext logContext,
final BufferSupplier decompressionBufferSupplier,
final int nodeId,
final TopicIdPartition partition,
final ShareFetchResponseData.PartitionData partitionData,
final ShareFetchMetricsAggregator metricAggregator,
final short requestVersion) {
this.log = logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.nodeId = nodeId;
this.partition = partition;
this.partitionData = partitionData;
this.metricAggregator = metricAggregator;
Expand Down Expand Up @@ -156,7 +158,7 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali
final int maxRecords,
final boolean checkCrcs) {
// Creating an empty ShareInFlightBatch
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(partition);
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(nodeId, partition);

if (cachedBatchException != null) {
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
Expand Down
Loading

0 comments on commit 668b255

Please sign in to comment.