Skip to content

Commit

Permalink
OpenSearch API source implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 committed Jun 5, 2024
1 parent 2180a69 commit 9f17acb
Show file tree
Hide file tree
Showing 20 changed files with 1,994 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ private HttpStatus handleException(final Throwable e) {
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return HttpStatus.REQUEST_ENTITY_TOO_LARGE;
} else if (e instanceof IllegalArgumentException) {
badRequestsCounter.increment();
return HttpStatus.BAD_REQUEST;
}

internalServerErrorCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.opensearch.dataprepper.http.codec;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.common.HttpData;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class MultiLineJsonCodec implements Codec<List<Map<String, Object>>> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String REGEX = "\\r?\\n";
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE =
new TypeReference<Map<String, Object>>() {};

@Override
public List<Map<String, Object>> parse(HttpData httpData) throws IOException {
List<Map<String, Object>> jsonListData = new ArrayList<>();

String requestBody = new String(httpData.toInputStream().readAllBytes(), StandardCharsets.UTF_8);
List<String> jsonLines = Arrays.asList(requestBody.split(REGEX));

for (String jsonLine: jsonLines) {
if (isInvalidLine(jsonLine)) {
throw new IOException("Error processing request payload.");
}
jsonListData.add(objectMapper.readValue(jsonLine, MAP_TYPE_REFERENCE));
}
return jsonListData;
}

private static boolean isInvalidLine(final String str) {
return str == null || str.isEmpty() || str.isBlank();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.opensearch.dataprepper.http.codec;

import com.linecorp.armeria.common.HttpData;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class MultiLineJsonCodecTest {
private final HttpData serializedRequest = HttpData.ofUtf8("" +
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" +
"{ \"text\": \"text1\", \"year\": \"2013\" }");

private final HttpData serializedRequestMultipleRows = HttpData.ofUtf8("" +
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" +
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" +
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" +
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" +
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" +
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" +
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" +
"{ \"text\": \"text1\", \"year\": \"2013\" }\n" +
"{ \"index\": { \"_index\": \"test-index\", \"_id\": \"id1\" } }\n" +
"{ \"text\": \"text1\", \"year\": \"2013\" }\n");

private final HttpData serializedRequestBad = HttpData.ofUtf8("{}\n\n{}");
private final HttpData serializedRequestBadWithBlanks = HttpData.ofUtf8("{}\n \n ");
private final HttpData serializedRequestBadWithWhiteSpaces = HttpData.ofUtf8("\t\n\r\f {}");
private final HttpData serializedRequestBadEmpty = HttpData.ofUtf8("");
private final HttpData serializedRequestBadEmptyNewLines = HttpData.ofUtf8("\n\n\n\n\n\n\n \n");
private final HttpData serializedRequestBadInvalidJson = HttpData.ofUtf8("{\"text\":");

private final MultiLineJsonCodec multiLineJsonCodec = new MultiLineJsonCodec();

@Test
public void testParseSuccess() throws IOException {
// When
List<Map<String, Object>> res = multiLineJsonCodec.parse(serializedRequest);

// Then
assertEquals(2, res.size());
assertEquals(res.get(0).containsKey("index"), true);
Map<String, Object> innerMap = (Map<String, Object>) res.get(0).get("index");
assertEquals(innerMap.get("_index"), "test-index");
assertEquals(innerMap.get("_id"), "id1");
assertEquals(res.get(1).containsKey("text"), true);
assertEquals(res.get(1).get("text"), "text1");
assertEquals(res.get(1).get("year"), "2013");
}

@Test
public void testParseSuccess2() throws IOException {
// When
List<Map<String, Object>> res = multiLineJsonCodec.parse(serializedRequestMultipleRows);


// Then
assertEquals(10, res.size());

for (int idx = 0; idx < res.size() - 1; idx++) {
assertEquals(res.get(idx).containsKey("index"), true);
Map<String, Object> innerMap = (Map<String, Object>) res.get(idx).get("index");
assertEquals(innerMap.get("_index"), "test-index");
assertEquals(innerMap.get("_id"), "id1");
assertEquals(res.get(idx+1).containsKey("text"), true);
assertEquals(res.get(idx+1).get("text"), "text1");
assertEquals(res.get(idx+1).get("year"), "2013");
idx++;
}
}

@Test
public void testParseFailure() {
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBad));
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmpty));
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadEmptyNewLines));
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadInvalidJson));
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadWithBlanks));
assertThrows(IOException.class, () -> multiLineJsonCodec.parse(serializedRequestBadWithWhiteSpaces));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ void testDefault() {
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath());
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort());
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath());

}
}
129 changes: 129 additions & 0 deletions data-prepper-plugins/opensearch-api-source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# OpenSearch API Source

