Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstracting out logic for TransactionManager reflections and direct field access; Disallow >1 backendProducers to be created when PscProducer is transactional #43

Merged
merged 11 commits into from
Aug 27, 2024
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-java-oss</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>psc-java-oss</name>
<modules>
Expand Down
2 changes: 1 addition & 1 deletion psc-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
4 changes: 2 additions & 2 deletions psc-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>psc-examples</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<name>psc-examples</name>

<properties>
Expand Down
4 changes: 2 additions & 2 deletions psc-flink-logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>psc-flink-logging</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>

<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-java-oss</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>psc-flink</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import com.pinterest.psc.producer.transaction.TransactionManagerUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
Expand Down Expand Up @@ -165,22 +166,20 @@ public String getTransactionalId() {

public long getProducerId(PscProducerMessage pscProducerMessage) throws ProducerException {
Object transactionManager = super.getTransactionManager(pscProducerMessage);
Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch");
return (long) getField(producerIdAndEpoch, "producerId");
return TransactionManagerUtils.getProducerId(transactionManager);
}

public short getEpoch(PscProducerMessage pscProducerMessage) throws ProducerException {
Object transactionManager = super.getTransactionManager(pscProducerMessage);
Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch");
return (short) getField(producerIdAndEpoch, "epoch");
return TransactionManagerUtils.getEpoch(transactionManager);
}

@VisibleForTesting
public Set<Integer> getTransactionCoordinatorIds() throws ProducerException {
Set<Integer> coordinatorIds = new HashSet<>();
super.getTransactionManagers().forEach(transactionManager ->
coordinatorIds.add(
((Node) invoke(transactionManager, "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION)).id()
TransactionManagerUtils.getTransactionCoordinatorId(transactionManager)
)
);
return coordinatorIds;
Expand All @@ -200,9 +199,15 @@ private void ensureNotClosed() {
*/
private void flushNewPartitions() throws ProducerException {
LOG.info("Flushing new partitions");
Set<TransactionalRequestResult> results = enqueueNewPartitions();
Set<Future<Boolean>> results = enqueueNewPartitions();
super.wakeup();
results.forEach(TransactionalRequestResult::await);
results.forEach(future -> {
try {
future.get();
} catch (Exception e) {
throw new RuntimeException("Error while flushing new partitions", e);
}
});
}

/**
Expand All @@ -212,97 +217,17 @@ private void flushNewPartitions() throws ProducerException {
* <p>If there are no new transactions we return a {@link TransactionalRequestResult} that is
* already done.
*/
private Set<TransactionalRequestResult> enqueueNewPartitions() throws ProducerException {
Set<TransactionalRequestResult> transactionalRequestResults = new HashSet<>();
private Set<Future<Boolean>> enqueueNewPartitions() throws ProducerException {
Set<Future<Boolean>> transactionalRequestResults = new HashSet<>();
Set<Object> transactionManagers = super.getTransactionManagers();
for (Object transactionManager : transactionManagers) {
synchronized (transactionManager) {
Object newPartitionsInTransaction = getField(transactionManager, "newPartitionsInTransaction");
Object newPartitionsInTransactionIsEmpty = invoke(newPartitionsInTransaction, "isEmpty");
TransactionalRequestResult result;
if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
result = (TransactionalRequestResult) getField(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
} else {
// we don't have an operation but this operation string is also used in
// addPartitionsToTransactionHandler.
result = new TransactionalRequestResult("AddPartitionsToTxn");
result.done();
}
transactionalRequestResults.add(result);
Future<Boolean> transactionalRequestResultFuture =
TransactionManagerUtils.enqueueInFlightTransactions(transactionManager);
transactionalRequestResults.add(transactionalRequestResultFuture);
}
}
return transactionalRequestResults;
}

protected static Enum<?> getEnum(String enumFullName) {
String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
if (x.length == 2) {
String enumClassName = x[0];
String enumName = x[1];
try {
Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
return Enum.valueOf(cl, enumName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}
return null;
}

protected static Object invoke(Object object, String methodName, Object... args) {
Class<?>[] argTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
argTypes[i] = args[i].getClass();
}
return invoke(object, methodName, argTypes, args);
}

private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
try {
Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
method.setAccessible(true);
return method.invoke(object, args);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Incompatible PscProducer version", e);
}
}

/**
* Gets and returns the field {@code fieldName} from the given Object {@code object} using
* reflection.
*/
protected static Object getField(Object object, String fieldName) {
return getField(object, object.getClass(), fieldName);
}

/**
* Gets and returns the field {@code fieldName} from the given Object {@code object} using
* reflection.
*/
private static Object getField(Object object, Class<?> clazz, String fieldName) {
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(object);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}

/**
* Sets the field {@code fieldName} on the given Object {@code object} to {@code value} using
* reflection.
*/
protected static void setField(Object object, String fieldName, Object value) {
try {
Field field = object.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(object, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}

}
2 changes: 1 addition & 1 deletion psc-integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public class TestMultiKafkaClusterBackends {
private static final int partitions1 = 12;
private static final String topic2 = "topic2";
private static final int partitions2 = 24;
private static final String topic3 = "topic3";
private static final int partitions3 = 36;
private KafkaCluster kafkaCluster1, kafkaCluster2;
private String topicUriStr1, topicUriStr2;
private String topicUriStr1, topicUriStr2, topicUriStr3;

/**
* Initializes two Kafka clusters that are commonly used by all tests, and creates a single topic on each.
Expand All @@ -61,10 +63,14 @@ public void setup() throws IOException, InterruptedException {

kafkaCluster2 = new KafkaCluster("plaintext", "region2", "cluster2", 9092);
topicUriStr2 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s",
kafkaCluster2.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster2.getRegion(), kafkaCluster2.getCluster(), topic1);
kafkaCluster2.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster2.getRegion(), kafkaCluster2.getCluster(), topic2);

topicUriStr3 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s",
kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic3);

PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic1, partitions1);
PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic2, partitions2);
PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic3, partitions3);
}

/**
Expand All @@ -78,12 +84,16 @@ public void setup() throws IOException, InterruptedException {
public void tearDown() throws ExecutionException, InterruptedException {
PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic1);
PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic2);
PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic3);
Thread.sleep(1000);
}

/**
* Verifies that backend producers each have their own transactional states that could be different at times.
*
* Also, verifies that the PscProducer throws the appropriate exception when trying to send messages via a
* new backend producer while the PscProducer is already transactional.
*
* @throws ConfigurationException
* @throws ProducerException
*/
Expand All @@ -100,19 +110,43 @@ public void testTransactionalProducersStates() throws ConfigurationException, Pr
PscBackendProducer<Integer, Integer> backendProducer1 = pscProducer.getBackendProducer(topicUriStr1);
assertEquals(PscProducer.TransactionalState.BEGUN, pscProducer.getBackendProducerState(backendProducer1));

Exception e = assertThrows(ProducerException.class, () -> pscProducer.beginTransaction());
assertEquals("Invalid transaction state: consecutive calls to beginTransaction().", e.getMessage());
PscProducerMessage<Integer, Integer> producerMessageTopic1 = new PscProducerMessage<>(topicUriStr1, 0);
pscProducer.send(producerMessageTopic1);
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));

PscProducerMessage<Integer, Integer> producerMessageTopic3 = new PscProducerMessage<>(topicUriStr3, 1);
pscProducer.send(producerMessageTopic3);
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));

assertEquals(1, pscProducer.getBackendProducers().size()); // topic1 and topic3 belong to same cluster so there should only be one backend producer at this point
assertEquals(backendProducer1, pscProducer.getBackendProducers().iterator().next());

assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));
assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());

pscProducer.commitTransaction();

assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.READY, pscProducer.getBackendProducerState(backendProducer1));

pscProducer.beginTransaction();

assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.BEGUN, pscProducer.getBackendProducerState(backendProducer1));

PscProducerMessage<Integer, Integer> producerMessage = new PscProducerMessage<>(topicUriStr2, 0);
pscProducer.send(producerMessage);
PscProducerMessage<Integer, Integer> producerMessageTopic2 = new PscProducerMessage<>(topicUriStr2, 0);
Exception e = assertThrows(ProducerException.class, () -> pscProducer.send(producerMessageTopic2));
assertEquals("Invalid call to send() which would have created a new backend producer. This is not allowed when the PscProducer is already transactional.", e.getMessage());

assertEquals(2, pscProducer.getBackendProducers().size());
assertEquals(1, pscProducer.getBackendProducers().size());

PscBackendProducer<Integer, Integer> backendProducer2 = pscProducer.getBackendProducer(topicUriStr2);
assertNotEquals(backendProducer1, backendProducer2);
pscProducer.send(producerMessageTopic1); // this should go through
assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));

pscProducer.commitTransaction();
assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.READY, pscProducer.getBackendProducerState(backendProducer1));
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer2));

pscProducer.close();
}
Expand Down
2 changes: 1 addition & 1 deletion psc-logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion psc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Loading
Loading