diff --git a/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java b/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java index d9a66da7b3..70ec2c2a17 100644 --- a/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java +++ b/data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkServiceIT.java @@ -37,7 +37,6 @@ import java.text.MessageFormat; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.UUID; @@ -112,7 +111,6 @@ public HttpSinkService createHttpSinkServiceUnderTest() throws NoSuchFieldExcept final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); this.ndjsonOutputConfig = mock(NdjsonOutputConfig.class); - when(ndjsonOutputConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); codec = new NdjsonOutputCodec(ndjsonOutputConfig); this.bufferFactory = new InMemoryBufferFactory(); this.dlqPushHandler = new DlqPushHandler(httpSinkConfiguration.getDlqFile(), pluginFactory, diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/HTTPSink.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/HTTPSink.java index ab696a8899..b456836927 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/HTTPSink.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/HTTPSink.java @@ -21,6 +21,7 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.accumulator.BufferFactory; @@ -113,7 +114,7 @@ public HTTPSink(final PluginSetting pluginSetting, pluginMetrics, pluginSetting, codec, - Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null); + OutputCodecContext.fromSinkContext(sinkContext)); } @Override diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java index 4555e4616f..534d41dfb1 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/service/HttpSinkService.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.accumulator.Buffer; @@ -105,7 +106,7 @@ public class HttpSinkService { private final OutputCodec codec; - private final String tagsTargetKey; + private final OutputCodecContext codecContext; public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration, final BufferFactory bufferFactory, @@ -116,7 +117,7 @@ public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration, final PluginMetrics pluginMetrics, final PluginSetting httpPluginSetting, final OutputCodec codec, - final String tagsTargetKey) { + final OutputCodecContext codecContext) { this.httpSinkConfiguration = httpSinkConfiguration; this.bufferFactory = bufferFactory; @@ -142,7 +143,7 @@ public HttpSinkService(final HttpSinkConfiguration httpSinkConfiguration, this.httpSinkRecordsSuccessCounter = pluginMetrics.counter(HTTP_SINK_RECORDS_SUCCESS_COUNTER); this.httpSinkRecordsFailedCounter = pluginMetrics.counter(HTTP_SINK_RECORDS_FAILED_COUNTER); this.codec= codec; - this.tagsTargetKey = tagsTargetKey; + this.codecContext = codecContext; } /** @@ -160,9 +161,9 @@ public void output(Collection> records) { try { final Event event = record.getData(); if(currentBuffer.getEventCount() == 0) { - codec.start(outputStream,event , tagsTargetKey); + codec.start(outputStream,event , codecContext); } - codec.writeEvent(event, outputStream, tagsTargetKey); + codec.writeEvent(event, outputStream); int count = currentBuffer.getEventCount() +1; currentBuffer.setEventCount(count);