Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kkondaka authored Jun 28, 2024
2 parents d29fac8 + f27b808 commit 281b8c6
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 57 deletions.
73 changes: 73 additions & 0 deletions TRIAGING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<img src="https://raw.githubusercontent.com/opensearch-project/data-prepper/main/docs/images/DataPrepper_auto.svg" height="64px" alt="Data Prepper">

The Data Prepper maintainers seek to promote an inclusive and engaged community of contributors.
In order to facilitate this, weekly triage meetings are open to all and attendance is encouraged for anyone who hopes to contribute, discuss an issue, or learn more about the project.
To learn more about contributing to the Data Prepper project visit the [Contributing](./CONTRIBUTING.md) documentation.

### Do I need to attend for my issue to be addressed/triaged?

Attendance is not required for your issue to be triaged or addressed.
All new issues are triaged weekly.

### What happens if my issue does not get covered this time?

Each meeting we seek to address all new issues.
However, should we run out of time before your issue is discussed, you are always welcome to attend the next meeting or to follow up on the issue post itself.

### How do I join the triage meeting?

Meetings are hosted regularly Tuesdays at 2:30 PM US Central Time (12:30 PM Pacific Time) and can be joined via the links posted on the [OpenSearch Meetup Group](https://www.meetup.com/opensearch/events/) list of events.
The event will be titled `Data Prepper Triage Meeting`.

After joining the Zoom meeting, you can enable your video / voice to join the discussion.
If you do not have a webcam or microphone available, you can still join in via the text chat.

If you have an issue you'd like to bring forth please consider getting a link to the issue so it can be presented to everyone in the meeting.

### Is there an agenda for each week?

Meetings are 30 minutes and structured as follows:

1. Initial Gathering: As we gather, feel free to turn on video and engage in informal and open-to-all conversation. A volunteer Data Prepper maintainer will share the [Data Prepper Tracking Board](https://github.com/orgs/opensearch-project/projects/82/) and proceed.
2. Announcements: We will make any announcements at the beginning, if necessary.
3. Untriaged issues: We will review all untriaged [issues](https://github.com/orgs/opensearch-project/projects/82/views/6) for the Data Prepper repository. If you have an item here, you may spend a few minutes to explain your request.
4. Member Requests: Opportunity for any meeting member to ask for consideration of an issue or pull request.
5. Release review: If time permits, and we find it necessary, we will review [items for the current release](https://github.com/orgs/opensearch-project/projects/82/views/14).
6. Follow-up issues: If time permits, we will review the [follow up items](https://github.com/orgs/opensearch-project/projects/82/views/18).
7. Open Discussion: If time permits, allow for members of the meeting to surface any topics without issues filed or pull request created.

### Do I need to have already contributed to the project to attend a triage meeting?

No, all are welcome and encouraged to attend.
Attending the triage meetings is a great way for a new contributor to learn about the project as well as explore different avenues of contribution.

### What if I have follow-up questions on an issue?

If you have an existing issue you would like to discuss, you can always comment on the issue itself.
Alternatively, you are welcome to come to the triage meeting to discuss.

### Is this meeting a good place to get help using Data Prepper?

While we are always happy to help the community, the best resource for usage questions is the [the Data Prepper discussion forum](https://github.com/opensearch-project/data-prepper/discussions) on GitHub.

There you can find answers to many common questions as well as speak with implementation experts and Data Prepper maintainers.

### What are the issue labels associated with triaging?

There are several labels that are particularly important for triaging in Data Prepper:

| Label | When applied | Meaning |
| ----- | ------------ | ------- |
| [untriaged](https://github.com/opensearch-project/data-prepper/labels/untriaged) | When issues are created or re-opened. | Issues labeled as `untriaged` require the attention of the repository maintainers and may need to be prioritized for quicker resolution. It's crucial to keep the count of 'untriaged' labels low to ensure all potential security issues are addressed in a timely manner. |
| [follow up](https://github.com/opensearch-project/data-prepper/labels/follow%20up) | During triage meetings. | Issues labeled as `follow up` have been triaged. However, the maintainers may need to follow up further on it. This tag lets us triage an issue as not critical, but also be able to come back to it.
| [help wanted](https://github.com/opensearch-project/data-prepper/labels/help%20wanted) | Anytime. | Issues marked as `help wanted` signal that they are actionable and not the current focus of the project maintainers. Community contributions are especially encouraged for these issues. |
| [good first issue](https://github.com/opensearch-project/data-prepper/labels/good%20first%20issue) | Anytime. | Issues labeled as `good first issue` are small in scope and can be resolved with a single pull request. These are recommended starting points for newcomers looking to make their first contributions. |


### Is this where I should bring up potential security vulnerabilities?

Due to the sensitive nature of security vulnerabilities, please report all potential vulnerabilities directly by following the steps outlined in the [Security Issue Response Process](https://github.com/opensearch-project/data-prepper/security/policy).

### Who should I contact if I have further questions?

You can always file an [issue](https://github.com/opensearch-project/data-prepper/issues/new/choose) for any question you have about the project.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.common;public class KafkaMdc {
package org.opensearch.dataprepper.plugins.kafka.common;

public class KafkaMdc {
public static final String MDC_KAFKA_PLUGIN_KEY = "kafkaPluginType";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ public class KafkaPluginThreadFactory implements ThreadFactory {
final ThreadFactory delegateThreadFactory,
final String kafkaPluginType) {
this.delegateThreadFactory = delegateThreadFactory;
this.threadPrefix = "kafka-" + kafkaPluginType + "-";
this.threadPrefix = createPluginPart(kafkaPluginType);
this.kafkaPluginType = kafkaPluginType;
}

KafkaPluginThreadFactory(
final ThreadFactory delegateThreadFactory,
final String kafkaPluginType,
final String kafkaTopic) {
this.delegateThreadFactory = delegateThreadFactory;
this.threadPrefix = normalizeName(kafkaTopic) + "-" + createPluginPart(kafkaPluginType);
this.kafkaPluginType = kafkaPluginType;
}

Expand All @@ -39,6 +48,28 @@ public static KafkaPluginThreadFactory defaultExecutorThreadFactory(final String
return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType);
}

/**
* Creates an instance specifically for use with {@link Executors}.
*
* @param kafkaPluginType The name of the plugin type. e.g. sink, source, buffer
* @return An instance of the {@link KafkaPluginThreadFactory}.
*/
public static KafkaPluginThreadFactory defaultExecutorThreadFactory(
final String kafkaPluginType,
final String kafkaTopic) {
return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType, kafkaTopic);
}

private static String createPluginPart(final String kafkaPluginType) {
return "kafka-" + kafkaPluginType + "-";
}

private static String normalizeName(final String kafkaTopic) {
final String limitedName = kafkaTopic.length() > 20 ? kafkaTopic.substring(0, 20) : kafkaTopic;
return limitedName
.toLowerCase().replaceAll("[^a-z0-9]", "-");
}

@Override
public Thread newThread(final Runnable runnable) {
final Thread thread = delegateThreadFactory.newThread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -73,10 +76,10 @@
public class KafkaSource implements Source<Record<Event>> {
private static final String NO_RESOLVABLE_URLS_ERROR_MESSAGE = "No resolvable bootstrap urls given in bootstrap.servers";
private static final long RETRY_SLEEP_INTERVAL = 30000;
private static final String MDC_KAFKA_PLUGIN_VALUE = "source";
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final KafkaSourceConfig sourceConfig;
private final AtomicBoolean shutdownInProgress;
private ExecutorService executorService;
private final PluginMetrics pluginMetrics;
private KafkaCustomConsumer consumer;
private KafkaConsumer kafkaConsumer;
Expand Down Expand Up @@ -112,59 +115,65 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,

@Override
public void start(Buffer<Record<Event>> buffer) {
Properties authProperties = new Properties();
KafkaSecurityConfigurer.setDynamicSaslClientCallbackHandler(authProperties, sourceConfig, pluginConfigObservable);
KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, true);
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
int numWorkers = topic.getWorkers();
executorService = Executors.newFixedThreadPool(numWorkers);
allTopicExecutorServices.add(executorService);

IntStream.range(0, numWorkers).forEach(index -> {
while (true) {
try {
kafkaConsumer = createKafkaConsumer(schema, consumerProperties);
break;
} catch (ConfigException ce) {
if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) {
LOG.warn("Exception while creating Kafka consumer: ", ce);
LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL);
try {
sleep(RETRY_SLEEP_INTERVAL);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
try {
setMdc();
Properties authProperties = new Properties();
KafkaSecurityConfigurer.setDynamicSaslClientCallbackHandler(authProperties, sourceConfig, pluginConfigObservable);
KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG);
sourceConfig.getTopics().forEach(topic -> {
consumerGroupID = topic.getGroupId();
KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, true);
Properties consumerProperties = getConsumerProperties(topic, authProperties);
MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType);
try {
int numWorkers = topic.getWorkers();
final ExecutorService executorService = Executors.newFixedThreadPool(
numWorkers, KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE, topic.getName()));
allTopicExecutorServices.add(executorService);

IntStream.range(0, numWorkers).forEach(index -> {
while (true) {
try {
kafkaConsumer = createKafkaConsumer(schema, consumerProperties);
break;
} catch (ConfigException ce) {
if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) {
LOG.warn("Exception while creating Kafka consumer: ", ce);
LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL);
try {
sleep(RETRY_SLEEP_INTERVAL);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
} else {
throw ce;
}
} else {
throw ce;
}

}
consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType,
acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause());
allTopicConsumers.add(consumer);

executorService.submit(consumer);
});
} catch (Exception e) {
if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) {
LOG.error("The kafka broker is not available...");
} else {
LOG.error("Failed to setup the Kafka Source Plugin.", e);
}
consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType,
acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause());
allTopicConsumers.add(consumer);

executorService.submit(consumer);
});
} catch (Exception e) {
if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) {
LOG.error("The kafka broker is not available...");
} else {
LOG.error("Failed to setup the Kafka Source Plugin.", e);
throw new RuntimeException(e);
}
throw new RuntimeException(e);
}
LOG.info("Started Kafka source for topic " + topic.getName());
});
LOG.info("Started Kafka source for topic " + topic.getName());
});
} finally {
removeMdc();
}
}

public KafkaConsumer<?, ?> createKafkaConsumer(final MessageFormat schema, final Properties consumerProperties) {
KafkaConsumer<?, ?> createKafkaConsumer(final MessageFormat schema, final Properties consumerProperties) {
switch (schema) {
case JSON:
return new KafkaConsumer<String, JsonNode>(consumerProperties);
Expand All @@ -183,19 +192,24 @@ public void start(Buffer<Record<Event>> buffer) {

@Override
public void stop() {
shutdownInProgress.set(true);
final long shutdownWaitTime = calculateLongestThreadWaitingTime();
try {
setMdc();
shutdownInProgress.set(true);
final long shutdownWaitTime = calculateLongestThreadWaitingTime();

LOG.info("Shutting down {} Executor services", allTopicExecutorServices.size());
allTopicExecutorServices.forEach(executor -> stopExecutor(executor, shutdownWaitTime));
LOG.info("Shutting down {} Executor services", allTopicExecutorServices.size());
allTopicExecutorServices.forEach(executor -> stopExecutor(executor, shutdownWaitTime));

LOG.info("Closing {} consumers", allTopicConsumers.size());
allTopicConsumers.forEach(consumer -> consumer.closeConsumer());
LOG.info("Closing {} consumers", allTopicConsumers.size());
allTopicConsumers.forEach(consumer -> consumer.closeConsumer());

LOG.info("Kafka source shutdown successfully...");
LOG.info("Kafka source shutdown successfully...");
} finally {
removeMdc();
}
}

public void stopExecutor(final ExecutorService executorService, final long shutdownWaitTime) {
private void stopExecutor(final ExecutorService executorService, final long shutdownWaitTime) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(shutdownWaitTime, TimeUnit.SECONDS)) {
Expand Down Expand Up @@ -346,7 +360,7 @@ private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
}
}

protected void sleep(final long millis) throws InterruptedException {
void sleep(final long millis) throws InterruptedException {
Thread.sleep(millis);
}

Expand All @@ -366,4 +380,12 @@ private void updateConfig(final KafkaClusterConfigSupplier kafkaClusterConfigSup
}
}
}

private static void setMdc() {
MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, MDC_KAFKA_PLUGIN_VALUE);
}

private static void removeMdc() {
MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY);
}
}
Loading

0 comments on commit 281b8c6

Please sign in to comment.