Skip to content

Commit

Permalink
Try to fix NPE in checkpoint recovery by deprecating KafkaProducerTra…
Browse files Browse the repository at this point in the history
…nsactionalProperties
  • Loading branch information
jeffxiang committed Dec 7, 2023
1 parent 2ac8e42 commit e6b4a15
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties;
import com.pinterest.psc.serde.ByteArraySerializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.pinterest.psc.producer.PscBackendProducer;
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import com.pinterest.psc.producer.PscProducerUtils;
import com.pinterest.psc.producer.creation.PscKafkaProducerCreator;
import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties;
import com.pinterest.psc.serde.ByteArraySerializer;
import com.pinterest.psc.serde.IntegerDeserializer;
import com.pinterest.psc.serde.IntegerSerializer;
Expand Down Expand Up @@ -434,7 +434,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
Collection<PscBackendProducer> backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer);
assertEquals(1, backendProducers.size());
PscBackendProducer backendProducer = backendProducers.iterator().next();
KafkaProducerTransactionalProperties transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
PscProducerTransactionalProperties transactionalProperties = backendProducer.getTransactionalProperties();
long producerId = transactionalProperties.getProducerId();
assertEquals(0, transactionalProperties.getEpoch());

Expand All @@ -450,7 +450,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer2);
assertEquals(1, backendProducers.size());
backendProducer = backendProducers.iterator().next();
transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
transactionalProperties = backendProducer.getTransactionalProperties();
// it should bump the epoch each time for the same producer id
assertEquals(producerId, transactionalProperties.getProducerId());
assertEquals(1, transactionalProperties.getEpoch());
Expand All @@ -463,7 +463,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer3);
assertEquals(1, backendProducers.size());
backendProducer = backendProducers.iterator().next();
transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
transactionalProperties = backendProducer.getTransactionalProperties();
assertEquals(producerId, transactionalProperties.getProducerId());
assertEquals(2, transactionalProperties.getEpoch());
pscProducer3.abortTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import com.pinterest.psc.producer.PscProducerTransactionalProperties;

/**
* Moved to PscProducerTransactionalProperties
*/
@Deprecated
public class KafkaProducerTransactionalProperties extends PscProducerTransactionalProperties {

public KafkaProducerTransactionalProperties(long producerId, short epoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,25 +716,14 @@ public void resumeTransaction(PscBackendProducer otherBackendProducer) throws Pr
);
}

resumeTransaction(new KafkaProducerTransactionalProperties(producerId, epoch));
resumeTransaction(new PscProducerTransactionalProperties(producerId, epoch));
}

@Override
public void resumeTransaction(PscProducerTransactionalProperties pscProducerTransactionalProperties) throws ProducerException {
if (kafkaProducer == null)
handleUninitializedKafkaProducer("resumeTransaction()");

if (!(pscProducerTransactionalProperties instanceof KafkaProducerTransactionalProperties)) {
handleException(
new BackendProducerException(
"[Kafka] Unexpected producer transaction state type: " + pscProducerTransactionalProperties.getClass().getCanonicalName(),
PscUtils.BACKEND_TYPE_KAFKA
), true
);
}

KafkaProducerTransactionalProperties kafkaProducerTransactionalProperties = (KafkaProducerTransactionalProperties) pscProducerTransactionalProperties;

try {
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
synchronized (kafkaProducer) {
Expand All @@ -752,8 +741,8 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran
PscCommon.invoke(topicPartitionBookkeeper, "reset");

Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
PscCommon.setField(producerIdAndEpoch, "producerId", kafkaProducerTransactionalProperties.getProducerId());
PscCommon.setField(producerIdAndEpoch, "epoch", kafkaProducerTransactionalProperties.getEpoch());
PscCommon.setField(producerIdAndEpoch, "producerId", pscProducerTransactionalProperties.getProducerId());
PscCommon.setField(producerIdAndEpoch, "epoch", pscProducerTransactionalProperties.getEpoch());

PscCommon.invoke(
transactionManager,
Expand Down Expand Up @@ -785,7 +774,7 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr

Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
return new KafkaProducerTransactionalProperties(
return new PscProducerTransactionalProperties(
(long) PscCommon.getField(producerIdAndEpoch, "producerId"),
(short) PscCommon.getField(producerIdAndEpoch, "epoch")
);
Expand Down

0 comments on commit e6b4a15

Please sign in to comment.