Skip to content

Commit

Permalink
MINOR: Various cleanups in connect (apache#15734)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored Apr 17, 2024
1 parent 363f4d2 commit 7b1c777
Show file tree
Hide file tree
Showing 54 changed files with 97 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@
*/
public class PropertyFileLoginModule implements LoginModule {
private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class);
private static final Map<String, Properties> CREDENTIAL_PROPERTIES_MAP = new ConcurrentHashMap<>();
private static final String FILE_OPTIONS = "file";
private static final Map<String, Properties> CREDENTIAL_PROPERTIES = new ConcurrentHashMap<>();

private CallbackHandler callbackHandler;
private String fileName;
private boolean authenticated;

private static final Map<String, Properties> CREDENTIAL_PROPERTIES = new ConcurrentHashMap<>();

@Override
public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
this.callbackHandler = callbackHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void setup() {
@Test
public void testPutFlush() {
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
final String newLine = System.getProperty("line.separator");
final String newLine = System.lineSeparator();

// We do not call task.start() since it would override the output stream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ Set<String> findConsumerGroups()
.collect(Collectors.toSet());
// Only perform checkpoints for groups that have offsets for at least one topic that's accepted
// by the topic filter.
if (consumedTopics.size() > 0) {
checkpointGroups.add(group);
} else {
if (consumedTopics.isEmpty()) {
irrelevantGroups.add(group);
} else {
checkpointGroups.add(group);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public class MirrorMaker {
private final Time time;
private final MirrorMakerConfig config;
private final Set<String> clusters;
private final Set<SourceAndTarget> herderPairs;
private final MirrorRestServer internalServer;
private final RestClient restClient;

Expand Down Expand Up @@ -149,13 +148,13 @@ public MirrorMaker(MirrorMakerConfig config, List<String> clusters, Time time) {
this.clusters = config.clusters();
}
log.info("Targeting clusters {}", this.clusters);
this.herderPairs = config.clusterPairs().stream()
Set<SourceAndTarget> herderPairs = config.clusterPairs().stream()
.filter(x -> this.clusters.contains(x.target()))
.collect(Collectors.toSet());
if (herderPairs.isEmpty()) {
throw new IllegalArgumentException("No source->target replication flows.");
}
this.herderPairs.forEach(this::addHerder);
herderPairs.forEach(this::addHerder);
shutdownHook = new ShutdownHook();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.ws.rs.NotFoundException;
Expand All @@ -37,8 +35,6 @@ public class InternalMirrorResource extends InternalClusterResource {
@Context
private UriInfo uriInfo;

private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);

private final Map<SourceAndTarget, Herder> herders;

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,7 @@

public class MirrorSourceConnectorTest {
private ConfigPropertyFilter getConfigPropertyFilter() {
return new ConfigPropertyFilter() {
@Override
public boolean shouldReplicateConfigProperty(String prop) {
return true;
}

};
return prop -> true;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,17 +331,13 @@ public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception {
}

void waitForTopicToPersistInFakeLocalMetadataStore(String topicName) throws InterruptedException {
waitForCondition(() -> {
return FakeLocalMetadataStore.containsTopic(topicName);
}, FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS,
waitForCondition(() -> FakeLocalMetadataStore.containsTopic(topicName), FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS,
"Topic: " + topicName + " didn't get created in the FakeLocalMetadataStore"
);
}

void waitForTopicConfigPersistInFakeLocalMetaDataStore(String topicName, String configName, String expectedConfigValue) throws InterruptedException {
waitForCondition(() -> {
return FakeLocalMetadataStore.topicConfig(topicName).getOrDefault(configName, "").equals(expectedConfigValue);
}, FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS,
waitForCondition(() -> FakeLocalMetadataStore.topicConfig(topicName).getOrDefault(configName, "").equals(expectedConfigValue), FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS,
"Topic: " + topicName + "'s configs don't have " + configName + ":" + expectedConfigValue
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -55,7 +53,6 @@
* </p>
*/
public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);

public ConnectDistributed(String... args) {
super(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private static ConfigInfos validateClientOverrides(String connName,

ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
if (configValue.errorMessages().size() > 0) {
if (!configValue.errorMessages().isEmpty()) {
errorCount++;
}
ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ boolean sendRecords() {
int processed = 0;
recordBatch(toSend.size());
final SourceRecordWriteCounter counter =
toSend.size() > 0 ? new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup) : null;
toSend.isEmpty() ? null : new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
for (final SourceRecord preTransformRecord : toSend) {
ProcessingContext<SourceRecord> context = new ProcessingContext<>(preTransformRecord);
final SourceRecord record = transformationChain.apply(context, preTransformRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<Str
ConfigDef newDef = new ConfigDef(baseConfigDef);
new EnrichablePlugin<Transformation<?>>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class,
props, requireFullConfig) {
@SuppressWarnings("rawtypes")

@Override
protected Set<PluginDesc<Transformation<?>>> plugins() {
return plugins.transformations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ void initialize() {
}
}

private boolean doStart() throws Throwable {
private boolean doStart() {
try {
switch (state) {
case STARTED:
Expand Down Expand Up @@ -235,12 +235,12 @@ private synchronized void onFailure(Throwable t) {
this.state = State.FAILED;
}

private void resume() throws Throwable {
private void resume() {
if (doStart())
statusListener.onResume(connName);
}

private void start() throws Throwable {
private void start() {
if (doStart())
statusListener.onStartup(connName);
}
Expand Down Expand Up @@ -392,7 +392,7 @@ void doTransitionTo(TargetState targetState, Callback<TargetState> stateChangeCa
}
}

private void doTransitionTo(TargetState targetState) throws Throwable {
private void doTransitionTo(TargetState targetState) {
log.debug("{} Transition connector to {}", this, targetState);
if (targetState == TargetState.PAUSED) {
suspend(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,6 @@ private void onPartitionsRemoved(Collection<TopicPartition> partitions, boolean
}

static class SinkTaskMetricsGroup {
private final ConnectorTaskId id;
private final ConnectMetrics metrics;
private final MetricGroup metricGroup;
private final Sensor sinkRecordRead;
private final Sensor sinkRecordSend;
Expand All @@ -805,13 +803,10 @@ static class SinkTaskMetricsGroup {
private final Sensor offsetCompletionSkip;
private final Sensor putBatchTime;
private final Sensor sinkRecordActiveCount;
private long activeRecords;
private Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<>();
private Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();

public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
this.metrics = connectMetrics;
this.id = id;

ConnectMetricsRegistry registry = connectMetrics.registry();
metricGroup = connectMetrics
Expand Down Expand Up @@ -855,7 +850,7 @@ public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
void computeSinkRecordLag() {
Map<TopicPartition, OffsetAndMetadata> consumed = this.consumedOffsets;
Map<TopicPartition, OffsetAndMetadata> committed = this.committedOffsets;
activeRecords = 0L;
long activeRecords = 0L;
for (Map.Entry<TopicPartition, OffsetAndMetadata> committedOffsetEntry : committed.entrySet()) {
final TopicPartition partition = committedOffsetEntry.getKey();
final OffsetAndMetadata consumedOffsetMeta = consumed.get(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@ public RequestTargetException(String s, String forwardUrl) {
this.forwardUrl = forwardUrl;
}

public RequestTargetException(String s, Throwable throwable, String forwardUrl) {
super(s, throwable);
this.forwardUrl = forwardUrl;
}

public RequestTargetException(Throwable throwable, String forwardUrl) {
super(throwable);
this.forwardUrl = forwardUrl;
}

public String forwardUrl() {
return forwardUrl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private final ConcurrentMap<String, String> aliases;

// Although this classloader does not load classes directly but rather delegates loading to a
// PluginClassLoader or its parent through its base class, because of the use of inheritance in
// PluginClassLoader or its parent through its base class, because of the use of inheritance
// in the latter case, this classloader needs to also be declared as parallel capable to use
// fine-grain locking when loading classes.
static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public URI advertisedUrl() {
String advertisedHostname = config.advertisedHostName();
if (advertisedHostname != null && !advertisedHostname.isEmpty())
builder.host(advertisedHostname);
else if (serverConnector != null && serverConnector.getHost() != null && serverConnector.getHost().length() > 0)
else if (serverConnector != null && serverConnector.getHost() != null && !serverConnector.getHost().isEmpty())
builder.host(serverConnector.getHost());

Integer advertisedPort = config.advertisedPort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public KafkaConfigBackingStore(Converter converter, DistributedConfig config, Wo

this.usesFencableWriter = config.transactionalLeaderEnabled();
this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
if (this.topic == null || this.topic.trim().length() == 0)
if (this.topic == null || this.topic.trim().isEmpty())
throw new ConfigException("Must specify topic for connector configuration.");

configLog = setupAndCreateKafkaBasedLog(this.topic, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin, Supplier<String>
@Override
public void configure(final WorkerConfig config) {
String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
if (topic == null || topic.trim().length() == 0)
if (topic == null || topic.trim().isEmpty())
throw new ConfigException("Offset storage topic must be specified");

this.exactlyOnce = config.exactlyOnceSourceEnabled();
Expand Down Expand Up @@ -249,7 +249,7 @@ public void start() {
+ "support for source connectors, or upgrade to a newer Kafka broker version.";
} else {
message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + "is set to "
+ IsolationLevel.READ_COMMITTED.toString()
+ IsolationLevel.READ_COMMITTED
+ ", a Kafka broker version that allows admin clients to read consumer offsets is required. "
+ "Please either reconfigure the worker or connector, or upgrade to a newer Kafka broker version.";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdm
@Override
public void configure(final WorkerConfig config) {
this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
if (this.statusTopic == null || this.statusTopic.trim().length() == 0)
if (this.statusTopic == null || this.statusTopic.trim().isEmpty())
throw new ConfigException("Must specify topic for connector status.");

sendRetryExecutor = Executors.newSingleThreadExecutor(
Expand Down Expand Up @@ -298,7 +298,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) return;
// TODO: retry more gracefully and not forever
if (exception instanceof RetriableException) {
sendRetryExecutor.submit((Runnable) () -> kafkaLog.send(key, value, this));
sendRetryExecutor.submit(() -> kafkaLog.send(key, value, this));
} else {
log.error("Failed to write status update", exception);
}
Expand Down Expand Up @@ -332,7 +332,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
return;
}

sendRetryExecutor.submit((Runnable) () -> kafkaLog.send(key, value, this));
sendRetryExecutor.submit(() -> kafkaLog.send(key, value, this));
} else {
log.error("Failed to write status update", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Collection;
Expand All @@ -38,7 +36,6 @@
* background thread.
*/
public abstract class MemoryOffsetBackingStore implements OffsetBackingStore {
private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);

protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
protected ExecutorService executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,18 @@ private <P extends Predicate<?>> DocInfo(Class<P> predicateClass, String overvie
}
}

private static final List<DocInfo> PREDICATES;
static {
List<DocInfo> collect = new Plugins(Collections.emptyMap()).predicates().stream()
.map(p -> {
try {
String overviewDoc = (String) p.pluginClass().getDeclaredField("OVERVIEW_DOC").get(null);
ConfigDef configDef = (ConfigDef) p.pluginClass().getDeclaredField("CONFIG_DEF").get(null);
return new DocInfo(p.pluginClass(), overviewDoc, configDef);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Predicate class " + p.pluginClass().getName() + " lacks either a `public static final String OVERVIEW_DOC` or `public static final ConfigDef CONFIG_DEF`");
}
})
.sorted(Comparator.comparing(docInfo -> docInfo.predicateName))
.collect(Collectors.toList());
PREDICATES = collect;
}
private static final List<DocInfo> PREDICATES = new Plugins(Collections.emptyMap()).predicates().stream()
.map(p -> {
try {
String overviewDoc = (String) p.pluginClass().getDeclaredField("OVERVIEW_DOC").get(null);
ConfigDef configDef = (ConfigDef) p.pluginClass().getDeclaredField("CONFIG_DEF").get(null);
return new DocInfo(p.pluginClass(), overviewDoc, configDef);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Predicate class " + p.pluginClass().getName() + " lacks either a `public static final String OVERVIEW_DOC` or `public static final ConfigDef CONFIG_DEF`");
}
})
.sorted(Comparator.comparing(docInfo -> docInfo.predicateName))
.collect(Collectors.toList());

private static void printPredicateHtml(PrintStream out, DocInfo docInfo) {
out.println("<div id=\"" + docInfo.predicateName + "\">");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void start() {
throw new ConnectException(
"Must provide a TopicAdmin to KafkaBasedLog when consumer is configured with "
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to "
+ IsolationLevel.READ_COMMITTED.toString()
+ IsolationLevel.READ_COMMITTED
);
}
initializer.accept(admin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,6 @@ public static LoggingContext forConnector(String connectorName) {
return context;
}

/**
* Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the
* supplied connector name and the {@link Scope#VALIDATE} scope.
*
* @param connectorName the connector name
*/
public static LoggingContext forValidation(String connectorName) {
LoggingContext context = new LoggingContext();
MDC.put(CONNECTOR_CONTEXT, prefixFor(connectorName, Scope.VALIDATE, null));
return context;
}

/**
* Modify the current {@link MDC} logging context to set the {@link #CONNECTOR_CONTEXT connector context} to include the
* connector name and task number using the supplied {@link ConnectorTaskId}, and to set the scope to {@link Scope#TASK}.
Expand Down
Loading

0 comments on commit 7b1c777

Please sign in to comment.