From 36d599f1183f3bdfcef32183c80f573bb9a20e6c Mon Sep 17 00:00:00 2001 From: David Venable Date: Wed, 26 Jun 2024 14:25:57 -0500 Subject: [PATCH 1/2] Enhanced Kafka source logging through the use of MDC and better thread names for Kafka source threads. Resolves #4126. (#4663) Signed-off-by: David Venable --- .../plugins/kafka/common/KafkaMdc.java | 4 +- .../thread/KafkaPluginThreadFactory.java | 33 ++++- .../plugins/kafka/source/KafkaSource.java | 132 ++++++++++-------- .../thread/KafkaPluginThreadFactoryTest.java | 73 ++++++++++ .../plugins/kafka/source/KafkaSourceTest.java | 40 ++++++ 5 files changed, 225 insertions(+), 57 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java index 9ae8985908..785d565e78 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.kafka.common;public class KafkaMdc { +package org.opensearch.dataprepper.plugins.kafka.common; + +public class KafkaMdc { public static final String MDC_KAFKA_PLUGIN_KEY = "kafkaPluginType"; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java index a05540c320..b5dede6cda 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java @@ -25,7 +25,16 @@ public class KafkaPluginThreadFactory implements ThreadFactory { final ThreadFactory delegateThreadFactory, final String kafkaPluginType) { this.delegateThreadFactory = delegateThreadFactory; - this.threadPrefix = "kafka-" + kafkaPluginType + "-"; + this.threadPrefix = createPluginPart(kafkaPluginType); + this.kafkaPluginType = kafkaPluginType; + } + + KafkaPluginThreadFactory( + final ThreadFactory delegateThreadFactory, + final String kafkaPluginType, + final String kafkaTopic) { + this.delegateThreadFactory = delegateThreadFactory; + this.threadPrefix = normalizeName(kafkaTopic) + "-" + createPluginPart(kafkaPluginType); this.kafkaPluginType = kafkaPluginType; } @@ -39,6 +48,28 @@ public static KafkaPluginThreadFactory defaultExecutorThreadFactory(final String return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType); } + /** + * Creates an instance specifically for use with {@link Executors}. + * + * @param kafkaPluginType The name of the plugin type. e.g. sink, source, buffer + * @return An instance of the {@link KafkaPluginThreadFactory}. + */ + public static KafkaPluginThreadFactory defaultExecutorThreadFactory( + final String kafkaPluginType, + final String kafkaTopic) { + return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType, kafkaTopic); + } + + private static String createPluginPart(final String kafkaPluginType) { + return "kafka-" + kafkaPluginType + "-"; + } + + private static String normalizeName(final String kafkaTopic) { + final String limitedName = kafkaTopic.length() > 20 ? kafkaTopic.substring(0, 20) : kafkaTopic; + return limitedName + .toLowerCase().replaceAll("[^a-z0-9]", "-"); + } + @Override public Thread newThread(final Runnable runnable) { final Thread thread = delegateThreadFactory.newThread(() -> { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 525c754929..e235594ce2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -29,6 +29,8 @@ import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc; +import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; @@ -46,6 +48,7 @@ import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.io.IOException; import java.util.ArrayList; @@ -73,10 +76,10 @@ public class KafkaSource implements Source> { private static final String NO_RESOLVABLE_URLS_ERROR_MESSAGE = "No resolvable bootstrap urls given in bootstrap.servers"; private static final long RETRY_SLEEP_INTERVAL = 30000; + private static final String MDC_KAFKA_PLUGIN_VALUE = "source"; private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private final KafkaSourceConfig sourceConfig; private final AtomicBoolean shutdownInProgress; - private ExecutorService executorService; private final PluginMetrics pluginMetrics; private KafkaCustomConsumer consumer; private KafkaConsumer kafkaConsumer; @@ -112,59 +115,65 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, @Override public void start(Buffer> buffer) { - Properties authProperties = new Properties(); - KafkaSecurityConfigurer.setDynamicSaslClientCallbackHandler(authProperties, sourceConfig, pluginConfigObservable); - KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); - sourceConfig.getTopics().forEach(topic -> { - consumerGroupID = topic.getGroupId(); - KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, true); - Properties consumerProperties = getConsumerProperties(topic, authProperties); - MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); - try { - int numWorkers = topic.getWorkers(); - executorService = Executors.newFixedThreadPool(numWorkers); - allTopicExecutorServices.add(executorService); - - IntStream.range(0, numWorkers).forEach(index -> { - while (true) { - try { - kafkaConsumer = createKafkaConsumer(schema, consumerProperties); - break; - } catch (ConfigException ce) { - if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) { - LOG.warn("Exception while creating Kafka consumer: ", ce); - LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL); - try { - sleep(RETRY_SLEEP_INTERVAL); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); + try { + setMdc(); + Properties authProperties = new Properties(); + KafkaSecurityConfigurer.setDynamicSaslClientCallbackHandler(authProperties, sourceConfig, pluginConfigObservable); + KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); + sourceConfig.getTopics().forEach(topic -> { + consumerGroupID = topic.getGroupId(); + KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, true); + Properties consumerProperties = getConsumerProperties(topic, authProperties); + MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); + try { + int numWorkers = topic.getWorkers(); + final ExecutorService executorService = Executors.newFixedThreadPool( + numWorkers, KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE, topic.getName())); + allTopicExecutorServices.add(executorService); + + IntStream.range(0, numWorkers).forEach(index -> { + while (true) { + try { + kafkaConsumer = createKafkaConsumer(schema, consumerProperties); + break; + } catch (ConfigException ce) { + if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) { + LOG.warn("Exception while creating Kafka consumer: ", ce); + LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL); + try { + sleep(RETRY_SLEEP_INTERVAL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } else { + throw ce; } - } else { - throw ce; } + } + consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, + acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause()); + allTopicConsumers.add(consumer); + executorService.submit(consumer); + }); + } catch (Exception e) { + if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { + LOG.error("The kafka broker is not available..."); + } else { + LOG.error("Failed to setup the Kafka Source Plugin.", e); } - consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, - acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause()); - allTopicConsumers.add(consumer); - - executorService.submit(consumer); - }); - } catch (Exception e) { - if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { - LOG.error("The kafka broker is not available..."); - } else { - LOG.error("Failed to setup the Kafka Source Plugin.", e); + throw new RuntimeException(e); } - throw new RuntimeException(e); - } - LOG.info("Started Kafka source for topic " + topic.getName()); - }); + LOG.info("Started Kafka source for topic " + topic.getName()); + }); + } finally { + removeMdc(); + } } - public KafkaConsumer createKafkaConsumer(final MessageFormat schema, final Properties consumerProperties) { + KafkaConsumer createKafkaConsumer(final MessageFormat schema, final Properties consumerProperties) { switch (schema) { case JSON: return new KafkaConsumer(consumerProperties); @@ -183,19 +192,24 @@ public void start(Buffer> buffer) { @Override public void stop() { - shutdownInProgress.set(true); - final long shutdownWaitTime = calculateLongestThreadWaitingTime(); + try { + setMdc(); + shutdownInProgress.set(true); + final long shutdownWaitTime = calculateLongestThreadWaitingTime(); - LOG.info("Shutting down {} Executor services", allTopicExecutorServices.size()); - allTopicExecutorServices.forEach(executor -> stopExecutor(executor, shutdownWaitTime)); + LOG.info("Shutting down {} Executor services", allTopicExecutorServices.size()); + allTopicExecutorServices.forEach(executor -> stopExecutor(executor, shutdownWaitTime)); - LOG.info("Closing {} consumers", allTopicConsumers.size()); - allTopicConsumers.forEach(consumer -> consumer.closeConsumer()); + LOG.info("Closing {} consumers", allTopicConsumers.size()); + allTopicConsumers.forEach(consumer -> consumer.closeConsumer()); - LOG.info("Kafka source shutdown successfully..."); + LOG.info("Kafka source shutdown successfully..."); + } finally { + removeMdc(); + } } - public void stopExecutor(final ExecutorService executorService, final long shutdownWaitTime) { + private void stopExecutor(final ExecutorService executorService, final long shutdownWaitTime) { executorService.shutdown(); try { if (!executorService.awaitTermination(shutdownWaitTime, TimeUnit.SECONDS)) { @@ -346,7 +360,7 @@ private void setPropertiesForSchemaRegistryConnectivity(Properties properties) { } } - protected void sleep(final long millis) throws InterruptedException { + void sleep(final long millis) throws InterruptedException { Thread.sleep(millis); } @@ -366,4 +380,12 @@ private void updateConfig(final KafkaClusterConfigSupplier kafkaClusterConfigSup } } } + + private static void setMdc() { + MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, MDC_KAFKA_PLUGIN_VALUE); + } + + private static void removeMdc() { + MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java index 589f81a74c..1f1bc854dc 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java @@ -8,6 +8,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -37,10 +39,12 @@ class KafkaPluginThreadFactoryTest { @Mock private Runnable runnable; private String pluginType; + private String topic; @BeforeEach void setUp() { pluginType = UUID.randomUUID().toString(); + topic = UUID.randomUUID().toString(); when(delegateThreadFactory.newThread(any(Runnable.class))).thenReturn(innerThread); } @@ -50,11 +54,20 @@ private KafkaPluginThreadFactory createObjectUnderTest() { return new KafkaPluginThreadFactory(delegateThreadFactory, pluginType); } + private KafkaPluginThreadFactory createObjectUnderTestWithTopic() { + return new KafkaPluginThreadFactory(delegateThreadFactory, pluginType, topic); + } + @Test void newThread_creates_thread_from_delegate() { assertThat(createObjectUnderTest().newThread(runnable), equalTo(innerThread)); } + @Test + void newThread_with_topic_creates_thread_from_delegate() { + assertThat(createObjectUnderTestWithTopic().newThread(runnable), equalTo(innerThread)); + } + @Test void newThread_creates_thread_with_name() { final KafkaPluginThreadFactory objectUnderTest = createObjectUnderTest(); @@ -69,6 +82,30 @@ void newThread_creates_thread_with_name() { verify(thread2).setName(String.format("kafka-%s-2", pluginType)); } + @ParameterizedTest + @CsvSource({ + "abcd12,abcd12", + "aBCd12,abcd12", + "abcd-12,abcd-12", + "has space,has-space", + "has!character,has-character", + "this-is-somewhat-too-long,this-is-somewhat-too" + }) + void newThread_with_topic_creates_thread_with_name( + final String topicName, + final String expectedPrefix) { + this.topic = topicName; + final KafkaPluginThreadFactory objectUnderTest = createObjectUnderTestWithTopic(); + + final Thread thread1 = objectUnderTest.newThread(runnable); + assertThat(thread1, notNullValue()); + verify(thread1).setName(String.format("%s-kafka-%s-1", expectedPrefix, pluginType)); + + final Thread thread2 = objectUnderTest.newThread(runnable); + assertThat(thread2, notNullValue()); + verify(thread2).setName(String.format("%s-kafka-%s-2", expectedPrefix, pluginType)); + } + @Test void newThread_creates_thread_with_wrapping_runnable() { createObjectUnderTest().newThread(runnable); @@ -85,6 +122,22 @@ void newThread_creates_thread_with_wrapping_runnable() { verify(runnable).run(); } + @Test + void newThread_with_topic_creates_thread_with_wrapping_runnable() { + createObjectUnderTestWithTopic().newThread(runnable); + + final ArgumentCaptor actualRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(delegateThreadFactory).newThread(actualRunnableCaptor.capture()); + + final Runnable actualRunnable = actualRunnableCaptor.getValue(); + + assertThat(actualRunnable, not(equalTo(runnable))); + + verifyNoInteractions(runnable); + actualRunnable.run(); + verify(runnable).run(); + } + @Test void newThread_creates_thread_that_calls_MDC_on_run() { createObjectUnderTest().newThread(runnable); @@ -104,4 +157,24 @@ void newThread_creates_thread_that_calls_MDC_on_run() { assertThat(actualKafkaPluginType[0], equalTo(pluginType)); } + + @Test + void newThread_with_topic_creates_thread_that_calls_MDC_on_run() { + createObjectUnderTestWithTopic().newThread(runnable); + + final ArgumentCaptor actualRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(delegateThreadFactory).newThread(actualRunnableCaptor.capture()); + + final Runnable actualRunnable = actualRunnableCaptor.getValue(); + + final String[] actualKafkaPluginType = new String[1]; + doAnswer(a -> { + actualKafkaPluginType[0] = MDC.get(KafkaMdc.MDC_KAFKA_PLUGIN_KEY); + return null; + }).when(runnable).run(); + + actualRunnable.run(); + + assertThat(actualKafkaPluginType[0], equalTo(pluginType)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index 1503a7424d..3433a92b76 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -6,11 +6,14 @@ package org.opensearch.dataprepper.plugins.kafka.source; import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -21,6 +24,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; @@ -29,6 +33,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.slf4j.MDC; import java.time.Duration; import java.util.Collections; @@ -41,6 +46,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -230,4 +236,38 @@ void test_updateConfig_not_using_kafkaClusterConfigExtension() { verify(sourceConfig, never()).setAwsConfig(any()); verify(sourceConfig, never()).setEncryptionConfig(any()); } + + @Nested + class MdcTests { + private MockedStatic mdcMockedStatic; + + @BeforeEach + void setUp() { + mdcMockedStatic = mockStatic(MDC.class); + } + + @AfterEach + void tearDown() { + mdcMockedStatic.close(); + } + + @Test + void start_sets_and_removes_MDC() { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + + createObjectUnderTest().start(buffer); + + mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "source")); + mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY)); + } + + @Test + void stop_sets_and_removes_MDC() { + createObjectUnderTest().stop(); + + mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "source")); + mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY)); + } + } } From f27b8083ce618ef101e048bd3580d65f54ecff10 Mon Sep 17 00:00:00 2001 From: David Venable Date: Thu, 27 Jun 2024 19:47:18 -0500 Subject: [PATCH 2/2] Adds the TRIAGING.md file to outline our triaging process (#4630) Adds the TRIAGING.md file, which outlines for the community the Data Prepper triaging process. Signed-off-by: David Venable --- TRIAGING.md | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 TRIAGING.md diff --git a/TRIAGING.md b/TRIAGING.md new file mode 100644 index 0000000000..a4a25e1932 --- /dev/null +++ b/TRIAGING.md @@ -0,0 +1,73 @@ +Data Prepper + +The Data Prepper maintainers seek to promote an inclusive and engaged community of contributors. +In order to facilitate this, weekly triage meetings are open to all and attendance is encouraged for anyone who hopes to contribute, discuss an issue, or learn more about the project. +To learn more about contributing to the Data Prepper project visit the [Contributing](./CONTRIBUTING.md) documentation. + +### Do I need to attend for my issue to be addressed/triaged? + +Attendance is not required for your issue to be triaged or addressed. +All new issues are triaged weekly. + +### What happens if my issue does not get covered this time? + +Each meeting we seek to address all new issues. +However, should we run out of time before your issue is discussed, you are always welcome to attend the next meeting or to follow up on the issue post itself. + +### How do I join the triage meeting? + +Meetings are hosted regularly Tuesdays at 2:30 PM US Central Time (12:30 PM Pacific Time) and can be joined via the links posted on the [OpenSearch Meetup Group](https://www.meetup.com/opensearch/events/) list of events. +The event will be titled `Data Prepper Triage Meeting`. + +After joining the Zoom meeting, you can enable your video / voice to join the discussion. +If you do not have a webcam or microphone available, you can still join in via the text chat. + +If you have an issue you'd like to bring forth please consider getting a link to the issue so it can be presented to everyone in the meeting. + +### Is there an agenda for each week? + +Meetings are 30 minutes and structured as follows: + +1. Initial Gathering: As we gather, feel free to turn on video and engage in informal and open-to-all conversation. A volunteer Data Prepper maintainer will share the [Data Prepper Tracking Board](https://github.com/orgs/opensearch-project/projects/82/) and proceed. +2. Announcements: We will make any announcements at the beginning, if necessary. +3. Untriaged issues: We will review all untriaged [issues](https://github.com/orgs/opensearch-project/projects/82/views/6) for the Data Prepper repository. If you have an item here, you may spend a few minutes to explain your request. +4. Member Requests: Opportunity for any meeting member to ask for consideration of an issue or pull request. +5. Release review: If time permits, and we find it necessary, we will review [items for the current release](https://github.com/orgs/opensearch-project/projects/82/views/14). +6. Follow-up issues: If time permits, we will review the [follow up items](https://github.com/orgs/opensearch-project/projects/82/views/18). +7. Open Discussion: If time permits, allow for members of the meeting to surface any topics without issues filed or pull request created. + +### Do I need to have already contributed to the project to attend a triage meeting? + +No, all are welcome and encouraged to attend. +Attending the triage meetings is a great way for a new contributor to learn about the project as well as explore different avenues of contribution. + +### What if I have follow-up questions on an issue? + +If you have an existing issue you would like to discuss, you can always comment on the issue itself. +Alternatively, you are welcome to come to the triage meeting to discuss. + +### Is this meeting a good place to get help using Data Prepper? + +While we are always happy to help the community, the best resource for usage questions is the [the Data Prepper discussion forum](https://github.com/opensearch-project/data-prepper/discussions) on GitHub. + +There you can find answers to many common questions as well as speak with implementation experts and Data Prepper maintainers. + +### What are the issue labels associated with triaging? + +There are several labels that are particularly important for triaging in Data Prepper: + +| Label | When applied | Meaning | +| ----- | ------------ | ------- | +| [untriaged](https://github.com/opensearch-project/data-prepper/labels/untriaged) | When issues are created or re-opened. | Issues labeled as `untriaged` require the attention of the repository maintainers and may need to be prioritized for quicker resolution. It's crucial to keep the count of 'untriaged' labels low to ensure all potential security issues are addressed in a timely manner. | +| [follow up](https://github.com/opensearch-project/data-prepper/labels/follow%20up) | During triage meetings. | Issues labeled as `follow up` have been triaged. However, the maintainers may need to follow up further on it. This tag lets us triage an issue as not critical, but also be able to come back to it. +| [help wanted](https://github.com/opensearch-project/data-prepper/labels/help%20wanted) | Anytime. | Issues marked as `help wanted` signal that they are actionable and not the current focus of the project maintainers. Community contributions are especially encouraged for these issues. | +| [good first issue](https://github.com/opensearch-project/data-prepper/labels/good%20first%20issue) | Anytime. | Issues labeled as `good first issue` are small in scope and can be resolved with a single pull request. These are recommended starting points for newcomers looking to make their first contributions. | + + +### Is this where I should bring up potential security vulnerabilities? + +Due to the sensitive nature of security vulnerabilities, please report all potential vulnerabilities directly by following the steps outlined in the [Security Issue Response Process](https://github.com/opensearch-project/data-prepper/security/policy). + +### Who should I contact if I have further questions? + +You can always file an [issue](https://github.com/opensearch-project/data-prepper/issues/new/choose) for any question you have about the project.