Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply "Run tests on the current JVM for Java 17 & 21 / Gradle 8.8 #4771

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ subprojects {

test {
useJUnitPlatform()
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.current()
}
reports {
junitXml.required
html.required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand All @@ -51,7 +52,8 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
retryThread = new Thread(sinkThread);
retryThread.start();
}
}
Expand All @@ -76,7 +78,7 @@ public void output(Collection<T> records) {
@Override
public void shutdown() {
if (retryThread != null) {
retryThread.stop();
sinkThread.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;

private volatile boolean isStopped = false;

public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
Expand All @@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
while (!sink.isReady() && numRetries++ < maxRetries) {
while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}

public void stop() {
isStopped = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -30,6 +25,12 @@
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
Expand Down Expand Up @@ -71,13 +72,13 @@ void testMetrics() {
}

@Test
void testSinkNotReady() {
void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<String>> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
Expand All @@ -87,7 +88,10 @@ void testSinkNotReady() {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}

@Test
Expand Down
3 changes: 0 additions & 3 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down Expand Up @@ -89,8 +88,6 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -218,7 +218,7 @@ static class SomeUnknownTypesArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
arguments(Random.class),
arguments(Timer.class),
arguments(InputStream.class),
arguments(File.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
return Stream.of(
Arguments.of(0, randomInt + 1, 0.0),
Arguments.of(1, 100, 1.0),
Arguments.of(randomInt, randomInt, 100.0),
Arguments.of(randomInt + 1, randomInt + 1, 100.0),
Arguments.of(randomInt, randomInt + 250, ((double) randomInt / (randomInt + 250)) * 100),
Arguments.of(6, 9, 66.66666666666666),
Arguments.of(531, 1000, 53.1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -28,6 +31,7 @@
import java.io.ByteArrayInputStream;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -56,7 +60,7 @@ public EventJsonInputCodec createInputCodec() {
@ParameterizedTest
@ValueSource(strings = {"", "{}"})
public void emptyTest(String input) throws Exception {
input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":["+input+"]}";
input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[" + input + "]}";
ByteArrayInputStream inputStream = new ByteArrayInputStream(input.getBytes());
inputCodec = createInputCodec();
Consumer<Record<Event>> consumer = mock(Consumer.class);
Expand All @@ -70,15 +74,15 @@ public void inCompatibleVersionTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\"3.0\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"3.0\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
Expand All @@ -95,24 +99,24 @@ public void basicTest() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -126,24 +130,24 @@ public void test_with_timeReceivedOverridden() throws Exception {
final String key = UUID.randomUUID().toString();
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);
Instant startTime = Instant.now().minusSeconds(5);
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS).minusSeconds(5);
Event event = createEvent(data, startTime);

Map<String, Object> dataMap = event.toMap();
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
String input = "{\""+EventJsonDefines.VERSION+"\":\""+DataPrepperVersion.getCurrentVersion().toString()+"\", \""+EventJsonDefines.EVENTS+"\":[";
String input = "{\"" + EventJsonDefines.VERSION + "\":\"" + DataPrepperVersion.getCurrentVersion().toString() + "\", \"" + EventJsonDefines.EVENTS + "\":[";
String comma = "";
for (int i = 0; i < 2; i++) {
input += comma+"{\"data\":"+objectMapper.writeValueAsString(dataMap)+","+"\"metadata\":"+objectMapper.writeValueAsString(metadataMap)+"}";
input += comma + "{\"data\":" + objectMapper.writeValueAsString(dataMap) + "," + "\"metadata\":" + objectMapper.writeValueAsString(metadataMap) + "}";
comma = ",";
}
input += "]}";
inputStream = new ByteArrayInputStream(input.getBytes());
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(inputStream, records::add);
assertThat(records.size(), equalTo(2));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), not(equalTo(startTime)));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -159,7 +163,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import org.mockito.Mock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -22,6 +25,7 @@
import org.opensearch.dataprepper.model.log.JacksonLog;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -64,7 +68,7 @@ public void basicTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -75,8 +79,8 @@ public void basicTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -90,7 +94,7 @@ public void multipleEventsTest() throws Exception {
final String value = UUID.randomUUID().toString();
Map<String, Object> data = Map.of(key, value);

Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
outputCodec = createOutputCodec();
inputCodec = createInputCodec();
Expand All @@ -103,8 +107,8 @@ public void multipleEventsTest() throws Exception {
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(3));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags().size(), equalTo(0));
Expand All @@ -122,7 +126,7 @@ public void extendedTest() throws Exception {

Set<String> tags = Set.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
List<String> tagsList = tags.stream().collect(Collectors.toList());
Instant startTime = Instant.now();
Instant startTime = Instant.now().truncatedTo(ChronoUnit.MICROS);
Event event = createEvent(data, startTime);
Instant origTime = startTime.minusSeconds(5);
event.getMetadata().setExternalOriginationTime(origTime);
Expand All @@ -135,11 +139,11 @@ public void extendedTest() throws Exception {
outputCodec.complete(outputStream);
assertThat(outputCodec.getExtension(), equalTo(EventJsonOutputCodec.EVENT_JSON));
List<Record<Event>> records = new LinkedList<>();
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);
inputCodec.parse(new ByteArrayInputStream(outputStream.toByteArray()), records::add);

assertThat(records.size(), equalTo(1));
for(Record record : records) {
Event e = (Event)record.getData();
for (Record record : records) {
Event e = (Event) record.getData();
assertThat(e.get(key, String.class), equalTo(value));
assertThat(e.getMetadata().getTimeReceived(), equalTo(startTime));
assertThat(e.getMetadata().getTags(), equalTo(tags));
Expand All @@ -157,7 +161,7 @@ private Event createEvent(final Map<String, Object> json, final Instant timeRece
if (timeReceived != null) {
logBuilder.withTimeReceived(timeReceived);
}
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final JacksonEvent event = (JacksonEvent) logBuilder.build();

return event;
}
Expand Down
Loading
Loading