Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-21295: Add knative trait filters to camel jbang kubernetes plugin #15808

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion docs/user-manual/modules/ROOT/pages/camel-jbang-kubernetes.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,64 @@ Apache Camel also provides a Knative component that makes you easily interact wi

The Knative component enables you to exchange data with the Knative eventing broker and other Knative services deployed on Kubernetes.
The Camel JBang Kubernetes plugin provides some autoconfiguration options when connecting with the Knative component.
The export command assists you in configuring both the Knative component and the Kubernetes manifest for connecting to Knative resources on the Kubenretes cluster.
The export command assists you in configuring both the Knative component and the Kubernetes manifest for connecting to Knative resources on the Kubernetes cluster.

You can configure the Knative component with the Knative trait.

The trait offers following options for customization:

[cols="2m,1m,5a"]
|===
|Property | Type | Description

| knative.enabled
| bool
| Can be used to enable or disable a trait. (default: true)

| knative.configuration
| string
| Can be used to inject a Knative complete configuration in JSON format

| knative.channel-sinks
| []string
| List of channels used as destination of camel routes. Can contain simple channel names or full Camel URIs.

Refer to the Knative documentation for more information.

| knative.channel-sources
| []string
| List of channels used as source of camel routes. Can contain simple channel names or full Camel URIs.

| knative.endpoint-sinks
| []string
| List of endpoints used as destination of camel routes. Can contain simple endpoint names or full Camel URIs.

| knative.endpoint-sources
| []string
| List of endpoints used as sources of camel routes. Can contain simple endpoint names or full Camel URIs.

| knative.event-sinks
| []string
| List of endpoints used as destination of integration routes. Can contain simple endpoint names or full Camel URIs.

| knative.event-sources
| []string
| List of event types that the integration will be subscribed to. Can contain simple event types or full Camel URIs (to use a specific broker different from "default").

| knative.sink-binding
| bool
| Allows binding the integration to a sink via a Knative SinkBinding resource. This can be used when the integration targets a single sink. It’s enabled by default when the integration targets a single sink (except when the integration is owned by a Knative source).

| knative.filters
| []string
| Sets filter attributes on the event stream (such as event type, source, subject and so on). A list of key-value pairs that represent filter attributes and its values. The syntax is KEY=VALUE, e.g., source="my.source". Filter attributes get set on the Knative trigger that is being created as part of this integration.

| knative.filter-event-type
| bool
| Enables the default filtering for the Knative trigger using the event type If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true)

|===


