Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into tabularschema
Browse files Browse the repository at this point in the history
  • Loading branch information
omkarmmore95 authored Aug 8, 2023
2 parents c2553cb + 5262eea commit d82ad55
Show file tree
Hide file tree
Showing 157 changed files with 7,786 additions and 1,141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run basic grok end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:log:${{ matrix.test }}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:peerforwarder:localAggregateEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:peerforwarder:${{ matrix.test }}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanLatestReleaseCompatibilityEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span end-to-end tests with Gradle
run: ./gradlew -PopenTelemetryVersion=${{ matrix.otelVersion }} -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run raw-span latest release compatibility end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:rawSpanPeerForwarderEndToEndTest
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run service-map end-to-end tests with Gradle
run: ./gradlew -PendToEndJavaVersion=${{ matrix.java }} :e2e-test:trace:serviceMapPeerForwarderEndToEndTest
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Build with Gradle
run: ./gradlew build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run Open Distro docker
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Run OpenSearch docker
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/performance-test-compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Build performance tests with Gradle
run: ./gradlew :performance-test:compileGatlingJava
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV
Expand Down Expand Up @@ -62,7 +62,7 @@ jobs:
timeout-minutes: 30

steps:
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV
Expand All @@ -88,7 +88,7 @@ jobs:
timeout-minutes: 8

steps:
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/staging-resources-cdk-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
with:
node-version: '16'

- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2

- name: Install NPM Dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/third-party-generate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 11
- name: Checkout Data-Prepper
- name: Checkout Data Prepper
uses: actions/checkout@v2

- name: Generate Third Party Report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ public class SourcePartition<T> {

private final String partitionKey;
private final T partitionState;
private final Long partitionClosedCount;

private SourcePartition(final Builder<T> builder) {
Objects.requireNonNull(builder.partitionKey);

this.partitionKey = builder.partitionKey;
this.partitionState = builder.partitionState;
this.partitionClosedCount = builder.partitionClosedCount;
}

public String getPartitionKey() {
Expand All @@ -34,6 +36,10 @@ public Optional<T> getPartitionState() {
return Optional.ofNullable(partitionState);
}

public Long getPartitionClosedCount() {
return partitionClosedCount;
}

public static <T> Builder<T> builder(Class<T> clazz) {
return new Builder<>(clazz);
}
Expand All @@ -42,6 +48,7 @@ public static class Builder<T> {

private String partitionKey;
private T partitionState;
private Long partitionClosedCount;

public Builder(Class<T> clazz) {

Expand All @@ -57,6 +64,11 @@ public Builder<T> withPartitionState(final T partitionState) {
return this;
}

public Builder<T> withPartitionClosedCount(final Long partitionClosedCount) {
this.partitionClosedCount = partitionClosedCount;
return this;
}

public SourcePartition<T> build() {
return new SourcePartition<T>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.Test;

import java.util.Random;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -31,15 +32,18 @@ void sourcePartitionBuilderWithNullPartitionThrowsNullPointerException() {
void sourcePartitionBuilder_returns_expected_SourcePartition() {
final String partitionKey = UUID.randomUUID().toString();
final String partitionState = UUID.randomUUID().toString();
final Long partitionClosedCount = new Random().nextLong();

final SourcePartition<String> sourcePartition = SourcePartition.builder(String.class)
.withPartitionKey(partitionKey)
.withPartitionState(partitionState)
.withPartitionClosedCount(partitionClosedCount)
.build();

assertThat(sourcePartition, notNullValue());
assertThat(sourcePartition.getPartitionKey(), equalTo(partitionKey));
assertThat(sourcePartition.getPartitionState().isPresent(), equalTo(true));
assertThat(sourcePartition.getPartitionState().get(), equalTo(partitionState));
assertThat(sourcePartition.getPartitionClosedCount(), equalTo(partitionClosedCount));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String,
final SourcePartition<T> sourcePartition = SourcePartition.builder(partitionProgressStateClass)
.withPartitionKey(ownedPartitions.get().getSourcePartitionKey())
.withPartitionState(convertStringToPartitionProgressStateClass(ownedPartitions.get().getPartitionProgressState()))
.withPartitionClosedCount(ownedPartitions.get().getClosedCount())
.build();

partitionManager.setActivePartition(sourcePartition);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
public static final String INTERNAL_SERVER_ERROR = "internalServerError";

private final Counter requestTimeoutsCounter;
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

@Override
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) {
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception;

return handleExceptions(exceptionCause);
}

private Status handleExceptions(final Throwable e) {
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.CANCELLED);
}

internalServerErrorCounter.increment();
return createStatus(e, Status.INTERNAL);
}

private Status createStatus(final Throwable e, final Status status) {
final String message;
if (e instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = e.getMessage() == null ? status.getCode().name() : e.getMessage();
}

return status.withDescription(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.loghttp;
package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import com.linecorp.armeria.common.HttpResponse;
Expand All @@ -13,10 +17,11 @@
import io.micrometer.core.instrument.Counter;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class RequestExceptionHandler {
public class HttpRequestExceptionHandler implements ExceptionHandlerFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
Expand All @@ -27,31 +32,39 @@ public class RequestExceptionHandler {
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public RequestExceptionHandler(final PluginMetrics pluginMetrics) {
public HttpRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

public HttpResponse handleException(final Exception e) {
final String message = e.getMessage() == null? "" : e.getMessage();
return handleException(e, message);
@Override
public HttpResponse handleException(final ServiceRequestContext ctx, final HttpRequest req, final Throwable cause) {
final HttpStatus status = handleException(cause);
final String message;
if (cause instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = cause.getMessage() == null ? status.reasonPhrase() : cause.getMessage();
}

return HttpResponse.of(status, MediaType.ANY_TYPE, message);
}

public HttpResponse handleException(final Exception e, final String message) {
Objects.requireNonNull(message);
private HttpStatus handleException(final Throwable e) {
if (e instanceof IOException) {
badRequestsCounter.increment();
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, message);
} else if (e instanceof TimeoutException) {
return HttpStatus.BAD_REQUEST;
} else if (e instanceof TimeoutException || e instanceof RequestTimeoutException) {
requestTimeoutsCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, message);
return HttpStatus.REQUEST_TIMEOUT;
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_ENTITY_TOO_LARGE, MediaType.ANY_TYPE, message);
return HttpStatus.REQUEST_ENTITY_TOO_LARGE;
}

internalServerErrorCounter.increment();
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, MediaType.ANY_TYPE, message);
return HttpStatus.INTERNAL_SERVER_ERROR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class BadRequestException extends RuntimeException {
public BadRequestException(final String message, final Throwable cause) {
super(message, cause);
}
}
Loading

0 comments on commit d82ad55

Please sign in to comment.