Skip to content

Commit

Permalink
Support for Event Json input and output codecs (opensearch-project#4436)
Browse files Browse the repository at this point in the history
* Event Json input and output codecs

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified test case to check for event metadata attributes

Signed-off-by: Krishna Kondaka <[email protected]>

* Modified the coverage to 0.9

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixes for failing coverage tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed test coverage

Signed-off-by: Krishna Kondaka <[email protected]>

* Added more tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Added more tests for coverage

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed code coverage failure

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Apr 22, 2024
1 parent e66dac1 commit df17cc5
Show file tree
Hide file tree
Showing 12 changed files with 725 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.event;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
import java.util.HashSet;
import java.util.List;
Expand All @@ -23,16 +25,23 @@
*/
public class DefaultEventMetadata implements EventMetadata {

private final String eventType;
@JsonProperty("event_type")
private String eventType;

private final Instant timeReceived;
@JsonProperty("time_received")
private Instant timeReceived;

@JsonProperty("external_origination_time")
private Instant externalOriginationTime;

@JsonProperty("attributes")
private Map<String, Object> attributes;

@JsonProperty("tags")
private Set<String> tags;

private DefaultEventMetadata() {}

private DefaultEventMetadata(final Builder builder) {

checkNotNull(builder.eventType, "eventType cannot be null");
Expand All @@ -45,7 +54,8 @@ private DefaultEventMetadata(final Builder builder) {
this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes);

this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags);
this.externalOriginationTime = null;

this.externalOriginationTime = builder.externalOriginationTime;
}

private DefaultEventMetadata(final EventMetadata eventMetadata) {
Expand Down Expand Up @@ -163,6 +173,7 @@ static EventMetadata fromEventMetadata(final EventMetadata eventMetadata) {
public static class Builder {
private String eventType;
private Instant timeReceived;
private Instant externalOriginationTime;
private Map<String, Object> attributes;
private Set<String> tags;

Expand All @@ -188,6 +199,17 @@ public Builder withTimeReceived(final Instant timeReceived) {
return this;
}

/**
* Sets the external origination Time.
* @param externalOriginationTime the time an event was received
* @return returns the builder
* @since 2.8
*/
public Builder withExternalOriginationTime(final Instant externalOriginationTime) {
this.externalOriginationTime = externalOriginationTime;
return this;
}

/**
* Sets the attributes. An empty immutable map is the default value.
* @param attributes a map of key-value pair attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ public void testSetAttribute(String key, final Object value) {
assertThat(eventMetadata.getAttribute(key), equalTo(value));
}

@Test
public void test_with_ExternalOriginationTime() {
Instant now = Instant.now();
eventMetadata = DefaultEventMetadata.builder()
.withEventType(testEventType)
.withTimeReceived(testTimeReceived)
.withExternalOriginationTime(now)
.build();
assertThat(eventMetadata.getExternalOriginationTime(), equalTo(now));
}

@Test
public void testAttributes_without_attributes_is_empty() {
eventMetadata = DefaultEventMetadata.builder()
Expand Down
38 changes: 38 additions & 0 deletions data-prepper-plugins/event-json-codecs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-ion'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.0'
implementation 'org.apache.parquet:parquet-common:1.13.1'
testImplementation project(':data-prepper-test-common')
}

test {
useJUnitPlatform()
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 0.9
}
}
}
}

check.dependsOn jacocoTestCoverageVerification

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

public class EventJsonDefines {
public static final String VERSION = "version";
public static final String EVENTS = "events";
public static final String DATA = "data";
public static final String METADATA = "metadata";
public static final String ATTRIBUTES = "attributes";
public static final String TAGS = "tags";
public static final String TIME_RECEIVED = "timeReceived";
public static final String EXTERNAL_ORIGINATION_TIME = "externalOriginationTime";
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.function.Consumer;
import java.util.Map;
import java.util.List;
import java.util.Objects;

/**
* An implementation of {@link InputCodec} which parses JSON Objects for arrays.
*/
@DataPrepperPlugin(name = "event_json", pluginType = InputCodec.class, pluginConfigurationType = EventJsonInputCodecConfig.class)
public class EventJsonInputCodec implements InputCodec {
private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class);
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
private final JsonFactory jsonFactory = new JsonFactory();
private final Boolean overrideTimeReceived;

@DataPrepperPluginConstructor
public EventJsonInputCodec(final EventJsonInputCodecConfig config) {
this.overrideTimeReceived = config.getOverrideTimeReceived();
}

private boolean isCompatibleVersion(Map<String, Object> json) {
final String versionStr = (String)json.get(EventJsonDefines.VERSION);
final DataPrepperVersion version = DataPrepperVersion.parse(versionStr);

final DataPrepperVersion currentVersion = DataPrepperVersion.getCurrentVersion();
return currentVersion.compatibleWith(version);
}

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);

final JsonParser jsonParser = jsonFactory.createParser(inputStream);

while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) {
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);
if (!isCompatibleVersion(innerJson)) {
return;
}
final List<Map<String, Object>> events = (List<Map<String, Object>>)innerJson.get(EventJsonDefines.EVENTS);
for (Map<String, Object> eventMap: events) {
final Record<Event> record = createRecord(eventMap);
if (record != null) {
eventConsumer.accept(record);
}
}
}
}
}

