Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Oct 18, 2023
1 parent ad8faa3 commit fb82aca
Show file tree
Hide file tree
Showing 22 changed files with 188 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.io.Serializable;
import java.util.function.Consumer;

public interface ByteDecoder extends Serializable{
public interface ByteDecoder extends Serializable {
/**
* Parses an {@link InputStream}. Implementors should call the {@link Consumer} for each
* {@link Record} loaded from the {@link InputStream}.
Expand All @@ -23,6 +23,4 @@ public interface ByteDecoder extends Serializable{
*/
void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException;

// ByteDecoder getDecoder();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

public interface HasByteDecoder {
default ByteDecoder getDecoder() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ public class JsonDecoder implements ByteDecoder {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

/*
public ByteDecoder getDecoder() {
return this;
}
*/

public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(eventConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,6 @@ public Object getAttributeOrDefault(final String attribute, final Object default
return settings == null ? defaultValue : settings.getOrDefault(attribute, defaultValue);
}

public <T> T getTypedAttribute(final String attribute, final Class<T> type) {
Object object = getAttributeOrDefault(attribute, Collections.emptyList());
if (object == null) {
return null;
}

checkObjectForListType(attribute, object, type);
return (T)object;

}


/**
* Returns the value of the specified attribute as integer, or {@code defaultValue} if this settings contains no
* value for the attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,7 @@ public interface PluginFactory {
* @return A new instance of your plugin, configured
* @since 1.2
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting);

/**
* Loads a new instance of a plugin.
*
* @param baseClass The class type that the plugin is supporting.
* @param pluginSetting The {@link PluginSetting} to configure this plugin
* @param argument generic object that can be used as argument in the construction of the new object
* @param <T> The type
* @return A new instance of your plugin, configured
* @since 2.6
*/
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, Object argument);
<T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final Object ... args);

/**
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.HasByteDecoder;

/**
* Data Prepper source interface. Source acts as receiver of the events that flow
* through the transformation pipeline.
*/
public interface Source<T extends Record<?>> {
public interface Source<T extends Record<?>> extends HasByteDecoder {

/**
* Notifies the source to start writing the records into the buffer
Expand All @@ -37,7 +37,4 @@ default boolean areAcknowledgementsEnabled() {
return false;
}

default ByteDecoder getDecoder() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ public void testCheckpointMetrics() throws Exception {
0.001));
}

@Test
public void testWriteBytes() throws TimeoutException {
final AbstractBuffer<Record<String>> abstractBuffer = new AbstractBufferTimeoutImpl(testPluginSetting);
byte[] bytes = new byte[2];
Assert.assertThrows(RuntimeException.class, () -> abstractBuffer.writeBytes(bytes, "", 10));
}

@Test
public void testWriteTimeoutMetric() throws TimeoutException {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,21 @@ public void testShutdown() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);
buffer.shutdown();
}

@Test
public void testIsByteBuffer() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);

Assert.assertEquals(false, buffer.isByteBuffer());
}

@Test
public void testWriteBytes() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);

byte[] bytes = new byte[2];
Assert.assertThrows(RuntimeException.class, () -> buffer.writeBytes(bytes, "", 10));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

import org.junit.Assert;
import org.junit.jupiter.api.Test;

import static org.mockito.Mockito.spy;

public class HasByteDecoderTest {

@Test
public void testGetDecoder() {
final HasByteDecoder hasByteDecoder = spy(HasByteDecoder.class);

Assert.assertEquals(null, hasByteDecoder.getDecoder());
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opensearch.dataprepper.model.codec;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.io.ByteArrayInputStream;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotEquals;

import org.junit.jupiter.api.BeforeEach;

public class JsonDecoderTest {
private JsonDecoder jsonDecoder;
private Record<Event> receivedRecord;

private JsonDecoder createObjectUnderTest() {
return new JsonDecoder();
}

@BeforeEach
void setup() {
jsonDecoder = createObjectUnderTest();
receivedRecord = null;
}

@Test
void test_basicJsonDecoder() {
String stringValue = UUID.randomUUID().toString();
Random r = new Random();
int intValue = r.nextInt();
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
try {
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), (record) -> {
receivedRecord = record;
});
} catch (Exception e){}

assertNotEquals(receivedRecord, null);
Map<String, Object> map = receivedRecord.getData().toMap();
assertThat(map.get("key1"), equalTo(stringValue));
assertThat(map.get("key2"), equalTo(intValue));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
class ComponentPluginArgumentsContext implements PluginArgumentsContext {
private static final String UNABLE_TO_CREATE_PLUGIN_PARAMETER = "Unable to create an argument for required plugin parameter type: ";
private final Map<Class<?>, Supplier<Object>> typedArgumentsSuppliers;
private Map<Class<?>, Supplier<Object>> optionalArgumentsSuppliers;

@Nullable
private final BeanFactory beanFactory;
Expand All @@ -45,7 +44,6 @@ private ComponentPluginArgumentsContext(final Builder builder) {
beanFactory = builder.beanFactory;

typedArgumentsSuppliers = new HashMap<>();
optionalArgumentsSuppliers = new HashMap<>();

typedArgumentsSuppliers.put(PluginSetting.class, () -> builder.pluginSetting);

Expand Down Expand Up @@ -81,20 +79,23 @@ private ComponentPluginArgumentsContext(final Builder builder) {
}

@Override
public Object[] createArguments(final Class<?>[] parameterTypes, Optional<Object> optionalArgument) {
if (optionalArgument.isPresent()) {
if (optionalArgument.get() instanceof ByteDecoder) {
ByteDecoder byteDecoder = (ByteDecoder) optionalArgument.get();
public Object[] createArguments(final Class<?>[] parameterTypes, final Object ... args) {
Map<Class<?>, Supplier<Object>> optionalArgumentsSuppliers = new HashMap<>();
Boolean byteDecoderInitialized = false;
for (final Object arg: args) {
if (arg instanceof ByteDecoder && !byteDecoderInitialized) {
ByteDecoder byteDecoder = (ByteDecoder) arg;
byteDecoderInitialized = true;
optionalArgumentsSuppliers.put(ByteDecoder.class, () -> byteDecoder);
}
}
return Arrays.stream(parameterTypes)
.map(this::getRequiredArgumentSupplier)
.map((parameterType) -> getRequiredArgumentSupplier(parameterType, optionalArgumentsSuppliers))
.map(Supplier::get)
.toArray();
}

private Supplier<Object> getRequiredArgumentSupplier(final Class<?> parameterType) {
private Supplier<Object> getRequiredArgumentSupplier(final Class<?> parameterType, Map<Class<?>, Supplier<Object>> optionalArgumentsSuppliers) {
if(typedArgumentsSuppliers.containsKey(parameterType)) {
return typedArgumentsSuppliers.get(parameterType);
} else if(optionalArgumentsSuppliers.containsKey(parameterType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,13 @@ public class DefaultPluginFactory implements PluginFactory {
}

@Override
public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final Object argument) {
public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final Object ... args) {
final String pluginName = pluginSetting.getName();
final Class<? extends T> pluginClass = getPluginClass(baseClass, pluginName);

final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null);

return pluginCreator.newPluginInstance(pluginClass, constructionContext, argument, pluginName);
}

@Override
public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting) {
final String pluginName = pluginSetting.getName();
final Class<? extends T> pluginClass = getPluginClass(baseClass, pluginName);

final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null);

return pluginCreator.newPluginInstance(pluginClass, constructionContext, null, pluginName);
return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName, args);
}

@Override
Expand All @@ -98,7 +88,7 @@ public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSett

final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, sinkContext);

return pluginCreator.newPluginInstance(pluginClass, constructionContext, null, pluginName);
return pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName);
}

@Override
Expand All @@ -118,7 +108,7 @@ public <T> List<T> loadPlugins(

final List<T> plugins = new ArrayList<>(numberOfInstances);
for (int i = 0; i < numberOfInstances; i++) {
plugins.add(pluginCreator.newPluginInstance(pluginClass, constructionContext, null, pluginName));
plugins.add(pluginCreator.newPluginInstance(pluginClass, constructionContext, pluginName));
}
return plugins;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ List<? extends ExtensionPlugin> loadExtensions() {
.stream()
.map(extensionClass -> {
final PluginArgumentsContext pluginArgumentsContext = getConstructionContext(extensionClass);
return pluginCreator.newPluginInstance(extensionClass, pluginArgumentsContext, null, convertClassToName(extensionClass));
return pluginCreator.newPluginInstance(extensionClass, pluginArgumentsContext, convertClassToName(extensionClass));
})
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -75,7 +75,7 @@ static String classNameToPluginName(final String className) {

protected static class NoArgumentsArgumentsContext implements PluginArgumentsContext {
@Override
public Object[] createArguments(final Class<?>[] parameterTypes, Optional<Object> arg) {
public Object[] createArguments(final Class<?>[] parameterTypes, final Object ... args) {
if(parameterTypes.length != 0) {
throw new InvalidPluginDefinitionException("No arguments are permitted for extensions constructors.");
}
Expand All @@ -91,7 +91,7 @@ protected static class SingleConfigArgumentArgumentsContext implements PluginArg
}

@Override
public Object[] createArguments(Class<?>[] parameterTypes, Optional<Object> arg) {
public Object[] createArguments(Class<?>[] parameterTypes, final Object ... args) {
if (parameterTypes.length != 1 && (Objects.nonNull(extensionPluginConfiguration) &&
!parameterTypes[0].equals(extensionPluginConfiguration.getClass()))) {
throw new InvalidPluginDefinitionException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.dataprepper.plugin;

import java.util.Optional;

interface PluginArgumentsContext {
Object[] createArguments(final Class<?>[] parameterTypes, final Optional<Object> optionalArgument);
Object[] createArguments(final Class<?>[] parameterTypes, final Object ... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ class PluginCreator {

<T> T newPluginInstance(final Class<T> pluginClass,
final PluginArgumentsContext pluginArgumentsContext,
final Object argument,
final String pluginName) {
final String pluginName,
final Object... args) {
Objects.requireNonNull(pluginClass);
Objects.requireNonNull(pluginArgumentsContext);
Objects.requireNonNull(pluginName);

final Constructor<?> constructor = getConstructor(pluginClass, pluginName);

final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes(), argument == null ? Optional.empty() : Optional.of(argument));
final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes(), args);

pluginConfigurationObservableRegister.registerPluginConfigurationObservables(constructorArguments);

Expand Down
Loading

0 comments on commit fb82aca

Please sign in to comment.