Skip to content

Commit

Permalink
Merge pull request #202 from thingsboard/feature/kafka-integration
Browse files Browse the repository at this point in the history
Kafka integration
  • Loading branch information
dmytro-landiak authored Feb 28, 2025
2 parents f3c7658 + 145bb6f commit 5cb2c0f
Show file tree
Hide file tree
Showing 39 changed files with 1,429 additions and 267 deletions.
7 changes: 7 additions & 0 deletions application/src/main/resources/thingsboard-mqtt-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,13 @@ queue:
additional-consumer-config: "${TB_KAFKA_IE_DOWNLINK_HTTP_ADDITIONAL_CONSUMER_CONFIG:}"
# Additional Kafka producer configs separated by semicolon for `tbmq.ie.downlink.http` topic
additional-producer-config: "${TB_KAFKA_IE_DOWNLINK_HTTP_ADDITIONAL_PRODUCER_CONFIG:}"
kafka:
# Kafka topic properties separated by semicolon for `tbmq.ie.downlink.kafka` topic
topic-properties: "${TB_KAFKA_IE_DOWNLINK_KAFKA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1}"
# Additional Kafka consumer configs separated by semicolon for `tbmq.ie.downlink.kafka` topic
additional-consumer-config: "${TB_KAFKA_IE_DOWNLINK_KAFKA_ADDITIONAL_CONSUMER_CONFIG:}"
# Additional Kafka producer configs separated by semicolon for `tbmq.ie.downlink.kafka` topic
additional-producer-config: "${TB_KAFKA_IE_DOWNLINK_KAFKA_ADDITIONAL_PRODUCER_CONFIG:}"
integration-uplink:
# Topic for sending messages/events from integration executors to tbmq
topic: "${TB_KAFKA_IE_UPLINK_TOPIC:tbmq.ie.uplink}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public enum IntegrationType {

HTTP,
MQTT,
// MQTT,
KAFKA,
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
package org.thingsboard.mqtt.broker.integration.api;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.mqtt.broker.common.data.event.ErrorEvent;
import org.thingsboard.mqtt.broker.common.data.exception.ThingsboardException;
import org.thingsboard.mqtt.broker.common.data.integration.Integration;
import org.thingsboard.mqtt.broker.common.data.integration.IntegrationLifecycleMsg;
import org.thingsboard.mqtt.broker.common.data.util.StringUtils;
import org.thingsboard.mqtt.broker.common.util.JacksonUtil;
import org.thingsboard.mqtt.broker.gen.integration.PublishIntegrationMsgProto;
import org.thingsboard.mqtt.broker.gen.queue.PublishMsgProto;
import org.thingsboard.mqtt.broker.integration.api.data.ContentType;
import org.thingsboard.mqtt.broker.integration.api.data.UplinkMetaData;
import org.thingsboard.mqtt.broker.integration.api.util.ExceptionUtil;
import org.thingsboard.mqtt.broker.queue.util.IntegrationProtoConverter;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -61,8 +67,15 @@ public void init(TbIntegrationInitParams params) throws Exception {
}
}

protected ContentType getDefaultUplinkContentType() {
return ContentType.JSON;
@Override
public void destroy() {
stopProcessingPersistedMessages();
}

@Override
public void destroyAndClearData() {
stopProcessingPersistedMessages();
clearIntegrationMessages();
}

@Override
Expand Down Expand Up @@ -109,6 +122,80 @@ public UUID getIntegrationUuid() {
return lifecycleMsg.getIntegrationId();
}

protected void startProcessingIntegrationMessages(TbPlatformIntegration integration) {
context.startProcessingIntegrationMessages(integration);
}

protected void stopProcessingPersistedMessages() {
doStopProcessingPersistedMessages();

if (lifecycleMsg == null) {
log.debug("[{}] Integration was not initialized properly. Skip stopProcessingPersistedMessages", this.getClass());
return;
}
context.stopProcessingPersistedMessages(lifecycleMsg.getIntegrationId().toString());
}

protected void clearIntegrationMessages() {
if (lifecycleMsg == null) {
log.debug("[{}] Integration was not initialized properly. Skip clearIntegrationMessages", this.getClass());
return;
}
context.clearIntegrationMessages(lifecycleMsg.getIntegrationId().toString());
}

protected ObjectNode constructBody(PublishIntegrationMsgProto msg) {
PublishMsgProto publishMsgProto = msg.getPublishMsgProto();

ObjectNode request = JacksonUtil.newObjectNode();
request.put("payload", publishMsgProto.getPayload().toByteArray());
request.put("topicName", publishMsgProto.getTopicName());
request.put("clientId", publishMsgProto.getClientId());
request.put("eventType", "PUBLISH_MSG");
request.put("qos", publishMsgProto.getQos());
request.put("retain", publishMsgProto.getRetain());
request.put("tbmqIeNode", context.getServiceId());
request.put("tbmqNode", msg.getTbmqNode());
request.put("ts", msg.getTimestamp());
request.set("props", IntegrationProtoConverter.fromProto(publishMsgProto.getUserPropertiesList()));
request.set("metadata", JacksonUtil.valueToTree(metadataTemplate.getKvMap()));

return request;
}

protected void handleMsgProcessingFailure(Throwable throwable) {
integrationStatistics.incErrorsOccurred();
context.saveErrorEvent(getErrorEvent(throwable));
}

private ErrorEvent getErrorEvent(Throwable throwable) {
return ErrorEvent
.builder()
.entityId(lifecycleMsg.getIntegrationId())
.serviceId(context.getServiceId())
.method("onMsgProcess")
.error(getError(throwable))
.build();
}

private String getError(Throwable throwable) {
return throwable == null ? "Unspecified server error" : getRealErrorMsg(throwable);
}

private String getRealErrorMsg(Throwable throwable) {
if (StringUtils.isNotEmpty(throwable.getMessage())) {
return throwable.getMessage();
}
if (StringUtils.isNotEmpty(throwable.getCause().getMessage())) {
return throwable.getCause().getMessage();
}
return throwable.getCause().toString();
}

protected ContentType getDefaultUplinkContentType() {
return ContentType.JSON;
}

protected <T> T getClientConfiguration(Integration configuration, Class<T> clazz) {
JsonNode clientConfiguration = configuration.getConfiguration().get("clientConfiguration");
return getClientConfiguration(clientConfiguration, clazz);
Expand All @@ -135,6 +222,10 @@ protected void doCheckConnection(Integration integration, IntegrationContext ctx

}

protected void doStopProcessingPersistedMessages() {

}

protected static boolean isLocalNetworkHost(String host) {
try {
InetAddress address = InetAddress.getByName(host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.thingsboard.mqtt.broker.common.data.BasicCallback;
import org.thingsboard.mqtt.broker.common.data.event.ErrorEvent;
import org.thingsboard.mqtt.broker.common.data.integration.IntegrationLifecycleMsg;
import org.thingsboard.mqtt.broker.common.util.ListeningExecutor;

public interface IntegrationContext {

Expand All @@ -45,5 +46,7 @@ public interface IntegrationContext {

void saveErrorEvent(ErrorEvent errorEvent);

BasicCallback getCallback();
BasicCallback getCheckConnectionCallback();

ListeningExecutor getExternalCallExecutor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public static TbPlatformIntegration createPlatformIntegration(IntegrationType ty
return switch (type) {
case HTTP ->
newInstance("org.thingsboard.mqtt.broker.integration.service.integration.http.HttpIntegration");
case MQTT ->
newInstance("org.thingsboard.mqtt.broker.integration.service.integration.mqtt.MqttIntegration");
// case MQTT ->
// newInstance("org.thingsboard.mqtt.broker.integration.service.integration.mqtt.MqttIntegration");
case KAFKA ->
newInstance("org.thingsboard.mqtt.broker.integration.service.integration.kafka.KafkaIntegration");
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.queue.kafka.settings.integration;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "queue.kafka.integration-downlink.kafka")
public class KafkaIntegrationDownlinkKafkaSettings extends AbstractIntegrationDownlinkKafkaSettings {

private String topicProperties;
private String additionalProducerConfig;
private String additionalConsumerConfig;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.thingsboard.mqtt.broker.common.data.integration.IntegrationType;
import org.thingsboard.mqtt.broker.exception.ThingsboardRuntimeException;
import org.thingsboard.mqtt.broker.gen.integration.DownlinkIntegrationMsgProto;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.TbmqIntegrationExecutorComponent;
import org.thingsboard.mqtt.broker.queue.cluster.ServiceInfoProvider;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;

import java.util.HashMap;
import java.util.Map;

import static org.thingsboard.mqtt.broker.queue.constants.QueueConstants.TBMQ_IE_NOT_IMPLEMENTED;

@Component
Expand All @@ -34,17 +42,48 @@
@TbmqIntegrationExecutorComponent
public class ExecutorIntegrationDownlinkQueueProvider implements IntegrationDownlinkQueueProvider {

private final ServiceInfoProvider serviceInfoProvider;
private final @Lazy HttpIntegrationDownlinkQueueFactory httpIntegrationDownlinkQueueFactory;
private final @Lazy KafkaIntegrationDownlinkQueueFactory kafkaIntegrationDownlinkQueueFactory;

private Map<IntegrationType, TbQueueControlledOffsetConsumer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>>> consumers;

@PostConstruct
public void init() {
var supportedIntegrationTypes = serviceInfoProvider.getSupportedIntegrationTypes();
log.info("Initializing ExecutorIntegrationDownlinkQueueProvider: {}", supportedIntegrationTypes);
if (CollectionUtils.isEmpty(supportedIntegrationTypes)) {
return;
}
consumers = new HashMap<>();

String serviceId = serviceInfoProvider.getServiceId();
for (IntegrationType integrationType : supportedIntegrationTypes) {
consumers.put(integrationType, getConsumer(integrationType, serviceId));
}
}

private TbQueueControlledOffsetConsumer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>> getConsumer(IntegrationType integrationType, String serviceId) {
return switch (integrationType) {
case HTTP -> httpIntegrationDownlinkQueueFactory.createConsumer(serviceId);
case KAFKA -> kafkaIntegrationDownlinkQueueFactory.createConsumer(serviceId);
// case MQTT -> throw new ThingsboardRuntimeException("MQTT integration type is not yet implemented!");
default -> throw new ThingsboardRuntimeException("Unsupported integration type: " + integrationType);
};
}

@PreDestroy
public void destroy() {
// No need to destroy consumers here!
}

@Override
public TbQueueProducer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>> getIeDownlinkProducer(IntegrationType type) {
throw new RuntimeException(TBMQ_IE_NOT_IMPLEMENTED);
}

@Override
public Map<IntegrationType, TbQueueControlledOffsetConsumer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>>> getIeDownlinkConsumers() {
return consumers == null ? Map.of() : consumers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

import org.thingsboard.mqtt.broker.common.data.integration.IntegrationType;
import org.thingsboard.mqtt.broker.gen.integration.DownlinkIntegrationMsgProto;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;

import java.util.Map;

public interface IntegrationDownlinkQueueProvider {

TbQueueProducer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>> getIeDownlinkProducer(IntegrationType type);

Map<IntegrationType, TbQueueControlledOffsetConsumer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>>> getIeDownlinkConsumers();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.mqtt.broker.queue.provider.integration;

import org.thingsboard.mqtt.broker.gen.integration.DownlinkIntegrationMsgProto;
import org.thingsboard.mqtt.broker.queue.TbQueueControlledOffsetConsumer;
import org.thingsboard.mqtt.broker.queue.TbQueueProducer;
import org.thingsboard.mqtt.broker.queue.common.TbProtoQueueMsg;

public interface KafkaIntegrationDownlinkQueueFactory {

TbQueueProducer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>> createProducer(String serviceId);

TbQueueControlledOffsetConsumer<TbProtoQueueMsg<DownlinkIntegrationMsgProto>> createConsumer(String consumerId);

}
Loading

0 comments on commit 5cb2c0f

Please sign in to comment.