Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kkondaka authored Jul 22, 2024
2 parents 932c549 + afe8464 commit 4ec547b
Show file tree
Hide file tree
Showing 142 changed files with 2,534 additions and 787 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ subprojects {
}
}
dependencies {
implementation platform('com.fasterxml.jackson:jackson-bom:2.16.1')
implementation platform('com.fasterxml.jackson:jackson-bom:2.17.2')
implementation platform('org.eclipse.jetty:jetty-bom:9.4.53.v20231009')
implementation platform('io.micrometer:micrometer-bom:1.10.5')
implementation libs.guava.core
Expand Down Expand Up @@ -226,6 +226,9 @@ subprojects {

test {
useJUnitPlatform()
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.current()
}
reports {
junitXml.required
html.required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand All @@ -51,7 +52,8 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
retryThread = new Thread(sinkThread);
retryThread.start();
}
}
Expand All @@ -76,7 +78,7 @@ public void output(Collection<T> records) {
@Override
public void shutdown() {
if (retryThread != null) {
retryThread.stop();
sinkThread.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;

private volatile boolean isStopped = false;

public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
Expand All @@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
while (!sink.isReady() && numRetries++ < maxRetries) {
while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}

public void stop() {
isStopped = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,37 @@
package org.opensearch.dataprepper.metrics;

import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class MetricsTestUtil {

public static void initMetrics() {
Metrics.globalRegistry.getRegistries().forEach(meterRegistry -> Metrics.globalRegistry.remove(meterRegistry));
Metrics.globalRegistry.getMeters().forEach(meter -> Metrics.globalRegistry.remove(meter));
public static synchronized void initMetrics() {
final Set<MeterRegistry> registries = new HashSet<>(Metrics.globalRegistry.getRegistries());
registries.forEach(Metrics.globalRegistry::remove);

final List<Meter> meters = new ArrayList<>(Metrics.globalRegistry.getMeters());
meters.forEach(Metrics.globalRegistry::remove);

Metrics.addRegistry(new SimpleMeterRegistry());
}

public static List<Measurement> getMeasurementList(final String meterName) {
return StreamSupport.stream(getRegistry().find(meterName).meter().measure().spliterator(), false)
public static synchronized List<Measurement> getMeasurementList(final String meterName) {
final Meter meter = getRegistry().find(meterName).meter();
if(meter == null)
throw new RuntimeException("No metrics meter is available for " + meterName);

return StreamSupport.stream(meter.measure().spliterator(), false)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -30,6 +25,12 @@
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
Expand Down Expand Up @@ -71,13 +72,13 @@ void testMetrics() {
}

@Test
void testSinkNotReady() {
void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<String>> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
Expand All @@ -87,7 +88,10 @@ void testSinkNotReady() {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}

@Test
Expand Down
4 changes: 0 additions & 4 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand All @@ -60,7 +59,6 @@ dependencies {
implementation 'software.amazon.awssdk:servicediscovery'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation testLibs.junit.vintage
testImplementation testLibs.mockito.inline
testImplementation libs.commons.lang3
testImplementation project(':data-prepper-test-event')
testImplementation project(':data-prepper-test-common')
Expand Down Expand Up @@ -90,8 +88,6 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,33 @@

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.dns.DnsAddressEndpointGroup;
import io.micrometer.core.instrument.Measurement;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.peerforwarder.HashRing;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.function.ToDoubleFunction;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.peerforwarder.discovery.PeerListProvider.PEER_ENDPOINTS;

@RunWith(MockitoJUnitRunner.class)
@ExtendWith(MockitoExtension.class)
public class DnsPeerListProviderTest {

private static final String ENDPOINT_1 = "10.1.1.1";
Expand All @@ -39,43 +42,40 @@ public class DnsPeerListProviderTest {
Endpoint.of(ENDPOINT_1),
Endpoint.of(ENDPOINT_2)
);
private static final String COMPONENT_SCOPE = "testComponentScope";
private static final String COMPONENT_ID = "testComponentId";

@Mock
private DnsAddressEndpointGroup dnsAddressEndpointGroup;

@Mock
private HashRing hashRing;

@Mock
private PluginMetrics pluginMetrics;

private CompletableFuture completableFuture;

private DnsPeerListProvider dnsPeerListProvider;

@Before
@BeforeEach
public void setup() {
MetricsTestUtil.initMetrics();
completableFuture = CompletableFuture.completedFuture(null);
when(dnsAddressEndpointGroup.whenReady()).thenReturn(completableFuture);

pluginMetrics = PluginMetrics.fromNames(COMPONENT_ID, COMPONENT_SCOPE);
dnsPeerListProvider = new DnsPeerListProvider(dnsAddressEndpointGroup, pluginMetrics);
}

@Test(expected = NullPointerException.class)
@Test
public void testDefaultListProviderWithNullHostname() {
new DnsPeerListProvider(null, pluginMetrics);
assertThrows(NullPointerException.class, () -> new DnsPeerListProvider(null, pluginMetrics));
}

@Test(expected = RuntimeException.class)
@Test
public void testConstructWithInterruptedException() throws Exception {
CompletableFuture mockFuture = mock(CompletableFuture.class);
when(mockFuture.get()).thenThrow(new InterruptedException());
when(dnsAddressEndpointGroup.whenReady()).thenReturn(mockFuture);

new DnsPeerListProvider(dnsAddressEndpointGroup, pluginMetrics);
assertThrows(RuntimeException.class, () -> new DnsPeerListProvider(dnsAddressEndpointGroup, pluginMetrics));
}

@Test
Expand All @@ -90,17 +90,27 @@ public void testGetPeerList() {
}

@Test
public void testActivePeerCounter() {
public void testActivePeerCounter_with_list() {
when(dnsAddressEndpointGroup.endpoints()).thenReturn(ENDPOINT_LIST);

final List<Measurement> endpointsMeasures = MetricsTestUtil.getMeasurementList(new StringJoiner(MetricNames.DELIMITER).add(COMPONENT_SCOPE).add(COMPONENT_ID)
.add(PeerListProvider.PEER_ENDPOINTS).toString());
assertEquals(1, endpointsMeasures.size());
final Measurement endpointsMeasure = endpointsMeasures.get(0);
assertEquals(2.0, endpointsMeasure.getValue(), 0);
final ArgumentCaptor<ToDoubleFunction<DnsAddressEndpointGroup>> gaugeFunctionCaptor = ArgumentCaptor.forClass(ToDoubleFunction.class);
verify(pluginMetrics).gauge(eq(PEER_ENDPOINTS), eq(dnsAddressEndpointGroup), gaugeFunctionCaptor.capture());

final ToDoubleFunction<DnsAddressEndpointGroup> gaugeFunction = gaugeFunctionCaptor.getValue();

assertThat(gaugeFunction.applyAsDouble(dnsAddressEndpointGroup), equalTo(2.0));
}

@Test
public void testActivePeerCounter_with_single() {
when(dnsAddressEndpointGroup.endpoints()).thenReturn(Collections.singletonList(Endpoint.of(ENDPOINT_1)));
assertEquals(1.0, endpointsMeasure.getValue(), 0);

final ArgumentCaptor<ToDoubleFunction<DnsAddressEndpointGroup>> gaugeFunctionCaptor = ArgumentCaptor.forClass(ToDoubleFunction.class);
verify(pluginMetrics).gauge(eq(PEER_ENDPOINTS), eq(dnsAddressEndpointGroup), gaugeFunctionCaptor.capture());

final ToDoubleFunction<DnsAddressEndpointGroup> gaugeFunction = gaugeFunctionCaptor.getValue();

assertThat(gaugeFunction.applyAsDouble(dnsAddressEndpointGroup), equalTo(1.0));
}

@Test
Expand Down
Loading

0 comments on commit 4ec547b

Please sign in to comment.