Skip to content

Commit

Permalink
kafka:fix issue 7123 Kafka events fail to post
Browse files Browse the repository at this point in the history
Motivation

After we upgraded to 8.2 we no longer are getting events into Kafka. We have 3 dCache instances. One 7.2 remaining still publishing to Kafka with no issue. (dCache#7123).

The issue is that according to spring-projects/spring-kafka#2251,  the kafka-clients provide no hooks to determine that a send failed because the broker is down (spring-projects/spring-kafka#2250).
This is still not fixed so this should be fixed.

Modification

Change the LoggingProducerListener so that when TimeoutException will be catch, the error message will indicate that there is a connection issue or the broker is down.

Result

Log looks like this

24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (NFS-localhost) [] Producer failed to send the message, the broker is down or the connection was refused

or

24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 17:27:51 (pool_write) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag NFS-localhost PoolAcceptFile 0000C9CFA47686574B43B1EF9CF037A24780] Topic billing not present in metadata after 60000 ms.
24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] TEST Topic billing not present in metadata after 60000 ms. class org.springframework.kafka.KafkaException
24 Apr 2023 17:28:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqZqubA pool_write DoorTransferFinished 00002B30ED198C494F25A31F589AB91F903F] Producer failed to send the message, the broker is down or the co

Target: master
8.2, 9.0
 Require-book: no
 Require-notes: yes
 Patch: https://rb.dcache.org/r/13967/
 Acked-by: Lea Morschel, Abert Rossi, Tigran Mkrtchyan
  • Loading branch information
mksahakyan committed May 6, 2023
1 parent 0a28438 commit f27e328
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.dcache.kafka;

import com.google.common.base.Throwables;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
Expand All @@ -35,7 +37,13 @@ public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata
@Override
public void onError(ProducerRecord<K, V> producerRecord,
@Nullable RecordMetadata recordMetadata, Exception exception) {
LOGGER.error("Producer exception occurred while publishing message : {}, exception : {}",
producerRecord, exception.toString());
if (exception instanceof TimeoutException && exception.getCause() == null) {
LOGGER.error("Producer failed to send the message,"
+ " the broker is down or the connection was refused ");
} else {
LOGGER.error(
"Producer exception occurred while publishing message : {}, exception : {}",
producerRecord, Throwables.getRootCause(exception).getMessage());
}
}
}

0 comments on commit f27e328

Please sign in to comment.