Skip to content

Commit

Permalink
CAMEL-21295: Add Knative Trigger filter to camel-jbang-kubernetes plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
gansheer committed Oct 2, 2024
1 parent d7a8788 commit 0b06a7f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.fabric8.knative.internal.pkg.tracker.ReferenceBuilder;
import io.fabric8.knative.messaging.v1.SubscriptionBuilder;
import io.fabric8.knative.sources.v1.SinkBindingBuilder;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.dsl.jbang.core.commands.kubernetes.KubernetesHelper;
import org.apache.camel.dsl.jbang.core.commands.kubernetes.support.SourceMetadata;
import org.apache.camel.dsl.jbang.core.commands.kubernetes.traits.ServiceTrait;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void apply(Traits traitConfig, TraitContext context) {
Knative knativeTrait = Optional.ofNullable(traitConfig.getKnative()).orElseGet(Knative::new);

configureChannels(knativeTrait, context);
configureEndpoints(knativeTrait, context);
configureEndpoints(knativeTrait);
configureEvents(knativeTrait, context);

if (knativeTrait.getSinkBinding() != null && knativeTrait.getSinkBinding()) {
Expand All @@ -145,7 +146,7 @@ public void apply(Traits traitConfig, TraitContext context) {

private void configureChannels(Knative knativeTrait, TraitContext context) {
for (String uri : knativeTrait.getChannelSources()) {
createSubscription(toKnativeUri(KnativeResourceType.CHANNEL, uri), knativeTrait, context);
createSubscription(toKnativeUri(KnativeResourceType.CHANNEL, uri), context);
}

for (String uri : knativeTrait.getChannelSinks()) {
Expand All @@ -161,7 +162,7 @@ private void configureChannels(Knative knativeTrait, TraitContext context) {
}
}

private void configureEndpoints(Knative knativeTrait, TraitContext context) {
private void configureEndpoints(Knative knativeTrait) {
for (String uri : knativeTrait.getEndpointSources()) {
String endpointName = extractKnativeResource(uri);
addKnativeResourceConfiguration(new KnativeResourceConfiguration(
Expand Down Expand Up @@ -207,7 +208,7 @@ private void configureEvents(Knative knativeTrait, TraitContext context) {
}
}

private void createSubscription(String uri, Knative knativeTrait, TraitContext context) {
private void createSubscription(String uri, TraitContext context) {
String channelName = extractKnativeResource(uri);

String subscriptionName = createSubscriptionName(context.getName(), channelName);
Expand Down Expand Up @@ -286,21 +287,23 @@ private void createTrigger(String uri, Knative knativeTrait, TraitContext contex

private Map<String, String> getFilterAttributes(Knative knativeTrait, String eventType) {
Map<String, String> filterAttributes = new HashMap<>();
filterAttributes.put("type", eventType);

// TODO: use this as soon as new Camel K CRD model has been released
// for (String filterExpression : knativeTrait.getFilters()) {
// String[] keyValue = filterExpression.split("=", 2);
// if (keyValue.length != 2) {
// throw new RuntimeCamelException("Invalid Knative trigger filter expression: %s".formatted(filterExpression));
// }
// filterAttributes.put(keyValue[0].trim(), keyValue[1].trim());
// }
//
// if (!filterAttributes.containsKey("type") && Optional.ofNullable(knativeTrait.getFilterEventType()).orElse(true) && ObjectHelper.isNotEmpty(eventType)) {
// // Apply default trigger filter attribute for the event type
// filterAttributes.put("type", eventType);
// }

if (knativeTrait.getFilters() != null) {
for (String filterExpression : knativeTrait.getFilters()) {
String[] keyValue = filterExpression.split("=", 2);
if (keyValue.length != 2) {
throw new RuntimeCamelException(
"Invalid Knative trigger filter expression: %s".formatted(filterExpression));
}
filterAttributes.put(keyValue[0].trim(), keyValue[1].trim());
}
}

if (!filterAttributes.containsKey("type") && Optional.ofNullable(knativeTrait.getFilterEventType()).orElse(true)
&& ObjectHelper.isNotEmpty(eventType)) {
// Apply default trigger filter attribute for the event type
filterAttributes.put("type", eventType);
}

return filterAttributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"auto", "channelSinks", "channelSources", "config", "enabled", "endpointSinks", "endpointSources",
"auto", "channelSinks", "channelSources", "configuration", "enabled", "endpointSinks", "endpointSources",
"eventSinks", "eventSources", "filterEventType", "filterSourceChannels", "filters", "namespaceLabel", "sinkBinding" })
public class Knative {
@JsonProperty("auto")
Expand All @@ -50,7 +50,7 @@ public class Knative {
@JsonPropertyDescription("Can be used to inject a Knative complete configuration in JSON format.")
@JsonSetter(
nulls = Nulls.SKIP)
private String config;
private String configuration;
@JsonProperty("enabled")
@JsonPropertyDescription("Can be used to enable or disable a trait. All traits share this common property.")
@JsonSetter(
Expand Down Expand Up @@ -129,12 +129,12 @@ public void setChannelSources(List<String> channelSources) {
this.channelSources = channelSources;
}

public String getConfig() {
return this.config;
public String getConfiguration() {
return this.configuration;
}

public void setConfig(String config) {
this.config = config;
public void setConfiguration(String configuration) {
this.configuration = configuration;
}

public Boolean getEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public void shouldAddKnativeServiceSpec(RuntimeType rt) throws Exception {
public void shouldAddKnativeTrigger(RuntimeType rt) throws Exception {
KubernetesExport command = createCommand(new String[] { "classpath:knative-event-source.yaml" },
"--image-group=camel-test", "--runtime=" + rt.runtime());
command.traits = new String[] {
"knative.filters=source=my-source" };
command.doCall();

Assertions.assertTrue(hasService(rt));
Expand All @@ -104,6 +106,7 @@ public void shouldAddKnativeTrigger(RuntimeType rt) throws Exception {
Assertions.assertEquals("my-broker", trigger.getSpec().getBroker());
Assertions.assertEquals(1, trigger.getSpec().getFilter().getAttributes().size());
Assertions.assertEquals("camel-event", trigger.getSpec().getFilter().getAttributes().get("type"));
Assertions.assertEquals("my-source", trigger.getSpec().getFilter().getAttributes().get("source"));
Assertions.assertEquals("knative-event-source", trigger.getSpec().getSubscriber().getRef().getName());
Assertions.assertEquals("Service", trigger.getSpec().getSubscriber().getRef().getKind());
Assertions.assertEquals("v1", trigger.getSpec().getSubscriber().getRef().getApiVersion());
Expand Down

0 comments on commit 0b06a7f

Please sign in to comment.