Skip to content

Commit

Permalink
FEAT: AWS secret extension (opensearch-project#3340)
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored and asifsmohammed committed Sep 27, 2023
1 parent 14c6759 commit 62e99ef
Show file tree
Hide file tree
Showing 51 changed files with 1,372 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotates a Data Prepper extension plugin which includes a configuration model class.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface DataPrepperExtensionPlugin {
/**
* @return extension plugin configuration class.
*/
Class<?> modelType();

String rootKey();
/**
* @return valid JSON path string starts with "/" pointing towards the configuration block.
*/
String rootKeyJsonPath();

boolean allowInPipelineConfigurations() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class PluginSetting implements PipelineDescription {
private static final String UNEXPECTED_ATTRIBUTE_TYPE_MSG = "Unexpected type [%s] for attribute [%s]";

private final String name;
private final Map<String, Object> settings;
private Map<String, Object> settings;
private int processWorkers;
private String pipelineName;

Expand All @@ -31,6 +31,10 @@ public Map<String, Object> getSettings() {
return settings;
}

public void setSettings(final Map<String, Object> settings) {
this.settings = settings;
}

/**
* Returns the number of process workers the pipeline is using; This is only required for special plugin use-cases
* where plugin implementation depends on the number of process workers. For example, Trace analytics service map
Expand Down Expand Up @@ -99,7 +103,7 @@ public Object getAttributeOrDefault(final String attribute, final Object default
* @return the value of the specified attribute, or {@code defaultValue} if this settings contains no value for
* the attribute. If the value is null, null will be returned.
*/
public Integer getIntegerOrDefault(final String attribute, final int defaultValue) {
public Integer getIntegerOrDefault(final String attribute, final Integer defaultValue) {
Object object = getAttributeOrDefault(attribute, defaultValue);
if (object == null) {
return null;
Expand Down Expand Up @@ -218,7 +222,7 @@ public <K, V> Map<K, List<V>> getTypedListMap(final String attribute, final Clas
* @return the value of the specified attribute, or {@code defaultValue} if this settings contains no value for
* the attribute
*/
public Boolean getBooleanOrDefault(final String attribute, final boolean defaultValue) {
public Boolean getBooleanOrDefault(final String attribute, final Boolean defaultValue) {
Object object = getAttributeOrDefault(attribute, defaultValue);
if (object == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.model.plugin;

public interface PluginConfigValueTranslator {
Object translate(final String value);

String getPrefix();
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,4 +537,13 @@ public void testGetLongOrDefault_UnsupportedType() {

assertThrows(IllegalArgumentException.class, () -> pluginSetting.getLongOrDefault(TEST_LONG_ATTRIBUTE, TEST_LONG_DEFAULT_VALUE));
}

@Test
public void testSetSettings() {
final PluginSetting pluginSetting = new PluginSetting(TEST_PLUGIN_NAME, null);

final Map<String, Object> settings = Map.of("test", 1);
pluginSetting.setSettings(settings);
assertThat(pluginSetting.getSettings(), equalTo(settings));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
public class DataPrepperExtensionPoints implements ExtensionPoints {
private static final ExtensionProvider.Context EMPTY_CONTEXT = new EmptyContext();
private final GenericApplicationContext sharedApplicationContext;
private final GenericApplicationContext coreApplicationContext;

@Inject
public DataPrepperExtensionPoints(
final PluginBeanFactoryProvider pluginBeanFactoryProvider) {
Objects.requireNonNull(pluginBeanFactoryProvider);
Objects.requireNonNull(pluginBeanFactoryProvider.getCoreApplicationContext());
Objects.requireNonNull(pluginBeanFactoryProvider.getSharedPluginApplicationContext());
this.sharedApplicationContext = pluginBeanFactoryProvider.getSharedPluginApplicationContext();
this.coreApplicationContext = pluginBeanFactoryProvider.getCoreApplicationContext();
}

@Override
public void addExtensionProvider(final ExtensionProvider extensionProvider) {
coreApplicationContext.registerBean(
extensionProvider.supportedClass(),
() -> extensionProvider.provideInstance(EMPTY_CONTEXT).orElse(null),
b -> b.setScope(BeanDefinition.SCOPE_PROTOTYPE));
sharedApplicationContext.registerBean(
extensionProvider.supportedClass(),
() -> extensionProvider.provideInstance(EMPTY_CONTEXT),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.opensearch.dataprepper.plugin;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;

import java.io.IOException;

public class DataPrepperScalarTypeDeserializer<T> extends JsonDeserializer<T> {
private final VariableExpander variableExpander;
private final Class<T> scalarType;

public DataPrepperScalarTypeDeserializer(final VariableExpander variableExpander, final Class<T> scalarType) {
this.variableExpander = variableExpander;
this.scalarType = scalarType;
}

@Override
public T deserialize(final JsonParser jsonParser, final DeserializationContext ctxt) throws IOException {
return variableExpander.translate(jsonParser, this.scalarType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ private PluginArgumentsContext getConstructionContext(final Class<?> extensionPl
return new NoArgumentsArgumentsContext();
} else {
final Class<?> pluginConfigurationType = pluginAnnotation.modelType();
final String rootKey = pluginAnnotation.rootKey();
final String rootKey = pluginAnnotation.rootKeyJsonPath();
final Object configuration = extensionPluginConfigurationConverter.convert(
pluginAnnotation.allowInPipelineConfigurations(),
pluginConfigurationType, rootKey);
return new SingleConfigArgumentArgumentsContext(configuration);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.opensearch.dataprepper.plugin;

import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validator;
Expand All @@ -8,12 +11,14 @@
import javax.inject.Inject;
import javax.inject.Named;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

@Named
public class ExtensionPluginConfigurationConverter {
static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
private final ExtensionPluginConfigurationResolver extensionPluginConfigurationResolver;
private final ObjectMapper objectMapper;
private final Validator validator;
Expand All @@ -22,19 +27,25 @@ public class ExtensionPluginConfigurationConverter {
public ExtensionPluginConfigurationConverter(
final ExtensionPluginConfigurationResolver extensionPluginConfigurationResolver,
final Validator validator,
@Named("pluginConfigObjectMapper")
@Named("extensionPluginConfigObjectMapper")
final ObjectMapper objectMapper) {
this.extensionPluginConfigurationResolver = extensionPluginConfigurationResolver;
this.objectMapper = objectMapper;
this.validator = validator;
}

public Object convert(final Class<?> extensionPluginConfigurationType, final String rootKey) {
public Object convert(final boolean configAllowedInPipelineConfigurations,
final Class<?> extensionPluginConfigurationType, final String rootKey) {
Objects.requireNonNull(extensionPluginConfigurationType);
Objects.requireNonNull(rootKey);

final Object configuration = convertSettings(extensionPluginConfigurationType,
extensionPluginConfigurationResolver.getExtensionMap().get(rootKey));
final Object configuration = configAllowedInPipelineConfigurations ?
convertSettings(extensionPluginConfigurationType,
getExtensionPluginConfigMap(
extensionPluginConfigurationResolver.getCombinedExtensionMap(), rootKey)) :
convertSettings(extensionPluginConfigurationType,
getExtensionPluginConfigMap(
extensionPluginConfigurationResolver.getDataPrepperConfigExtensionMap(), rootKey));

final Set<ConstraintViolation<Object>> constraintViolations = configuration == null ? Collections.emptySet() :
validator.validate(configuration);
Expand All @@ -54,4 +65,12 @@ public Object convert(final Class<?> extensionPluginConfigurationType, final Str
private Object convertSettings(final Class<?> extensionPluginConfigurationType, final Object extensionPlugin) {
return objectMapper.convertValue(extensionPlugin, extensionPluginConfigurationType);
}

private Map<String, Object> getExtensionPluginConfigMap(
final Map<String, Object> extensionMap, final String rootKey) {
final JsonNode jsonNode = objectMapper.valueToTree(extensionMap);
final JsonPointer jsonPointer = JsonPointer.compile(rootKey);
final JsonNode extensionPluginConfigNode = jsonNode.at(jsonPointer);
return objectMapper.convertValue(extensionPluginConfigNode, MAP_TYPE_REFERENCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,26 @@

@Named
public class ExtensionPluginConfigurationResolver {
private final Map<String, Object> extensionMap;
private final Map<String, Object> combinedExtensionMap;

private final Map<String, Object> dataPrepperConfigExtensionMap;

@Inject
public ExtensionPluginConfigurationResolver(final DataPrepperConfiguration dataPrepperConfiguration,
final PipelinesDataFlowModel pipelinesDataFlowModel) {
extensionMap = dataPrepperConfiguration.getPipelineExtensions() == null?
this.dataPrepperConfigExtensionMap = dataPrepperConfiguration.getPipelineExtensions() == null?
new HashMap<>() : new HashMap<>(dataPrepperConfiguration.getPipelineExtensions().getExtensionMap());
combinedExtensionMap = new HashMap<>(dataPrepperConfigExtensionMap);
if (pipelinesDataFlowModel.getPipelineExtensions() != null) {
extensionMap.putAll(pipelinesDataFlowModel.getPipelineExtensions().getExtensionMap());
combinedExtensionMap.putAll(pipelinesDataFlowModel.getPipelineExtensions().getExtensionMap());
}
}

public Map<String, Object> getExtensionMap() {
return Collections.unmodifiableMap(extensionMap);
public Map<String, Object> getDataPrepperConfigExtensionMap() {
return Collections.unmodifiableMap(dataPrepperConfigExtensionMap);
}

public Map<String, Object> getCombinedExtensionMap() {
return Collections.unmodifiableMap(combinedExtensionMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,35 @@

import javax.inject.Named;
import java.time.Duration;
import java.util.Set;

/**
* Application context for internal plugin framework beans.
*/
@Named
public class ObjectMapperConfiguration {
@Bean(name = "pluginConfigObjectMapper")
ObjectMapper objectMapper() {
static final Set<Class> TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES = Set.of(
String.class, Number.class, Long.class, Short.class, Integer.class, Double.class, Float.class,
Boolean.class, Duration.class, Enum.class, Character.class);

@Bean(name = "extensionPluginConfigObjectMapper")
ObjectMapper extensionPluginConfigObjectMapper() {
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer());

return new ObjectMapper()
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
.registerModule(simpleModule);
}

@Bean(name = "pluginConfigObjectMapper")
ObjectMapper pluginConfigObjectMapper(final VariableExpander variableExpander) {
final SimpleModule simpleModule = new SimpleModule();
TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES.stream().forEach(clazz -> simpleModule.addDeserializer(
clazz, new DataPrepperScalarTypeDeserializer<>(variableExpander, clazz)));

return new ObjectMapper()
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
.registerModule(simpleModule);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
@Named
class PluginBeanFactoryProvider implements Provider<BeanFactory> {
private final GenericApplicationContext sharedPluginApplicationContext;
private final GenericApplicationContext coreApplicationContext;

@Inject
PluginBeanFactoryProvider(final ApplicationContext coreContext) {
final ApplicationContext publicContext = Objects.requireNonNull(coreContext.getParent());
PluginBeanFactoryProvider(final GenericApplicationContext coreApplicationContext) {
final ApplicationContext publicContext = Objects.requireNonNull(coreApplicationContext.getParent());
sharedPluginApplicationContext = new GenericApplicationContext(publicContext);
this.coreApplicationContext = coreApplicationContext;
}

/**
Expand All @@ -43,6 +45,10 @@ GenericApplicationContext getSharedPluginApplicationContext() {
return sharedPluginApplicationContext;
}

GenericApplicationContext getCoreApplicationContext() {
return coreApplicationContext;
}

/**
* @since 1.3
* Creates a new isolated application context that inherits from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@

package org.opensearch.dataprepper.plugin;

import com.fasterxml.jackson.core.type.TypeReference;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validator;
import org.springframework.context.annotation.DependsOn;

import javax.inject.Named;
import java.util.Collections;
Expand All @@ -24,7 +26,9 @@
* and converting it to the plugin model type which should be denoted by {@link DataPrepperPlugin#pluginConfigurationType()}
*/
@Named
@DependsOn({"extensionsApplier"})
class PluginConfigurationConverter {
static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
private final ObjectMapper objectMapper;
private final Validator validator;

Expand All @@ -48,8 +52,12 @@ public Object convert(final Class<?> pluginConfigurationType, final PluginSettin
Objects.requireNonNull(pluginConfigurationType);
Objects.requireNonNull(pluginSetting);

if (pluginConfigurationType.equals(PluginSetting.class))
if (pluginConfigurationType.equals(PluginSetting.class)) {
final Map<String, Object> settings = pluginSetting.getSettings();
final Map<String, Object> convertedSettings = objectMapper.convertValue(settings, MAP_TYPE_REFERENCE);
pluginSetting.setSettings(convertedSettings);
return pluginSetting;
}

final Object configuration = convertSettings(pluginConfigurationType, pluginSetting);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.dataprepper.plugin;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* This annotation can be added to any plugin configuration field that is supposed to support secret string value.
*/
@Documented
@Retention(RUNTIME)
@Target({ElementType.FIELD})
public @interface SupportSecretString {
}
Loading

0 comments on commit 62e99ef

Please sign in to comment.