This is a source plugin that supports HTTP protocol. It supports [OpenSearch Bulk Document API](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/). All the paths and HTTP methods for [Bulk API operation](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/#path-and-http-methods) are supported. It will also support optional [bulk URL parameters](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/#url-parameters).

## Usages
To get started with OpenSearch API source, create the following `pipeline.yaml` configuration:
```yaml
source:
opensearch_api:
```
### Response status
* `200`: the request data has been successfully written into the buffer.
* `400`: the request data is either in mal-format or unsupported codec.
* `408`: the request data fails to be written into the buffer within the timeout.
* `413`: the request data size is larger than the configured capacity.
* `429`: the request has been rejected due to the OpenSearch API source executor being in full capacity.

## Configurations

* port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```9202```.
* path (Optional) => A `string` which represents the URI path for endpoint invocation. It should start with `/` and length should be at least 1. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/opensearch`.
* health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false`
* unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false`
* request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```.
* thread_count (Optional) => An `int` larger than 0 represents the number of threads to keep in the ScheduledThreadPool. Default is `200`.
* max_connection_count (Optional) => An `int` larger than 0 represents the maximum allowed number of open connections. Default is `500`.
* max_pending_requests (Optional) => An `int` larger than 0 represents the maximum allowed number of tasks in the ScheduledThreadPool work queue. Default is `1024`.
* authentication (Optional) => An authentication configuration. By default, this runs an unauthenticated server. See below for more information.
* compression (Optional) : The compression type applied on the client request payload. Defaults to `none`. Supported values are:
* `none`: no compression
* `gzip`: apply GZip de-compression on the incoming request.

### Authentication Configurations

By default, the OpenSearch API source input is unauthenticated.

The following is an example of how to run the server with HTTP Basic authentication:

```yaml
source:
opensearch_api:
authentication:
http_basic:
username: my-user
password: my_s3cr3t
```

You can also explicitly disable authentication with:

```yaml
source:
opensearch_api:
authentication:
unauthenticated:
```

This plugin uses pluggable authentication for HTTP servers. To provide custom authentication,
create a plugin which implements [`ArmeriaHttpAuthenticationProvider`](../armeria-common/src/main/java/org/opensearch/dataprepper/armeria/authentication/ArmeriaHttpAuthenticationProvider.java)


### SSL

* ssl(Optional) => A `boolean` that enables TLS/SSL. Default is ```false```.
* ssl_certificate_file(Optional) => A `String` that represents the SSL certificate chain file path or AWS S3 path. S3 path example `s3://<bucketName>/<path>`. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`.
* ssl_key_file(Optional) => A `String` that represents the SSL key file path or AWS S3 path. S3 path example `s3://<bucketName>/<path>`. Only decrypted key file is supported. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`.
* use_acm_certificate_for_ssl(Optional) : A `boolean` that enables TLS/SSL using certificate and private key from AWS Certificate Manager (ACM). Default is `false`.
* acm_certificate_arn(Optional) : A `String` that represents the ACM certificate ARN. ACM certificate take preference over S3 or local file system certificate. Required if `use_acm_certificate_for_ssl` is set to `true`.
* acm_private_key_password(Optional): A `String` that represents the ACM private key password which that will be used to decrypt the private key. If it's not provided, a random password will be generated.
* acm_certificate_timeout_millis(Optional) : An `int` that represents the timeout in milliseconds for ACM to get certificates. Default value is `120000`.
* aws_region(Optional) : A `String` that represents the AWS region to use `ACM`, `S3`. Required if `use_acm_certificate_for_ssl` is set to `true` or `ssl_certificate_file` and `ssl_key_file` is `AWS S3`.

### Example to enable SSL using OpenSSL

Create the following OpenSearch API source configuration in your `pipeline.yaml`.

```yaml
source:
opensearch_api:
ssl: true
ssl_certificate_file: "/full/path/to/certfile.crt"
ssl_key_file: "/full/path/to/keyfile.key"
```

Generate a private key named `keyfile.key`, along with a self-signed certificate file named `certfile.crt`.

```
openssl req -nodes -new -x509 -keyout keyfile.key -out certfile.crt -subj "/L=test/O=Example Com Inc./OU=Example Com Inc. Root CA/CN=Example Com Inc. Root CA"
```
Make sure to replace the paths for the `ssl_certificate_file` and `ssl_key_file` for the OpenSearch API source configuration with the actual paths of the files.
- Use the following command to send a sample index action on the Bulk API request by setting the index `index = movies` in the body of the request.
```
curl -k -XPOST -H "Content-Type: application/json" -d '{ "index": { "_index": "movies", "_id": "tt1979320" } }
{ "title": "Rush", "year": 2013}'
http://localhost:9202/opensearch/_bulk
```
- Alternatively, use the following command to set the index `index = movies` in the path
```
curl -k -XPOST -H "Content-Type: application/json" -d '{ "index": { "_index": "movies", "_id": "tt1979320" } }
{ "title": "Rush", "year": 2013}'
http://localhost:9202/opensearch/movies/_bulk
```
# Metrics
### Counter
- `requestsReceived`: measures total number of requests received by `/opensearch` endpoint.
- `requestsRejected`: measures total number of requests rejected (429 response status code) by OpenSearch API source plugin.
- `successRequests`: measures total number of requests successfully processed (200 response status code) by OpenSearch API source plugin.
- `badRequests`: measures total number of requests with invalid content type or format processed by OpenSearch API source plugin (400 response status code).
- `requestTimeouts`: measures total number of requests that time out in the OpenSearch API source server (415 response status code).
- `requestsTooLarge`: measures total number of requests of which the events size in the content is larger than the buffer capacity (413 response status code).
- `internalServerError`: measures total number of requests processed by the OpenSearch API source with custom exception type (500 response status code).
### Timer
- `requestProcessDuration`: measures latency of requests processed by the OpenSearch API source plugin in seconds.
### Distribution Summary
- `payloadSize`: measures the distribution of incoming requests payload sizes in bytes.
## Developer Guide
This plugin is compatible with Java 14. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
38 changes: 38 additions & 0 deletions data-prepper-plugins/opensearch-api-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:http-source-common')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:armeria-common')
implementation libs.armeria.core
implementation libs.commons.io
implementation 'software.amazon.awssdk:acm'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-api').sourceSets.test.output
testImplementation 'org.assertj:assertj-core:3.25.3'
testImplementation testLibs.mockito.inline
compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
}
}
}
}
Loading

0 comments on commit 9f17acb

Please sign in to comment.