diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java index 0ed92f1095d8a..7803dffa9416d 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java @@ -48,14 +48,13 @@ */ public class PropertyFileLoginModule implements LoginModule { private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class); - private static final Map CREDENTIAL_PROPERTIES_MAP = new ConcurrentHashMap<>(); private static final String FILE_OPTIONS = "file"; + private static final Map CREDENTIAL_PROPERTIES = new ConcurrentHashMap<>(); + private CallbackHandler callbackHandler; private String fileName; private boolean authenticated; - private static final Map CREDENTIAL_PROPERTIES = new ConcurrentHashMap<>(); - @Override public void initialize(Subject subject, CallbackHandler callbackHandler, Map sharedState, Map options) { this.callbackHandler = callbackHandler; diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java index 7f41444b1559f..11106e5a179f9 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java @@ -56,7 +56,7 @@ public void setup() { @Test public void testPutFlush() { HashMap offsets = new HashMap<>(); - final String newLine = System.getProperty("line.separator"); + final String newLine = System.lineSeparator(); // We do not call task.start() since it would override the output stream diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index 4139555708a42..cb9158fd8abe6 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -203,10 +203,10 @@ Set findConsumerGroups() .collect(Collectors.toSet()); // Only perform checkpoints for groups that have offsets for at least one topic that's accepted // by the topic filter. - if (consumedTopics.size() > 0) { - checkpointGroups.add(group); - } else { + if (consumedTopics.isEmpty()) { irrelevantGroups.add(group); + } else { + checkpointGroups.add(group); } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index 635457a2b20d3..785edc44a165c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -117,7 +117,6 @@ public class MirrorMaker { private final Time time; private final MirrorMakerConfig config; private final Set clusters; - private final Set herderPairs; private final MirrorRestServer internalServer; private final RestClient restClient; @@ -149,13 +148,13 @@ public MirrorMaker(MirrorMakerConfig config, List clusters, Time time) { this.clusters = config.clusters(); } log.info("Targeting clusters {}", this.clusters); - this.herderPairs = config.clusterPairs().stream() + Set herderPairs = config.clusterPairs().stream() .filter(x -> this.clusters.contains(x.target())) .collect(Collectors.toSet()); if (herderPairs.isEmpty()) { throw new IllegalArgumentException("No source->target replication flows."); } - this.herderPairs.forEach(this::addHerder); + herderPairs.forEach(this::addHerder); shutdownHook = new ShutdownHook(); } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java index 5c46bd9c6c519..752fb0ffce4bd 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java @@ -21,8 +21,6 @@ import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.inject.Inject; import javax.ws.rs.NotFoundException; @@ -37,8 +35,6 @@ public class InternalMirrorResource extends InternalClusterResource { @Context private UriInfo uriInfo; - private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class); - private final Map herders; @Inject diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index 107b72c6c975b..e82a50509ea53 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -91,13 +91,7 @@ public class MirrorSourceConnectorTest { private ConfigPropertyFilter getConfigPropertyFilter() { - return new ConfigPropertyFilter() { - @Override - public boolean shouldReplicateConfigProperty(String prop) { - return true; - } - - }; + return prop -> true; } @Test diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java index 9f45136e24a0b..b54aa7073ce88 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java @@ -331,17 +331,13 @@ public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception { } void waitForTopicToPersistInFakeLocalMetadataStore(String topicName) throws InterruptedException { - waitForCondition(() -> { - return FakeLocalMetadataStore.containsTopic(topicName); - }, FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS, + waitForCondition(() -> FakeLocalMetadataStore.containsTopic(topicName), FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS, "Topic: " + topicName + " didn't get created in the FakeLocalMetadataStore" ); } void waitForTopicConfigPersistInFakeLocalMetaDataStore(String topicName, String configName, String expectedConfigValue) throws InterruptedException { - waitForCondition(() -> { - return FakeLocalMetadataStore.topicConfig(topicName).getOrDefault(configName, "").equals(expectedConfigValue); - }, FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS, + waitForCondition(() -> FakeLocalMetadataStore.topicConfig(topicName).getOrDefault(configName, "").equals(expectedConfigValue), FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS, "Topic: " + topicName + "'s configs don't have " + configName + ":" + expectedConfigValue ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 2f5109b65f038..57600bdbea868 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -36,8 +36,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.SharedTopicAdmin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -55,7 +53,6 @@ *

*/ public class ConnectDistributed extends AbstractConnectCli { - private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class); public ConnectDistributed(String... args) { super(args); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index e9a3a094470da..574d800a88ee8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -668,7 +668,7 @@ private static ConfigInfos validateClientOverrides(String connName, ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(), validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages()); - if (configValue.errorMessages().size() > 0) { + if (!configValue.errorMessages().isEmpty()) { errorCount++; } ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index 49c41e4bd7c37..6afd27139bbfc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -391,7 +391,7 @@ boolean sendRecords() { int processed = 0; recordBatch(toSend.size()); final SourceRecordWriteCounter counter = - toSend.size() > 0 ? new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup) : null; + toSend.isEmpty() ? null : new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup); for (final SourceRecord preTransformRecord : toSend) { ProcessingContext context = new ProcessingContext<>(preTransformRecord); final SourceRecord record = transformationChain.apply(context, preTransformRecord); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index d5cdc23fa364a..a1c2a36905492 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -346,7 +346,7 @@ public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class, props, requireFullConfig) { - @SuppressWarnings("rawtypes") + @Override protected Set>> plugins() { return plugins.transformations(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java index 4bbe1f42b8b3f..84cec3640b9cd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java @@ -203,7 +203,7 @@ void initialize() { } } - private boolean doStart() throws Throwable { + private boolean doStart() { try { switch (state) { case STARTED: @@ -235,12 +235,12 @@ private synchronized void onFailure(Throwable t) { this.state = State.FAILED; } - private void resume() throws Throwable { + private void resume() { if (doStart()) statusListener.onResume(connName); } - private void start() throws Throwable { + private void start() { if (doStart()) statusListener.onStartup(connName); } @@ -392,7 +392,7 @@ void doTransitionTo(TargetState targetState, Callback stateChangeCa } } - private void doTransitionTo(TargetState targetState) throws Throwable { + private void doTransitionTo(TargetState targetState) { log.debug("{} Transition connector to {}", this, targetState); if (targetState == TargetState.PAUSED) { suspend(true); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 25d8b54d4d6d5..eb085063a93de 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -794,8 +794,6 @@ private void onPartitionsRemoved(Collection partitions, boolean } static class SinkTaskMetricsGroup { - private final ConnectorTaskId id; - private final ConnectMetrics metrics; private final MetricGroup metricGroup; private final Sensor sinkRecordRead; private final Sensor sinkRecordSend; @@ -805,13 +803,10 @@ static class SinkTaskMetricsGroup { private final Sensor offsetCompletionSkip; private final Sensor putBatchTime; private final Sensor sinkRecordActiveCount; - private long activeRecords; private Map consumedOffsets = new HashMap<>(); private Map committedOffsets = new HashMap<>(); public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) { - this.metrics = connectMetrics; - this.id = id; ConnectMetricsRegistry registry = connectMetrics.registry(); metricGroup = connectMetrics @@ -855,7 +850,7 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) { void computeSinkRecordLag() { Map consumed = this.consumedOffsets; Map committed = this.committedOffsets; - activeRecords = 0L; + long activeRecords = 0L; for (Map.Entry committedOffsetEntry : committed.entrySet()) { final TopicPartition partition = committedOffsetEntry.getKey(); final OffsetAndMetadata consumedOffsetMeta = consumed.get(partition); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java index 3c03e7b85565b..063e6e2410e81 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/RequestTargetException.java @@ -30,16 +30,6 @@ public RequestTargetException(String s, String forwardUrl) { this.forwardUrl = forwardUrl; } - public RequestTargetException(String s, Throwable throwable, String forwardUrl) { - super(s, throwable); - this.forwardUrl = forwardUrl; - } - - public RequestTargetException(Throwable throwable, String forwardUrl) { - super(throwable); - this.forwardUrl = forwardUrl; - } - public String forwardUrl() { return forwardUrl; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index dd342c67a4321..fdbadef7b6939 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -46,7 +46,7 @@ public class DelegatingClassLoader extends URLClassLoader { private final ConcurrentMap aliases; // Although this classloader does not load classes directly but rather delegates loading to a - // PluginClassLoader or its parent through its base class, because of the use of inheritance in + // PluginClassLoader or its parent through its base class, because of the use of inheritance // in the latter case, this classloader needs to also be declared as parallel capable to use // fine-grain locking when loading classes. static { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index b6bb4fa3bd157..66c6943889e86 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -398,7 +398,7 @@ public URI advertisedUrl() { String advertisedHostname = config.advertisedHostName(); if (advertisedHostname != null && !advertisedHostname.isEmpty()) builder.host(advertisedHostname); - else if (serverConnector != null && serverConnector.getHost() != null && serverConnector.getHost().length() > 0) + else if (serverConnector != null && serverConnector.getHost() != null && !serverConnector.getHost().isEmpty()) builder.host(serverConnector.getHost()); Integer advertisedPort = config.advertisedPort(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 48ae74b0813f4..da1722a8b4609 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -365,7 +365,7 @@ public KafkaConfigBackingStore(Converter converter, DistributedConfig config, Wo this.usesFencableWriter = config.transactionalLeaderEnabled(); this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); - if (this.topic == null || this.topic.trim().length() == 0) + if (this.topic == null || this.topic.trim().isEmpty()) throw new ConfigException("Must specify topic for connector configuration."); configLog = setupAndCreateKafkaBasedLog(this.topic, config); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 786bd55351fcb..4d447759eddfa 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -173,7 +173,7 @@ public KafkaOffsetBackingStore(Supplier topicAdmin, Supplier @Override public void configure(final WorkerConfig config) { String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG); - if (topic == null || topic.trim().length() == 0) + if (topic == null || topic.trim().isEmpty()) throw new ConfigException("Offset storage topic must be specified"); this.exactlyOnce = config.exactlyOnceSourceEnabled(); @@ -249,7 +249,7 @@ public void start() { + "support for source connectors, or upgrade to a newer Kafka broker version."; } else { message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + "is set to " - + IsolationLevel.READ_COMMITTED.toString() + + IsolationLevel.READ_COMMITTED + ", a Kafka broker version that allows admin clients to read consumer offsets is required. " + "Please either reconfigure the worker or connector, or upgrade to a newer Kafka broker version."; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index d1580e3a7c1d7..4843b8799b8c3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -170,7 +170,7 @@ public KafkaStatusBackingStore(Time time, Converter converter, Supplier kafkaLog.send(key, value, this)); + sendRetryExecutor.submit(() -> kafkaLog.send(key, value, this)); } else { log.error("Failed to write status update", exception); } @@ -332,7 +332,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { return; } - sendRetryExecutor.submit((Runnable) () -> kafkaLog.send(key, value, this)); + sendRetryExecutor.submit(() -> kafkaLog.send(key, value, this)); } else { log.error("Failed to write status update", exception); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java index d29f41326cc93..bfbc910b8359c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java @@ -19,8 +19,6 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.common.utils.ThreadUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.Collection; @@ -38,7 +36,6 @@ * background thread. */ public abstract class MemoryOffsetBackingStore implements OffsetBackingStore { - private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class); protected Map data = new HashMap<>(); protected ExecutorService executor; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java index 1fe80a4db0fea..ed09d4a37a005 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java @@ -40,22 +40,18 @@ private

> DocInfo(Class

predicateClass, String overvie } } - private static final List PREDICATES; - static { - List collect = new Plugins(Collections.emptyMap()).predicates().stream() - .map(p -> { - try { - String overviewDoc = (String) p.pluginClass().getDeclaredField("OVERVIEW_DOC").get(null); - ConfigDef configDef = (ConfigDef) p.pluginClass().getDeclaredField("CONFIG_DEF").get(null); - return new DocInfo(p.pluginClass(), overviewDoc, configDef); - } catch (ReflectiveOperationException e) { - throw new RuntimeException("Predicate class " + p.pluginClass().getName() + " lacks either a `public static final String OVERVIEW_DOC` or `public static final ConfigDef CONFIG_DEF`"); - } - }) - .sorted(Comparator.comparing(docInfo -> docInfo.predicateName)) - .collect(Collectors.toList()); - PREDICATES = collect; - } + private static final List PREDICATES = new Plugins(Collections.emptyMap()).predicates().stream() + .map(p -> { + try { + String overviewDoc = (String) p.pluginClass().getDeclaredField("OVERVIEW_DOC").get(null); + ConfigDef configDef = (ConfigDef) p.pluginClass().getDeclaredField("CONFIG_DEF").get(null); + return new DocInfo(p.pluginClass(), overviewDoc, configDef); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Predicate class " + p.pluginClass().getName() + " lacks either a `public static final String OVERVIEW_DOC` or `public static final ConfigDef CONFIG_DEF`"); + } + }) + .sorted(Comparator.comparing(docInfo -> docInfo.predicateName)) + .collect(Collectors.toList()); private static void printPredicateHtml(PrintStream out, DocInfo docInfo) { out.println("

"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index a66ac4e0546d9..36203399766c3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -251,7 +251,7 @@ public void start() { throw new ConnectException( "Must provide a TopicAdmin to KafkaBasedLog when consumer is configured with " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to " - + IsolationLevel.READ_COMMITTED.toString() + + IsolationLevel.READ_COMMITTED ); } initializer.accept(admin); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java index 902ddb67bd653..fa6a0b9cccd91 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java @@ -106,18 +106,6 @@ public static LoggingContext forConnector(String connectorName) { return context; } - /** - * Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the - * supplied connector name and the {@link Scope#VALIDATE} scope. - * - * @param connectorName the connector name - */ - public static LoggingContext forValidation(String connectorName) { - LoggingContext context = new LoggingContext(); - MDC.put(CONNECTOR_CONTEXT, prefixFor(connectorName, Scope.VALIDATE, null)); - return context; - } - /** * Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the * connector name and task number using the supplied {@link ConnectorTaskId}, and to set the scope to {@link Scope#TASK}. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 3db23d9e90920..a9a90db45af98 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -92,10 +92,6 @@ public Set createdTopics() { return created; } - public Set existingTopics() { - return existing; - } - public boolean isCreated(String topicName) { return created.contains(topicName); } @@ -135,8 +131,6 @@ public String toString() { private static final String CLEANUP_POLICY_CONFIG = TopicConfig.CLEANUP_POLICY_CONFIG; private static final String CLEANUP_POLICY_COMPACT = TopicConfig.CLEANUP_POLICY_COMPACT; - private static final String MIN_INSYNC_REPLICAS_CONFIG = TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; - private static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG; /** * A builder of {@link NewTopic} instances. @@ -207,29 +201,6 @@ public NewTopicBuilder compacted() { return this; } - /** - * Specify the minimum number of in-sync replicas required for this topic. - * - * @param minInSyncReplicas the minimum number of in-sync replicas allowed for the topic; must be positive - * @return this builder to allow methods to be chained; never null - */ - public NewTopicBuilder minInSyncReplicas(short minInSyncReplicas) { - this.configs.put(MIN_INSYNC_REPLICAS_CONFIG, Short.toString(minInSyncReplicas)); - return this; - } - - /** - * Specify whether the broker is allowed to elect a leader that was not an in-sync replica when no ISRs - * are available. - * - * @param allow true if unclean leaders can be elected, or false if they are not allowed - * @return this builder to allow methods to be chained; never null - */ - public NewTopicBuilder uncleanLeaderElection(boolean allow) { - this.configs.put(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, Boolean.toString(allow)); - return this; - } - /** * Specify the configuration properties for the topic, overwriting any previously-set properties. * diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java index e9ab0e56d4a1c..45c12aa292a39 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java @@ -17,8 +17,6 @@ package org.apache.kafka.connect.util; import org.apache.kafka.connect.runtime.WorkerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashSet; @@ -33,7 +31,6 @@ * enabled for source connectors at the worker and the connector configurations. */ public class TopicCreation { - private static final Logger log = LoggerFactory.getLogger(TopicCreation.class); private static final TopicCreation EMPTY = new TopicCreation(false, null, Collections.emptyMap(), Collections.emptySet()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java index 9af731e7ac5fb..b6a73d56de9cf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java @@ -50,10 +50,10 @@ private List configValues(Map clientConfig) { } protected void assertNoError(List configValues) { - Assert.assertTrue(configValues.stream().allMatch(configValue -> configValue.errorMessages().size() == 0)); + Assert.assertTrue(configValues.stream().allMatch(configValue -> configValue.errorMessages().isEmpty())); } protected void assertError(List configValues) { - Assert.assertTrue(configValues.stream().anyMatch(configValue -> configValue.errorMessages().size() > 0)); + Assert.assertTrue(configValues.stream().anyMatch(configValue -> !configValue.errorMessages().isEmpty())); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/BooleanConverterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/BooleanConverterTest.java index 5eadf849b33d9..d7f9654130c0d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/BooleanConverterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/BooleanConverterTest.java @@ -41,7 +41,7 @@ public class BooleanConverterTest { @Before public void setUp() { - converter.configure(Collections.emptyMap(), false); + converter.configure(Collections.emptyMap(), false); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java index 4c1439cf358a2..e971a3151c70e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java @@ -40,7 +40,7 @@ public class ByteArrayConverterTest { @Before public void setUp() { - converter.configure(Collections.emptyMap(), false); + converter.configure(Collections.emptyMap(), false); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java index bed05fa21e41c..33cc97d394e92 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java @@ -102,16 +102,6 @@ public Collection tasks() { return taskHandles.values(); } - /** - * Delete the task handle for this task id. - * - * @param taskId the task id. - */ - public void deleteTask(String taskId) { - log.info("Removing handle for {} task in connector {}", taskId, connectorName); - taskHandles.remove(taskId); - } - /** * Delete all task handles for this connector. */ diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java index ffc5885ba77a1..2773c3964f362 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java @@ -94,7 +94,6 @@ public boolean alterOffsets(Map connectorConfig, Map committedOffsets; @@ -113,7 +112,7 @@ public String version() { @Override public void start(Map props) { taskId = props.get("task.id"); - connectorName = props.get("connector.name"); + String connectorName = props.get("connector.name"); taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId); log.debug("Starting task {}", taskId); taskHandle.recordTaskStart(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index b632267598f62..dc507b68df7c6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -283,7 +283,7 @@ private void getAndVerifySourceConnectorOffsets(Map connectorCon } @Test - public void testAlterOffsetsNonExistentConnector() throws Exception { + public void testAlterOffsetsNonExistentConnector() { ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.alterConnectorOffsets("non-existent-connector", new ConnectorOffsets(Collections.singletonList( new ConnectorOffset(Collections.emptyMap(), Collections.emptyMap()))))); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatch.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatch.java index b77007c5f2e13..a5dfdc42b4d38 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatch.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatch.java @@ -37,8 +37,8 @@ public class StartAndStopLatch { StartAndStopLatch(int expectedStarts, int expectedStops, Consumer uponCompletion, List dependents, Time clock) { - this.startLatch = new CountDownLatch(expectedStarts < 0 ? 0 : expectedStarts); - this.stopLatch = new CountDownLatch(expectedStops < 0 ? 0 : expectedStops); + this.startLatch = new CountDownLatch(Math.max(expectedStarts, 0)); + this.stopLatch = new CountDownLatch(Math.max(expectedStops, 0)); this.dependents = dependents; this.uponCompletion = uponCompletion; this.clock = clock; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java index 4dd352961ee7a..ebf546dfcd012 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java @@ -39,10 +39,10 @@ @Category(IntegrationTest.class) public class StartAndStopLatchTest { + private final AtomicBoolean completed = new AtomicBoolean(); private Time clock; private StartAndStopLatch latch; private List dependents; - private AtomicBoolean completed = new AtomicBoolean(); private ExecutorService waiters; private Future future; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 47be1509f7aaf..a0b8cecc8f427 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -83,7 +83,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -229,7 +228,7 @@ public void tearDown() { } @Test - public void testErrorHandlingInSinkTasks() throws Exception { + public void testErrorHandlingInSinkTasks() { Map reportProps = new HashMap<>(); reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true"); reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true"); @@ -484,7 +483,7 @@ private void createSourceTask(TargetState initialState, RetryWithToleranceOperat offsetReader, offsetWriter, offsetStore, workerConfig, ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator, - statusBackingStore, (Executor) Runnable::run, () -> errorReporters)); + statusBackingStore, Runnable::run, () -> errorReporters)); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 54c467ceb48cf..6d5e6350357ed 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -80,7 +80,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -1027,11 +1026,11 @@ private void awaitPolls(int minimum, List records) { ConcurrencyUtils.awaitLatch(pollLatch.get(), "task was not polled " + minimum + " time(s) quickly enough"); } - private void awaitEmptyPolls(int minimum) throws InterruptedException { + private void awaitEmptyPolls(int minimum) { awaitPolls(minimum, Collections.emptyList()); } - private void awaitPolls(int minimum) throws InterruptedException { + private void awaitPolls(int minimum) { awaitPolls(minimum, RECORDS); } @@ -1051,7 +1050,7 @@ private void startTaskThread() { workerTaskFuture = executor.submit(workerTask); } - private void expectSuccessfulFlushes() throws InterruptedException, TimeoutException { + private void expectSuccessfulFlushes() { when(offsetWriter.beginFlush()).thenReturn(true); when(offsetWriter.doFlush(any())).thenAnswer(invocation -> { Callback cb = invocation.getArgument(0); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java index eb12b4dc039b7..507d196d42982 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/StateTrackerTest.java @@ -30,14 +30,12 @@ public class StateTrackerTest { private StateTracker tracker; private MockTime time; - private State state; @Before public void setUp() { time = new MockTime(); time.sleep(1000L); tracker = new StateTracker(); - state = State.UNASSIGNED; } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java index fdb4c542f7d1b..6dcbe6c38e617 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java @@ -33,13 +33,12 @@ public class ConnectProtocolCompatibilityTest { private static final String LEADER_URL = "leaderUrl:8083"; private static final long CONFIG_OFFSET = 1; - private String connectorId1 = "connector1"; - private String connectorId2 = "connector2"; - private String connectorId3 = "connector3"; - private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0); - private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1); - private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); - private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0); + private final String connectorId1 = "connector1"; + private final String connectorId2 = "connector2"; + private final String connectorId3 = "connector3"; + private final ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0); + private final ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); + private final ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0); @Test public void testEagerToEagerMetadata() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java index be5686a222b33..c7fc0d8299a11 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java @@ -146,9 +146,9 @@ public void testSupportedKeyGeneratorAlgorithms() { private void testSupportedAlgorithms(String type, String... expectedAlgorithms) { Set supportedAlgorithms = DistributedConfig.supportedAlgorithms(type); - Set unuspportedAlgorithms = new HashSet<>(Arrays.asList(expectedAlgorithms)); - unuspportedAlgorithms.removeAll(supportedAlgorithms); - assertEquals(type + " algorithms were found that should be supported by this JVM but are not", Collections.emptySet(), unuspportedAlgorithms); + Set unsupportedAlgorithms = new HashSet<>(Arrays.asList(expectedAlgorithms)); + unsupportedAlgorithms.removeAll(supportedAlgorithms); + assertEquals(type + " algorithms were found that should be supported by this JVM but are not", Collections.emptySet(), unsupportedAlgorithms); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index e680f41d2099c..fa05e55015efd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -4122,10 +4122,10 @@ private static List expectRecordStages(Callback callback) { } // We need to use a real class here due to some issue with mocking java.lang.Class - private abstract class BogusSourceConnector extends SourceConnector { + private static abstract class BogusSourceConnector extends SourceConnector { } - private abstract class BogusSourceTask extends SourceTask { + private static abstract class BogusSourceTask extends SourceTask { } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java index 75296d2e14402..2d10f1c588ff1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java @@ -77,11 +77,8 @@ public class WorkerCoordinatorIncrementalTest { private String connectorId1 = "connector1"; private String connectorId2 = "connector2"; - private String connectorId3 = "connector3"; private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0); - private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1); private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); - private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0); private String groupId = "test-group"; private int sessionTimeoutMs = 10; @@ -115,8 +112,6 @@ public class WorkerCoordinatorIncrementalTest { private int configStorageCalls; private ClusterConfigState configState1; - private ClusterConfigState configState2; - private ClusterConfigState configStateSingleTaskConnectors; // Arguments are: // - Protocol type diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java index 216e8c86c16c6..e0018a13e9544 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java @@ -56,6 +56,7 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -257,7 +258,7 @@ public void testDlqHeaderConsumerRecord() { assertEquals(Transformation.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS)); assertEquals(ConnectException.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXCEPTION)); assertEquals("Test Exception", headerValue(producerRecord, ERROR_HEADER_EXCEPTION_MESSAGE)); - assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0); + assertFalse(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).isEmpty()); assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("org.apache.kafka.connect.errors.ConnectException: Test Exception")); } @@ -285,7 +286,7 @@ public void testDlqHeaderOnNullExceptionMessage() { assertEquals(Transformation.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS)); assertEquals(NullPointerException.class.getName(), headerValue(producerRecord, ERROR_HEADER_EXCEPTION)); assertNull(producerRecord.headers().lastHeader(ERROR_HEADER_EXCEPTION_MESSAGE).value()); - assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0); + assertFalse(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).isEmpty()); assertTrue(headerValue(producerRecord, ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("java.lang.NullPointerException")); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java index f9709ffda62a3..8360e9a6c36be 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java @@ -202,8 +202,7 @@ private void testHandleExceptionInStage(Stage type, Exception ex) { } private RetryWithToleranceOperator setupExecutor() { - RetryWithToleranceOperator retryWithToleranceOperator = genericOperator(0, ALL, errorHandlingMetrics); - return retryWithToleranceOperator; + return genericOperator(0, ALL, errorHandlingMetrics); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index 171fbde699770..12eac8c94de06 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -204,7 +204,7 @@ public void testPluginDescEquality() { assertNotEquals(transformDescPluginPath, transformDescClasspath); } - @SuppressWarnings({"rawtypes", "unchecked"}) + @SuppressWarnings("rawtypes") @Test public void testPluginDescComparison() { PluginDesc connectorDescPluginPath = new PluginDesc<>( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java index cd0ee9532c0e1..4f20ccdeaae7e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java @@ -40,7 +40,7 @@ @RunWith(Parameterized.class) public class PluginScannerTest { - private enum ScannerType { Reflection, ServiceLoader }; + private enum ScannerType { Reflection, ServiceLoader } @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 4b60223cbbcc7..8a637beee5ec0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -450,7 +450,7 @@ public void newPluginsShouldConfigureWithPluginClassLoader() { } @Test - public void pluginClassLoaderReadVersionFromResourceExistingOnlyInChild() throws Exception { + public void pluginClassLoaderReadVersionFromResourceExistingOnlyInChild() { assertClassLoaderReadsVersionFromResource( TestPlugin.ALIASED_STATIC_FIELD, TestPlugin.READ_VERSION_FROM_RESOURCE_V1, @@ -459,7 +459,7 @@ public void pluginClassLoaderReadVersionFromResourceExistingOnlyInChild() throws } @Test - public void pluginClassLoaderReadVersionFromResourceExistingOnlyInParent() throws Exception { + public void pluginClassLoaderReadVersionFromResourceExistingOnlyInParent() { assertClassLoaderReadsVersionFromResource( TestPlugin.READ_VERSION_FROM_RESOURCE_V1, TestPlugin.ALIASED_STATIC_FIELD, @@ -468,7 +468,7 @@ public void pluginClassLoaderReadVersionFromResourceExistingOnlyInParent() throw } @Test - public void pluginClassLoaderReadVersionFromResourceExistingInParentAndChild() throws Exception { + public void pluginClassLoaderReadVersionFromResourceExistingInParentAndChild() { assertClassLoaderReadsVersionFromResource( TestPlugin.READ_VERSION_FROM_RESOURCE_V1, TestPlugin.READ_VERSION_FROM_RESOURCE_V2, @@ -663,11 +663,6 @@ protected void instantiateAndConfigureConverter(String configPropName, ClassLoad assertNotNull(converter); } - protected void instantiateAndConfigureHeaderConverter(String configPropName) { - headerConverter = (TestHeaderConverter) plugins.newHeaderConverter(config, configPropName, ClassLoaderUsage.CURRENT_CLASSLOADER); - assertNotNull(headerConverter); - } - protected void instantiateAndConfigureInternalConverter(boolean isKey, Map config) { internalConverter = (TestInternalConverter) plugins.newInternalConverter(isKey, TestInternalConverter.class.getName(), config); assertNotNull(internalConverter); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java index 8b664227a1949..a99235a1d1404 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java @@ -63,7 +63,7 @@ default Map flatten() { for (Entry child : otherSamples.entrySet()) { for (Entry flattened : child.getValue().flatten().entrySet()) { String key = child.getKey(); - if (flattened.getKey().length() > 0) { + if (!flattened.getKey().isEmpty()) { key += "." + flattened.getKey(); } out.put(key, flattened.getValue()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index d6fd3cd54bca2..2bc03b2b7c25f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -19,14 +19,13 @@ import java.io.BufferedInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -340,7 +339,7 @@ private static Path createPluginJar(String resourceDir, Predicate remove Path binDir = Files.createTempDirectory(resourceDir + ".bin."); compileJavaSources(inputDir, binDir); Path jarFile = Files.createTempFile(resourceDir + ".", ".jar"); - try (JarOutputStream jar = openJarFile(jarFile.toFile())) { + try (JarOutputStream jar = openJarFile(jarFile)) { writeJar(jar, inputDir, removeRuntimeClasses); writeJar(jar, binDir, removeRuntimeClasses); } @@ -356,29 +355,28 @@ private static Path resourceDirectoryPath(String resourceDir) throws IOException if (resource == null) { throw new IOException("Could not find test plugin resource: " + resourceDir); } - File file = new File(resource.getFile()); - if (!file.isDirectory()) { + Path file = Paths.get(resource.getFile()); + if (!Files.isDirectory(file)) { throw new IOException("Resource is not a directory: " + resourceDir); } - if (!file.canRead()) { + if (!Files.isReadable(file)) { throw new IOException("Resource directory is not readable: " + resourceDir); } - return file.toPath(); + return file; } - private static JarOutputStream openJarFile(File jarFile) throws IOException { + private static JarOutputStream openJarFile(Path jarFile) throws IOException { Manifest manifest = new Manifest(); manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); - return new JarOutputStream(new FileOutputStream(jarFile), manifest); + return new JarOutputStream(Files.newOutputStream(jarFile), manifest); } private static void removeDirectory(Path binDir) throws IOException { - List classFiles = Files.walk(binDir) + List classFiles = Files.walk(binDir) .sorted(Comparator.reverseOrder()) - .map(Path::toFile) .collect(Collectors.toList()); - for (File classFile : classFiles) { - if (!classFile.delete()) { + for (Path classFile : classFiles) { + if (!Files.deleteIfExists(classFile)) { throw new IOException("Could not delete: " + classFile); } } @@ -429,7 +427,7 @@ private static void writeJar(JarOutputStream jar, Path inputDir, Predicate !removeRuntimeClasses.test(path.toFile().getName())) .collect(Collectors.toList()); for (Path path : paths) { - try (InputStream in = new BufferedInputStream(new FileInputStream(path.toFile()))) { + try (InputStream in = new BufferedInputStream(Files.newInputStream(path))) { jar.putNextEntry(new JarEntry( inputDir.relativize(path) .toFile() diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java index 1e06a77645313..72a51afd500be 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java @@ -328,7 +328,7 @@ public void testDisableAdminEndpoint() throws IOException { } @Test - public void testRequestLogs() throws IOException, InterruptedException { + public void testRequestLogs() throws IOException { Map configMap = new HashMap<>(baseServerProps()); doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId(); @@ -421,11 +421,6 @@ private HttpResponse executeRequest(URI serverUrl, HttpRequest request) throws I return response; } - private static String prettyPrint(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map); - } - private void expectEmptyRestExtensions() { doReturn(Collections.emptyList()).when(plugins).newPlugins( eq(Collections.emptyList()), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 2c9eaf599b944..65f27f7e9256f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.rest.resources; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.AbstractStatus; @@ -59,7 +58,6 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -145,8 +143,6 @@ public class ConnectorsResourceTest { private static final Set CONNECTOR_ACTIVE_TOPICS = new HashSet<>( Arrays.asList("foo_topic", "bar_topic")); - private static final Set CONNECTOR2_ACTIVE_TOPICS = new HashSet<>( - Arrays.asList("foo_topic", "baz_topic")); private static final RestRequestTimeout REQUEST_TIMEOUT = () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; @@ -176,8 +172,7 @@ public void teardown() { } private static Map getConnectorConfig(Map mapToClone) { - Map result = new HashMap<>(mapToClone); - return result; + return new HashMap<>(mapToClone); } @Test @@ -258,7 +253,6 @@ public void testFullExpandConnectors() { @Test public void testExpandConnectorsWithConnectorNotFound() { when(herder.connectors()).thenReturn(Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); - ConnectorStateInfo connector = mock(ConnectorStateInfo.class); ConnectorStateInfo connector2 = mock(ConnectorStateInfo.class); when(herder.connectorStatus(CONNECTOR2_NAME)).thenReturn(connector2); doThrow(mock(NotFoundException.class)).when(herder).connectorStatus(CONNECTOR_NAME); @@ -503,7 +497,7 @@ public void testGetTasksConfig() throws Throwable { } @Test - public void testGetTasksConfigConnectorNotFound() throws Throwable { + public void testGetTasksConfigConnectorNotFound() { final ArgumentCaptor>>> cb = ArgumentCaptor.forClass(Callback.class); expectAndCallbackException(cb, new NotFoundException("not found")) .when(herder).tasksConfig(eq(CONNECTOR_NAME), cb.capture()); @@ -806,8 +800,7 @@ public void testResetConnectorActiveTopics() { @Test public void testCompleteOrForwardWithErrorAndNoForwardUrl() { final ArgumentCaptor>> cb = ArgumentCaptor.forClass(Callback.class); - String leaderUrl = null; - expectAndCallbackException(cb, new NotLeaderException("not leader", leaderUrl)) + expectAndCallbackException(cb, new NotLeaderException("not leader", null)) .when(herder).deleteConnectorConfig(eq(CONNECTOR_NAME), cb.capture()); ConnectRestException e = assertThrows(ConnectRestException.class, () -> @@ -920,10 +913,6 @@ public void testResetOffsets() throws Throwable { assertEquals(msg, response.getEntity()); } - private byte[] serializeAsBytes(final T value) throws IOException { - return new ObjectMapper().writeValueAsBytes(value); - } - private Stubber expectAndCallbackResult(final ArgumentCaptor> cb, final T value) { return doAnswer(invocation -> { cb.getValue().onCompletion(null, value); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java index 0dff57fb59367..c48b60fbd96ff 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java @@ -188,7 +188,7 @@ public void testFenceZombiesWithInternalRequestSignature() throws Throwable { } @Test - public void testFenceZombiesConnectorNotFound() throws Throwable { + public void testFenceZombiesConnectorNotFound() { @SuppressWarnings("unchecked") final ArgumentCaptor> cb = ArgumentCaptor.forClass(Callback.class); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java index b8884b5ada8f9..19b51b0ec4ae2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -136,7 +136,7 @@ public void testNoBackoffTimeAndFail() throws Exception { public void testBackoffMoreThanTimeoutWillOnlyExecuteOnce() throws Exception { Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception")); - TimeoutException e = assertThrows(TimeoutException.class, + assertThrows(TimeoutException.class, () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 100, mockTime)); Mockito.verify(mockCallable, Mockito.times(1)).call(); } @@ -181,7 +181,7 @@ public void testSupplier() throws Exception { public void testWakeupException() throws Exception { Mockito.when(mockCallable.call()).thenThrow(new WakeupException()); - ConnectException e = assertThrows(ConnectException.class, + assertThrows(ConnectException.class, () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 10, mockTime)); Mockito.verify(mockCallable, Mockito.atLeastOnce()).call(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 07ccbde9de0af..9dab14df97ac3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -611,7 +611,7 @@ public void endOffsetsShouldFailWithUnsupportedVersionWhenVersionUnsupportedErro env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); TopicAdmin admin = new TopicAdmin(env.adminClient()); - UnsupportedVersionException e = assertThrows(UnsupportedVersionException.class, () -> admin.endOffsets(tps)); + assertThrows(UnsupportedVersionException.class, () -> admin.endOffsets(tps)); } } @@ -629,7 +629,7 @@ public void endOffsetsShouldFailWithTimeoutExceptionWhenTimeoutErrorOccurs() { env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset)); TopicAdmin admin = new TopicAdmin(env.adminClient()); - TimeoutException e = assertThrows(TimeoutException.class, () -> admin.endOffsets(tps)); + assertThrows(TimeoutException.class, () -> admin.endOffsets(tps)); } } @@ -705,7 +705,6 @@ public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() { String topicName = "myTopic"; TopicPartition tp1 = new TopicPartition(topicName, 0); Set tps = Collections.singleton(tp1); - long offset = 1000; Cluster cluster = createCluster(1, topicName, 1); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -733,14 +732,13 @@ private Cluster createCluster(int numNodes, String topicName, int partitions) { for (int i = 0; i < partitions; ++i) { pInfos.add(new PartitionInfo(topicName, i, leader, nodeArray, nodeArray)); } - Cluster cluster = new Cluster( + return new Cluster( "mockClusterId", nodes.values(), pInfos, Collections.emptySet(), Collections.emptySet(), leader); - return cluster; } private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java index 936363b4963ff..d90bbe8cd234e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java @@ -18,8 +18,6 @@ import org.apache.kafka.connect.cli.ConnectDistributed; import org.apache.kafka.connect.runtime.Connect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.URI; import java.util.Map; @@ -29,7 +27,6 @@ * A handle to a worker executing in a Connect cluster. */ public class WorkerHandle { - private static final Logger log = LoggerFactory.getLogger(WorkerHandle.class); private final String workerName; private final Connect worker; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java index 38f5a8a019a33..92d69f8adc1d8 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java @@ -53,10 +53,6 @@ interface ConfigName { String EXCLUDE = "exclude"; String INCLUDE = "include"; - // for backwards compatibility - String INCLUDE_ALIAS = "whitelist"; - String EXCLUDE_ALIAS = "blacklist"; - String RENAME = "renames"; }