diff --git a/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java index 4e07dfac9c..493904e7a2 100644 --- a/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java @@ -143,7 +143,7 @@ public void start(final Buffer buffer) { if (sourceConfig.hasHealthCheckService()) { logger.info("{} source health check is enabled", sourceName); - sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); + sb.service(getHttpHealthCheckPath(), HealthCheckService.builder().longPolling(0).build()); } server = sb.build(); @@ -191,4 +191,7 @@ public void stop() { public abstract BaseHttpService getHttpService(int bufferTimeoutInMillis, Buffer buffer, PluginMetrics pluginMetrics); + public String getHttpHealthCheckPath() { + return HTTP_HEALTH_CHECK_PATH; + } } diff --git a/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java index 24da331c62..4bcb4b3031 100644 --- a/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java @@ -34,11 +34,14 @@ public class MultiLineJsonCodecTest { private final HttpData serializedRequestBadEmptyNewLines = HttpData.ofUtf8("\n\n\n\n\n\n\n \n"); private final HttpData serializedRequestBadInvalidJson = HttpData.ofUtf8("{\"text\":"); - private final MultiLineJsonCodec multiLineJsonCodec = new MultiLineJsonCodec(); + private MultiLineJsonCodec createObjectUnderTest() { + return new MultiLineJsonCodec(); + } @Test public void testParseSuccess() throws IOException { // When + MultiLineJsonCodec multiLineJsonCodec = createObjectUnderTest(); List> res = multiLineJsonCodec.parse(serializedRequest); // Then @@ -55,9 +58,9 @@ public void testParseSuccess() throws IOException { @Test public void testParseSuccess2() throws IOException { // When + MultiLineJsonCodec multiLineJsonCodec = createObjectUnderTest(); List> res = multiLineJsonCodec.parse(serializedRequestMultipleRows); - // Then assertEquals(10, res.size()); @@ -75,6 +78,7 @@ public void testParseSuccess2() throws IOException { @Test public void testParseFailure() { + MultiLineJsonCodec multiLineJsonCodec = createObjectUnderTest(); assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBad)); assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmpty)); assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmptyNewLines)); diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java index 1a6c9b4bd1..9bfa1a9c53 100644 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java @@ -21,6 +21,7 @@ @DataPrepperPlugin(name = "opensearch_api", pluginType = Source.class, pluginConfigurationType = OpenSearchAPISourceConfig.class) public class OpenSearchAPISource extends BaseHttpSource> { private static final String SOURCE_NAME = "OpenSearch API"; + private static final String HTTP_HEALTH_CHECK_PATH = "/"; @DataPrepperPluginConstructor public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, @@ -32,4 +33,9 @@ public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final P public BaseHttpService getHttpService(final int bufferWriteTimeoutInMillis, final Buffer> buffer, final PluginMetrics pluginMetrics) { return new OpenSearchAPIService(bufferWriteTimeoutInMillis, buffer, pluginMetrics); } + + @Override + public String getHttpHealthCheckPath() { + return HTTP_HEALTH_CHECK_PATH; + } } diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java index ed864df778..3a1faec3df 100644 --- a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java @@ -10,7 +10,7 @@ public class OpenSearchAPISourceConfig extends BaseHttpServerConfig { static final String DEFAULT_ENDPOINT_URI = "/"; - static final int DEFAULT_PORT = 9202; + static final int DEFAULT_PORT = 9200; @Override public int getDefaultPort() { diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java index 0b4553646f..c7b8f60edd 100644 --- a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -50,6 +51,7 @@ import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.source.file.FileSourceConfig; import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes; import java.io.ByteArrayOutputStream; @@ -62,6 +64,7 @@ import java.util.List; import java.util.Map; import java.util.StringJoiner; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; @@ -83,16 +86,20 @@ class OpenSearchAPISourceTest { private static final ObjectMapper mapper = new ObjectMapper(); private final String PLUGIN_NAME = "opensearch_api"; - private final String TEST_PIPELINE_NAME = "test_pipeline"; - private final String TEST_INDEX = "test-index"; - private final String AUTHORITY = "127.0.0.1:9202"; - private final int DEFAULT_PORT = 9202; + private final String AUTHORITY = "127.0.0.1:9200"; + private final int DEFAULT_PORT = 9200; private final int DEFAULT_REQUEST_TIMEOUT_MS = 10_000; private final int DEFAULT_THREAD_COUNT = 200; private final int MAX_CONNECTIONS_COUNT = 500; private final int MAX_PENDING_REQUESTS_COUNT = 1024; private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); + + private String testIndex; + private String testPipelineName; + private String testRoutingId; + private String testQueryParams; + @Mock private ServerBuilder serverBuilder; @@ -123,7 +130,7 @@ private BlockingBuffer> getBuffer() { integerHashMap.put("buffer_size", 1); integerHashMap.put("batch_size", 1); final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); - pluginSetting.setPipelineName(TEST_PIPELINE_NAME); + pluginSetting.setPipelineName(testPipelineName); return new BlockingBuffer<>(pluginSetting); } @@ -132,7 +139,7 @@ private BlockingBuffer> getBuffer() { */ private void refreshMeasurements() { final String metricNamePrefix = new StringJoiner(MetricNames.DELIMITER) - .add(TEST_PIPELINE_NAME).add(PLUGIN_NAME).toString(); + .add(testPipelineName).add(PLUGIN_NAME).toString(); requestsReceivedMeasurements = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) .add(OpenSearchAPIService.REQUESTS_RECEIVED).toString()); @@ -172,7 +179,7 @@ private byte[] createGZipCompressedPayload(final String payload) throws IOExcept } @BeforeEach - public void setUp() { + public void setUp() throws Exception { lenient().when(serverBuilder.annotatedService(any())).thenReturn(serverBuilder); lenient().when(serverBuilder.http(anyInt())).thenReturn(serverBuilder); lenient().when(serverBuilder.https(anyInt())).thenReturn(serverBuilder); @@ -190,17 +197,22 @@ public void setUp() { lenient().when(sourceConfig.getCompression()).thenReturn(CompressionOption.NONE); MetricsTestUtil.initMetrics(); - pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + testPipelineName = UUID.randomUUID().toString(); + pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, testPipelineName); pluginFactory = mock(PluginFactory.class); final ArmeriaHttpAuthenticationProvider authenticationProvider = new HttpBasicArmeriaHttpAuthenticationProvider(new HttpBasicAuthenticationConfig("test", "test")); - when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))) + lenient().when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))) .thenReturn(authenticationProvider); testBuffer = getBuffer(); + pipelineDescription = mock(PipelineDescription.class); - when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + lenient().when(pipelineDescription.getPipelineName()).thenReturn(testPipelineName); + testIndex = UUID.randomUUID().toString(); + testRoutingId = UUID.randomUUID().toString(); + testQueryParams = "?pipeline=" + testPipelineName + "&routing=" + testRoutingId; openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); } @@ -223,17 +235,17 @@ private void assertSecureResponseWithStatusCode(final AggregatedHttpResponse res } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testBulkRequestAPIResponse200(boolean includeIndexInPath) throws IOException { + @CsvSource({"false,false", "false,true", "true,false", "true,true"}) + public void testBulkRequestAPIResponse200(boolean includeIndexInPath, boolean useQueryParams) throws IOException { int numberOfRecords = 1; - testBulkRequestAPI200(includeIndexInPath, false, numberOfRecords); + testBulkRequestAPI200(includeIndexInPath, false, useQueryParams, numberOfRecords); } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testBulkRequestAPICompressionResponse200(boolean includeIndexInPath) throws IOException { + @CsvSource({"false,false", "false,true", "true,false", "true,true"}) + public void testBulkRequestAPICompressionResponse200(boolean includeIndexInPath, boolean useQueryParams) throws IOException { int numberOfRecords = 1; - testBulkRequestAPI200(includeIndexInPath, true, numberOfRecords); + testBulkRequestAPI200(includeIndexInPath, true, useQueryParams, numberOfRecords); } @Test @@ -246,7 +258,7 @@ public void testHealthCheck() { .scheme(SessionProtocol.HTTP) .authority(AUTHORITY) .method(HttpMethod.GET) - .path("/health") + .path("/") .build()) .aggregate() .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); @@ -261,7 +273,7 @@ public void testHealthCheckUnauthenticatedDisabled() { "username", "test", "password", "test" ))); - pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, testPipelineName); openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); openSearchAPISource.start(testBuffer); @@ -271,7 +283,7 @@ public void testHealthCheckUnauthenticatedDisabled() { .scheme(SessionProtocol.HTTP) .authority(AUTHORITY) .method(HttpMethod.GET) - .path("/health") + .path("/") .build()) .aggregate() .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.UNAUTHORIZED)).join(); @@ -290,7 +302,7 @@ public void testBulkRequestJsonResponse400WithEmptyPayload(boolean includeIndexI .scheme(SessionProtocol.HTTP) .authority(AUTHORITY) .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + testQueryParams : "/_bulk" + testQueryParams) .contentType(MediaType.JSON_UTF_8) .build(), HttpData.ofUtf8(testBadData)) @@ -310,7 +322,7 @@ public void testBulkRequestJsonResponse400WithEmptyPayload(boolean includeIndexI @ParameterizedTest @ValueSource(booleans = {false, true}) - public void testBulkRequestJsonResponse400WithInvalidPayload(boolean includeIndexInPath) throws JsonProcessingException { + public void testBulkRequestJsonResponse400WithInvalidPayload(boolean includeIndexInPath) throws Exception { // Prepare List jsonList = new ArrayList<>(); for (int i = 0; i < 2; i++) { @@ -325,7 +337,7 @@ public void testBulkRequestJsonResponse400WithInvalidPayload(boolean includeInde .scheme(SessionProtocol.HTTP) .authority(AUTHORITY) .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + testQueryParams : "/_bulk" + testQueryParams) .contentType(MediaType.JSON_UTF_8) .build(), HttpData.ofUtf8(testBadData)) @@ -346,68 +358,36 @@ public void testBulkRequestJsonResponse400WithInvalidPayload(boolean includeInde @ParameterizedTest @ValueSource(booleans = {false, true}) public void testBulkRequestAPIJsonResponse413(boolean includeIndexInPath) throws JsonProcessingException { - testBulkRequestJsonResponse413(includeIndexInPath); - } - - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testBulkRequestAPIJsonResponse408(boolean includeIndexInPath) throws JsonProcessingException { - testBulkRequestJsonResponse408(includeIndexInPath); - } - - private void testBulkRequestAPI200(boolean includeIndexInPath, boolean useCompression, int numberOfRecords) throws IOException { - final String testData = generateTestData(includeIndexInPath, numberOfRecords); + // Prepare + final String testData = generateTestData(includeIndexInPath, 50); final int testPayloadSize = testData.getBytes().length; - if (useCompression) { - when(sourceConfig.getCompression()).thenReturn(CompressionOption.GZIP); - } - - openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); openSearchAPISource.start(testBuffer); refreshMeasurements(); // When - if (useCompression) { - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority(AUTHORITY) - .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") - .add(HttpHeaderNames.CONTENT_ENCODING, "gzip") - .build(), - createGZipCompressedPayload(testData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - } else { - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority(AUTHORITY) - .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8(testData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); - } - // Then - Assertions.assertFalse(testBuffer.isEmpty()); + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + testQueryParams : "/_bulk" + testQueryParams) + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join(); - final Map.Entry>, CheckpointState> result = testBuffer.read(100); - List> records = new ArrayList<>(result.getKey()); - Assertions.assertEquals(numberOfRecords, records.size()); - final Record record = records.get(0); - Assertions.assertEquals("text-data", record.getData().get("text", String.class)); - Assertions.assertEquals("index", record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); - Assertions.assertEquals(TEST_INDEX, record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX)); - Assertions.assertEquals("123", record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID)); + // Then + Assertions.assertTrue(testBuffer.isEmpty()); // Verify metrics final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( requestsReceivedMeasurements, Statistic.COUNT); Assertions.assertEquals(1.0, requestReceivedCount.getValue()); final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( successRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, successRequestsCount.getValue()); + Assertions.assertEquals(0.0, successRequestsCount.getValue()); + final Measurement requestsTooLargeCount = MetricsTestUtil.getMeasurementFromList( + requestsTooLargeMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestsTooLargeCount.getValue()); final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( requestProcessDurationMeasurements, Statistic.COUNT); Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); @@ -417,24 +397,13 @@ private void testBulkRequestAPI200(boolean includeIndexInPath, boolean useCompre final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( payloadSizeSummaryMeasurements, Statistic.MAX); Assertions.assertEquals(testPayloadSize, payloadSizeMax.getValue()); - Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); - } - - private String generateTestData(boolean includeIndexInPath, int numberOfRecords) throws JsonProcessingException { - List jsonList = new ArrayList<>(); - for (int i = 0; i < numberOfRecords; i++) { - if (includeIndexInPath) { - jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_id", "123")))); - } else { - jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_index", TEST_INDEX, "_id", "123")))); - } - jsonList.add(mapper.writeValueAsString(Collections.singletonMap("text", "text-data"))); - } - return String.join("\n", jsonList); } - private void testBulkRequestJsonResponse408(boolean includeIndexInPath) throws JsonProcessingException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIJsonResponse408(boolean includeIndexInPath) throws JsonProcessingException { // Prepare + reset(); final int testMaxPendingRequests = 1; final int testThreadCount = 1; final int serverTimeoutInMillis = 500; @@ -450,7 +419,7 @@ private void testBulkRequestJsonResponse408(boolean includeIndexInPath) throws J final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) .authority(AUTHORITY) .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + testQueryParams : "/_bulk" + testQueryParams) .contentType(MediaType.JSON_UTF_8) .build(); final HttpData testHttpData = HttpData.ofUtf8(generateTestData(includeIndexInPath, 1)); @@ -482,37 +451,60 @@ private void testBulkRequestJsonResponse408(boolean includeIndexInPath) throws J Assertions.assertTrue(maxDurationInMillis > bufferTimeoutInMillis); } - private void testBulkRequestJsonResponse413(boolean includeIndexInPath) throws JsonProcessingException { - // Prepare - final String testData = generateTestData(includeIndexInPath, 50); + private void testBulkRequestAPI200(boolean includeIndexInPath, boolean useCompression, boolean useQueryParams, int numberOfRecords) throws IOException { + final String testData = generateTestData(includeIndexInPath, numberOfRecords); final int testPayloadSize = testData.getBytes().length; + if (useCompression) { + when(sourceConfig.getCompression()).thenReturn(CompressionOption.GZIP); + } + + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); openSearchAPISource.start(testBuffer); refreshMeasurements(); + final String queryParams = useQueryParams ? testQueryParams : ""; // When - WebClient.of().execute(RequestHeaders.builder() - .scheme(SessionProtocol.HTTP) - .authority(AUTHORITY) - .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") - .contentType(MediaType.JSON_UTF_8) - .build(), - HttpData.ofUtf8(testData)) - .aggregate() - .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join(); - + if (useCompression) { + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + queryParams : "/_bulk" + queryParams) + .add(HttpHeaderNames.CONTENT_ENCODING, "gzip") + .build(), + createGZipCompressedPayload(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + } else { + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + testQueryParams : "/_bulk" + testQueryParams) + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + } // Then - Assertions.assertTrue(testBuffer.isEmpty()); + Assertions.assertFalse(testBuffer.isEmpty()); + + final Map.Entry>, CheckpointState> result = testBuffer.read(100); + List> records = new ArrayList<>(result.getKey()); + Assertions.assertEquals(numberOfRecords, records.size()); + final Record record = records.get(0); + Assertions.assertEquals("text-data", record.getData().get("text", String.class)); + Assertions.assertEquals("index", record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); + Assertions.assertEquals(testIndex, record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX)); + Assertions.assertEquals("123", record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID)); // Verify metrics final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( requestsReceivedMeasurements, Statistic.COUNT); Assertions.assertEquals(1.0, requestReceivedCount.getValue()); final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( successRequestsMeasurements, Statistic.COUNT); - Assertions.assertEquals(0.0, successRequestsCount.getValue()); - final Measurement requestsTooLargeCount = MetricsTestUtil.getMeasurementFromList( - requestsTooLargeMeasurements, Statistic.COUNT); - Assertions.assertEquals(1.0, requestsTooLargeCount.getValue()); + Assertions.assertEquals(1.0, successRequestsCount.getValue()); final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( requestProcessDurationMeasurements, Statistic.COUNT); Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); @@ -522,6 +514,20 @@ private void testBulkRequestJsonResponse413(boolean includeIndexInPath) throws J final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( payloadSizeSummaryMeasurements, Statistic.MAX); Assertions.assertEquals(testPayloadSize, payloadSizeMax.getValue()); + Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); + } + + private String generateTestData(boolean includeIndexInPath, int numberOfRecords) throws JsonProcessingException { + List jsonList = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + if (includeIndexInPath) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_id", "123")))); + } else { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_index", testIndex, "_id", "123")))); + } + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("text", "text-data"))); + } + return String.join("\n", jsonList); } @ParameterizedTest @@ -539,7 +545,7 @@ public void testOpenSearchAPISourceServerConnectionsMetric(boolean includeIndexI final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) .authority(AUTHORITY) .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + testQueryParams : "/_bulk" + testQueryParams) .contentType(MediaType.JSON_UTF_8) .build(); final HttpData testHttpData = HttpData.ofUtf8(generateTestData(includeIndexInPath, 1)); @@ -559,10 +565,10 @@ void testBulkRequestAPIJsonResponse(boolean includeIndexInPath) throws JsonProce reset(sourceConfig); when(sourceConfig.getPort()).thenReturn(DEFAULT_PORT); when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_ENDPOINT_URI); - lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(DEFAULT_REQUEST_TIMEOUT_MS); - lenient().when(sourceConfig.getThreadCount()).thenReturn(DEFAULT_THREAD_COUNT); - lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(MAX_CONNECTIONS_COUNT); - lenient().when(sourceConfig.getMaxPendingRequests()).thenReturn(MAX_PENDING_REQUESTS_COUNT); + when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(DEFAULT_REQUEST_TIMEOUT_MS); + when(sourceConfig.getThreadCount()).thenReturn(DEFAULT_THREAD_COUNT); + when(sourceConfig.getMaxConnectionCount()).thenReturn(MAX_CONNECTIONS_COUNT); + when(sourceConfig.getMaxPendingRequests()).thenReturn(MAX_PENDING_REQUESTS_COUNT); when(sourceConfig.isSsl()).thenReturn(true); when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); @@ -575,7 +581,7 @@ void testBulkRequestAPIJsonResponse(boolean includeIndexInPath) throws JsonProce .scheme(SessionProtocol.HTTPS) .authority(AUTHORITY) .method(HttpMethod.POST) - .path(includeIndexInPath ? "/" + TEST_INDEX + "/_bulk" : "/_bulk") + .path(includeIndexInPath ? "/" + testIndex + "/_bulk" + testQueryParams : "/_bulk" + testQueryParams) .contentType(MediaType.JSON_UTF_8) .build(), HttpData.ofUtf8(generateTestData(includeIndexInPath, 1))) @@ -586,15 +592,15 @@ void testBulkRequestAPIJsonResponse(boolean includeIndexInPath) throws JsonProce @Test public void request_that_exceeds_maxRequestLength_returns_413() throws JsonProcessingException { reset(sourceConfig); - lenient().when(sourceConfig.getPort()).thenReturn(DEFAULT_PORT); - lenient().when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_ENDPOINT_URI); - lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000); - lenient().when(sourceConfig.getThreadCount()).thenReturn(200); - lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(500); - lenient().when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); - lenient().when(sourceConfig.hasHealthCheckService()).thenReturn(true); - lenient().when(sourceConfig.getCompression()).thenReturn(CompressionOption.NONE); - lenient().when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); + when(sourceConfig.getPort()).thenReturn(DEFAULT_PORT); + when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_ENDPOINT_URI); + when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000); + when(sourceConfig.getThreadCount()).thenReturn(200); + when(sourceConfig.getMaxConnectionCount()).thenReturn(500); + when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); + when(sourceConfig.hasHealthCheckService()).thenReturn(false); + when(sourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); // Prepare final String testData = "" +