Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ATLAS-4963: checkstyle compliance updates - atlas-notification module #280

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-support/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@
<suppress files="AtlasGraphTraversalSource.java" checks="MethodName"/>
<suppress files="DeleteHandlerV1.java" checks="MethodName"/>
<suppress files="Id.java" checks="MethodName"/>
<suppress files="KafkaNotification.java" checks="StaticVariableName"/>
</suppressions>
5 changes: 5 additions & 0 deletions notification/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
<name>Apache Atlas Notification</name>
<description>Apache Atlas Notification</description>

<properties>
<checkstyle.failOnViolation>true</checkstyle.failOnViolation>
<checkstyle.skip>false</checkstyle.skip>
</properties>

<dependencies>

<dependency>
Expand Down
411 changes: 208 additions & 203 deletions notification/src/main/java/org/apache/atlas/hook/AtlasHook.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,7 +21,6 @@
* Exception class for Atlas Hooks.
*/
public class AtlasHookException extends Exception {

public AtlasHookException() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -39,11 +39,17 @@
* Use this class to create a Kafka topic with specific configuration like number of partitions, replicas, etc.
*/
public class AtlasTopicCreator {

private static final Logger LOG = LoggerFactory.getLogger(AtlasTopicCreator.class);

public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics";

public static void main(String[] args) throws AtlasException {
Configuration configuration = ApplicationProperties.get();
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();

atlasTopicCreator.createAtlasTopic(configuration, args);
}

/**
* Create an Atlas topic.
*
Expand All @@ -61,46 +67,46 @@ public void createAtlasTopic(Configuration atlasProperties, String... topicNames
if (!handleSecurity(atlasProperties)) {
return;
}
try(KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {

try (KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {
int numPartitions = atlasProperties.getInt("atlas.notification.partitions", 1);
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);

kafkaUtils.createTopics(Arrays.asList(topicNames), numPartitions, numReplicas);
} catch (Exception e) {
LOG.error("Error while creating topics e :" + e.getMessage(), e);
LOG.error("Error while creating topics e :{}", e.getMessage(), e);
}
} else {
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","),
ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","), ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
}
}

@VisibleForTesting
protected boolean handleSecurity(Configuration atlasProperties) {
if (AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) {
String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();

SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hadoopConf);

try {
String serverPrincipal = SecurityUtil.getServerPrincipal(kafkaPrincipal, (String) null);

UserGroupInformation.setConfiguration(hadoopConf);
UserGroupInformation.loginUserFromKeytab(serverPrincipal, kafkaKeyTab);
} catch (IOException e) {
LOG.warn("Could not login as {} from keytab file {}", kafkaPrincipal, kafkaKeyTab, e);

return false;
}
}

return true;
}

// This method is added to mock the creation of kafkaUtils object while writing the test cases
KafkaUtils getKafkaUtils(Configuration configuration) {
return new KafkaUtils(configuration);
}

public static void main(String[] args) throws AtlasException {
Configuration configuration = ApplicationProperties.get();
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
atlasTopicCreator.createAtlasTopic(configuration, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,11 +18,9 @@

package org.apache.atlas.hook;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A logger wrapper that can be used to write messages that failed to be sent to a log file.
*/
Expand All @@ -32,4 +30,4 @@ public class FailedMessagesLogger {
public void log(String message) {
LOG.info(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,6 +21,11 @@
import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.collections.MapUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,12 +34,6 @@
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

/**
* Kafka specific notification consumer.
*
Expand All @@ -45,7 +44,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {

private final KafkaConsumer kafkaConsumer;
private final boolean autoCommitEnabled;
private long pollTimeoutMilliSeconds = 1000L;
private final long pollTimeoutMilliSeconds;

public AtlasKafkaConsumer(NotificationInterface.NotificationType notificationType, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
this(notificationType.getDeserializer(), kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
Expand All @@ -59,32 +58,11 @@ public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer,
this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
}

public List<AtlasKafkaMessage<T>> receive() {
return this.receive(this.pollTimeoutMilliSeconds);
}

@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
return receive(this.pollTimeoutMilliSeconds, null);
}

@Override
public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}

@Override
public List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receiveRawRecords(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}


@Override
public void commit(TopicPartition partition, long offset) {
if (!autoCommitEnabled) {
if (LOG.isDebugEnabled()) {
LOG.info(" commiting the offset ==>> " + offset);
}
LOG.debug(" commiting the offset ==>> {}", offset);

kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
}
}
Expand All @@ -103,6 +81,25 @@ public void wakeup() {
}
}

public List<AtlasKafkaMessage<T>> receive() {
return this.receive(this.pollTimeoutMilliSeconds);
}

@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
return receive(this.pollTimeoutMilliSeconds, null);
}

@Override
public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}

@Override
public List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receiveRawRecords(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}

private List<AtlasKafkaMessage<T>> receiveRawRecords(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, true);
}
Expand All @@ -112,7 +109,7 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
}

private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset, boolean isRawDataRequired) {
List<AtlasKafkaMessage<T>> messages = new ArrayList();
List<AtlasKafkaMessage<T>> messages = new ArrayList<>();

ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null;

Expand All @@ -127,10 +124,9 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
if (MapUtils.isNotEmpty(lastCommittedPartitionOffset)
&& lastCommittedPartitionOffset.containsKey(topicPartition)
&& record.offset() < lastCommittedPartitionOffset.get(topicPartition)) {

commit(topicPartition, record.offset());
LOG.info("Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}",
record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition));
record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition));
continue;
}

Expand All @@ -147,21 +143,18 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
continue;
}

AtlasKafkaMessage kafkaMessage = null;
AtlasKafkaMessage kafkaMessage;

if (isRawDataRequired) {
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource(), record.value().toString());
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource(), record.value().toString());
} else {
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource());
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource());
}

messages.add(kafkaMessage);
}
}

return messages;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -79,7 +79,11 @@ public long getMsgCreated() {
return this.msgCreated;
}

public String getSource() { return this.source; }
public String getSource() {
return this.source;
}

public String getRawRecordData() { return this.rawRecordData; }
public String getRawRecordData() {
return this.rawRecordData;
}
}
Loading