diff --git a/spring-kafka-docs/src/main/asciidoc/appendix.adoc b/spring-kafka-docs/src/main/asciidoc/appendix.adoc index 534d733844..867a41523a 100644 --- a/spring-kafka-docs/src/main/asciidoc/appendix.adoc +++ b/spring-kafka-docs/src/main/asciidoc/appendix.adoc @@ -4,6 +4,8 @@ When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot's dependency management. If you wish to use a different version of `kafka-clients` or `kafka-streams`, and use the embedded kafka broker for testing, you need to override their version used by Spring Boot dependency management; set the `kafka.version` property. +NOTE: Default `kafka-clients` dependencies for Spring Boot 3.0.x and 3.1.x are 3.3.2 and 3.4.0 respectively. + Or, to use a different Spring for Apache Kafka version with a supported Spring Boot version, set the `spring-kafka.version` property. ==== diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index a088b76d9c..3b0641e3af 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -126,7 +126,7 @@ To achieve more fine-grained control over how to handle non-blocking retrials fo public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() - .fixedBackoff(3000) + .fixedBackOff(3000) .maxAttempts(5) .concurrency(1) .includeTopics("my-topic", "my-other-topic") diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java index b65802ac88..bb3dcdc732 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java @@ -123,7 +123,7 @@ default void reset() { * @since 2.5 */ default Map getConfigurationProperties() { - throw new UnsupportedOperationException("This implementation doesn't support this method"); + return Collections.emptyMap(); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java index 320c6b7c11..0dc15d7b60 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ * @param the value type. * * @author Mark Norkin + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -61,6 +62,11 @@ public ReactiveKafkaConsumerTemplate(ReceiverOptions receiverOptions) { this.kafkaReceiver = KafkaReceiver.create(receiverOptions); } + public ReactiveKafkaConsumerTemplate(KafkaReceiver kafkaReceiver) { + Assert.notNull(kafkaReceiver, "Kafka receiver can not be null"); + this.kafkaReceiver = kafkaReceiver; + } + public Flux> receive() { return this.kafkaReceiver.receive(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java index 904bedbbc4..e2e9af4771 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ * @param the value type. * * @author Mark Norkin + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -71,6 +72,17 @@ public ReactiveKafkaProducerTemplate(SenderOptions senderOptions, RecordMe this.messageConverter = messageConverter; } + public ReactiveKafkaProducerTemplate(KafkaSender sender) { + this(sender, new MessagingMessageConverter()); + } + + public ReactiveKafkaProducerTemplate(KafkaSender sender, RecordMessageConverter messageConverter) { + Assert.notNull(sender, "Sender can not be null"); + Assert.notNull(messageConverter, "Message converter can not be null"); + this.sender = sender; + this.messageConverter = messageConverter; + } + public Flux> sendTransactionally(Publisher> records) { Flux>> sendTransactionally = this.sender.sendTransactionally(Flux.just(records)); return sendTransactionally.flatMap(Function.identity()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java index e6dc715b25..0174eb1658 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java @@ -38,10 +38,10 @@ public interface CommonErrorHandler extends DeliveryAttemptAware { /** - * Return false if this error handler should only receive the current failed record; - * remaining records will be passed to the listener after the error handler returns. - * When true (default), all remaining records including the failed record are passed - * to the error handler. + * Return false (default) if this error handler should only receive the current failed + * record; remaining records will be passed to the listener after the error handler + * returns. When true, all remaining records including the failed record are passed to + * the error handler. * @return false to receive only the failed record. * @deprecated in favor of {@link #seeksAfterHandling()}. * @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java index fd63140bce..bd9f1e9896 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,6 +60,7 @@ import reactor.core.publisher.Mono; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; import reactor.kafka.sender.SenderResult; @@ -69,6 +70,7 @@ /** * @author Mark Norkin * @author Gary Russell + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -132,10 +134,17 @@ public void tearDown() { @Test public void shouldNotCreateTemplateIfOptionsIsNull() { assertThatIllegalArgumentException() - .isThrownBy(() -> new ReactiveKafkaProducerTemplate(null)) + .isThrownBy(() -> new ReactiveKafkaProducerTemplate<>((SenderOptions) null)) .withMessage("Sender options can not be null"); } + @Test + public void shouldNotCreateTemplateIfSenderIsNull() { + assertThatIllegalArgumentException() + .isThrownBy(() -> new ReactiveKafkaProducerTemplate<>((KafkaSender) null)) + .withMessage("Sender can not be null"); + } + @Test @SuppressWarnings("unchecked") public void shouldNotCreateTemplateIfConverterIsNull() { @@ -143,6 +152,11 @@ public void shouldNotCreateTemplateIfConverterIsNull() { .isThrownBy(() -> new ReactiveKafkaProducerTemplate(Mockito.mock(SenderOptions.class), null)) .withMessage("Message converter can not be null"); + + assertThatIllegalArgumentException() + .isThrownBy(() -> + new ReactiveKafkaProducerTemplate(Mockito.mock(KafkaSender.class), null)) + .withMessage("Message converter can not be null"); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java index db7ae084ce..61d711724d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.receiver.ReceiverRecord; import reactor.kafka.sender.SenderOptions; @@ -65,6 +66,7 @@ * @author Mark Norkin * @author Gary Russell * @author Will Kennedy + * @author Adrian Chlebosz * * @since 2.3.0 */ @@ -138,10 +140,17 @@ public void tearDown() { @Test public void shouldNotCreateTemplateIfOptionsIsNull() { assertThatIllegalArgumentException() - .isThrownBy(() -> new ReactiveKafkaConsumerTemplate(null)) + .isThrownBy(() -> new ReactiveKafkaConsumerTemplate<>((ReceiverOptions) null)) .withMessage("Receiver options can not be null"); } + @Test + public void shouldNotCreateTemplateIfReceiverIsNull() { + assertThatIllegalArgumentException() + .isThrownBy(() -> new ReactiveKafkaConsumerTemplate<>((KafkaReceiver) null)) + .withMessage("Kafka receiver can not be null"); + } + @Test public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveIt() { ProducerRecord producerRecord =