=== Knative trigger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.fabric8.knative.internal.pkg.tracker.ReferenceBuilder;
import io.fabric8.knative.messaging.v1.SubscriptionBuilder;
import io.fabric8.knative.sources.v1.SinkBindingBuilder;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.dsl.jbang.core.commands.kubernetes.KubernetesHelper;
import org.apache.camel.dsl.jbang.core.commands.kubernetes.support.SourceMetadata;
import org.apache.camel.dsl.jbang.core.commands.kubernetes.traits.ServiceTrait;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void apply(Traits traitConfig, TraitContext context) {
Knative knativeTrait = Optional.ofNullable(traitConfig.getKnative()).orElseGet(Knative::new);

configureChannels(knativeTrait, context);
configureEndpoints(knativeTrait, context);
configureEndpoints(knativeTrait);
configureEvents(knativeTrait, context);

if (knativeTrait.getSinkBinding() != null && knativeTrait.getSinkBinding()) {
Expand All @@ -145,7 +146,7 @@ public void apply(Traits traitConfig, TraitContext context) {

private void configureChannels(Knative knativeTrait, TraitContext context) {
for (String uri : knativeTrait.getChannelSources()) {
createSubscription(toKnativeUri(KnativeResourceType.CHANNEL, uri), knativeTrait, context);
createSubscription(toKnativeUri(KnativeResourceType.CHANNEL, uri), context);
}

for (String uri : knativeTrait.getChannelSinks()) {
Expand All @@ -161,7 +162,7 @@ private void configureChannels(Knative knativeTrait, TraitContext context) {
}
}

private void configureEndpoints(Knative knativeTrait, TraitContext context) {
private void configureEndpoints(Knative knativeTrait) {
for (String uri : knativeTrait.getEndpointSources()) {
String endpointName = extractKnativeResource(uri);
addKnativeResourceConfiguration(new KnativeResourceConfiguration(
Expand Down Expand Up @@ -207,7 +208,7 @@ private void configureEvents(Knative knativeTrait, TraitContext context) {
}
}

private void createSubscription(String uri, Knative knativeTrait, TraitContext context) {
private void createSubscription(String uri, TraitContext context) {
String channelName = extractKnativeResource(uri);

String subscriptionName = createSubscriptionName(context.getName(), channelName);
Expand Down Expand Up @@ -286,21 +287,23 @@ private void createTrigger(String uri, Knative knativeTrait, TraitContext contex

private Map<String, String> getFilterAttributes(Knative knativeTrait, String eventType) {
Map<String, String> filterAttributes = new HashMap<>();
filterAttributes.put("type", eventType);

// TODO: use this as soon as new Camel K CRD model has been released
// for (String filterExpression : knativeTrait.getFilters()) {
// String[] keyValue = filterExpression.split("=", 2);
// if (keyValue.length != 2) {
// throw new RuntimeCamelException("Invalid Knative trigger filter expression: %s".formatted(filterExpression));
// }
// filterAttributes.put(keyValue[0].trim(), keyValue[1].trim());
// }
//
// if (!filterAttributes.containsKey("type") && Optional.ofNullable(knativeTrait.getFilterEventType()).orElse(true) && ObjectHelper.isNotEmpty(eventType)) {
// // Apply default trigger filter attribute for the event type
// filterAttributes.put("type", eventType);
// }

if (knativeTrait.getFilters() != null) {
for (String filterExpression : knativeTrait.getFilters()) {
String[] keyValue = filterExpression.split("=", 2);
if (keyValue.length != 2) {
throw new RuntimeCamelException(
"Invalid Knative trigger filter expression: %s".formatted(filterExpression));
}
filterAttributes.put(keyValue[0].trim(), keyValue[1].trim());
}
}

if (!filterAttributes.containsKey("type") && Optional.ofNullable(knativeTrait.getFilterEventType()).orElse(true)
&& ObjectHelper.isNotEmpty(eventType)) {
// Apply default trigger filter attribute for the event type
filterAttributes.put("type", eventType);
}

return filterAttributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"auto", "channelSinks", "channelSources", "config", "enabled", "endpointSinks", "endpointSources",
"auto", "channelSinks", "channelSources", "configuration", "enabled", "endpointSinks", "endpointSources",
"eventSinks", "eventSources", "filterEventType", "filterSourceChannels", "filters", "namespaceLabel", "sinkBinding" })
public class Knative {
@JsonProperty("auto")
Expand All @@ -50,7 +50,7 @@ public class Knative {
@JsonPropertyDescription("Can be used to inject a Knative complete configuration in JSON format.")
@JsonSetter(
nulls = Nulls.SKIP)
private String config;
private String configuration;
@JsonProperty("enabled")
@JsonPropertyDescription("Can be used to enable or disable a trait. All traits share this common property.")
@JsonSetter(
Expand Down Expand Up @@ -129,12 +129,12 @@ public void setChannelSources(List<String> channelSources) {
this.channelSources = channelSources;
}

public String getConfig() {
return this.config;
public String getConfiguration() {
return this.configuration;
}

public void setConfig(String config) {
this.config = config;
public void setConfiguration(String configuration) {
this.configuration = configuration;
}

public Boolean getEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.dsl.jbang.core.commands.kubernetes;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Service;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
import org.apache.camel.dsl.jbang.core.common.RuntimeType;
import org.apache.camel.util.IOHelper;
import org.junit.jupiter.api.BeforeEach;
import picocli.CommandLine;

public class KubernetesExportBaseTest extends KubernetesBaseTest {

protected File workingDir;
protected String[] defaultArgs;

@BeforeEach
public void setup() {
super.setup();

try {
Path base = Paths.get("target");
workingDir = Files.createTempDirectory(base, "camel-k8s-export").toFile();
workingDir.deleteOnExit();
} catch (IOException e) {
throw new RuntimeCamelException(e);
}

defaultArgs = new String[] { "--dir=" + workingDir, "--quiet" };
}

protected KubernetesExport createCommand(String[] files, String... args) {
var argsArr = Optional.ofNullable(args).orElse(new String[0]);
var argsLst = new ArrayList<>(Arrays.asList(argsArr));
argsLst.addAll(Arrays.asList(defaultArgs));
KubernetesExport command = new KubernetesExport(new CamelJBangMain(), files);
CommandLine.populateCommand(command, argsLst.toArray(new String[0]));
return command;
}

protected boolean hasService(RuntimeType rt) throws IOException {
return getResource(rt, Service.class).isPresent();
}

protected boolean hasKnativeService(RuntimeType rt) throws IOException {
return getResource(rt, io.fabric8.knative.serving.v1.Service.class).isPresent();
}

protected <T extends HasMetadata> Optional<T> getResource(RuntimeType rt, Class<T> type) throws IOException {
if (rt == RuntimeType.quarkus) {
try (FileInputStream fis
= new FileInputStream(
KubernetesHelper.getKubernetesManifest(ClusterType.KUBERNETES.name(),
new File(workingDir, "/src/main/kubernetes")))) {
List<HasMetadata> resources = kubernetesClient.load(fis).items();
return resources.stream()
.filter(it -> type.isAssignableFrom(it.getClass()))
.map(type::cast)
.findFirst();
}
}
if (rt == RuntimeType.springBoot || rt == RuntimeType.main) {
var kind = type.getSimpleName().toLowerCase();
File file = new File(workingDir, "src/main/jkube/%s.yml".formatted(kind));
if (file.isFile()) {
try (FileInputStream fis = new FileInputStream(file)) {
List<HasMetadata> resources = kubernetesClient.load(fis).items();
return resources.stream()
.filter(it -> type.isAssignableFrom(it.getClass()))
.map(type::cast)
.findFirst();
}
}
}
return Optional.empty();
}

protected String readResource(File workingDir, String path) throws IOException {
try (FileInputStream fis = new FileInputStream(workingDir.toPath().resolve(path).toFile())) {
return IOHelper.loadText(fis);
}
}

protected Properties getApplicationProperties(File workingDir) throws IOException {
String content = readResource(workingDir, "src/main/resources/application.properties");
Properties applicationProperties = new Properties();
applicationProperties.load(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)));
return applicationProperties;
}
}
Loading
Loading