Skip to content

Commit

Permalink
Merge branch 'spring-projects:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
alograg authored Jun 9, 2023
2 parents 8a1d26b + 206e68f commit efa1472
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 12 deletions.
2 changes: 2 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/appendix.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot's dependency management.
If you wish to use a different version of `kafka-clients` or `kafka-streams`, and use the embedded kafka broker for testing, you need to override their version used by Spring Boot dependency management; set the `kafka.version` property.

NOTE: Default `kafka-clients` dependencies for Spring Boot 3.0.x and 3.1.x are 3.3.2 and 3.4.0 respectively.

Or, to use a different Spring for Apache Kafka version with a supported Spring Boot version, set the `spring-kafka.version` property.

====
Expand Down
2 changes: 1 addition & 1 deletion spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ To achieve more fine-grained control over how to handle non-blocking retrials fo
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ default void reset() {
* @since 2.5
*/
default Map<String, Object> getConfigurationProperties() {
throw new UnsupportedOperationException("This implementation doesn't support this method");
return Collections.emptyMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,7 @@
* @param <V> the value type.
*
* @author Mark Norkin
* @author Adrian Chlebosz
*
* @since 2.3.0
*/
Expand All @@ -61,6 +62,11 @@ public ReactiveKafkaConsumerTemplate(ReceiverOptions<K, V> receiverOptions) {
this.kafkaReceiver = KafkaReceiver.create(receiverOptions);
}

public ReactiveKafkaConsumerTemplate(KafkaReceiver<K, V> kafkaReceiver) {
Assert.notNull(kafkaReceiver, "Kafka receiver can not be null");
this.kafkaReceiver = kafkaReceiver;
}

public Flux<ReceiverRecord<K, V>> receive() {
return this.kafkaReceiver.receive();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@
* @param <V> the value type.
*
* @author Mark Norkin
* @author Adrian Chlebosz
*
* @since 2.3.0
*/
Expand All @@ -71,6 +72,17 @@ public ReactiveKafkaProducerTemplate(SenderOptions<K, V> senderOptions, RecordMe
this.messageConverter = messageConverter;
}

public ReactiveKafkaProducerTemplate(KafkaSender<K, V> sender) {
this(sender, new MessagingMessageConverter());
}

public ReactiveKafkaProducerTemplate(KafkaSender<K, V> sender, RecordMessageConverter messageConverter) {
Assert.notNull(sender, "Sender can not be null");
Assert.notNull(messageConverter, "Message converter can not be null");
this.sender = sender;
this.messageConverter = messageConverter;
}

public <T> Flux<SenderResult<T>> sendTransactionally(Publisher<? extends SenderRecord<K, V, T>> records) {
Flux<Flux<SenderResult<T>>> sendTransactionally = this.sender.sendTransactionally(Flux.just(records));
return sendTransactionally.flatMap(Function.identity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
public interface CommonErrorHandler extends DeliveryAttemptAware {

/**
* Return false if this error handler should only receive the current failed record;
* remaining records will be passed to the listener after the error handler returns.
* When true (default), all remaining records including the failed record are passed
* to the error handler.
* Return false (default) if this error handler should only receive the current failed
* record; remaining records will be passed to the listener after the error handler
* returns. When true, all remaining records including the failed record are passed to
* the error handler.
* @return false to receive only the failed record.
* @deprecated in favor of {@link #seeksAfterHandling()}.
* @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,7 @@
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
Expand All @@ -69,6 +70,7 @@
/**
* @author Mark Norkin
* @author Gary Russell
* @author Adrian Chlebosz
*
* @since 2.3.0
*/
Expand Down Expand Up @@ -132,17 +134,29 @@ public void tearDown() {
@Test
public void shouldNotCreateTemplateIfOptionsIsNull() {
assertThatIllegalArgumentException()
.isThrownBy(() -> new ReactiveKafkaProducerTemplate<String, String>(null))
.isThrownBy(() -> new ReactiveKafkaProducerTemplate<>((SenderOptions<String, String>) null))
.withMessage("Sender options can not be null");
}

@Test
public void shouldNotCreateTemplateIfSenderIsNull() {
assertThatIllegalArgumentException()
.isThrownBy(() -> new ReactiveKafkaProducerTemplate<>((KafkaSender<String, String>) null))
.withMessage("Sender can not be null");
}

@Test
@SuppressWarnings("unchecked")
public void shouldNotCreateTemplateIfConverterIsNull() {
assertThatIllegalArgumentException()
.isThrownBy(() ->
new ReactiveKafkaProducerTemplate<String, String>(Mockito.mock(SenderOptions.class), null))
.withMessage("Message converter can not be null");

assertThatIllegalArgumentException()
.isThrownBy(() ->
new ReactiveKafkaProducerTemplate<String, String>(Mockito.mock(KafkaSender.class), null))
.withMessage("Message converter can not be null");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.SenderOptions;
Expand All @@ -65,6 +66,7 @@
* @author Mark Norkin
* @author Gary Russell
* @author Will Kennedy
* @author Adrian Chlebosz
*
* @since 2.3.0
*/
Expand Down Expand Up @@ -138,10 +140,17 @@ public void tearDown() {
@Test
public void shouldNotCreateTemplateIfOptionsIsNull() {
assertThatIllegalArgumentException()
.isThrownBy(() -> new ReactiveKafkaConsumerTemplate<String, String>(null))
.isThrownBy(() -> new ReactiveKafkaConsumerTemplate<>((ReceiverOptions<String, String>) null))
.withMessage("Receiver options can not be null");
}

@Test
public void shouldNotCreateTemplateIfReceiverIsNull() {
assertThatIllegalArgumentException()
.isThrownBy(() -> new ReactiveKafkaConsumerTemplate<>((KafkaReceiver<String, String>) null))
.withMessage("Kafka receiver can not be null");
}

@Test
public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveIt() {
ProducerRecord<Integer, String> producerRecord =
Expand Down

0 comments on commit efa1472

Please sign in to comment.