Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DefaultErrorHandler doesn't work with suspend methods #3618

Open
igloo12 opened this issue Nov 8, 2024 · 8 comments
Open

DefaultErrorHandler doesn't work with suspend methods #3618

igloo12 opened this issue Nov 8, 2024 · 8 comments

Comments

@igloo12
Copy link

igloo12 commented Nov 8, 2024

In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.2.4

Describe the bug

Suspend methods don't trigger the default error handler on exception

To Reproduce
I have a bean defined like so

    @Bean
    fun defaultErrorHandler(): DefaultErrorHandler {
        val defaultErrorHandler = DefaultErrorHandler()
        defaultErrorHandler.setBackOffFunction { _, exception ->
            if (exception is HttpRequestTimeoutException) {
                FixedBackOff(2000, FixedBackOff.UNLIMITED_ATTEMPTS)
            } else {
                FixedBackOff(2000, 10)
            }
        }
        return defaultErrorHandler
    }

If I listen like

    @KafkaListener(
        id = "item_updates",
        groupId = "groupid",
        topics = [TOPIC_ITEM_UPDATED],
    )
   suspend fun syncItemListener(message: String) {
                   throw HttpRequestTimeoutException("google.com", 100)
    }

The defaultErrorHandler is not triggered. If I remove the suspend it is triggered

    @KafkaListener(
        id = "item_updates",
        groupId = "groupid",
        topics = [TOPIC_ITEM_UPDATED],
    )
   fun syncItemListener(message: String) {
                   throw HttpRequestTimeoutException("google.com", 100)
    }

Expected behavior

The documentation seems to imply this should work

@sobychacko
Copy link
Contributor

@igloo12 We will take a look at this issue soon. In the meantime, could you put your code in a runnable sample app and share it with us?

@igloo12
Copy link
Author

igloo12 commented Nov 8, 2024

@sobychacko I have push the sample code here. I created two listeners one working one not

https://github.com/igloo12/kafka-suspend/blob/main/src/main/kotlin/com/chasecorp/kafkasuspend/KafkaSuspendApplication.kt

@sobychacko
Copy link
Contributor

sobychacko commented Nov 9, 2024

When using suspend functions, the exceptions thrown from those functions might be wrapped inside a different exception type by the Kotlin coroutine machinery. You don't have that problem in the non-suspend function since it goes through the standard Java exception propagation mechanism, which Spring Kafka detects correctly. Before we dig into this issue, can you try to change your DefaultErorrHandler bean so that it checks for the exception cause? Here is the rough idea.

@Bean
fun defaultErrorHandler(): DefaultErrorHandler {
    val defaultErrorHandler = DefaultErrorHandler()
    defaultErrorHandler.setBackOffFunction { _, exception ->
        val isTimeout = when {
            exception is HttpRequestTimeoutException -> true
            exception.cause is HttpRequestTimeoutException -> true
            else -> false
        }
        
        if (isTimeout) {
            FixedBackOff(2000, FixedBackOff.UNLIMITED_ATTEMPTS)
        } else {
            FixedBackOff(2000, 10)
        }
    }
    return defaultErrorHandler
}

As you can see, we first try to see if the exception is of type HttpRequestTimeoutException; if not, check the cause of the thrown exception. See if that has any effect on the situation, and let us know. Thanks!

@igloo12
Copy link
Author

igloo12 commented Nov 11, 2024 via email

@igloo12
Copy link
Author

igloo12 commented Nov 12, 2024

@sobychacko I did some poking. I think the error is getting caught here.


	protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
			Throwable t, Message<?> source) {

		try {
			handleException(request, acknowledgment, consumer, source,
					new ListenerExecutionFailedException(createMessagingErrorMessage(
							"Async Fail", source.getPayload()), t));
		}
		catch (Throwable ex) {
			this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
			acknowledge(acknowledgment);
		}
	}

And the DefaultErrorHandler is expecting this method to throw the exception

	protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
			final Message<?> message) { 

@sobychacko
Copy link
Contributor

@igloo12 Thats a good catch. We will take a look at that today.

@sobychacko
Copy link
Contributor

@igloo12 Unfortunately, this is not supported at the moment. The problem is that, for async return types, we cannot propagate the exception back to the container since the async processing happens in a separate thread. We have to address this as a new feature in the next version of Spring for Apache Kafka. For now, you have two options: use the error handler on the listener itself. See note at the end here: https://docs.spring.io/spring-kafka/reference/3.3-SNAPSHOT/kafka/receiving-messages/async-returns.html. You can add an errorHandler property to your KafkaListener. The downside to this approach is that you don't have things like retries etc. Or you can also try a new feature that was recently added for async retries. Docs are missing for this feature, but you can find more details here: #3276

Please let us know how those options work for you.

@igloo12
Copy link
Author

igloo12 commented Nov 13, 2024

@sobychacko Thanks for the reply. I need a stable release and retries for production so I will remove suspend from the functions and wait for the upcoming fix. Is there a ticket to track the new feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants