Skip to content

Commit

Permalink
-Support for kafka-sink
Browse files Browse the repository at this point in the history
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode committed Aug 21, 2023
2 parents 6050fa6 + 2f2c047 commit 19da476
Show file tree
Hide file tree
Showing 128 changed files with 3,807 additions and 2,602 deletions.
73 changes: 73 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ name: Release Artifacts

on:
workflow_dispatch:
inputs:
release-major-tag:
description: 'Whether to create major tag of docker image or not. This will create a tag such as 2.3 which points to this version.'
required: true
release-latest-tag:
description: >
'Whether to create latest tag of docker image or not. This will update the latest tag to point to this version. You should set this when releasing the latest version, but not patches to old versions.'
required: true

permissions:
id-token: write
Expand Down Expand Up @@ -95,3 +103,68 @@ jobs:

- name: Smoke Test Tarball Files
run: ./release/smoke-tests/run-tarball-files-smoke-tests.sh -v ${{ env.version }} -u ${{ secrets.ARCHIVES_PUBLIC_URL }} -n ${{ github.run_number }} -i ${{ matrix.image }} -t ${{ matrix.archive }}

protomote:
runs-on: ubuntu-latest
if: success() || failure()
needs: [build, validate-docker, validate-archive]
permissions:
contents: write
issues: write

steps:
- name: Checkout Data Prepper
uses: actions/checkout@v3
- name: Get Version
run: grep '^version=' gradle.properties >> $GITHUB_ENV

- name: Get Approvers
id: get_approvers
run: |
echo "approvers=$(cat .github/CODEOWNERS | grep @ | tr -d '* ' | sed 's/@/,/g' | sed 's/,//1')" >> $GITHUB_OUTPUT
- uses: trstringer/manual-approval@v1
with:
secret: ${{ github.TOKEN }}
approvers: ${{ steps.get_approvers.outputs.approvers }}
minimum-approvals: 2
issue-title: 'Release Data Prepper : ${{ env.version }}'
issue-body: >
Please approve or deny the release of Data Prepper.
**VERSION**: ${{ env.version }}
**BUILD NUMBER**: ${{ github.run_number }}
**RELEASE MAJOR TAG**: ${{ github.event.inputs.release-major-tag }}
**RELEASE LATEST TAG**: ${{ github.event.inputs.release-latest-tag }}
exclude-workflow-initiator-as-approver: false

