Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] In KafkaMessageChannelBinder pass the extendedConsumerProperties as an argument to ListenerContainerWithDlqAndRetryCustomizer #2985

Open
LukeKynaston opened this issue Aug 8, 2024 · 7 comments
Assignees

Comments

@LukeKynaston
Copy link

LukeKynaston commented Aug 8, 2024

Dear devs,

In org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder#createConsumerEndpoint()

We have the following snippet:

	if (customizer instanceof ListenerContainerWithDlqAndRetryCustomizer) {

			BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver = createDestResolver(
					extendedConsumerProperties.getExtension());
			BackOff createBackOff = extendedConsumerProperties.getMaxAttempts() > 1
					? createBackOff(extendedConsumerProperties)
					: null;
			((ListenerContainerWithDlqAndRetryCustomizer) customizer)
					.configure(messageListenerContainer, destination.getName(), consumerGroup, destinationResolver,
							createBackOff);
		}
		else {
			((ListenerContainerCustomizer<Object>) customizer)
					.configure(messageListenerContainer, destination.getName(), consumerGroup);
		}

I would like to pass the extendedConsumerProperties as an argument to an overload of ListenerContainerWithDlqAndRetryCustomizer#configure().

It may be sensible to do the same for ListenerContainerCustomizer too.

This is to get access to all of the properties available, including the bindingName, which users can use to configure their custom error-handling strategy more easily.

Kind regards,

Luke

@olegz
Copy link
Contributor

olegz commented Aug 8, 2024

If I understand you correctly you can inject BindingServiceProperties and then execute getBindings where you'll get a map of BindingProperties

@LukeKynaston
Copy link
Author

Hi Oleg, thanks for getting back to me.

Yes, this is what we currently do, however, there are situations where it is not possible to uniquely identify a consumer based on the destination and group. i.e. If we have more than one consumer configured for a destination without a group specified i.e. an anonymous consumer group.

While we can filter by group and destination, as described above, this approach is cumbersome and error prone.

It'd be a lot easier to have the extendedConsumerProperties available in the configure() method.

@olegz
Copy link
Contributor

olegz commented Aug 8, 2024

We have AbstractExtendedBindingProperties.getExtendedConsumerProperties(bindingName). is that what you are looking for?

@olegz
Copy link
Contributor

olegz commented Aug 8, 2024

If you have access to the binder instance there is an extendedBindingProperties field. It is currently not accessible, but we can discuss it

@LukeKynaston
Copy link
Author

That's correct, I'm looking for access to the extendedBindingProperties field.

I think an overload of the ListenerContainerWithDlqAndRetryCustomizer#configure() method containing this field would be a good approach.

@olegz
Copy link
Contributor

olegz commented Aug 8, 2024

Ok, how would you know which binder instance are you using? I mean if we were to inject something or overload a method we would need to know that

@LukeKynaston
Copy link
Author

The KafkaMessageChannelBinder appears to configure an error handler sequentially for every binding.

Looking through the code we call org.springframework.cloud.stream.binding.AbstractBindingLifecycle#start()
Where we call this.bindables.values().forEach(this::doStartWithBindable);
So we should only ever have one binding available at a time in the extendedBindingProperties field.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants