From ecdfbbfe58cd648c2efada77d1536aec6f8d929f Mon Sep 17 00:00:00 2001 From: Otavio Rodolfo Piske Date: Fri, 19 Apr 2024 10:06:01 +0200 Subject: [PATCH] CAMEL-20701: added a custom deserializer to be used when interoperating with JMS --- .../src/main/docs/kafka-component.adoc | 30 ++++++- .../support/interop/JMSDeserializer.java | 78 +++++++++++++++++++ 2 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index ae1662424893a..a00f0e78863e6 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -656,13 +656,41 @@ public class CustomSubscribeAdapter implements SubscribeAdapter { Then, it is necessary to add it as named bean instance to the registry: - [source,java] .Add to registry example ---- context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter()); ---- +== Interoperability + +=== JMS + +When interoperating Kafka and JMS, it may be necessary to coerce the JMS headers into their expected type. + +For instance, when consuming messages from Kafka carrying JMS headers and then sending them to a JMS broker, those headers are +first deserialized into a byte array. Then, the `camel-jms` component tries to coerce this byte array into the +specific type used by. +However, both the origin endpoint as well as how this was setup on the code itsef may affect how the data is serialized and +deserialized. As such, it is not feasible to naively assume the data type of the byte array. + +To address this issue, we provide a custom header deserializer to force Kafka to de-serialize the JMS data according to +the JMS specification. This approach ensures that the headers are properly interpreted and processed by the camel-jms component. + +Due to the inherent complexity of each possible system and endpoint, it may not be possible for this deserializer to cover all +possible scenarios. As such, it is provided as model that can be modified and/or adapted for the specific needs of each application. + +To utilize this solution, you need to modify the route URI on the consumer end of the pipeline by including the +`headerDeserializer` option. +For example: + +[source,java] +.Route snippet +---- +from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer") + .to("..."); +---- + include::spring-boot:partial$starter.adoc[] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java new file mode 100644 index 0000000000000..61a489d81af42 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support.interop; + +import java.nio.ByteBuffer; + +import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; + +public class JMSDeserializer implements KafkaHeaderDeserializer { + + public boolean isLong(byte[] bytes) { + return bytes.length == Long.BYTES; + } + + private static long bytesToLong(byte[] bytes) { + final ByteBuffer buffer = toByteBuffer(bytes, Long.BYTES); + return buffer.getLong(); + } + + private static int bytesToInt(byte[] bytes) { + final ByteBuffer buffer = toByteBuffer(bytes, Integer.BYTES); + return buffer.getInt(); + } + + private static ByteBuffer toByteBuffer(byte[] bytes, int size) { + ByteBuffer buffer = ByteBuffer.allocate(size); + buffer.put(bytes); + buffer.flip(); + return buffer; + } + + @Override + public Object deserialize(String key, byte[] value) { + if (key.startsWith("JMS")) { + switch (key) { + case "JMSDestination": + return new String(value); + case "JMSDeliveryMode": + return bytesToInt(value); + case "JMSTimestamp": + return bytesToLong(value); + case "JMSCorrelationID": + return value; + case "JMSReplyTo": + return new String(value); + case "JMSRedelivered": + return bytesToInt(value); + case "JMSType": + return new String(value); + case "JMSExpiration": + return isLong(value) ? bytesToLong(value) : bytesToInt(value); + case "JMSPriority": + return bytesToInt(value); + case "JMSMessageID": + return new String(value); + default: + return value; + } + } + + return value; + } +}