Skip to content

Commit

Permalink
HttpSink plugin functionality for opensearch-project#874. (opensearch…
Browse files Browse the repository at this point in the history
…-project#3065)

HttpSink plugin functionality for opensearch-project#874.
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 authored Oct 25, 2023
1 parent fe750f5 commit a9e419a
Show file tree
Hide file tree
Showing 43 changed files with 1,584 additions and 476 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.accumulator;

import java.io.IOException;
import java.io.OutputStream;

/**
* A buffer can hold data before flushing it any Sink.
Expand All @@ -21,5 +22,8 @@ public interface Buffer {
long getDuration();

byte[] getSinkBufferData() throws IOException;
void writeEvent(byte[] bytes) throws IOException;

OutputStream getOutputStream();

void setEventCount(int eventCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.commons.lang3.time.StopWatch;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -49,16 +50,13 @@ public byte[] getSinkBufferData() throws IOException {
return byteArrayOutputStream.toByteArray();
}

/**
* write byte array to output stream.
*
* @param bytes byte array.
* @throws IOException while writing to output stream fails.
*/
@Override
public void writeEvent(byte[] bytes) throws IOException {
byteArrayOutputStream.write(bytes);
byteArrayOutputStream.write(System.lineSeparator().getBytes());
eventCount++;
public OutputStream getOutputStream() {
return byteArrayOutputStream;
}

@Override
public void setEventCount(int eventCount) {
this.eventCount = eventCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,6 @@ public byte[] getSinkBufferData() throws IOException {
return fileData;
}

/**
* write byte array to output stream.
* @param bytes byte array.
* @throws IOException while writing to output stream fails.
*/
@Override
public void writeEvent(byte[] bytes) throws IOException {
outputStream.write(bytes);
outputStream.write(System.lineSeparator().getBytes());
eventCount++;
}

/**
* Flushing the buffered data into the output stream.
*/
Expand All @@ -106,4 +94,14 @@ protected void removeTemporaryFile() {
}
}
}

@Override
public OutputStream getOutputStream() {
return outputStream;
}

@Override
public void setEventCount(int eventCount) {
this.eventCount = eventCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

@ExtendWith(MockitoExtension.class)
Expand All @@ -24,18 +19,6 @@ class InMemoryBufferTest {

private InMemoryBuffer inMemoryBuffer;

@Test
void test_with_write_event_into_buffer() throws IOException {
inMemoryBuffer = new InMemoryBuffer();

while (inMemoryBuffer.getEventCount() < MAX_EVENTS) {
inMemoryBuffer.writeEvent(generateByteArray());
}
assertThat(inMemoryBuffer.getSize(), greaterThanOrEqualTo(54110L));
assertThat(inMemoryBuffer.getEventCount(), equalTo(MAX_EVENTS));
assertThat(inMemoryBuffer.getDuration(), greaterThanOrEqualTo(0L));

}

@Test
void test_getSinkData_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.equalTo;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand All @@ -38,19 +37,6 @@ void setUp() throws IOException {
localFileBuffer = new LocalFileBuffer(tempFile);
}

@Test
void test_with_write_events_into_buffer() throws IOException {
while (localFileBuffer.getEventCount() < 55) {
localFileBuffer.writeEvent(generateByteArray());
}
assertThat(localFileBuffer.getSize(), greaterThan(1l));
assertThat(localFileBuffer.getEventCount(), equalTo(55));
assertThat(localFileBuffer.getDuration(), equalTo(0L));
localFileBuffer.flushAndCloseStream();
localFileBuffer.removeTemporaryFile();
assertFalse(tempFile.exists(), "The temp file has not been deleted.");
}

@Test
void test_without_write_events_into_buffer() {
assertThat(localFileBuffer.getSize(), equalTo(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ThresholdValidatorTest {

Expand All @@ -23,20 +22,6 @@ class ThresholdValidatorTest {
@BeforeEach
void setUp() throws IOException {
inMemoryBuffer = new InMemoryBufferFactory().getBuffer();

while (inMemoryBuffer.getEventCount() < 100) {
inMemoryBuffer.writeEvent(generateByteArray());
}
}

@Test
void test_exceedThreshold_true_dueTo_maxEvents_is_less_than_buffered_event_count() {
final int maxEvents = 95;
final ByteCount maxBytes = ByteCount.parse("50kb");
final long maxCollectionDuration = 15;
boolean isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents,
maxBytes, maxCollectionDuration);
assertTrue(isThresholdExceed, "Threshold not exceeded");
}

@Test
Expand All @@ -49,16 +34,6 @@ void test_exceedThreshold_false_dueTo_maxEvents_is_greater_than_buffered_event_c
assertFalse(isThresholdExceed, "Threshold exceeded");
}

@Test
void test_exceedThreshold_ture_dueTo_maxBytes_is_less_than_buffered_byte_count() {
final int maxEvents = 500;
final ByteCount maxBytes = ByteCount.parse("1b");
final long maxCollectionDuration = 15;
boolean isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes,
maxCollectionDuration);
assertTrue(isThresholdExceed, "Threshold not exceeded");
}

@Test
void test_exceedThreshold_false_dueTo_maxBytes_is_greater_than_buffered_byte_count() {
final int maxEvents = 500;
Expand All @@ -69,58 +44,4 @@ void test_exceedThreshold_false_dueTo_maxBytes_is_greater_than_buffered_byte_cou
assertFalse(isThresholdExceed, "Threshold exceeded");
}

@Test
void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_less_than_buffered_event_collection_duration()
throws IOException, InterruptedException {
final int maxEvents = 500;
final ByteCount maxBytes = ByteCount.parse("500mb");
final long maxCollectionDuration = 10;

inMemoryBuffer = new InMemoryBufferFactory().getBuffer();
boolean isThresholdExceed = Boolean.FALSE;
synchronized (this) {
while (inMemoryBuffer.getEventCount() < 100) {
inMemoryBuffer.writeEvent(generateByteArray());
isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer, maxEvents,
maxBytes, maxCollectionDuration);
if (isThresholdExceed) {
break;
}
wait(5000);
}
}
assertTrue(isThresholdExceed, "Threshold not exceeded");
}

@Test
void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_greater_than_buffered_event_collection_duration()
throws IOException, InterruptedException {
final int maxEvents = 500;
final ByteCount maxBytes = ByteCount.parse("500mb");
final long maxCollectionDuration = 240;

inMemoryBuffer = new InMemoryBufferFactory().getBuffer();

boolean isThresholdExceed = Boolean.FALSE;
synchronized (this) {
while (inMemoryBuffer.getEventCount() < 100) {
inMemoryBuffer.writeEvent(generateByteArray());
isThresholdExceed = ThresholdValidator.checkThresholdExceed(inMemoryBuffer,
maxEvents, maxBytes, maxCollectionDuration);
if (isThresholdExceed) {
break;
}
wait(50);
}
}
assertFalse(isThresholdExceed, "Threshold exceeded");
}

private byte[] generateByteArray() {
byte[] bytes = new byte[10000];
for (int i = 0; i < 10000; i++) {
bytes[i] = (byte) i;
}
return bytes;
}
}
Loading

0 comments on commit a9e419a

Please sign in to comment.