Skip to content

Commit

Permalink
Fix for [BUG] Data Prepper is losing connections from S3 pool (#3836)
Browse files Browse the repository at this point in the history
* Fix for [BUG] Data Prepper is losing connections from S3 pool

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed CheckStyle errors

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Dec 11, 2023
1 parent 073419f commit f9be56a
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;
import java.util.Objects;

public interface InputCodec {
/**
Expand All @@ -31,8 +32,14 @@ public interface InputCodec {
* @param eventConsumer The consumer which handles each event from the stream
* @throws IOException throws IOException when invalid input is received or incorrect codec name is provided
*/
void parse(
default void parse(
InputFile inputFile,
DecompressionEngine decompressionEngine,
Consumer<Record<Event>> eventConsumer) throws IOException;
Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputFile);
Objects.requireNonNull(eventConsumer);
try (InputStream inputStream = inputFile.newStream()) {
parse(decompressionEngine.createInputStream(inputStream), eventConsumer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.opensearch.dataprepper.model.codec;

import org.apache.parquet.io.SeekableInputStream;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.io.InputFile;

import java.util.function.Consumer;
import java.io.IOException;
import java.io.InputStream;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

public class InputCodecTest {
private SeekableInputStream inputStream;
private InputFile inputFile;
private DecompressionEngine decompressionEngine;
private boolean closeCalled;

@Test
void testParse_success() throws Exception {
InputCodec objectUnderTest = new InputCodec() {
@Override
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {

}
};

inputFile = mock(InputFile.class);
inputStream = mock(SeekableInputStream.class);
decompressionEngine = mock(DecompressionEngine.class);
when(inputFile.newStream()).thenReturn(inputStream);
closeCalled = false;
doAnswer(a -> {
closeCalled = true;
return null;
}).when(inputStream).close();
when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream);
objectUnderTest.parse(inputFile, decompressionEngine, rec -> {});
assertTrue(closeCalled);
}

@Test
void testParse_exception() throws Exception {
InputCodec objectUnderTest = new InputCodec() {
@Override
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
throw new RuntimeException("error");
}
};

inputFile = mock(InputFile.class);
inputStream = mock(SeekableInputStream.class);
decompressionEngine = mock(DecompressionEngine.class);
when(inputFile.newStream()).thenReturn(inputStream);
closeCalled = false;
doAnswer(a -> {
closeCalled = true;
return null;
}).when(inputStream).close();
when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream);
assertThrows(RuntimeException.class, () -> objectUnderTest.parse(inputFile, decompressionEngine, rec -> {}));
assertTrue(closeCalled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import org.apache.avro.file.DataFileStream;
import org.apache.avro.util.Utf8;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.io.InputFile;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -47,14 +45,6 @@ public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer
parseAvroStream(inputStream, eventConsumer);
}

@Override
public void parse(final InputFile inputFile, final DecompressionEngine decompressionEngine, final Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputFile);
Objects.requireNonNull(eventConsumer);

parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

private void parseAvroStream(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) {

try {
Expand Down Expand Up @@ -107,4 +97,4 @@ else if(value instanceof Utf8){
return eventData;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.io.InputFile;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -52,11 +50,6 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
}
}

@Override
public void parse(final InputFile inputFile, DecompressionEngine decompressionEngine, Consumer<Record<Event>> eventConsumer) throws IOException {
parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
final CsvMapper mapper = createCsvMapper();
final CsvSchema schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.io.InputFile;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -48,11 +46,6 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
}
}

@Override
public void parse(final InputFile inputFile, final DecompressionEngine decompressionEngine, final Consumer<Record<Event>> eventConsumer) throws IOException {
parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

private void parseBufferedReader(final BufferedReader reader, final Consumer<Record<Event>> eventConsumer) throws IOException {
final boolean doAddHeaderToOutgoingEvents = Objects.nonNull(headerDestination);
boolean hasReadHeader = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,13 @@
package org.opensearch.dataprepper.plugins.codec.json;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.io.InputFile;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.codec.JsonDecoder;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;

/**
* An implementation of {@link InputCodec} which parses JSON Objects for arrays.
*/
@DataPrepperPlugin(name = "json", pluginType = InputCodec.class)
public class JsonInputCodec extends JsonDecoder implements InputCodec {

@Override
public void parse(
final InputFile inputFile,
final DecompressionEngine decompressionEngine,
final Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputFile);
Objects.requireNonNull(eventConsumer);

parse(decompressionEngine.createInputStream(inputFile.newStream()), eventConsumer);
}

}

0 comments on commit f9be56a

Please sign in to comment.