Skip to content

Commit

Permalink
ATLAS-4963: checkstyle compliance updates - atlas-notification module (
Browse files Browse the repository at this point in the history
…#280)

(cherry picked from commit 391ddba)
  • Loading branch information
mneethiraj committed Feb 12, 2025
1 parent 3fa48f5 commit 8aa08f8
Show file tree
Hide file tree
Showing 59 changed files with 1,649 additions and 1,682 deletions.
1 change: 1 addition & 0 deletions dev-support/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@
<suppress files="AtlasGraphTraversalSource.java" checks="MethodName"/>
<suppress files="DeleteHandlerV1.java" checks="MethodName"/>
<suppress files="Id.java" checks="MethodName"/>
<suppress files="KafkaNotification.java" checks="StaticVariableName"/>
</suppressions>
5 changes: 5 additions & 0 deletions notification/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
<name>Apache Atlas Notification</name>
<description>Apache Atlas Notification</description>

<properties>
<checkstyle.failOnViolation>true</checkstyle.failOnViolation>
<checkstyle.skip>false</checkstyle.skip>
</properties>

<dependencies>

<dependency>
Expand Down
411 changes: 208 additions & 203 deletions notification/src/main/java/org/apache/atlas/hook/AtlasHook.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,7 +21,6 @@
* Exception class for Atlas Hooks.
*/
public class AtlasHookException extends Exception {

public AtlasHookException() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -39,11 +39,17 @@
* Use this class to create a Kafka topic with specific configuration like number of partitions, replicas, etc.
*/
public class AtlasTopicCreator {

private static final Logger LOG = LoggerFactory.getLogger(AtlasTopicCreator.class);

public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics";

public static void main(String[] args) throws AtlasException {
Configuration configuration = ApplicationProperties.get();
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();

atlasTopicCreator.createAtlasTopic(configuration, args);
}

/**
* Create an Atlas topic.
*
Expand All @@ -61,46 +67,46 @@ public void createAtlasTopic(Configuration atlasProperties, String... topicNames
if (!handleSecurity(atlasProperties)) {
return;
}
try(KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {

try (KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {
int numPartitions = atlasProperties.getInt("atlas.notification.partitions", 1);
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);

kafkaUtils.createTopics(Arrays.asList(topicNames), numPartitions, numReplicas);
} catch (Exception e) {
LOG.error("Error while creating topics e :" + e.getMessage(), e);
LOG.error("Error while creating topics e :{}", e.getMessage(), e);
}
} else {
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","),
ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","), ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
}
}

@VisibleForTesting
protected boolean handleSecurity(Configuration atlasProperties) {
if (AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) {
String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();

SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hadoopConf);

try {
String serverPrincipal = SecurityUtil.getServerPrincipal(kafkaPrincipal, (String) null);

UserGroupInformation.setConfiguration(hadoopConf);
UserGroupInformation.loginUserFromKeytab(serverPrincipal, kafkaKeyTab);
} catch (IOException e) {
LOG.warn("Could not login as {} from keytab file {}", kafkaPrincipal, kafkaKeyTab, e);

return false;
}
}

return true;
}

// This method is added to mock the creation of kafkaUtils object while writing the test cases
KafkaUtils getKafkaUtils(Configuration configuration) {
return new KafkaUtils(configuration);
}

public static void main(String[] args) throws AtlasException {
Configuration configuration = ApplicationProperties.get();
AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
atlasTopicCreator.createAtlasTopic(configuration, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,11 +18,9 @@

package org.apache.atlas.hook;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A logger wrapper that can be used to write messages that failed to be sent to a log file.
*/
Expand All @@ -32,4 +30,4 @@ public class FailedMessagesLogger {
public void log(String message) {
LOG.info(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,6 +21,11 @@
import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.collections.MapUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,12 +34,6 @@
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

/**
* Kafka specific notification consumer.
*
Expand All @@ -45,7 +44,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {

private final KafkaConsumer kafkaConsumer;
private final boolean autoCommitEnabled;
private long pollTimeoutMilliSeconds = 1000L;
private final long pollTimeoutMilliSeconds;

public AtlasKafkaConsumer(NotificationInterface.NotificationType notificationType, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) {
this(notificationType.getDeserializer(), kafkaConsumer, autoCommitEnabled, pollTimeoutMilliSeconds);
Expand All @@ -59,32 +58,11 @@ public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer,
this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
}

public List<AtlasKafkaMessage<T>> receive() {
return this.receive(this.pollTimeoutMilliSeconds);
}

@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
return receive(this.pollTimeoutMilliSeconds, null);
}

@Override
public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}

@Override
public List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receiveRawRecords(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}


@Override
public void commit(TopicPartition partition, long offset) {
if (!autoCommitEnabled) {
if (LOG.isDebugEnabled()) {
LOG.info(" commiting the offset ==>> " + offset);
}
LOG.debug(" commiting the offset ==>> {}", offset);

kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
}
}
Expand All @@ -103,6 +81,25 @@ public void wakeup() {
}
}

public List<AtlasKafkaMessage<T>> receive() {
return this.receive(this.pollTimeoutMilliSeconds);
}

@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
return receive(this.pollTimeoutMilliSeconds, null);
}

@Override
public List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}

@Override
public List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receiveRawRecords(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}

private List<AtlasKafkaMessage<T>> receiveRawRecords(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, true);
}
Expand All @@ -112,7 +109,7 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
}

private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset, boolean isRawDataRequired) {
List<AtlasKafkaMessage<T>> messages = new ArrayList();
List<AtlasKafkaMessage<T>> messages = new ArrayList<>();

ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null;

Expand All @@ -127,10 +124,9 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
if (MapUtils.isNotEmpty(lastCommittedPartitionOffset)
&& lastCommittedPartitionOffset.containsKey(topicPartition)
&& record.offset() < lastCommittedPartitionOffset.get(topicPartition)) {

commit(topicPartition, record.offset());
LOG.info("Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}",
record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition));
record.topic(), record.partition(), record.offset(), lastCommittedPartitionOffset.get(topicPartition));
continue;
}

Expand All @@ -147,21 +143,18 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
continue;
}

AtlasKafkaMessage kafkaMessage = null;
AtlasKafkaMessage kafkaMessage;

if (isRawDataRequired) {
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource(), record.value().toString());
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource(), record.value().toString());
} else {
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource());
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(), deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource());
}

messages.add(kafkaMessage);
}
}

return messages;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -79,7 +79,11 @@ public long getMsgCreated() {
return this.msgCreated;
}

public String getSource() { return this.source; }
public String getSource() {
return this.source;
}

public String getRawRecordData() { return this.rawRecordData; }
public String getRawRecordData() {
return this.rawRecordData;
}
}
Loading

0 comments on commit 8aa08f8

Please sign in to comment.