Skip to content

Commit

Permalink
Abstracting out TransactionManager reflections and direct field acces…
Browse files Browse the repository at this point in the history
…s logic
  • Loading branch information
jeffxiang committed Aug 13, 2024
1 parent 6c36655 commit 8a728e4
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 58 deletions.
39 changes: 34 additions & 5 deletions psc/src/main/java/com/pinterest/psc/producer/PscProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -48,7 +50,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class PscProducer<K, V> implements AutoCloseable {
public class PscProducer<K, V> implements Closeable {
private static final PscLogger logger = PscLogger.getLogger(PscProducer.class);

static {
Expand Down Expand Up @@ -438,6 +440,22 @@ protected PscProducerTransactionalProperties initTransactions(String topicUriStr

TopicUri topicUri = validateTopicUri(topicUriString);
PscBackendProducer<K, V> backendProducer = getBackendProducerForTopicUri(topicUri);
initTransactions(backendProducer);
return backendProducer.getTransactionalProperties();
}

/**
* Initializes all backend producers of this PscProducer to be transactional ready.
* @throws ProducerException
*/
public void initTransactions() throws ProducerException {
ensureOpen();
for (PscBackendProducer<K, V> backendProducer : backendProducers) {
initTransactions(backendProducer);
}
}

private void initTransactions(PscBackendProducer<K, V> backendProducer) throws ProducerException {
if (!transactionalStateByBackendProducer.get(backendProducer).equals(TransactionalState.NON_TRANSACTIONAL) &&
!transactionalStateByBackendProducer.get(backendProducer).equals(TransactionalState.INIT_AND_BEGUN))
throw new ProducerException("Invalid transaction state: initializing transactions works only once for a PSC producer.");
Expand All @@ -456,7 +474,6 @@ protected PscProducerTransactionalProperties initTransactions(String topicUriStr
}

this.beginTransaction();
return backendProducer.getTransactionalProperties();
}

/**
Expand Down Expand Up @@ -671,6 +688,14 @@ public Set<Object> getTransactionManagers() throws ProducerException {
return transactionManagers;
}

@InterfaceStability.Evolving
protected Object getExactlyOneTransactionManager() throws ProducerException {
Set<Object> transactionManagers = getTransactionManagers();
if (transactionManagers.size() != 1)
throw new ProducerException("Expected exactly one transaction manager, but found " + transactionManagers.size());
return transactionManagers.iterator().next();
}

/**
* This API is added due to a dependency by Flink connector, and should not be normally used by a typical producer.
*
Expand Down Expand Up @@ -728,10 +753,14 @@ public void flush() throws ProducerException {
/**
* Closes this PscProducer instance.
*
* @throws ProducerException if closing some backend producer fails
*/
public void close() throws ProducerException {
close(Duration.ofMillis(Long.MAX_VALUE));
@Override
public void close() {
try {
close(Duration.ofMillis(Long.MAX_VALUE));
} catch (ProducerException e) {
throw new RuntimeException("Failed to close PscProducer", e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.pinterest.psc.producer.PscBackendProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import com.pinterest.psc.producer.transaction.TransactionManagerUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -731,42 +732,9 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran
handleUninitializedKafkaProducer("resumeTransaction()");

try {
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
Object transactionManager = getTransactionManager();
synchronized (kafkaProducer) {
Object topicPartitionBookkeeper =
PscCommon.getField(transactionManager, "topicPartitionBookkeeper");

PscCommon.invoke(
transactionManager,
"transitionTo",
PscCommon.getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"
)
);

PscCommon.invoke(topicPartitionBookkeeper, "reset");

Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
PscCommon.setField(producerIdAndEpoch, "producerId", pscProducerTransactionalProperties.getProducerId());
PscCommon.setField(producerIdAndEpoch, "epoch", pscProducerTransactionalProperties.getEpoch());

PscCommon.invoke(
transactionManager,
"transitionTo",
PscCommon.getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"
)
);

PscCommon.invoke(
transactionManager,
"transitionTo",
PscCommon.getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"
)
);

PscCommon.setField(transactionManager, "transactionStarted", true);
TransactionManagerUtils.resumeTransaction(transactionManager, pscProducerTransactionalProperties);
}
} catch (Exception exception) {
handleException(exception, true);
Expand All @@ -778,11 +746,10 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr
if (kafkaProducer == null)
handleUninitializedKafkaProducer("getTransactionalProperties()");

Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
Object transactionManager = getTransactionManager();
return new PscProducerTransactionalProperties(
(long) PscCommon.getField(producerIdAndEpoch, "producerId"),
(short) PscCommon.getField(producerIdAndEpoch, "epoch")
TransactionManagerUtils.getProducerId(transactionManager),
TransactionManagerUtils.getEpoch(transactionManager)
);
}

Expand All @@ -798,30 +765,38 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr
}

private long getProducerId() {
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
return (long) PscCommon.getField(producerIdAndEpoch, "producerId");
try {
Object transactionManager = getTransactionManager();
return TransactionManagerUtils.getProducerId(transactionManager);
} catch (ProducerException e) {
throw new RuntimeException("Unable to get producerId", e);
}
}

private void setProducerId(long producerId) {
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
if (transactionManager != null) {
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
PscCommon.setField(producerIdAndEpoch, "producerId", producerId);
try {
Object transactionManager = getTransactionManager();
TransactionManagerUtils.setProducerId(transactionManager, producerId);
} catch (ProducerException e) {
throw new RuntimeException("Unable to set producerId", e);
}
}

public short getEpoch() {
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
return (short) PscCommon.getField(producerIdAndEpoch, "epoch");
try {
Object transactionManager = getTransactionManager();
return TransactionManagerUtils.getEpoch(transactionManager);
} catch (ProducerException e) {
throw new RuntimeException("Unable to get epoch", e);
}
}

private void setEpoch(short epoch) {
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
if (transactionManager != null) {
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
PscCommon.setField(producerIdAndEpoch, "epoch", epoch);
try {
Object transactionManager = getTransactionManager();
TransactionManagerUtils.setEpoch(transactionManager, epoch);
} catch (ProducerException e) {
throw new RuntimeException("Unable to set epoch", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package com.pinterest.psc.producer.transaction;

import com.pinterest.psc.common.PscCommon;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class KafkaTransactionManagerOperator implements TransactionManagerOperator {

private static final String KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
private static final String TRANSACTION_MANAGER_STATE_ENUM =
"org.apache.kafka.clients.producer.internals.TransactionManager$State";

private ProducerIdAndEpoch getProducerIdAndEpoch(Object transactionManager) {
ProducerIdAndEpoch producerIdAndEpoch = (ProducerIdAndEpoch) PscCommon.getField(transactionManager, KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME);
if (producerIdAndEpoch == null) {
throw new IllegalStateException("ProducerIdAndEpoch is null");
}
return producerIdAndEpoch;
}

@Override
public short getEpoch(Object transactionManager) {
ProducerIdAndEpoch producerIdAndEpoch = getProducerIdAndEpoch(transactionManager);
return (short) PscCommon.getField(producerIdAndEpoch, "epoch");
}

@Override
public String getTransactionId(Object transactionManager) {
return (String) PscCommon.getField(transactionManager, "transactionalId");
}

@Override
public long getProducerId(Object transactionManager) {
ProducerIdAndEpoch producerIdAndEpoch = getProducerIdAndEpoch(transactionManager);
return (long) PscCommon.getField(producerIdAndEpoch, "producerId");
}

@Override
public void setEpoch(Object transactionManager, short epoch) {
ProducerIdAndEpoch producerIdAndEpoch = getProducerIdAndEpoch(transactionManager);
PscCommon.setField(producerIdAndEpoch, "epoch", epoch);
}

@Override
public void setTransactionId(Object transactionManager, String transactionId) {
PscCommon.setField(transactionManager, "transactionalId", transactionId);
PscCommon.setField(
transactionManager,
"currentState",
getTransactionManagerState("UNINITIALIZED"));
}

@Override
public void setProducerId(Object transactionManager, long producerId) {
ProducerIdAndEpoch producerIdAndEpoch = getProducerIdAndEpoch(transactionManager);
PscCommon.setField(producerIdAndEpoch, "producerId", producerId);
}

@Override
public Future<Boolean> enqueueInFlightTransactions(Object transactionManager) {
TransactionalRequestResult result = enqueueNewPartitions(transactionManager);
return new Future<Boolean>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return result.isCompleted();
}

@Override
public Boolean get() {
result.await();
return result.isSuccessful();
}

@Override
public Boolean get(long timeout, TimeUnit unit) {
result.await(timeout, unit);
return result.isSuccessful();
}
};
}

private Enum<?> getTransactionManagerState(String enumName) {
try {
Class<Enum> cl = (Class<Enum>) Class.forName(TRANSACTION_MANAGER_STATE_ENUM);
return Enum.valueOf(cl, enumName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}

/**
* Enqueues new transactions at the transaction manager and returns a {@link
* TransactionalRequestResult} that allows waiting on them.
*
* <p>If there are no new transactions we return a {@link TransactionalRequestResult} that is
* already done.
*/
private TransactionalRequestResult enqueueNewPartitions(Object transactionManager) {
Object newPartitionsInTransaction =
PscCommon.getField(transactionManager, "newPartitionsInTransaction");
Object newPartitionsInTransactionIsEmpty =
PscCommon.invoke(newPartitionsInTransaction, "isEmpty");
TransactionalRequestResult result;
if (newPartitionsInTransactionIsEmpty instanceof Boolean
&& !((Boolean) newPartitionsInTransactionIsEmpty)) {
Object txnRequestHandler =
PscCommon.invoke(transactionManager, "addPartitionsToTransactionHandler");
PscCommon.invoke(
transactionManager,
"enqueueRequest",
new Class[] {txnRequestHandler.getClass().getSuperclass()},
new Object[] {txnRequestHandler});
result =
(TransactionalRequestResult)
PscCommon.getField(
txnRequestHandler,
txnRequestHandler.getClass().getSuperclass(),
"result");
} else {
// we don't have an operation but this operation string is also used in
// addPartitionsToTransactionHandler.
result = new TransactionalRequestResult("AddPartitionsToTxn");
result.done();
}
return result;
}

@Override
public void resumeTransaction(Object transactionManager, PscProducerTransactionalProperties transactionalProperties) {
Object topicPartitionBookkeeper =
PscCommon.getField(transactionManager, "topicPartitionBookkeeper");

transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
PscCommon.invoke(topicPartitionBookkeeper, "reset");

PscCommon.setField(
transactionManager,
KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME,
createProducerIdAndEpoch(transactionalProperties.getProducerId(), transactionalProperties.getEpoch()));

transitionTransactionManagerStateTo(transactionManager, "READY");

transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
PscCommon.setField(transactionManager, "transactionStarted", true);
}

private ProducerIdAndEpoch createProducerIdAndEpoch(long producerId, short epoch) {
try {
Field field =
TransactionManager.class.getDeclaredField(KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME);
Class<?> clazz = field.getType();
Constructor<?> constructor = clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
constructor.setAccessible(true);
return (ProducerIdAndEpoch) constructor.newInstance(producerId, epoch);
} catch (InvocationTargetException
| InstantiationException
| IllegalAccessException
| NoSuchFieldException
| NoSuchMethodException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}

private void transitionTransactionManagerStateTo(
Object transactionManager, String state) {
PscCommon.invoke(transactionManager, "transitionTo", getTransactionManagerState(state));
}
}
Loading

0 comments on commit 8a728e4

Please sign in to comment.