Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New OpenSearch API source implementation #4603

Closed
wants to merge 0 commits into from

Conversation

sb2k16
Copy link
Member

@sb2k16 sb2k16 commented Jun 5, 2024

Description

In order for DataPrepper to support all OpenSearch Document API(s), we need to build a new source similar to the existing http source. This pull request is intended to implement a new OpenSearch API source like opensearch_api similar to http source. This source should support the Document API Bulk.

This pull request includes the following:

  1. Path and HTTP methods:
    • POST _bulk
    • POST <index>/_bulk
  2. Optional URL parameters
    • pipeline and routing. (TODO: pipeline parameter handling on Sink side)
  3. Multi-line JSON Bulk Request payload

Example pipeline configuration with opensearch_api source looks like:

simple-sample-pipeline:
  source:
    opensearch_api:
      path: "/opensearch"
      port: 9202
  sink:
   ...

Issues Resolved

Contributes to #248

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

List<Map<String, Object>> jsonListData = new ArrayList<>();

String requestBody = new String(httpData.toInputStream().readAllBytes(), StandardCharsets.UTF_8);
List<String> jsonLines = Arrays.asList(requestBody.split(REGEX));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be better to create the Pattern in the constructor. Then you can call splitPattern.split(requestBody)). This should avoid compilation each time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

violationRules {
rule { //in addition to core projects rule
limit {
minimum = 0.90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new project. Can we aim for 100% coverage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
LOG.info("Started OpenSearch API source on port " + sourceConfig.getPort() + "...");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.info("Started OpenSearch API source on port " + sourceConfig.getPort() + "...");
LOG.info("Started OpenSearch API source on port {}.", sourceConfig.getPort());

Let's avoid these ellipses at the end. They are unclear and make it seem that more is coming.

Also, please use SLF4J interpolation over string concatenation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.


public class OpenSearchAPISourceConfig extends BaseHttpServerConfig {

static final String DEFAULT_ENDPOINT_URI = "/opensearch";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default should just be / since this matches existing OpenSearch domains.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

import java.util.Arrays;
import java.util.ArrayList;

/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run a code formatter over your changes. There is a lot of whitespace that is off.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
if (server == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot here that is similar to the http source. Can we extend your work from #4570 to have more of this shared in common?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.


@Post("/_bulk")
public HttpResponse doPostBulk(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest,
@Param("pipeline") Optional<String> pipeline, @Param("routing") Optional<String> routing) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing Optional in as a parameter is an anti-pattern. You don't actually know if it is null or not. So you have tw checks you need to make pipeline != null && pipeline.isPresent(). Just take in a String and expect it to possibly be null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

@Post("/{index}/_bulk")
public HttpResponse doPostBulkIndex(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest, @Param("index") Optional<String> index,
@Param("pipeline") Optional<String> pipeline, @Param("routing") Optional<String> routing) throws Exception {
requestsReceivedCounter.increment();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consolidate all of this logic in processBulkRequest. The only thing that each of these methods should do is create the BulkAPIRequestParams and then callprocessBulkRequest.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable . I am going to address this.

dlvenable
dlvenable previously approved these changes Jun 14, 2024
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactoring work!


private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAPIService.class);

// TODO: support other data-types as request body, e.g. json_lines, msgpack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO really true? I think this is the only codec we need for _bulk.

.scheme(SessionProtocol.HTTP)
.authority(AUTHORITY)
.method(HttpMethod.GET)
.path("/health")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a health check endpoint? This is not something that the OpenSearch API would normally have.

If anything, maybe we should support the root API?

GET /

We could add this in a follow-on PR. But, I think we want to remove the /health endpoint.

Do you want to create a follow-on PR for the root endpoint?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I will resolve this. The default health check endpoint is /health which is the default endpoint for Http source. I will modify the BastHttpSource implementation to ensure that the OpenSearch API source can control the endpoint / for health check.

private static final ObjectMapper mapper = new ObjectMapper();
private final String PLUGIN_NAME = "opensearch_api";
private final String TEST_PIPELINE_NAME = "test_pipeline";
private final String TEST_INDEX = "test-index";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have this be a variable generated in @BeforeEach.

private String testIndex;

...

@BeforeEach
void setUp() {
  testIndex = UUID.randomUUID().toString();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I will resolve this.

class OpenSearchAPISourceTest {
private static final ObjectMapper mapper = new ObjectMapper();
private final String PLUGIN_NAME = "opensearch_api";
private final String TEST_PIPELINE_NAME = "test_pipeline";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also make this a random value generated in the setup method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I will resolve this.

integerHashMap.put("batch_size", 1);
final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap);
pluginSetting.setPipelineName(TEST_PIPELINE_NAME);
return new BlockingBuffer<>(pluginSetting);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a mock buffer instead?

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testBulkRequestAPIJsonResponse413(boolean includeIndexInPath) throws JsonProcessingException {
testBulkRequestJsonResponse413(includeIndexInPath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's inline this. We don't need an extra method here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I will resolve this.


// Fill in the buffer
WebClient.of().execute(testRequestHeaders, testHttpData).aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this expecting an OK? Shouldn't it be the 408?

private final HttpData serializedRequestBadEmptyNewLines = HttpData.ofUtf8("\n\n\n\n\n\n\n \n");
private final HttpData serializedRequestBadInvalidJson = HttpData.ofUtf8("{\"text\":");

private final MultiLineJsonCodec multiLineJsonCodec = new MultiLineJsonCodec();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the createObjectUnderTest() method pattern. It allows more flexibility as tests grow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I will resolve this.


try {
if (buffer.isByteBuffer()) {
buffer.writeBytes(content.array(), null, bufferWriteTimeoutInMillis);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will probably want to support chunking. But, we can do this in a follow-on PR. Can you create a PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dlvenable. I will create a separate PR to follow up on adding the chunking functionality.

* Bulk API supports query parameters "pipeline", "routing" and "refresh"
*/
@Blocking
public class OpenSearchAPIService implements BaseHttpService {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to support end-to-end acknowledgements, but we can do this in a follow-on PR. Can you create a GitHub issue to track this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable
Copy link
Member

Replaced by #5024.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants