From 9f17acbea9f53c35ff29bea4c39b47394dcd3393 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Wed, 5 Jun 2024 11:20:53 -0400 Subject: [PATCH] OpenSearch API source implementation Signed-off-by: Souvik Bose --- .../HttpRequestExceptionHandler.java | 3 + .../http/codec/MultiLineJsonCodec.java | 41 + .../http/codec/MultiLineJsonCodecTest.java | 86 ++ .../source/loghttp/HTTPSourceConfigTest.java | 1 - .../opensearch-api-source/README.md | 129 +++ .../opensearch-api-source/build.gradle | 38 + .../opensearchapi/OpenSearchAPIService.java | 209 +++++ .../opensearchapi/OpenSearchAPISource.java | 198 +++++ .../OpenSearchAPISourceConfig.java | 24 + .../BulkAPIEventMetadataKeyAttributes.java | 10 + .../model/BulkAPIRequestParams.java | 14 + .../model/BulkActionAndMetadataObject.java | 41 + .../src/main}/resources/test_cert.crt | 0 .../main}/resources/test_decrypted_key.key | 0 .../OpenSearchAPIServiceTest.java | 329 ++++++++ .../OpenSearchAPISourceConfigTest.java | 17 + .../OpenSearchAPISourceTest.java | 778 ++++++++++++++++++ ...BulkAPIEventMetadataKeyAttributesTest.java | 17 + .../BulkActionAndMetadataObjectTest.java | 58 ++ settings.gradle | 3 +- 20 files changed, 1994 insertions(+), 2 deletions(-) create mode 100644 data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodec.java create mode 100644 data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java create mode 100644 data-prepper-plugins/opensearch-api-source/README.md create mode 100644 data-prepper-plugins/opensearch-api-source/build.gradle create mode 100644 data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributes.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIRequestParams.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObject.java rename data-prepper-plugins/{http-source/src/test => opensearch-api-source/src/main}/resources/test_cert.crt (100%) rename data-prepper-plugins/{http-source/src/test => opensearch-api-source/src/main}/resources/test_decrypted_key.key (100%) create mode 100644 data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfigTest.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributesTest.java create mode 100644 data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObjectTest.java diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java index 2d0fde0196..77f7eddedc 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java @@ -69,6 +69,9 @@ private HttpStatus handleException(final Throwable e) { } else if (e instanceof SizeOverflowException) { requestsTooLargeCounter.increment(); return HttpStatus.REQUEST_ENTITY_TOO_LARGE; + } else if (e instanceof IllegalArgumentException) { + badRequestsCounter.increment(); + return HttpStatus.BAD_REQUEST; } internalServerErrorCounter.increment(); diff --git a/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodec.java b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodec.java new file mode 100644 index 0000000000..c0e1885f25 --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodec.java @@ -0,0 +1,41 @@ +package org.opensearch.dataprepper.http.codec; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linecorp.armeria.common.HttpData; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class MultiLineJsonCodec implements Codec>> { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String REGEX = "\\r?\\n"; + private static final TypeReference> MAP_TYPE_REFERENCE = + new TypeReference>() {}; + + @Override + public List> parse(HttpData httpData) throws IOException { + List> jsonListData = new ArrayList<>(); + + String requestBody = new String(httpData.toInputStream().readAllBytes(), StandardCharsets.UTF_8); + List jsonLines = Arrays.asList(requestBody.split(REGEX)); + + for (String jsonLine: jsonLines) { + if (isInvalidLine(jsonLine)) { + throw new IOException("Error processing request payload."); + } + jsonListData.add(objectMapper.readValue(jsonLine, MAP_TYPE_REFERENCE)); + } + return jsonListData; + } + + private static boolean isInvalidLine(final String str) { + return str == null || str.isEmpty() || str.isBlank(); + } +} diff --git a/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java new file mode 100644 index 0000000000..24da331c62 --- /dev/null +++ b/data-prepper-plugins/http-source-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java @@ -0,0 +1,86 @@ +package org.opensearch.dataprepper.http.codec; + +import com.linecorp.armeria.common.HttpData; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class MultiLineJsonCodecTest { + private final HttpData serializedRequest = HttpData.ofUtf8("" + + "{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + + "{ \"text\": \"text1\", \"year\": \"2013\" }"); + + private final HttpData serializedRequestMultipleRows = HttpData.ofUtf8("" + + "{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + + "{ \"text\": \"text1\", \"year\": \"2013\" }\n" + + "{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + + "{ \"text\": \"text1\", \"year\": \"2013\" }\n" + + "{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + + "{ \"text\": \"text1\", \"year\": \"2013\" }\n" + + "{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + + "{ \"text\": \"text1\", \"year\": \"2013\" }\n" + + "{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + + "{ \"text\": \"text1\", \"year\": \"2013\" }\n"); + + private final HttpData serializedRequestBad = HttpData.ofUtf8("{}\n\n{}"); + private final HttpData serializedRequestBadWithBlanks = HttpData.ofUtf8("{}\n \n "); + private final HttpData serializedRequestBadWithWhiteSpaces = HttpData.ofUtf8("\t\n\r\f {}"); + private final HttpData serializedRequestBadEmpty = HttpData.ofUtf8(""); + private final HttpData serializedRequestBadEmptyNewLines = HttpData.ofUtf8("\n\n\n\n\n\n\n \n"); + private final HttpData serializedRequestBadInvalidJson = HttpData.ofUtf8("{\"text\":"); + + private final MultiLineJsonCodec multiLineJsonCodec = new MultiLineJsonCodec(); + + @Test + public void testParseSuccess() throws IOException { + // When + List> res = multiLineJsonCodec.parse(serializedRequest); + + // Then + assertEquals(2, res.size()); + assertEquals(res.get(0).containsKey("index"), true); + Map innerMap = (Map) res.get(0).get("index"); + assertEquals(innerMap.get("_index"), "test-index"); + assertEquals(innerMap.get("_id"), "id1"); + assertEquals(res.get(1).containsKey("text"), true); + assertEquals(res.get(1).get("text"), "text1"); + assertEquals(res.get(1).get("year"), "2013"); + } + + @Test + public void testParseSuccess2() throws IOException { + // When + List> res = multiLineJsonCodec.parse(serializedRequestMultipleRows); + + + // Then + assertEquals(10, res.size()); + + for (int idx = 0; idx < res.size() - 1; idx++) { + assertEquals(res.get(idx).containsKey("index"), true); + Map innerMap = (Map) res.get(idx).get("index"); + assertEquals(innerMap.get("_index"), "test-index"); + assertEquals(innerMap.get("_id"), "id1"); + assertEquals(res.get(idx+1).containsKey("text"), true); + assertEquals(res.get(idx+1).get("text"), "text1"); + assertEquals(res.get(idx+1).get("year"), "2013"); + idx++; + } + } + + @Test + public void testParseFailure() { + assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBad)); + assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmpty)); + assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmptyNewLines)); + assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadInvalidJson)); + assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadWithBlanks)); + assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadWithWhiteSpaces)); + } + +} diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java index bf05e6b6b2..70051bff18 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java @@ -21,6 +21,5 @@ void testDefault() { assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath()); assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort()); assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath()); - } } diff --git a/data-prepper-plugins/opensearch-api-source/README.md b/data-prepper-plugins/opensearch-api-source/README.md new file mode 100644 index 0000000000..7b6ec28da7 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/README.md @@ -0,0 +1,129 @@ +# OpenSearch API Source + +This is a source plugin that supports HTTP protocol. It supports [OpenSearch Bulk Document API](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/). All the paths and HTTP methods for [Bulk API operation](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/#path-and-http-methods) are supported. It will also support optional [bulk URL parameters](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/#url-parameters). + +## Usages +To get started with OpenSearch API source, create the following `pipeline.yaml` configuration: +```yaml +source: + opensearch_api: +``` + +### Response status + +* `200`: the request data has been successfully written into the buffer. +* `400`: the request data is either in mal-format or unsupported codec. +* `408`: the request data fails to be written into the buffer within the timeout. +* `413`: the request data size is larger than the configured capacity. +* `429`: the request has been rejected due to the OpenSearch API source executor being in full capacity. + +## Configurations + +* port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```9202```. +* path (Optional) => A `string` which represents the URI path for endpoint invocation. It should start with `/` and length should be at least 1. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/opensearch`. +* health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false` +* unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false` +* request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```. +* thread_count (Optional) => An `int` larger than 0 represents the number of threads to keep in the ScheduledThreadPool. Default is `200`. +* max_connection_count (Optional) => An `int` larger than 0 represents the maximum allowed number of open connections. Default is `500`. +* max_pending_requests (Optional) => An `int` larger than 0 represents the maximum allowed number of tasks in the ScheduledThreadPool work queue. Default is `1024`. +* authentication (Optional) => An authentication configuration. By default, this runs an unauthenticated server. See below for more information. +* compression (Optional) : The compression type applied on the client request payload. Defaults to `none`. Supported values are: + * `none`: no compression + * `gzip`: apply GZip de-compression on the incoming request. + +### Authentication Configurations + +By default, the OpenSearch API source input is unauthenticated. + +The following is an example of how to run the server with HTTP Basic authentication: + +```yaml +source: + opensearch_api: + authentication: + http_basic: + username: my-user + password: my_s3cr3t +``` + +You can also explicitly disable authentication with: + +```yaml +source: + opensearch_api: + authentication: + unauthenticated: +``` + +This plugin uses pluggable authentication for HTTP servers. To provide custom authentication, +create a plugin which implements [`ArmeriaHttpAuthenticationProvider`](../armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/ArmeriaHttpAuthenticationProvider.java) + + +### SSL + +* ssl(Optional) => A `boolean` that enables TLS/SSL. Default is ```false```. +* ssl_certificate_file(Optional) => A `String` that represents the SSL certificate chain file path or AWS S3 path. S3 path example `s3:///`. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`. +* ssl_key_file(Optional) => A `String` that represents the SSL key file path or AWS S3 path. S3 path example `s3:///`. Only decrypted key file is supported. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`. +* use_acm_certificate_for_ssl(Optional) : A `boolean` that enables TLS/SSL using certificate and private key from AWS Certificate Manager (ACM). Default is `false`. +* acm_certificate_arn(Optional) : A `String` that represents the ACM certificate ARN. ACM certificate take preference over S3 or local file system certificate. Required if `use_acm_certificate_for_ssl` is set to `true`. +* acm_private_key_password(Optional): A `String` that represents the ACM private key password which that will be used to decrypt the private key. If it's not provided, a random password will be generated. +* acm_certificate_timeout_millis(Optional) : An `int` that represents the timeout in milliseconds for ACM to get certificates. Default value is `120000`. +* aws_region(Optional) : A `String` that represents the AWS region to use `ACM`, `S3`. Required if `use_acm_certificate_for_ssl` is set to `true` or `ssl_certificate_file` and `ssl_key_file` is `AWS S3`. + +### Example to enable SSL using OpenSSL + +Create the following OpenSearch API source configuration in your `pipeline.yaml`. + +```yaml +source: + opensearch_api: + ssl: true + ssl_certificate_file: "/full/path/to/certfile.crt" + ssl_key_file: "/full/path/to/keyfile.key" +``` + +Generate a private key named `keyfile.key`, along with a self-signed certificate file named `certfile.crt`. + +``` +openssl req -nodes -new -x509 -keyout keyfile.key -out certfile.crt -subj "/L=test/O=Example Com Inc./OU=Example Com Inc. Root CA/CN=Example Com Inc. Root CA" +``` + +Make sure to replace the paths for the `ssl_certificate_file` and `ssl_key_file` for the OpenSearch API source configuration with the actual paths of the files. + +- Use the following command to send a sample index action on the Bulk API request by setting the index `index = movies` in the body of the request. + +``` +curl -k -XPOST -H "Content-Type: application/json" -d '{ "index": { "_index": "movies", "_id": "tt1979320" } } +{ "title": "Rush", "year": 2013}' +http://localhost:9202/opensearch/_bulk +``` + +- Alternatively, use the following command to set the index `index = movies` in the path +``` +curl -k -XPOST -H "Content-Type: application/json" -d '{ "index": { "_index": "movies", "_id": "tt1979320" } } +{ "title": "Rush", "year": 2013}' +http://localhost:9202/opensearch/movies/_bulk +``` + +# Metrics + +### Counter +- `requestsReceived`: measures total number of requests received by `/opensearch` endpoint. +- `requestsRejected`: measures total number of requests rejected (429 response status code) by OpenSearch API source plugin. +- `successRequests`: measures total number of requests successfully processed (200 response status code) by OpenSearch API source plugin. +- `badRequests`: measures total number of requests with invalid content type or format processed by OpenSearch API source plugin (400 response status code). +- `requestTimeouts`: measures total number of requests that time out in the OpenSearch API source server (415 response status code). +- `requestsTooLarge`: measures total number of requests of which the events size in the content is larger than the buffer capacity (413 response status code). +- `internalServerError`: measures total number of requests processed by the OpenSearch API source with custom exception type (500 response status code). + +### Timer +- `requestProcessDuration`: measures latency of requests processed by the OpenSearch API source plugin in seconds. + +### Distribution Summary +- `payloadSize`: measures the distribution of incoming requests payload sizes in bytes. + +## Developer Guide +This plugin is compatible with Java 14. See +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) diff --git a/data-prepper-plugins/opensearch-api-source/build.gradle b/data-prepper-plugins/opensearch-api-source/build.gradle new file mode 100644 index 0000000000..874cbc4781 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/build.gradle @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +dependencies { + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:blocking-buffer') + implementation project(':data-prepper-plugins:http-source-common') + implementation project(':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:armeria-common') + implementation libs.armeria.core + implementation libs.commons.io + implementation 'software.amazon.awssdk:acm' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:apache-client' + testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-api').sourceSets.test.output + testImplementation 'org.assertj:assertj-core:3.25.3' + testImplementation testLibs.mockito.inline + compileOnly 'org.projectlombok:lombok:1.18.20' + annotationProcessor 'org.projectlombok:lombok:1.18.20' +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { //in addition to core projects rule + limit { + minimum = 0.90 + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java new file mode 100644 index 0000000000..d57da3632e --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIService.java @@ -0,0 +1,209 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.annotation.Param; +import io.micrometer.common.util.StringUtils; +import org.opensearch.dataprepper.http.codec.MultiLineJsonCodec; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventType; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.record.Record; +import com.linecorp.armeria.common.AggregatedHttpRequest; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.server.annotation.Blocking; +import com.linecorp.armeria.server.annotation.Post; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIRequestParams; +import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkActionAndMetadataObject; +import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; +import java.util.List; +import java.util.Map; +import java.util.Arrays; +import java.util.ArrayList; + +/* +* OpenSearch API Service class is responsible for handling bulk API requests. +* The bulk API is responsible for 1/ parsing the request body, 2/ validating against the schema for Document API (Bulk) and finally creating data prepper events. +* Bulk API supports query parameters "pipeline", "routing" and "refresh" +*/ +@Blocking +public class OpenSearchAPIService { + + //TODO: Will need to revisit the metrics per API endpoint + public static final String REQUESTS_RECEIVED = "RequestsReceived"; + public static final String SUCCESS_REQUESTS = "SuccessRequests"; + public static final String PAYLOAD_SIZE = "PayloadSize"; + public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; + + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPIService.class); + + // TODO: support other data-types as request body, e.g. json_lines, msgpack + private final MultiLineJsonCodec jsonCodec = new MultiLineJsonCodec(); + private final Buffer> buffer; + private final int bufferWriteTimeoutInMillis; + private final Counter requestsReceivedCounter; + private final Counter successRequestsCounter; + private final DistributionSummary payloadSizeSummary; + private final Timer requestProcessDuration; + + public OpenSearchAPIService(final int bufferWriteTimeoutInMillis, final Buffer> buffer, final PluginMetrics pluginMetrics) { + this.buffer = buffer; + this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; + + //TODO: Will need to revisit the metrics per API endpoint + requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); + successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); + payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE); + requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); + } + + @Post("/_bulk") + public HttpResponse doPostBulk(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest, + @Param("pipeline") Optional pipeline, @Param("routing") Optional routing) throws Exception { + + requestsReceivedCounter.increment(); + payloadSizeSummary.record(aggregatedHttpRequest.content().length()); + + if(serviceRequestContext.isTimedOut()) { + return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT); + } + BulkAPIRequestParams bulkAPIRequestParams = BulkAPIRequestParams.builder() + .pipeline(pipeline.orElse("")) + .routing(routing.orElse("")) + .build(); + return requestProcessDuration.recordCallable(() -> processBulkRequest(aggregatedHttpRequest, bulkAPIRequestParams)); + } + + @Post("/{index}/_bulk") + public HttpResponse doPostBulkIndex(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest, @Param("index") Optional index, + @Param("pipeline") Optional pipeline, @Param("routing") Optional routing) throws Exception { + requestsReceivedCounter.increment(); + payloadSizeSummary.record(aggregatedHttpRequest.content().length()); + + if(serviceRequestContext.isTimedOut()) { + return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT); + } + BulkAPIRequestParams bulkAPIRequestParams = BulkAPIRequestParams.builder() + .index(index.orElse("")) + .pipeline(pipeline.orElse("")) + .routing(routing.orElse("")) + .build(); + return requestProcessDuration.recordCallable(() -> processBulkRequest(aggregatedHttpRequest, bulkAPIRequestParams)); + } + + private HttpResponse processBulkRequest(final AggregatedHttpRequest aggregatedHttpRequest, final BulkAPIRequestParams bulkAPIRequestParams) throws Exception { + final HttpData content = aggregatedHttpRequest.content(); + List> bulkRequestPayloadList; + + // parse the request payload + try { + bulkRequestPayloadList = jsonCodec.parse(content); + } catch (IOException e) { + LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage()); + throw new IOException("Bad request data format.", e.getCause()); + } + + try { + if (buffer.isByteBuffer()) { + buffer.writeBytes(content.array(), null, bufferWriteTimeoutInMillis); + } else { + List> records = generateEventsFromBulkRequest(bulkRequestPayloadList, bulkAPIRequestParams); + buffer.writeAll(records, bufferWriteTimeoutInMillis); + } + } catch (Exception e) { + LOG.error("Failed to write the request of size {} due to: {}", content.length(), e.getMessage()); + throw e; + } + successRequestsCounter.increment(); + return HttpResponse.of(HttpStatus.OK); + } + + private boolean isValidBulkAction(Map actionMap) { + return Arrays.stream(OpenSearchBulkActions.values()) + .anyMatch(bulkAction -> actionMap.containsKey(bulkAction.toString())); + } + + private List> generateEventsFromBulkRequest(final List> bulkRequestPayloadList, final BulkAPIRequestParams bulkAPIRequestParams) throws Exception { + if (bulkRequestPayloadList.isEmpty()) { + throw new IOException("Invalid request data."); + } + + List> records = new ArrayList<>(); + Iterator> bulkRequestPayloadListIterator = bulkRequestPayloadList.iterator(); + + while (bulkRequestPayloadListIterator.hasNext()) { + Map actionMetadataRow = bulkRequestPayloadListIterator.next(); + if (!isValidBulkAction(actionMetadataRow)) { + throw new IOException("Invalid request data."); + } + + BulkActionAndMetadataObject bulkActionAndMetadataObject = new BulkActionAndMetadataObject(actionMetadataRow); + final boolean isDeleteAction = bulkActionAndMetadataObject.getAction().equals(OpenSearchBulkActions.DELETE.toString()); + Optional> documentDataObject = Optional.empty(); + if (!isDeleteAction) { + if (!bulkRequestPayloadListIterator.hasNext()) { + throw new IOException("Invalid request data."); + } + documentDataObject = Optional.of(bulkRequestPayloadListIterator.next()); + // Performing another validation check to make sure that the doc row is not a valid action row + if (!documentDataObject.isPresent() || isValidBulkAction(documentDataObject.get())) { + throw new IOException("Invalid request data."); + } + } + final JacksonEvent event = createBulkRequestActionEvent(bulkActionAndMetadataObject, bulkAPIRequestParams, documentDataObject); + records.add(new Record<>(event)); + } + + return records; + } + + private JacksonEvent createBulkRequestActionEvent( + final BulkActionAndMetadataObject bulkActionAndMetadataObject, + final BulkAPIRequestParams bulkAPIRequestParams, Optional> optionalDocumentData) { + + final JacksonEvent.Builder eventBuilder = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()); + optionalDocumentData.ifPresent(eventBuilder::withData); + final JacksonEvent event = eventBuilder.build(); + + final String index = bulkActionAndMetadataObject.getIndex().isBlank() || bulkActionAndMetadataObject.getIndex().isEmpty() + ? bulkAPIRequestParams.getIndex() : bulkActionAndMetadataObject.getIndex(); + + event.getMetadata().setAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION, bulkActionAndMetadataObject.getAction()); + event.getMetadata().setAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX, index); + + String docId = bulkActionAndMetadataObject.getDocId(); + if (!StringUtils.isBlank(docId) && !StringUtils.isEmpty(docId)) { + event.getMetadata().setAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID, docId); + } + + String pipeline = bulkAPIRequestParams.getPipeline(); + if (!StringUtils.isBlank(pipeline) && !StringUtils.isEmpty(pipeline)) { + event.getMetadata().setAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE, pipeline); + } + + String routing = bulkAPIRequestParams.getRouting(); + if (!StringUtils.isBlank(routing) && !StringUtils.isEmpty(routing)) { + event.getMetadata().setAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING, routing); + } + + return event; + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java new file mode 100644 index 0000000000..c10b918bb1 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISource.java @@ -0,0 +1,198 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.encoding.DecodingService; +import com.linecorp.armeria.server.healthcheck.HealthCheckService; +import com.linecorp.armeria.server.throttling.ThrottlingService; +import org.opensearch.dataprepper.HttpRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; +import org.opensearch.dataprepper.http.LogThrottlingStrategy; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.codec.JsonDecoder; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.function.Function; + +@DataPrepperPlugin(name = "opensearch_api", pluginType = Source.class, pluginConfigurationType = OpenSearchAPISourceConfig.class) +public class OpenSearchAPISource implements Source> { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPISource.class); + private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; + public static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; + static final String SERVER_CONNECTIONS = "serverConnections"; + + private final OpenSearchAPISourceConfig sourceConfig; + private final CertificateProviderFactory certificateProviderFactory; + private final ArmeriaHttpAuthenticationProvider authenticationProvider; + private final HttpRequestExceptionHandler httpRequestExceptionHandler; + private final String pipelineName; + private Server server; + private final PluginMetrics pluginMetrics; + private static final String HTTP_HEALTH_CHECK_PATH = "/health"; + private ByteDecoder byteDecoder; + + @DataPrepperPluginConstructor + public OpenSearchAPISource(final OpenSearchAPISourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, + final PipelineDescription pipelineDescription) { + this.sourceConfig = sourceConfig; + this.pluginMetrics = pluginMetrics; + this.pipelineName = pipelineDescription.getPipelineName(); + this.byteDecoder = new JsonDecoder(); + this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig); + final PluginModel authenticationConfiguration = sourceConfig.getAuthentication(); + final PluginSetting authenticationPluginSetting; + + if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME)) { + LOG.warn("Creating OpenSearch API source without authentication. This is not secure."); + LOG.warn("In order to set up Http Basic authentication for the OpenSearch API source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#authentication-configurations"); + } + + if(authenticationConfiguration != null) { + authenticationPluginSetting = + new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()); + } else { + authenticationPluginSetting = + new PluginSetting(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap()); + } + authenticationPluginSetting.setPipelineName(pipelineName); + authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting); + httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics); + } + + @Override + public void start(final Buffer> buffer) { + if (buffer == null) { + throw new IllegalStateException("Buffer provided is null"); + } + if (server == null) { + final ServerBuilder sb = Server.builder(); + + sb.disableServerHeader(); + + if (sourceConfig.isSsl()) { + LOG.info("Creating http source with SSL/TLS enabled."); + final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); + final Certificate certificate = certificateProvider.getCertificate(); + // TODO: enable encrypted key with password + sb.https(sourceConfig.getPort()).tls( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) + ) + ); + } else { + LOG.warn("Creating OpenSearch API source without SSL/TLS. This is not secure."); + LOG.warn("In order to set up TLS for the OpenSearch API source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl"); + sb.http(sourceConfig.getPort()); + } + + if(sourceConfig.getAuthentication() != null) { + final Optional> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator(); + + if (sourceConfig.isUnauthenticatedHealthCheck()) { + optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator)); + } else { + optionalAuthDecorator.ifPresent(sb::decorator); + } + } + + sb.maxNumConnections(sourceConfig.getMaxConnectionCount()); + sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis())); + if(sourceConfig.getMaxRequestLength() != null) { + sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes()); + } + final int threads = sourceConfig.getThreadCount(); + final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads); + sb.blockingTaskExecutor(blockingTaskExecutor, true); + final int maxPendingRequests = sourceConfig.getMaxPendingRequests(); + final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( + maxPendingRequests, blockingTaskExecutor.getQueue()); + final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); + + final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); + sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); + final OpenSearchAPIService openSearchAPIService = new OpenSearchAPIService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics); + + if (CompressionOption.NONE.equals(sourceConfig.getCompression())) { + sb.annotatedService(httpSourcePath, openSearchAPIService, httpRequestExceptionHandler); + } else { + sb.annotatedService(httpSourcePath, openSearchAPIService, DecodingService.newDecorator(), httpRequestExceptionHandler); + } + + if (sourceConfig.hasHealthCheckService()) { + LOG.info("OpenSearch API source health check is enabled"); + sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); + } + + server = sb.build(); + pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); + } + + try { + server.start().get(); + } catch (ExecutionException ex) { + if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } else { + throw new RuntimeException(ex); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); + } + LOG.info("Started OpenSearch API source on port " + sourceConfig.getPort() + "..."); + } + + @Override + public ByteDecoder getDecoder() { + return byteDecoder; + } + + @Override + public void stop() { + if (server != null) { + try { + server.stop().get(); + } catch (ExecutionException ex) { + if (ex.getCause() != null && ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } else { + throw new RuntimeException(ex); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); + } + } + LOG.info("Stopped OpenSearch API source."); + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java new file mode 100644 index 0000000000..646ecfe1a1 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfig.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import org.opensearch.dataprepper.http.BaseHttpServerConfig; + +public class OpenSearchAPISourceConfig extends BaseHttpServerConfig { + + static final String DEFAULT_ENDPOINT_URI = "/opensearch"; + static final int DEFAULT_PORT = 9202; + + @Override + public int getDefaultPort() { + return DEFAULT_PORT; + } + + @Override + public String getDefaultPath() { + return DEFAULT_ENDPOINT_URI; + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributes.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributes.java new file mode 100644 index 0000000000..2313167358 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributes.java @@ -0,0 +1,10 @@ +package org.opensearch.dataprepper.plugins.source.opensearchapi.model; + +public class BulkAPIEventMetadataKeyAttributes { + + public static final String BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION = "opensearch_action"; + public static final String BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX = "opensearch_index"; + public static final String BULK_API_EVENT_METADATA_ATTRIBUTE_ID = "opensearch_id"; + public static final String BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE = "opensearch_pipeline"; + public static final String BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING = "opensearch_routing"; +} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIRequestParams.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIRequestParams.java new file mode 100644 index 0000000000..71979e0b43 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIRequestParams.java @@ -0,0 +1,14 @@ +package org.opensearch.dataprepper.plugins.source.opensearchapi.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; + +@AllArgsConstructor +@Getter +@Builder +public class BulkAPIRequestParams { + private final String index; + private final String pipeline; + private final String routing; +} diff --git a/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObject.java b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObject.java new file mode 100644 index 0000000000..71eda44b76 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObject.java @@ -0,0 +1,41 @@ +package org.opensearch.dataprepper.plugins.source.opensearchapi.model; + +import lombok.Getter; + +import java.util.HashMap; +import java.util.Map; + +public class BulkActionAndMetadataObject { + @Getter + private String action; + + private Map requestModel; + private static final String emptyStringLiteral = ""; + + public BulkActionAndMetadataObject(Map requestModel) { + this.requestModel = requestModel; + this.action = isRequestModelValid() ? + requestModel.keySet().stream().findFirst().orElse(emptyStringLiteral) : emptyStringLiteral; + } + + public String getDocId() { + return getKeyInNestedMap("_id"); + } + public String getIndex() { + return getKeyInNestedMap("_index"); + } + + private String getKeyInNestedMap(final String key) { + if (!isRequestModelValid()) return emptyStringLiteral; + + Object apiAttributesMap = requestModel.getOrDefault(this.action, new HashMap()); + if (!(apiAttributesMap instanceof Map)) return emptyStringLiteral; + + return ((Map) apiAttributesMap).getOrDefault(key, emptyStringLiteral); + } + + private boolean isRequestModelValid() { + return requestModel != null && !requestModel.isEmpty(); + } + +} diff --git a/data-prepper-plugins/http-source/src/test/resources/test_cert.crt b/data-prepper-plugins/opensearch-api-source/src/main/resources/test_cert.crt similarity index 100% rename from data-prepper-plugins/http-source/src/test/resources/test_cert.crt rename to data-prepper-plugins/opensearch-api-source/src/main/resources/test_cert.crt diff --git a/data-prepper-plugins/http-source/src/test/resources/test_decrypted_key.key b/data-prepper-plugins/opensearch-api-source/src/main/resources/test_decrypted_key.key similarity index 100% rename from data-prepper-plugins/http-source/src/test/resources/test_decrypted_key.key rename to data-prepper-plugins/opensearch-api-source/src/main/resources/test_decrypted_key.key diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java new file mode 100644 index 0000000000..b6bfbc4932 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPIServiceTest.java @@ -0,0 +1,329 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import com.linecorp.armeria.server.ServiceRequestContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linecorp.armeria.common.AggregatedHttpRequest; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.List; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.never; + +@ExtendWith(MockitoExtension.class) +class OpenSearchAPIServiceTest { + private static final ObjectMapper mapper = new ObjectMapper(); + private static final int TEST_BUFFER_CAPACITY = 15; + private static final int TEST_TIMEOUT_IN_MILLIS = 500; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter requestsReceivedCounter; + + @Mock + private Counter successRequestsCounter; + + @Mock + private DistributionSummary payloadSizeSummary; + + @Mock + private Timer requestProcessDuration; + + @Mock + private ServiceRequestContext serviceRequestContext; + + private OpenSearchAPIService openSearchAPIService; + + @BeforeEach + public void setUp() throws Exception { + lenient().when(pluginMetrics.counter(openSearchAPIService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter); + lenient().when(pluginMetrics.counter(openSearchAPIService.SUCCESS_REQUESTS)).thenReturn(successRequestsCounter); + lenient().when(pluginMetrics.summary(openSearchAPIService.PAYLOAD_SIZE)).thenReturn(payloadSizeSummary); + lenient().when(pluginMetrics.timer(openSearchAPIService.REQUEST_PROCESS_DURATION)).thenReturn(requestProcessDuration); + lenient().when(serviceRequestContext.isTimedOut()).thenReturn(false); + lenient().when(requestProcessDuration.recordCallable(ArgumentMatchers.>any())).thenAnswer( + (Answer) invocation -> { + final Object[] args = invocation.getArguments(); + @SuppressWarnings("unchecked") + final Callable callable = (Callable) args[0]; + return callable.call(); + } + ); + + Buffer> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline"); + openSearchAPIService = new OpenSearchAPIService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPISuccess(boolean testBulkRequestAPIWithIndexInPath) throws Exception { + + AggregatedHttpRequest testRequest; + AggregatedHttpResponse postResponse; + + if (testBulkRequestAPIWithIndexInPath) { + // Prepare + testRequest = generateRandomValidBulkRequestWithNoIndexInBody(2); + // When + postResponse = openSearchAPIService.doPostBulkIndex(serviceRequestContext, testRequest, Optional.empty(), + Optional.ofNullable("pipeline-1"), Optional.ofNullable("routing-1")).aggregate().get(); + } else { + // Prepare + testRequest = generateRandomValidBulkRequest(2); + // When + postResponse = openSearchAPIService.doPostBulk(serviceRequestContext, testRequest, + Optional.empty(), Optional.empty()).aggregate().get(); + } + + // Then + assertEquals(HttpStatus.OK, postResponse.status()); + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, times(1)).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertEquals(testRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPISuccessWithMultipleBulkActions(boolean testBulkRequestAPIWithIndexInPath) throws Exception { + // Prepare + AggregatedHttpRequest testRequest = generateGoodBulkRequestWithMultipleActions(2); + + // When + AggregatedHttpResponse postResponse; + if (testBulkRequestAPIWithIndexInPath) { + postResponse = openSearchAPIService.doPostBulkIndex(serviceRequestContext, testRequest, Optional.empty(), + Optional.ofNullable("pipeline-1"), Optional.ofNullable("routing-1")).aggregate().get(); + } else { + postResponse = openSearchAPIService.doPostBulk(serviceRequestContext, testRequest, + Optional.empty(), Optional.empty()).aggregate().get(); + } + + // Then + assertEquals(HttpStatus.OK, postResponse.status()); + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, times(1)).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertEquals(testRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIBadRequestWithEmpty(boolean testBulkRequestAPIWithIndexInPath) throws Exception { + testBadRequestWithPayload(testBulkRequestAPIWithIndexInPath, ""); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIBadRequestWithInvalidPayload(boolean testBulkRequestAPIWithIndexInPath) throws Exception { + List jsonList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Collections.singletonMap("_index", "test-index")))); + } + testBadRequestWithPayload(testBulkRequestAPIWithIndexInPath, String.join("\n", jsonList)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIBadRequestWithInvalidPayload2(boolean testBulkRequestAPIWithIndexInPath) throws Exception { + List jsonList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("text", Collections.singletonMap("x", "test")))); + } + testBadRequestWithPayload(testBulkRequestAPIWithIndexInPath, String.join("\n", jsonList)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIBadRequestWithInvalidPayload3(boolean testBulkRequestAPIWithIndexInPath) throws Exception { + List jsonList = new ArrayList<>(); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Collections.singletonMap("_index", "test-index")))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("log", UUID.randomUUID().toString()))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Collections.singletonMap("_index", "test-index")))); + + testBadRequestWithPayload(testBulkRequestAPIWithIndexInPath, String.join("\n", jsonList)); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIEntityTooLarge(boolean testBulkRequestAPIWithIndexInPath) throws Exception { + // Prepare + AggregatedHttpRequest testTooLargeRequest = generateRandomValidBulkRequest(TEST_BUFFER_CAPACITY + 1); + + // When + if (testBulkRequestAPIWithIndexInPath) { + assertThrows(SizeOverflowException.class, () -> openSearchAPIService.doPostBulkIndex(serviceRequestContext, testTooLargeRequest, Optional.empty(), + Optional.empty(), Optional.empty()).aggregate().get()); + } else { + assertThrows(SizeOverflowException.class, () -> openSearchAPIService.doPostBulk(serviceRequestContext, testTooLargeRequest, Optional.empty(), + Optional.empty()).aggregate().get()); + } + + // Then + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, never()).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertEquals(testTooLargeRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); + } + + @Test + public void testBulkRequestWithIndexAPIRequestTimeout() throws Exception { + // Prepare + AggregatedHttpRequest populateDataRequest = generateRandomValidBulkRequest(3); + + lenient().when(serviceRequestContext.isTimedOut()).thenReturn(true); + + AggregatedHttpResponse response = openSearchAPIService.doPostBulkIndex(serviceRequestContext, populateDataRequest, Optional.empty(), + Optional.empty(), Optional.empty()).aggregate().get(); + assertEquals(HttpStatus.REQUEST_TIMEOUT, response.status()); + + // Then + verify(requestsReceivedCounter, times(1)).increment(); + } + + @Test + public void testBulkRequestAPIRequestTimeout() throws Exception { + // Prepare + AggregatedHttpRequest populateDataRequest = generateRandomValidBulkRequest(3); + + lenient().when(serviceRequestContext.isTimedOut()).thenReturn(true); + AggregatedHttpResponse response = openSearchAPIService.doPostBulk(serviceRequestContext, populateDataRequest, Optional.empty(), + Optional.empty()).aggregate().get(); + assertEquals(HttpStatus.REQUEST_TIMEOUT, response.status()); + + // Then + verify(requestsReceivedCounter, times(1)).increment(); + } + + private void testBadRequestWithPayload(boolean testBulkRequestAPIWithIndexInPath, String requestBody) throws Exception { + // Prepare + RequestHeaders requestHeaders = RequestHeaders.builder() + .contentType(MediaType.JSON) + .method(HttpMethod.POST) + .path("/opensearch") + .build(); + + HttpData httpData = HttpData.ofUtf8(requestBody); + AggregatedHttpRequest testBadRequest = HttpRequest.of(requestHeaders, httpData).aggregate().get(); + // When + if (testBulkRequestAPIWithIndexInPath) { + assertThrows(IOException.class, () -> openSearchAPIService.doPostBulkIndex(serviceRequestContext, testBadRequest, Optional.empty(), + Optional.empty(), Optional.empty()).aggregate().get()); + } else { + assertThrows(IOException.class, () -> openSearchAPIService.doPostBulk(serviceRequestContext, testBadRequest, Optional.empty(), + Optional.empty()).aggregate().get()); + } + + // Then + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, never()).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertEquals(testBadRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); + } + + private AggregatedHttpRequest generateRandomValidBulkRequest(int numJson) throws JsonProcessingException, + ExecutionException, InterruptedException { + RequestHeaders requestHeaders = RequestHeaders.builder() + .contentType(MediaType.JSON) + .method(HttpMethod.POST) + .path("/opensearch") + .build(); + List jsonList = new ArrayList<>(); + for (int i = 0; i < numJson; i++) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_index", "test-index", "_id", "123")))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("log", UUID.randomUUID().toString()))); + } + HttpData httpData = HttpData.ofUtf8(String.join("\n", jsonList)); + return HttpRequest.of(requestHeaders, httpData).aggregate().get(); + } + + private AggregatedHttpRequest generateRandomValidBulkRequestWithNoIndexInBody(int numJson) throws JsonProcessingException, + ExecutionException, InterruptedException { + RequestHeaders requestHeaders = RequestHeaders.builder() + .contentType(MediaType.JSON) + .method(HttpMethod.POST) + .path("/opensearch") + .build(); + List jsonList = new ArrayList<>(); + for (int i = 0; i < numJson; i++) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_id", "123")))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("log", UUID.randomUUID().toString()))); + } + HttpData httpData = HttpData.ofUtf8(String.join("\n", jsonList)); + return HttpRequest.of(requestHeaders, httpData).aggregate().get(); + } + + private AggregatedHttpRequest generateGoodBulkRequestWithMultipleActions(int numJson) throws JsonProcessingException, ExecutionException, InterruptedException { + RequestHeaders requestHeaders = RequestHeaders.builder() + .contentType(MediaType.JSON) + .method(HttpMethod.POST) + .path("/opensearch") + .build(); + List jsonList = new ArrayList<>(); + for (int i = 0; i < numJson; i++) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_index", "test-index", "_id", "123")))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("log", UUID.randomUUID().toString()))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("delete", Map.of("_index", "test-index", "_id", "124")))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("create", Map.of("_index", "test-index", "_id", "125")))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("log", UUID.randomUUID().toString()))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("update", Map.of("_index", "test-index", "_id", "126")))); + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("log", UUID.randomUUID().toString()))); + } + HttpData httpData = HttpData.ofUtf8(String.join("\n", jsonList)); + return HttpRequest.of(requestHeaders, httpData).aggregate().get(); + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfigTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfigTest.java new file mode 100644 index 0000000000..636bb8d66a --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceConfigTest.java @@ -0,0 +1,17 @@ +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class OpenSearchAPISourceConfigTest { + @Test + void testDefault() { + // Prepare + final OpenSearchAPISourceConfig sourceConfig = new OpenSearchAPISourceConfig(); + + // When/Then + assertEquals(OpenSearchAPISourceConfig.DEFAULT_PORT, sourceConfig.getPort()); + assertEquals(OpenSearchAPISourceConfig.DEFAULT_ENDPOINT_URI, sourceConfig.getPath()); + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java new file mode 100644 index 0000000000..f09f5d37e9 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/OpenSearchAPISourceTest.java @@ -0,0 +1,778 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearchapi; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import io.micrometer.core.instrument.Measurement; +import io.micrometer.core.instrument.Statistic; +import org.apache.commons.io.IOUtils; +import io.netty.util.AsciiString; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.HttpRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; +import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.source.opensearchapi.model.BulkAPIEventMetadataKeyAttributes; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class OpenSearchAPISourceTest { + private final String PLUGIN_NAME = "opensearch_api"; + private final String TEST_PIPELINE_NAME = "test_pipeline"; + private final String TEST_INDEX = "test-index"; + private final String AUTHORITY = "127.0.0.1:9202"; + private final int DEFAULT_PORT = 9202; + private final int DEFAULT_REQUEST_TIMEOUT_MS = 10_000; + private final int DEFAULT_THREAD_COUNT = 200; + private final int MAX_CONNECTIONS_COUNT = 500; + private final int MAX_PENDING_REQUESTS_COUNT = 1024; + + private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); + private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Mock + private ServerBuilder serverBuilder; + + @Mock + private Server server; + + @Mock + private CompletableFuture completableFuture; + + private BlockingBuffer> testBuffer; + private OpenSearchAPISource openSearchAPISource; + private List requestsReceivedMeasurements; + private List successRequestsMeasurements; + private List requestTimeoutsMeasurements; + private List badRequestsMeasurements; + private List requestsTooLargeMeasurements; + private List rejectedRequestsMeasurements; + private List requestProcessDurationMeasurements; + private List payloadSizeSummaryMeasurements; + private List serverConnectionsMeasurements; + private OpenSearchAPISourceConfig sourceConfig; + private PluginMetrics pluginMetrics; + private PluginFactory pluginFactory; + private PipelineDescription pipelineDescription; + + private BlockingBuffer> getBuffer() { + final HashMap integerHashMap = new HashMap<>(); + integerHashMap.put("buffer_size", 1); + integerHashMap.put("batch_size", 1); + final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap); + pluginSetting.setPipelineName(TEST_PIPELINE_NAME); + return new BlockingBuffer<>(pluginSetting); + } + + /** + * This method should be invoked after {@link OpenSearchAPISource::start(Buffer buffer)} to scrape metrics + */ + private void refreshMeasurements() { + final String metricNamePrefix = new StringJoiner(MetricNames.DELIMITER) + .add(TEST_PIPELINE_NAME).add(PLUGIN_NAME).toString(); + requestsReceivedMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(OpenSearchAPIService.REQUESTS_RECEIVED).toString()); + successRequestsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(OpenSearchAPIService.SUCCESS_REQUESTS).toString()); + requestTimeoutsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(HttpRequestExceptionHandler.REQUEST_TIMEOUTS).toString()); + badRequestsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(HttpRequestExceptionHandler.BAD_REQUESTS).toString()); + requestsTooLargeMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE).toString()); + rejectedRequestsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogThrottlingRejectHandler.REQUESTS_REJECTED).toString()); + requestProcessDurationMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(OpenSearchAPIService.REQUEST_PROCESS_DURATION).toString()); + payloadSizeSummaryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(OpenSearchAPIService.PAYLOAD_SIZE).toString()); + serverConnectionsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(OpenSearchAPISource.SERVER_CONNECTIONS).toString()); + } + + private byte[] createGZipCompressedPayload(final String payload) throws IOException { + // Create a GZip compressed request body + final ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + try (final GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream)) { + gzipStream.write(payload.getBytes(StandardCharsets.UTF_8)); + } + return byteStream.toByteArray(); + } + + @BeforeEach + public void setUp() { + lenient().when(serverBuilder.annotatedService(any())).thenReturn(serverBuilder); + lenient().when(serverBuilder.http(anyInt())).thenReturn(serverBuilder); + lenient().when(serverBuilder.https(anyInt())).thenReturn(serverBuilder); + lenient().when(serverBuilder.build()).thenReturn(server); + lenient().when(server.start()).thenReturn(completableFuture); + + sourceConfig = mock(OpenSearchAPISourceConfig.class); + lenient().when(sourceConfig.getPort()).thenReturn(DEFAULT_PORT); + lenient().when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_ENDPOINT_URI); + lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(DEFAULT_REQUEST_TIMEOUT_MS); + lenient().when(sourceConfig.getThreadCount()).thenReturn(DEFAULT_THREAD_COUNT); + lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(MAX_CONNECTIONS_COUNT); + lenient().when(sourceConfig.getMaxPendingRequests()).thenReturn(MAX_PENDING_REQUESTS_COUNT); + lenient().when(sourceConfig.hasHealthCheckService()).thenReturn(true); + lenient().when(sourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + + MetricsTestUtil.initMetrics(); + pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + + pluginFactory = mock(PluginFactory.class); + final ArmeriaHttpAuthenticationProvider authenticationProvider = new HttpBasicArmeriaHttpAuthenticationProvider(new HttpBasicAuthenticationConfig("test", "test")); + when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(authenticationProvider); + + testBuffer = getBuffer(); + pipelineDescription = mock(PipelineDescription.class); + when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); + + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + } + + @AfterEach + public void cleanUp() { + if (openSearchAPISource != null) { + openSearchAPISource.stop(); + } + } + + private void assertSecureResponseWithStatusCode(final AggregatedHttpResponse response, final HttpStatus expectedStatus) { + assertThat("Http Status", response.status(), equalTo(expectedStatus)); + + final List headerKeys = response.headers() + .stream() + .map(Map.Entry::getKey) + .map(AsciiString::toString) + .collect(Collectors.toList()); + assertThat("Response Header Keys", headerKeys, not(contains("server"))); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIResponse200(boolean includeIndexInPath) throws IOException { + int numberOfRecords = 1; + testBulkRequestAPI200(includeIndexInPath, false, numberOfRecords); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPICompressionResponse200(boolean includeIndexInPath) throws IOException { + int numberOfRecords = 1; + testBulkRequestAPI200(includeIndexInPath, true, numberOfRecords); + } + + @Test + public void testHealthCheck() { + // Prepare + openSearchAPISource.start(testBuffer); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.GET) + .path("/health") + .build()) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + } + + @Test + public void testHealthCheckUnauthenticatedDisabled() { + // Prepare + when(sourceConfig.isUnauthenticatedHealthCheck()).thenReturn(false); + when(sourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic", + Map.of( + "username", "test", + "password", "test" + ))); + pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + + openSearchAPISource.start(testBuffer); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.GET) + .path("/health") + .build()) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.UNAUTHORIZED)).join(); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestJsonResponse400WithEmptyPayload(boolean includeIndexInPath) { + // Prepare + final String testBadData = ""; //Empty body + openSearchAPISource.start(testBuffer); + refreshMeasurements(); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/opensearch/" + TEST_INDEX + "/_bulk" :"/opensearch/_bulk") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testBadData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.BAD_REQUEST)).join(); + + // Then + Assertions.assertTrue(testBuffer.isEmpty()); + // Verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestReceivedCount.getValue()); + final Measurement badRequestsCount = MetricsTestUtil.getMeasurementFromList( + badRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, badRequestsCount.getValue()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestJsonResponse400WithInvalidPayload(boolean includeIndexInPath) throws JsonProcessingException { + // Prepare + List jsonList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Collections.singletonMap("_index", "test-index")))); + } + final String testBadData = String.join("\n", jsonList); + openSearchAPISource.start(testBuffer); + refreshMeasurements(); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/opensearch/" + TEST_INDEX + "/_bulk" :"/opensearch/_bulk") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testBadData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.BAD_REQUEST)).join(); + + // Then + Assertions.assertTrue(testBuffer.isEmpty()); + // Verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestReceivedCount.getValue()); + final Measurement badRequestsCount = MetricsTestUtil.getMeasurementFromList( + badRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, badRequestsCount.getValue()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIJsonResponse413(boolean includeIndexInPath) throws JsonProcessingException { + testBulkRequestJsonResponse413(includeIndexInPath); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testBulkRequestAPIJsonResponse408(boolean includeIndexInPath) throws JsonProcessingException { + testBulkRequestJsonResponse408(includeIndexInPath); + } + + private void testBulkRequestAPI200(boolean includeIndexInPath, boolean useCompression, int numberOfRecords) throws IOException { + final String testData = generateTestData(includeIndexInPath, numberOfRecords); + final int testPayloadSize = testData.getBytes().length; + if (useCompression) { + when(sourceConfig.getCompression()).thenReturn(CompressionOption.GZIP); + } + + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + openSearchAPISource.start(testBuffer); + refreshMeasurements(); + + // When + if (useCompression) { + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/opensearch/" + TEST_INDEX + "/_bulk" :"/opensearch/_bulk") + .add(HttpHeaderNames.CONTENT_ENCODING, "gzip") + .build(), + createGZipCompressedPayload(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + } else { + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/opensearch/" + TEST_INDEX + "/_bulk" : "/opensearch/_bulk") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + } + // Then + Assertions.assertFalse(testBuffer.isEmpty()); + + final Map.Entry>, CheckpointState> result = testBuffer.read(100); + List> records = new ArrayList<>(result.getKey()); + Assertions.assertEquals(numberOfRecords, records.size()); + final Record record = records.get(0); + Assertions.assertEquals("text-data", record.getData().get("text", String.class)); + Assertions.assertEquals("index", record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION)); + Assertions.assertEquals(TEST_INDEX, record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX)); + Assertions.assertEquals("123", record.getData().getMetadata().getAttribute(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID)); + // Verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestReceivedCount.getValue()); + final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( + successRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, successRequestsCount.getValue()); + final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); + final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.MAX); + Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); + final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( + payloadSizeSummaryMeasurements, Statistic.MAX); + Assertions.assertEquals(testPayloadSize, payloadSizeMax.getValue()); + Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); + } + + private String generateTestData(boolean includeIndexInPath, int numberOfRecords) throws JsonProcessingException { + List jsonList = new ArrayList<>(); + for (int i = 0; i < numberOfRecords; i++) { + if (includeIndexInPath) { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_id", "123")))); + } else { + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("index", Map.of("_index", TEST_INDEX, "_id", "123")))); + } + jsonList.add(mapper.writeValueAsString(Collections.singletonMap("text", "text-data"))); + } + return String.join("\n", jsonList); + } + + private void testBulkRequestJsonResponse408(boolean includeIndexInPath) throws JsonProcessingException { + // Prepare + final int testMaxPendingRequests = 1; + final int testThreadCount = 1; + final int serverTimeoutInMillis = 500; + final int bufferTimeoutInMillis = 400; + when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(serverTimeoutInMillis); + when(sourceConfig.getBufferTimeoutInMillis()).thenReturn(bufferTimeoutInMillis); + when(sourceConfig.getMaxPendingRequests()).thenReturn(testMaxPendingRequests); + when(sourceConfig.getThreadCount()).thenReturn(testThreadCount); + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + // Start the source + openSearchAPISource.start(testBuffer); + refreshMeasurements(); + final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath? "/opensearch/"+TEST_INDEX+"/_bulk" : "/opensearch/_bulk") + .contentType(MediaType.JSON_UTF_8) + .build(); + final HttpData testHttpData = HttpData.ofUtf8(generateTestData(includeIndexInPath, 1)); + + // Fill in the buffer + WebClient.of().execute(testRequestHeaders, testHttpData).aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + + // Disable client timeout + WebClient testWebClient = WebClient.builder().responseTimeoutMillis(0).build(); + + // When/Then + testWebClient.execute(testRequestHeaders, testHttpData) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_TIMEOUT)).join(); + // verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(2.0, requestReceivedCount.getValue()); + final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( + successRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, successRequestsCount.getValue()); + final Measurement requestTimeoutsCount = MetricsTestUtil.getMeasurementFromList( + requestTimeoutsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestTimeoutsCount.getValue()); + final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.MAX); + final double maxDurationInMillis = 1000 * requestProcessDurationMax.getValue(); + Assertions.assertTrue(maxDurationInMillis > bufferTimeoutInMillis); + } + + private void testBulkRequestJsonResponse413(boolean includeIndexInPath) throws JsonProcessingException { + // Prepare + final String testData = generateTestData(includeIndexInPath, 50); + final int testPayloadSize = testData.getBytes().length; + openSearchAPISource.start(testBuffer); + refreshMeasurements(); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/opensearch/"+TEST_INDEX+"/_bulk" : "/opensearch/_bulk") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join(); + + // Then + Assertions.assertTrue(testBuffer.isEmpty()); + // Verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestReceivedCount.getValue()); + final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( + successRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(0.0, successRequestsCount.getValue()); + final Measurement requestsTooLargeCount = MetricsTestUtil.getMeasurementFromList( + requestsTooLargeMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestsTooLargeCount.getValue()); + final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); + final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.MAX); + Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); + final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( + payloadSizeSummaryMeasurements, Statistic.MAX); + Assertions.assertEquals(testPayloadSize, payloadSizeMax.getValue()); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testOpenSearchAPISourceServerConnectionsMetric(boolean includeIndexInPath) throws JsonProcessingException { + // Prepare + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + openSearchAPISource.start(testBuffer); + refreshMeasurements(); + + // Verify connections metric value is 0 + Measurement serverConnectionsMeasurement = MetricsTestUtil.getMeasurementFromList(serverConnectionsMeasurements, Statistic.VALUE); + Assertions.assertEquals(0, serverConnectionsMeasurement.getValue()); + + final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/opensearch/"+TEST_INDEX+"/_bulk" : "/opensearch/_bulk") + .contentType(MediaType.JSON_UTF_8) + .build(); + final HttpData testHttpData = HttpData.ofUtf8(generateTestData(includeIndexInPath, 1)); + + // Send request + WebClient.of().execute(testRequestHeaders, testHttpData).aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + + // Verify connections metric value is 1 + serverConnectionsMeasurement = MetricsTestUtil.getMeasurementFromList(serverConnectionsMeasurements, Statistic.VALUE); + Assertions.assertEquals(1.0, serverConnectionsMeasurement.getValue()); + } + + @Test + public void testOpenSearchAPISourceServerStartCertFileSuccess() throws IOException { + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + when(server.stop()).thenReturn(completableFuture); + + final Path certFilePath = new File(TEST_SSL_CERTIFICATE_FILE).toPath(); + final Path keyFilePath = new File(TEST_SSL_KEY_FILE).toPath(); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + + when(sourceConfig.isSsl()).thenReturn(true); + when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); + when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + openSearchAPISource.start(testBuffer); + openSearchAPISource.stop(); + + final ArgumentCaptor certificateIs = ArgumentCaptor.forClass(InputStream.class); + final ArgumentCaptor privateKeyIs = ArgumentCaptor.forClass(InputStream.class); + verify(serverBuilder).tls(certificateIs.capture(), privateKeyIs.capture()); + final String actualCertificate = IOUtils.toString(certificateIs.getValue(), StandardCharsets.UTF_8.name()); + final String actualPrivateKey = IOUtils.toString(privateKeyIs.getValue(), StandardCharsets.UTF_8.name()); + assertThat(actualCertificate, is(certAsString)); + assertThat(actualPrivateKey, is(keyAsString)); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testBulkRequestAPIJsonResponse(boolean includeIndexInPath) throws JsonProcessingException { + reset(sourceConfig); + when(sourceConfig.getPort()).thenReturn(DEFAULT_PORT); + when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_ENDPOINT_URI); + lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(DEFAULT_REQUEST_TIMEOUT_MS); + lenient().when(sourceConfig.getThreadCount()).thenReturn(DEFAULT_THREAD_COUNT); + lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(MAX_CONNECTIONS_COUNT); + lenient().when(sourceConfig.getMaxPendingRequests()).thenReturn(MAX_PENDING_REQUESTS_COUNT); + when(sourceConfig.isSsl()).thenReturn(true); + when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); + when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + + testBuffer = getBuffer(); + openSearchAPISource.start(testBuffer); + + WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTPS) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path(includeIndexInPath ? "/opensearch/"+TEST_INDEX+"/_bulk" : "/opensearch/_bulk") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(generateTestData(includeIndexInPath, 1))) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + } + + @Test + public void testDoubleStart() { + // starting server + openSearchAPISource.start(testBuffer); + // double start server + Assertions.assertThrows(IllegalStateException.class, () -> openSearchAPISource.start(testBuffer)); + } + + @Test + public void testStartWithEmptyBuffer() { + final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + Assertions.assertThrows(IllegalStateException.class, () -> source.start(null)); + } + + @Test + public void testStartWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException { + // Prepare + final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + when(completableFuture.get()).thenThrow(new ExecutionException("", null)); + + // When/Then + Assertions.assertThrows(RuntimeException.class, () -> source.start(testBuffer)); + } + } + + @Test + public void testStartWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException { + // Prepare + final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + final NullPointerException expCause = new NullPointerException(); + when(completableFuture.get()).thenThrow(new ExecutionException("", expCause)); + + // When/Then + final RuntimeException ex = Assertions.assertThrows(RuntimeException.class, () -> source.start(testBuffer)); + Assertions.assertEquals(expCause, ex); + } + } + + @Test + public void testStartWithInterruptedException() throws ExecutionException, InterruptedException { + // Prepare + final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + when(completableFuture.get()).thenThrow(new InterruptedException()); + + // When/Then + Assertions.assertThrows(RuntimeException.class, () -> source.start(testBuffer)); + Assertions.assertTrue(Thread.interrupted()); + } + } + + @Test + public void testStopWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException { + // Prepare + final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + source.start(testBuffer); + when(server.stop()).thenReturn(completableFuture); + + // When/Then + when(completableFuture.get()).thenThrow(new ExecutionException("", null)); + Assertions.assertThrows(RuntimeException.class, source::stop); + } + } + + @Test + public void testStopWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException { + // Prepare + final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + source.start(testBuffer); + when(server.stop()).thenReturn(completableFuture); + final NullPointerException expCause = new NullPointerException(); + when(completableFuture.get()).thenThrow(new ExecutionException("", expCause)); + + // When/Then + final RuntimeException ex = Assertions.assertThrows(RuntimeException.class, source::stop); + Assertions.assertEquals(expCause, ex); + } + } + + @Test + public void testStopWithInterruptedException() throws ExecutionException, InterruptedException { + // Prepare + final OpenSearchAPISource source = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + source.start(testBuffer); + when(server.stop()).thenReturn(completableFuture); + when(completableFuture.get()).thenThrow(new InterruptedException()); + + // When/Then + Assertions.assertThrows(RuntimeException.class, source::stop); + Assertions.assertTrue(Thread.interrupted()); + } + } + + @Test + public void testRunAnotherSourceWithSamePort() { + // starting server + openSearchAPISource.start(testBuffer); + + final OpenSearchAPISource secondSource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + //Expect RuntimeException because when port is already in use, BindException is thrown which is not RuntimeException + Assertions.assertThrows(RuntimeException.class, () -> secondSource.start(testBuffer)); + } + + @Test + public void request_that_exceeds_maxRequestLength_returns_413() throws JsonProcessingException { + reset(sourceConfig); + lenient().when(sourceConfig.getPort()).thenReturn(DEFAULT_PORT); + lenient().when(sourceConfig.getPath()).thenReturn(OpenSearchAPISourceConfig.DEFAULT_ENDPOINT_URI); + lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000); + lenient().when(sourceConfig.getThreadCount()).thenReturn(200); + lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(500); + lenient().when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); + lenient().when(sourceConfig.hasHealthCheckService()).thenReturn(true); + lenient().when(sourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + lenient().when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); + openSearchAPISource = new OpenSearchAPISource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + // Prepare + final String testData = "" + + "{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + + "{ \"text\": \"text1\", \"year\": \"2013\" }"; + + assertThat((long) testData.getBytes().length, greaterThan(sourceConfig.getMaxRequestLength().getBytes())); + openSearchAPISource.start(testBuffer); + refreshMeasurements(); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority(AUTHORITY) + .method(HttpMethod.POST) + .path("/opensearch") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join(); + + // Then + Assertions.assertTrue(testBuffer.isEmpty()); + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributesTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributesTest.java new file mode 100644 index 0000000000..a9e47fede7 --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkAPIEventMetadataKeyAttributesTest.java @@ -0,0 +1,17 @@ +package org.opensearch.dataprepper.plugins.source.opensearchapi.model; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BulkAPIEventMetadataKeyAttributesTest { + + @Test + public void testEventMetadataKeyAttributes() { + assertEquals(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ACTION, "opensearch_action"); + assertEquals(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_INDEX, "opensearch_index"); + assertEquals(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ID, "opensearch_id"); + assertEquals(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_PIPELINE, "opensearch_pipeline"); + assertEquals(BulkAPIEventMetadataKeyAttributes.BULK_API_EVENT_METADATA_ATTRIBUTE_ROUTING, "opensearch_routing"); + } +} diff --git a/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObjectTest.java b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObjectTest.java new file mode 100644 index 0000000000..27da1d8d8f --- /dev/null +++ b/data-prepper-plugins/opensearch-api-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearchapi/model/BulkActionAndMetadataObjectTest.java @@ -0,0 +1,58 @@ +package org.opensearch.dataprepper.plugins.source.opensearchapi.model; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BulkActionAndMetadataObjectTest { + + private final String bulkActionBodyTwoLevelMap = "{\"index\": {\"_index\": \"test-index-1\"}}"; + private final String bulkActionBodyOneLevelMap = "{\"text\": \"message-1\"}"; + private ObjectMapper objectMapper; + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference<>() {}; + private static final String emptyStringLiteral = ""; + @BeforeEach + void setup() { + objectMapper = new ObjectMapper(); + } + + @Test + public void testDeserializedJsonToTwoLevelMap() throws Exception { + Map bulkActionBodyMap = objectMapper.readValue(bulkActionBodyTwoLevelMap, MAP_TYPE_REFERENCE); + BulkActionAndMetadataObject BulkActionAndMetadataObject = new BulkActionAndMetadataObject(bulkActionBodyMap); + assertEquals(BulkActionAndMetadataObject.getAction(), "index"); + assertEquals(BulkActionAndMetadataObject.getIndex(), "test-index-1"); + assertEquals(BulkActionAndMetadataObject.getDocId(), emptyStringLiteral); + } + + @Test + public void testDeserializedJsonToOneLevelMap() throws Exception { + Map bulkActionBodyMap = objectMapper.readValue(bulkActionBodyOneLevelMap, MAP_TYPE_REFERENCE); + BulkActionAndMetadataObject BulkActionAndMetadataObject = new BulkActionAndMetadataObject(bulkActionBodyMap); + assertEquals(BulkActionAndMetadataObject.getAction(), "text"); + assertEquals(BulkActionAndMetadataObject.getIndex(), emptyStringLiteral); + assertEquals(BulkActionAndMetadataObject.getDocId(), emptyStringLiteral); + } + + @Test + public void testDeserializedJsonToEmptyMap() { + BulkActionAndMetadataObject BulkActionAndMetadataObject = new BulkActionAndMetadataObject(new HashMap<>()); + assertEquals(BulkActionAndMetadataObject.getAction(), emptyStringLiteral); + assertEquals(BulkActionAndMetadataObject.getIndex(), emptyStringLiteral); + assertEquals(BulkActionAndMetadataObject.getDocId(), emptyStringLiteral); + } + + @Test + public void testDeserializedJsonToNullMap() { + BulkActionAndMetadataObject BulkActionAndMetadataObject = new BulkActionAndMetadataObject(null); + assertEquals(BulkActionAndMetadataObject.getAction(), emptyStringLiteral); + assertEquals(BulkActionAndMetadataObject.getIndex(), emptyStringLiteral); + assertEquals(BulkActionAndMetadataObject.getDocId(), emptyStringLiteral); + } +} diff --git a/settings.gradle b/settings.gradle index a2495d9ffc..cb485a20d5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -173,4 +173,5 @@ include 'data-prepper-plugins:http-common' include 'data-prepper-plugins:flatten-processor' include 'data-prepper-plugins:mongodb' include 'data-prepper-plugins:rds-source' -include 'data-prepper-plugins:http-source-common' \ No newline at end of file +include 'data-prepper-plugins:http-source-common' +include 'data-prepper-plugins:opensearch-api-source' \ No newline at end of file