private Record<Event> createRecord(final Map<String, Object> innerJson) {
Map<String, Object> metadata = (Map<String, Object>)innerJson.get(EventJsonDefines.METADATA);
EventMetadata eventMetadata = objectMapper.convertValue(metadata, DefaultEventMetadata.class);
Map<String, Object> data = (Map<String, Object>)innerJson.get(EventJsonDefines.DATA);
if (data == null) {
return null;
}
if (!overrideTimeReceived) {
eventMetadata = new DefaultEventMetadata.Builder()
.withEventType(EventType.LOG.toString())
.withAttributes(eventMetadata.getAttributes())
.withTimeReceived(Instant.now())
.withTags(eventMetadata.getTags())
.withExternalOriginationTime(eventMetadata.getExternalOriginationTime())
.build();
}
final JacksonLog.Builder logBuilder = JacksonLog.builder()
.withData(data)
.withEventMetadata(eventMetadata)
.getThis();
final JacksonEvent event = (JacksonEvent)logBuilder.build();
final Record<Event> record = new Record<>(event);
return record;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.annotation.JsonProperty;

public class EventJsonInputCodecConfig {
@JsonProperty("override_time_received")
private Boolean overrideTimeReceived = false;

public Boolean getOverrideTimeReceived() {
return overrideTimeReceived;
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;

@DataPrepperPlugin(name = "event_json", pluginType = OutputCodec.class, pluginConfigurationType = EventJsonOutputCodecConfig.class)
public class EventJsonOutputCodec implements OutputCodec {
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
static final String EVENT_JSON = "event_json";
private static final JsonFactory factory = new JsonFactory();
private final EventJsonOutputCodecConfig config;
private JsonGenerator generator;
private OutputCodecContext codecContext;

@DataPrepperPluginConstructor
public EventJsonOutputCodec(final EventJsonOutputCodecConfig config) {
this.config = config;
}

@Override
public String getExtension() {
return EVENT_JSON;
}

@Override
public void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException {
Objects.requireNonNull(outputStream);
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
generator.writeStartObject();
generator.writeFieldName(EventJsonDefines.VERSION);
objectMapper.writeValue(generator, DataPrepperVersion.getCurrentVersion().toString());
generator.writeFieldName(EventJsonDefines.EVENTS);
generator.writeStartArray();
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
generator.writeEndArray();
generator.writeEndObject();
generator.close();
outputStream.flush();
outputStream.close();
}

@Override
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
generator.writeStartObject();
Objects.requireNonNull(event);
getDataMapToSerialize(event);
generator.flush();
generator.writeEndObject();
}

private Map<String, Object> getDataMapToSerialize(Event event) throws IOException {
Map<String, Object> dataMap = event.toMap();
generator.writeFieldName(EventJsonDefines.DATA);
objectMapper.writeValue(generator, dataMap);
Map<String, Object> metadataMap = objectMapper.convertValue(event.getMetadata(), Map.class);
generator.writeFieldName(EventJsonDefines.METADATA);
objectMapper.writeValue(generator, metadataMap);
return dataMap;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.codec.event_json;

public class EventJsonOutputCodecConfig {
}

Loading

0 comments on commit df17cc5

Please sign in to comment.