-
Notifications
You must be signed in to change notification settings - Fork 202
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
OpenSearch API source implementation
Signed-off-by: Souvik Bose <[email protected]>
- Loading branch information
Showing
20 changed files
with
1,994 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
41 changes: 41 additions & 0 deletions
41
...source-common/src/main/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodec.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package org.opensearch.dataprepper.http.codec; | ||
|
||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.linecorp.armeria.common.HttpData; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.io.InputStreamReader; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class MultiLineJsonCodec implements Codec<List<Map<String, Object>>> { | ||
private static final ObjectMapper objectMapper = new ObjectMapper(); | ||
private static final String REGEX = "\\r?\\n"; | ||
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = | ||
new TypeReference<Map<String, Object>>() {}; | ||
|
||
@Override | ||
public List<Map<String, Object>> parse(HttpData httpData) throws IOException { | ||
List<Map<String, Object>> jsonListData = new ArrayList<>(); | ||
|
||
String requestBody = new String(httpData.toInputStream().readAllBytes(), StandardCharsets.UTF_8); | ||
List<String> jsonLines = Arrays.asList(requestBody.split(REGEX)); | ||
|
||
for (String jsonLine: jsonLines) { | ||
if (isInvalidLine(jsonLine)) { | ||
throw new IOException("Error processing request payload."); | ||
} | ||
jsonListData.add(objectMapper.readValue(jsonLine, MAP_TYPE_REFERENCE)); | ||
} | ||
return jsonListData; | ||
} | ||
|
||
private static boolean isInvalidLine(final String str) { | ||
return str == null || str.isEmpty() || str.isBlank(); | ||
} | ||
} |
86 changes: 86 additions & 0 deletions
86
...ce-common/src/test/java/org/opensearch/dataprepper/http/codec/MultiLineJsonCodecTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package org.opensearch.dataprepper.http.codec; | ||
|
||
import com.linecorp.armeria.common.HttpData; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
||
public class MultiLineJsonCodecTest { | ||
private final HttpData serializedRequest = HttpData.ofUtf8("" + | ||
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + | ||
"{ \"text\": \"text1\", \"year\": \"2013\" }"); | ||
|
||
private final HttpData serializedRequestMultipleRows = HttpData.ofUtf8("" + | ||
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + | ||
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" + | ||
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + | ||
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" + | ||
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + | ||
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" + | ||
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + | ||
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" + | ||
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" + | ||
"{ \"text\": \"text1\", \"year\": \"2013\" }\n"); | ||
|
||
private final HttpData serializedRequestBad = HttpData.ofUtf8("{}\n\n{}"); | ||
private final HttpData serializedRequestBadWithBlanks = HttpData.ofUtf8("{}\n \n "); | ||
private final HttpData serializedRequestBadWithWhiteSpaces = HttpData.ofUtf8("\t\n\r\f {}"); | ||
private final HttpData serializedRequestBadEmpty = HttpData.ofUtf8(""); | ||
private final HttpData serializedRequestBadEmptyNewLines = HttpData.ofUtf8("\n\n\n\n\n\n\n \n"); | ||
private final HttpData serializedRequestBadInvalidJson = HttpData.ofUtf8("{\"text\":"); | ||
|
||
private final MultiLineJsonCodec multiLineJsonCodec = new MultiLineJsonCodec(); | ||
|
||
@Test | ||
public void testParseSuccess() throws IOException { | ||
// When | ||
List<Map<String, Object>> res = multiLineJsonCodec.parse(serializedRequest); | ||
|
||
// Then | ||
assertEquals(2, res.size()); | ||
assertEquals(res.get(0).containsKey("index"), true); | ||
Map<String, Object> innerMap = (Map<String, Object>) res.get(0).get("index"); | ||
assertEquals(innerMap.get("_index"), "test-index"); | ||
assertEquals(innerMap.get("_id"), "id1"); | ||
assertEquals(res.get(1).containsKey("text"), true); | ||
assertEquals(res.get(1).get("text"), "text1"); | ||
assertEquals(res.get(1).get("year"), "2013"); | ||
} | ||
|
||
@Test | ||
public void testParseSuccess2() throws IOException { | ||
// When | ||
List<Map<String, Object>> res = multiLineJsonCodec.parse(serializedRequestMultipleRows); | ||
|
||
|
||
// Then | ||
assertEquals(10, res.size()); | ||
|
||
for (int idx = 0; idx < res.size() - 1; idx++) { | ||
assertEquals(res.get(idx).containsKey("index"), true); | ||
Map<String, Object> innerMap = (Map<String, Object>) res.get(idx).get("index"); | ||
assertEquals(innerMap.get("_index"), "test-index"); | ||
assertEquals(innerMap.get("_id"), "id1"); | ||
assertEquals(res.get(idx+1).containsKey("text"), true); | ||
assertEquals(res.get(idx+1).get("text"), "text1"); | ||
assertEquals(res.get(idx+1).get("year"), "2013"); | ||
idx++; | ||
} | ||
} | ||
|
||
@Test | ||
public void testParseFailure() { | ||
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBad)); | ||
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmpty)); | ||
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmptyNewLines)); | ||
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadInvalidJson)); | ||
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadWithBlanks)); | ||
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadWithWhiteSpaces)); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
# OpenSearch API Source | ||
|
||
This is a source plugin that supports HTTP protocol. It supports [OpenSearch Bulk Document API](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/). All the paths and HTTP methods for [Bulk API operation](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/#path-and-http-methods) are supported. It will also support optional [bulk URL parameters](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/#url-parameters). | ||
|
||
## Usages | ||
To get started with OpenSearch API source, create the following `pipeline.yaml` configuration: | ||
```yaml | ||
source: | ||
opensearch_api: | ||
``` | ||
### Response status | ||
* `200`: the request data has been successfully written into the buffer. | ||
* `400`: the request data is either in mal-format or unsupported codec. | ||
* `408`: the request data fails to be written into the buffer within the timeout. | ||
* `413`: the request data size is larger than the configured capacity. | ||
* `429`: the request has been rejected due to the OpenSearch API source executor being in full capacity. | ||
|
||
## Configurations | ||
|
||
* port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```9202```. | ||
* path (Optional) => A `string` which represents the URI path for endpoint invocation. It should start with `/` and length should be at least 1. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/opensearch`. | ||
* health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false` | ||
* unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false` | ||
* request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```. | ||
* thread_count (Optional) => An `int` larger than 0 represents the number of threads to keep in the ScheduledThreadPool. Default is `200`. | ||
* max_connection_count (Optional) => An `int` larger than 0 represents the maximum allowed number of open connections. Default is `500`. | ||
* max_pending_requests (Optional) => An `int` larger than 0 represents the maximum allowed number of tasks in the ScheduledThreadPool work queue. Default is `1024`. | ||
* authentication (Optional) => An authentication configuration. By default, this runs an unauthenticated server. See below for more information. | ||
* compression (Optional) : The compression type applied on the client request payload. Defaults to `none`. Supported values are: | ||
* `none`: no compression | ||
* `gzip`: apply GZip de-compression on the incoming request. | ||
|
||
### Authentication Configurations | ||
|
||
By default, the OpenSearch API source input is unauthenticated. | ||
|
||
The following is an example of how to run the server with HTTP Basic authentication: | ||
|
||
```yaml | ||
source: | ||
opensearch_api: | ||
authentication: | ||
http_basic: | ||
username: my-user | ||
password: my_s3cr3t | ||
``` | ||
|
||
You can also explicitly disable authentication with: | ||
|
||
```yaml | ||
source: | ||
opensearch_api: | ||
authentication: | ||
unauthenticated: | ||
``` | ||
|
||
This plugin uses pluggable authentication for HTTP servers. To provide custom authentication, | ||
create a plugin which implements [`ArmeriaHttpAuthenticationProvider`](../armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/ArmeriaHttpAuthenticationProvider.java) | ||
|
||
|
||
### SSL | ||
|
||
* ssl(Optional) => A `boolean` that enables TLS/SSL. Default is ```false```. | ||
* ssl_certificate_file(Optional) => A `String` that represents the SSL certificate chain file path or AWS S3 path. S3 path example `s3://<bucketName>/<path>`. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`. | ||
* ssl_key_file(Optional) => A `String` that represents the SSL key file path or AWS S3 path. S3 path example `s3://<bucketName>/<path>`. Only decrypted key file is supported. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`. | ||
* use_acm_certificate_for_ssl(Optional) : A `boolean` that enables TLS/SSL using certificate and private key from AWS Certificate Manager (ACM). Default is `false`. | ||
* acm_certificate_arn(Optional) : A `String` that represents the ACM certificate ARN. ACM certificate take preference over S3 or local file system certificate. Required if `use_acm_certificate_for_ssl` is set to `true`. | ||
* acm_private_key_password(Optional): A `String` that represents the ACM private key password which that will be used to decrypt the private key. If it's not provided, a random password will be generated. | ||
* acm_certificate_timeout_millis(Optional) : An `int` that represents the timeout in milliseconds for ACM to get certificates. Default value is `120000`. | ||
* aws_region(Optional) : A `String` that represents the AWS region to use `ACM`, `S3`. Required if `use_acm_certificate_for_ssl` is set to `true` or `ssl_certificate_file` and `ssl_key_file` is `AWS S3`. | ||
|
||
### Example to enable SSL using OpenSSL | ||
|
||
Create the following OpenSearch API source configuration in your `pipeline.yaml`. | ||
|
||
```yaml | ||
source: | ||
opensearch_api: | ||
ssl: true | ||
ssl_certificate_file: "/full/path/to/certfile.crt" | ||
ssl_key_file: "/full/path/to/keyfile.key" | ||
``` | ||
|
||
Generate a private key named `keyfile.key`, along with a self-signed certificate file named `certfile.crt`. | ||
|
||
``` | ||
openssl req -nodes -new -x509 -keyout keyfile.key -out certfile.crt -subj "/L=test/O=Example Com Inc./OU=Example Com Inc. Root CA/CN=Example Com Inc. Root CA" | ||
``` | ||
Make sure to replace the paths for the `ssl_certificate_file` and `ssl_key_file` for the OpenSearch API source configuration with the actual paths of the files. | ||
- Use the following command to send a sample index action on the Bulk API request by setting the index `index = movies` in the body of the request. | ||
``` | ||
curl -k -XPOST -H "Content-Type: application/json" -d '{ "index": { "_index": "movies", "_id": "tt1979320" } } | ||
{ "title": "Rush", "year": 2013}' | ||
http://localhost:9202/opensearch/_bulk | ||
``` | ||
- Alternatively, use the following command to set the index `index = movies` in the path | ||
``` | ||
curl -k -XPOST -H "Content-Type: application/json" -d '{ "index": { "_index": "movies", "_id": "tt1979320" } } | ||
{ "title": "Rush", "year": 2013}' | ||
http://localhost:9202/opensearch/movies/_bulk | ||
``` | ||
# Metrics | ||
### Counter | ||
- `requestsReceived`: measures total number of requests received by `/opensearch` endpoint. | ||
- `requestsRejected`: measures total number of requests rejected (429 response status code) by OpenSearch API source plugin. | ||
- `successRequests`: measures total number of requests successfully processed (200 response status code) by OpenSearch API source plugin. | ||
- `badRequests`: measures total number of requests with invalid content type or format processed by OpenSearch API source plugin (400 response status code). | ||
- `requestTimeouts`: measures total number of requests that time out in the OpenSearch API source server (415 response status code). | ||
- `requestsTooLarge`: measures total number of requests of which the events size in the content is larger than the buffer capacity (413 response status code). | ||
- `internalServerError`: measures total number of requests processed by the OpenSearch API source with custom exception type (500 response status code). | ||
### Timer | ||
- `requestProcessDuration`: measures latency of requests processed by the OpenSearch API source plugin in seconds. | ||
### Distribution Summary | ||
- `payloadSize`: measures the distribution of incoming requests payload sizes in bytes. | ||
## Developer Guide | ||
This plugin is compatible with Java 14. See | ||
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) | ||
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-plugins:blocking-buffer') | ||
implementation project(':data-prepper-plugins:http-source-common') | ||
implementation project(':data-prepper-plugins:common') | ||
implementation project(':data-prepper-plugins:armeria-common') | ||
implementation libs.armeria.core | ||
implementation libs.commons.io | ||
implementation 'software.amazon.awssdk:acm' | ||
implementation 'software.amazon.awssdk:s3' | ||
implementation 'software.amazon.awssdk:apache-client' | ||
testImplementation project(':data-prepper-test-common') | ||
testImplementation project(':data-prepper-api').sourceSets.test.output | ||
testImplementation 'org.assertj:assertj-core:3.25.3' | ||
testImplementation testLibs.mockito.inline | ||
compileOnly 'org.projectlombok:lombok:1.18.20' | ||
annotationProcessor 'org.projectlombok:lombok:1.18.20' | ||
} | ||
|
||
jacocoTestCoverageVerification { | ||
dependsOn jacocoTestReport | ||
violationRules { | ||
rule { //in addition to core projects rule | ||
limit { | ||
minimum = 0.90 | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.