Skip to content

Commit

Permalink
Fixed build issue for #874.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed Aug 10, 2023
1 parent 690fd4b commit d28b729
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.UUID;
Expand Down Expand Up @@ -112,7 +111,6 @@ public HttpSinkService createHttpSinkServiceUnderTest() throws NoSuchFieldExcept
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
this.ndjsonOutputConfig = mock(NdjsonOutputConfig.class);
when(ndjsonOutputConfig.getExcludeKeys()).thenReturn(new ArrayList<>());
codec = new NdjsonOutputCodec(ndjsonOutputConfig);
this.bufferFactory = new InMemoryBufferFactory();
this.dlqPushHandler = new DlqPushHandler(httpSinkConfiguration.getDlqFile(), pluginFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.plugins.accumulator.BufferFactory;
Expand Down Expand Up @@ -113,7 +114,7 @@ public HTTPSink(final PluginSetting pluginSetting,
pluginMetrics,
pluginSetting,
codec,
Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null);
OutputCodecContext.fromSinkContext(sinkContext));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.model.types.ByteCount;

import org.opensearch.dataprepper.plugins.accumulator.Buffer;
Expand Down Expand Up @@ -105,7 +106,7 @@ public class HttpSinkService {

private final OutputCodec codec;

private final String tagsTargetKey;
private final OutputCodecContext codecContext;

public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration,
final BufferFactory bufferFactory,
Expand All @@ -116,7 +117,7 @@ public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration,
final PluginMetrics pluginMetrics,
final PluginSetting httpPluginSetting,
final OutputCodec codec,
final String tagsTargetKey) {
final OutputCodecContext codecContext) {

this.httpSinkConfiguration = httpSinkConfiguration;
this.bufferFactory = bufferFactory;
Expand All @@ -142,7 +143,7 @@ public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration,
this.httpSinkRecordsSuccessCounter = pluginMetrics.counter(HTTP_SINK_RECORDS_SUCCESS_COUNTER);
this.httpSinkRecordsFailedCounter = pluginMetrics.counter(HTTP_SINK_RECORDS_FAILED_COUNTER);
this.codec= codec;
this.tagsTargetKey = tagsTargetKey;
this.codecContext = codecContext;
}

/**
Expand All @@ -160,9 +161,9 @@ public void output(Collection<Record<Event>> records) {
try {
final Event event = record.getData();
if(currentBuffer.getEventCount() == 0) {
codec.start(outputStream,event , tagsTargetKey);
codec.start(outputStream,event , codecContext);
}
codec.writeEvent(event, outputStream, tagsTargetKey);
codec.writeEvent(event, outputStream);
int count = currentBuffer.getEventCount() +1;
currentBuffer.setEventCount(count);

Expand Down

0 comments on commit d28b729

Please sign in to comment.