- name: Create Release Description
run: |
echo 'version: ${{ env.version }}' > release-description.yaml
echo 'build_number: ${{ github.run_number }}' >> release-description.yaml
echo 'release_major_tag: ${{ github.event.inputs.release-major-tag }}' >> release-description.yaml
echo 'release_latest_tag: ${{ github.event.inputs.release-latest-tag }}' >> release-description.yaml
- name: Create tag
uses: actions/github-script@v6
with:
github-token: ${{ github.TOKEN }}
script: |
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: 'refs/tags/${{ env.version }}',
sha: context.sha
})
- name: Draft release
uses: softprops/action-gh-release@v1
with:
draft: true
name: '${{ env.version }}'
tag_name: 'refs/tags/${{ env.version }}'
files: |
release-description.yaml
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ subprojects {

configure(subprojects.findAll {it.name != 'data-prepper-api'}) {
dependencies {
implementation platform('software.amazon.awssdk:bom:2.17.264')
implementation platform('software.amazon.awssdk:bom:2.20.67')
implementation 'jakarta.validation:jakarta.validation-api:3.0.2'
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ public interface OutputCodec {
*/
String getExtension();

/**
* Returns true if this codec has an internal compression. That is, the entire
* {@link OutputStream} should not be compressed.
* <p>
* When this value is true, sinks should not attempt to encrypt the final {@link OutputStream}
* at all.
* <p>
* For example, Parquet compression happens within the file. Each column chunk
* is compressed independently.
*
* @return True if the compression is internal to the codec; false if whole-file compression is ok.
*/
default boolean isCompressionInternal() {
return false;
}

default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Represents an extension of the {@link PluginModel} which is specific to Sink
Expand Down Expand Up @@ -120,10 +118,11 @@ private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes,
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
super(pluginSettings);
this.routes = routes != null ? routes : Collections.emptyList();
this.includeKeys = includeKeys != null ? preprocessingKeys(includeKeys) : Collections.emptyList();
this.excludeKeys = excludeKeys != null ? preprocessingKeys(excludeKeys) : Collections.emptyList();
this.includeKeys = includeKeys != null ? includeKeys : Collections.emptyList();
this.excludeKeys = excludeKeys != null ? excludeKeys : Collections.emptyList();
this.tagsTargetKey = tagsTargetKey;
validateConfiguration();
validateKeys();
}

void validateConfiguration() {
Expand All @@ -132,24 +131,18 @@ void validateConfiguration() {
}
}


/**
* Pre-processes a list of Keys and returns a sorted list
* The keys must start with `/` and not end with `/`
*
* @param keys a list of raw keys
* @return a sorted processed keys
* Validates both include and exclude keys if they contain /
*/
private List<String> preprocessingKeys(final List<String> keys) {
if (keys.contains("/")) {
return new ArrayList<>();
}
List<String> result = keys.stream()
.map(k -> k.startsWith("/") ? k : "/" + k)
.map(k -> k.endsWith("/") ? k.substring(0, k.length() - 1) : k)
.collect(Collectors.toList());
Collections.sort(result);
return result;
private void validateKeys() {
includeKeys.forEach(key -> {
if(key.contains("/"))
throw new InvalidPluginConfigurationException("include_keys cannot contain /");
});
excludeKeys.forEach(key -> {
if(key.contains("/"))
throw new InvalidPluginConfigurationException("exclude_keys cannot contain /");
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;

/**
* Data Prepper Output Codec Context class.
Expand All @@ -18,6 +19,7 @@ public class OutputCodecContext {

private final List<String> includeKeys;
private final List<String> excludeKeys;
private final Predicate<String> inclusionPredicate;

public OutputCodecContext() {
this(null, Collections.emptyList(), Collections.emptyList());
Expand All @@ -28,6 +30,14 @@ public OutputCodecContext(String tagsTargetKey, List<String> includeKeys, List<S
this.tagsTargetKey = tagsTargetKey;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;

if (includeKeys != null && !includeKeys.isEmpty()) {
inclusionPredicate = k -> includeKeys.contains(k);
} else if (excludeKeys != null && !excludeKeys.isEmpty()) {
inclusionPredicate = k -> !excludeKeys.contains(k);
} else {
inclusionPredicate = k -> true;
}
}


Expand All @@ -49,4 +59,8 @@ public List<String> getIncludeKeys() {
public List<String> getExcludeKeys() {
return excludeKeys;
}

public boolean shouldIncludeKey(String key) {
return inclusionPredicate.test(key);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand All @@ -19,12 +19,17 @@
import java.util.Set;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.mock;

public class OutputCodecTest {
@Test
void isCompressionInternal_returns_false() {
OutputCodec objectUnderTest = mock(OutputCodec.class, InvocationOnMock::callRealMethod);

@BeforeEach
public void setUp() {
assertThat(objectUnderTest.isCompressionInternal(), equalTo(false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,27 +144,41 @@ void serialize_with_just_pluginModel() throws IOException {
}

@Test
void sinkModel_with_include_keys() throws IOException {
void sinkModel_with_include_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "/abc", "efg/"), null, pluginSettings);
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "abc", "efg"), null, pluginSettings);

assertThat(sinkModel.getExcludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));
assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("bcd", "abc", "efg")));

}

@Test
void sinkModel_with_exclude_keys() throws IOException {
void sinkModel_with_invalid_include_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/"), Arrays.asList("bcd", "/abc", "efg/"), pluginSettings);
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/bcd"), List.of(), pluginSettings));
}

@Test
void sinkModel_with_exclude_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), pluginSettings);

assertThat(sinkModel.getIncludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("/abc", "/bcd", "/efg")));
assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("abc", "bcd", "efg")));

}

@Test
void sinkModel_with_invalid_exclude_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), List.of("/bcd"), pluginSettings));
}



@Test
void sinkModel_with_both_include_and_exclude_keys() throws IOException {
void sinkModel_with_both_include_and_exclude_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("abc"), List.of("bcd"), pluginSettings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNull;

public class OutputCodecContextTest {


@Test
public void testOutputCodecContextBasic() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
Expand All @@ -32,8 +31,6 @@ public void testOutputCodecContextBasic() {
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));


}

@Test
Expand All @@ -53,7 +50,43 @@ public void testOutputCodecContextAdapter() {
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));
}

@Test
void shouldIncludeKey_returns_expected_when_no_include_exclude() {
OutputCodecContext objectUnderTest = new OutputCodecContext(null, null, null);
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
}

@Test
void shouldIncludeKey_returns_expected_when_empty_lists_for_include_exclude() {
OutputCodecContext objectUnderTest = new OutputCodecContext(null, Collections.emptyList(), Collections.emptyList());
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
}

@Test
void shouldIncludeKey_returns_expected_when_includeKey() {
String includeKey1 = UUID.randomUUID().toString();
String includeKey2 = UUID.randomUUID().toString();
final List<String> includeKeys = List.of(includeKey1, includeKey2);

OutputCodecContext objectUnderTest = new OutputCodecContext(null, includeKeys, null);

assertThat(objectUnderTest.shouldIncludeKey(includeKey1), equalTo(true));
assertThat(objectUnderTest.shouldIncludeKey(includeKey2), equalTo(true));
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(false));
}

@Test
void shouldIncludeKey_returns_expected_when_excludeKey() {
String excludeKey1 = UUID.randomUUID().toString();
String excludeKey2 = UUID.randomUUID().toString();
final List<String> excludeKeys = List.of(excludeKey1, excludeKey2);

OutputCodecContext objectUnderTest = new OutputCodecContext(null, null, excludeKeys);

assertThat(objectUnderTest.shouldIncludeKey(excludeKey1), equalTo(false));
assertThat(objectUnderTest.shouldIncludeKey(excludeKey2), equalTo(false));
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class HeapCircuitBreaker implements InnerCircuitBreaker, AutoCloseable {
scheduledExecutorService
.scheduleAtFixedRate(this::checkMemory, 0L, checkInterval.toMillis(), TimeUnit.MILLISECONDS);

LOG.info("Heap circuit breaker with usage of {} bytes.", usageBytes);
LOG.info("Circuit breaker heap limit is set to {} bytes.", usageBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public synchronized void shutdown() {
stopRequested.set(true);
} catch (Exception ex) {
LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " +
"proceeding with termination of process workers", name);
"proceeding with termination of process workers", name, ex);
}

shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis(), "processor");
Expand Down
Loading

0 comments on commit 19da476

Please sign in to comment.