Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support plugin loading in conifg #4974

3 changes: 2 additions & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation libs.reflections.core
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need this anymore.

implementation libs.parquet.common
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation libs.commons.lang3
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test-common')
testImplementation 'org.skyscreamer:jsonassert:1.5.3'
testImplementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotates a field that uses Data Prepper plugin config as its value.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface UsesDataPrepperPlugin {
/**
* The class type for this plugin.
*
* @return The Java class
* @since 1.2
*/
Class<?> pluginType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need this if we support it on the target itself.

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.dataprepper.model.annotations.DataPrepperPlugin.DEFAULT_ALTERNATE_NAME;
import static org.opensearch.dataprepper.model.annotations.DataPrepperPlugin.DEFAULT_DEPRECATED_NAME;
Expand Down Expand Up @@ -60,6 +61,20 @@ public <T> Optional<Class<? extends T>> findPluginClass(final Class<T> pluginTyp
return Optional.ofNullable((Class<? extends T>) supportedTypesMap.get(pluginType));
}

@Override
public <T> Set<Class<? extends T>> findPluginClasses(Class<T> pluginType) {
if (nameToSupportedTypeToPluginType == null) {
nameToSupportedTypeToPluginType = scanForPlugins();
}

return nameToSupportedTypeToPluginType.values().stream()
.flatMap(supportedTypeToPluginType ->
supportedTypeToPluginType.entrySet().stream()
.filter(entry -> pluginType.equals(entry.getKey()))
.flatMap(entry -> Stream.of((Class<? extends T>) entry.getValue())))
.collect(Collectors.toSet());
}

private Map<String, Map<Class<?>, Class<?>>> scanForPlugins() {
final Set<Class<?>> dataPrepperPluginClasses =
reflections.getTypesAnnotatedWith(DataPrepperPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugin;

import java.util.Collection;
import java.util.Optional;

/**
Expand All @@ -27,4 +28,15 @@ public interface PluginProvider {
* @since 1.2
*/
<T> Optional<Class<? extends T>> findPluginClass(Class<T> pluginType, String pluginName);

/**
* Finds the Java classes for a specific pluginType.
*
* @param pluginType The type of plugin which is being supported.
* e.g. {@link org.opensearch.dataprepper.model.sink.Sink}.
* @param <T> The type
* @return An {@link Collection} of Java classes for plugins
* @since 1.2
*/
<T> Collection<Class<? extends T>> findPluginClasses(Class<T> pluginType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.test.TestSink;
import org.opensearch.dataprepper.plugins.test.TestSource;
import org.reflections.Reflections;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -61,6 +65,22 @@ void findPlugin_should_scan_for_plugins() {
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugins_should_scan_for_plugins() {
final ClasspathPluginProvider objectUnderTest = createObjectUnderTest();

then(reflections).shouldHaveNoInteractions();

given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
.willReturn(Collections.emptySet());

objectUnderTest.findPluginClasses(Sink.class);

then(reflections)
.should()
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugin_should_scan_for_plugins_only_once() {
final ClasspathPluginProvider objectUnderTest = createObjectUnderTest();
Expand All @@ -76,6 +96,21 @@ void findPlugin_should_scan_for_plugins_only_once() {
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugins_should_scan_for_plugins_only_once() {
final ClasspathPluginProvider objectUnderTest = createObjectUnderTest();

given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
.willReturn(Collections.emptySet());

for (int i = 0; i < 10; i++)
objectUnderTest.findPluginClasses(Sink.class);

then(reflections)
.should()
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugin_should_return_empty_if_no_plugins_found() {
given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
Expand Down Expand Up @@ -130,6 +165,17 @@ void findPlugin_should_return_plugin_if_found_for_alternate_name_and_type_using_
assertThat(optionalPlugin.get(), equalTo(TestSource.class));
}

@Test
void findPlugins_should_return_empty_if_no_plugins_found() {
given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
.willReturn(Collections.emptySet());

final Collection<Class<? extends PluginSetting>> foundPlugins = createObjectUnderTest().findPluginClasses(
PluginSetting.class);
assertThat(foundPlugins, notNullValue());
assertThat(foundPlugins.isEmpty(), is(true));
}

@Nested
class WithPredefinedPlugins {

Expand Down Expand Up @@ -161,5 +207,13 @@ void findPlugin_should_return_plugin_if_found_for_name_and_type_using_pluginType
assertThat(optionalPlugin.isPresent(), equalTo(true));
assertThat(optionalPlugin.get(), equalTo(TestSink.class));
}

@Test
void findPlugins_should_return_plugins_if_plugin_found_for_specified_type() {
final Set<Class<? extends Source>> foundPlugins = createObjectUnderTest().findPluginClasses(Source.class);
assertThat(foundPlugins, notNullValue());
assertThat(foundPlugins.size(), equalTo(1));
assertThat(foundPlugins.stream().iterator().next(), equalTo(TestSource.class));
}
}
}
1 change: 1 addition & 0 deletions data-prepper-plugin-schema-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ application {

dependencies {
implementation project(':data-prepper-plugins')
implementation project(':data-prepper-plugin-framework')
implementation project(':data-prepper-plugin-schema')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.reflections:reflections:0.10.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
import com.github.victools.jsonschema.generator.SchemaVersion;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
import org.opensearch.dataprepper.plugin.ClasspathPluginProvider;
import org.opensearch.dataprepper.plugin.PluginProvider;
import org.opensearch.dataprepper.schemas.module.CustomJacksonModule;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
Expand Down Expand Up @@ -58,11 +56,9 @@ public void run() {
new JakartaValidationModule(JakartaValidationOption.NOT_NULLABLE_FIELD_IS_REQUIRED,
JakartaValidationOption.INCLUDE_PATTERN_EXPRESSIONS)
);
final Reflections reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage(DEFAULT_PLUGINS_CLASSPATH))
.setScanners(Scanners.TypesAnnotated, Scanners.SubTypes));
final PluginProvider pluginProvider = new ClasspathPluginProvider();
final PluginConfigsJsonSchemaConverter pluginConfigsJsonSchemaConverter = new PluginConfigsJsonSchemaConverter(
reflections, new JsonSchemaConverter(modules), siteUrl, siteBaseUrl);
pluginProvider, new JsonSchemaConverter(modules, pluginProvider), siteUrl, siteBaseUrl);
final Class<?> pluginType = pluginConfigsJsonSchemaConverter.pluginTypeNameToPluginType(pluginTypeName);
final Map<String, String> pluginNameToJsonSchemaMap = pluginConfigsJsonSchemaConverter.convertPluginConfigsIntoJsonSchemas(
SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, pluginType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@
import com.github.victools.jsonschema.generator.SchemaGeneratorConfig;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigPart;
import com.github.victools.jsonschema.generator.SchemaGeneratorGeneralConfigPart;
import com.github.victools.jsonschema.generator.SchemaVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
import org.opensearch.dataprepper.plugin.PluginProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class JsonSchemaConverter {
private static final Logger LOG = LoggerFactory.getLogger(JsonSchemaConverter.class);
static final String DEPRECATED_SINCE_KEY = "deprecated";
private final List<Module> jsonSchemaGeneratorModules;
private final PluginProvider pluginProvider;

public JsonSchemaConverter(final List<Module> jsonSchemaGeneratorModules) {
public JsonSchemaConverter(final List<Module> jsonSchemaGeneratorModules, final PluginProvider pluginProvider) {
this.jsonSchemaGeneratorModules = jsonSchemaGeneratorModules;
this.pluginProvider = pluginProvider;
}

public ObjectNode convertIntoJsonSchema(
Expand All @@ -30,7 +41,9 @@ public ObjectNode convertIntoJsonSchema(
loadJsonSchemaGeneratorModules(configBuilder);
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart = configBuilder.forFields();
overrideInstanceAttributeWithDeprecated(scopeSchemaGeneratorConfigPart);
overrideTargetTypeWithUsesDataPrepperPlugin(scopeSchemaGeneratorConfigPart);
resolveDefaultValueFromJsonProperty(scopeSchemaGeneratorConfigPart);
overrideDataPrepperPluginTypeAttribute(configBuilder.forTypesInGeneral(), schemaVersion, optionPreset);

final SchemaGeneratorConfig config = configBuilder.build();
final SchemaGenerator generator = new SchemaGenerator(config);
Expand All @@ -52,6 +65,37 @@ private void overrideInstanceAttributeWithDeprecated(
});
}

private void overrideTargetTypeWithUsesDataPrepperPlugin(
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart) {
scopeSchemaGeneratorConfigPart.withTargetTypeOverridesResolver(field -> Optional
.ofNullable(field.getAnnotationConsideringFieldAndGetterIfSupported(UsesDataPrepperPlugin.class))
.map(usesDataPrepperPlugin ->
pluginProvider.findPluginClasses(usesDataPrepperPlugin.pluginType()).stream())
.map(stream -> stream.map(specificSubtype -> field.getContext().resolve(specificSubtype)))
.map(stream -> stream.collect(Collectors.toList()))
.orElse(null));
}

private void overrideDataPrepperPluginTypeAttribute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanning for plugins should only be done in the plugin framework. Make modifications there and use those.

final SchemaGeneratorGeneralConfigPart schemaGeneratorGeneralConfigPart,
final SchemaVersion schemaVersion, final OptionPreset optionPreset) {
schemaGeneratorGeneralConfigPart.withTypeAttributeOverride((node, scope, context) -> {
final DataPrepperPlugin dataPrepperPlugin = scope.getType().getErasedType()
.getAnnotation(DataPrepperPlugin.class);
if (dataPrepperPlugin != null) {
final ObjectNode propertiesNode = node.putObject("properties");
try {
final ObjectNode schemaNode = this.convertIntoJsonSchema(
schemaVersion, optionPreset, dataPrepperPlugin.pluginConfigurationType());
propertiesNode.set(dataPrepperPlugin.name(), schemaNode);
} catch (JsonProcessingException e) {
LOG.error("Encountered error retrieving JSON schema for {}", dataPrepperPlugin.name(), e);
throw new RuntimeException(e);
}
}
});
}

private void resolveDefaultValueFromJsonProperty(
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart) {
scopeSchemaGeneratorConfigPart.withDefaultResolver(field -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.dataprepper.schemas;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaVersion;
Expand All @@ -10,7 +9,7 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.reflections.Reflections;
import org.opensearch.dataprepper.plugin.PluginProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,15 +50,15 @@ public class PluginConfigsJsonSchemaConverter {

private final String siteUrl;
private final String siteBaseUrl;
private final Reflections reflections;
private final PluginProvider pluginProvider;
private final JsonSchemaConverter jsonSchemaConverter;

public PluginConfigsJsonSchemaConverter(
final Reflections reflections,
final PluginProvider pluginProvider,
final JsonSchemaConverter jsonSchemaConverter,
final String siteUrl,
final String siteBaseUrl) {
this.reflections = reflections;
this.pluginProvider = pluginProvider;
this.jsonSchemaConverter = jsonSchemaConverter;
this.siteUrl = siteUrl == null ? SITE_URL_PLACEHOLDER : siteUrl;
this.siteBaseUrl = siteBaseUrl == null ? SITE_BASE_URL_PLACEHOLDER : siteBaseUrl;
Expand Down Expand Up @@ -90,8 +89,8 @@ public Map<String, String> convertPluginConfigsIntoJsonSchemas(
addPluginName(jsonSchemaNode, pluginName);
addDocumentationLink(jsonSchemaNode, pluginName, pluginType);
value = jsonSchemaNode.toPrettyString();
} catch (JsonProcessingException e) {
LOG.error("Encountered error retrieving JSON schema for {}", pluginName);
} catch (final Exception e) {
LOG.error("Encountered error retrieving JSON schema for {}", pluginName, e);
return Stream.empty();
}
return Stream.of(Map.entry(entry.getKey(), value));
Expand All @@ -107,7 +106,7 @@ private Map<String, Class<?>> scanForPluginConfigs(final Class<?> pluginType) {
if (ConditionalRoute.class.equals(pluginType)) {
return Map.of(CONDITIONAL_ROUTE_PROCESSOR_NAME, ConditionalRoute.class);
}
return reflections.getTypesAnnotatedWith(DataPrepperPlugin.class).stream()
return pluginProvider.findPluginClasses(pluginType).stream()
.map(clazz -> clazz.getAnnotation(DataPrepperPlugin.class))
.filter(dataPrepperPlugin -> pluginType.equals(dataPrepperPlugin.pluginType()))
.collect(Collectors.toMap(
Expand Down
Loading
Loading