From f03388655f42dc2e7b5c5f029328ae08db97283c Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 11 Sep 2024 14:30:00 -0400 Subject: [PATCH] Address comments --- .../client/TestPscMetadataClient.java | 92 ++++++++----------- .../client/PscBackendMetadataClient.java | 21 ++--- .../metadata/client/PscMetadataClient.java | 30 +++--- .../client/kafka/PscKafkaMetadataClient.java | 40 ++++---- 4 files changed, 79 insertions(+), 104 deletions(-) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 212cee8..507f372 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -39,7 +40,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -121,21 +121,21 @@ public void tearDown() throws ExecutionException, InterruptedException { } /** - * Tests that {@link PscMetadataClient#listTopicRns(TopicUri, long, TimeUnit)} returns the correct list of topic RNs + * Tests that {@link PscMetadataClient#listTopicRns(TopicUri, Duration)} returns the correct list of topic RNs * * @throws Exception */ @Test public void testListTopicRns() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); - List topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), 10000, TimeUnit.MILLISECONDS); + List topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), Duration.ofMillis(10000)); List expectedTopicRnList = Arrays.asList(topic1Rn, topic2Rn, topic3Rn); assertEquals(expectedTopicRnList, topicRnList); client.close(); } /** - * Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, long, TimeUnit)} returns the correct + * Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, Duration)} returns the correct * metadata for the supplied topic RNs * * @throws Exception @@ -146,8 +146,7 @@ public void testDescribeTopicRns() throws Exception { Map topicRnDescriptionMap = client.describeTopicRns( KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)), - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(3, topicRnDescriptionMap.size()); @@ -173,7 +172,7 @@ public void testDescribeTopicRns() throws Exception { } /** - * Tests that {@link PscMetadataClient#listOffsets(TopicUri, Map, long, TimeUnit)} returns the correct offsets for the + * Tests that {@link PscMetadataClient#listOffsets(TopicUri, Map, Duration)} returns the correct offsets for the * supplied topic partitions and specs * * @throws Exception @@ -185,35 +184,33 @@ public void testListOffsets() throws Exception { topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST); topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); - Map offsets = client.listOffsets( + Map offsets = client.listOffsets( clusterUri, topicUriPartitionsAndOptions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(2, offsets.size()); // ensure that the offsets are 0 when the topic is empty - assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); - assertEquals(0, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 0))); + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic2Uri, 0))); // send one message to partition 0 for both topics PscProducer pscProducer = getPscProducer(); - pscProducer.send(new PscProducerMessage(topicUriStr1, 0,0,0)); - pscProducer.send(new PscProducerMessage(topicUriStr2, 0,0,0)); + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0,0,0)); + pscProducer.send(new PscProducerMessage<>(topicUriStr2, 0,0,0)); pscProducer.flush(); offsets = client.listOffsets( clusterUri, topicUriPartitionsAndOptions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); // ensure sent offsets are captured in next metadataClient call assertEquals(2, offsets.size()); - assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); // earliest offset - assertEquals(1, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); // latest offset + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 0))); // earliest offset + assertEquals(1, (long) offsets.get(new TopicUriPartition(topic2Uri, 0))); // latest offset // now change the spec to latest for both topics, and add topic1 partitions 5 and 10 to the query topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); @@ -233,23 +230,22 @@ public void testListOffsets() throws Exception { offsets = client.listOffsets( clusterUri, topicUriPartitionsAndOptions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); // ensure sent offsets are captured in next metadataClient call assertEquals(4, offsets.size()); - assertEquals(3, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); - assertEquals(1, offsets.get(new TopicUriPartition(topic1Uri, 5)).getOffset()); - assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 10)).getOffset()); - assertEquals(2, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); + assertEquals(3, (long) offsets.get(new TopicUriPartition(topic1Uri, 0))); + assertEquals(1, (long) offsets.get(new TopicUriPartition(topic1Uri, 5))); + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 10))); + assertEquals(2, (long) offsets.get(new TopicUriPartition(topic2Uri, 0))); client.close(); pscProducer.close(); } /** - * Tests that {@link PscMetadataClient#listOffsetsForConsumerGroup(TopicUri, String, Collection, long, TimeUnit)} returns + * Tests that {@link PscMetadataClient#listOffsetsForConsumerGroup(TopicUri, String, Collection, Duration)} returns * the correct offsets for the supplied consumer group and topic partitions. Also tests correct behavior when supplied * with edge case scenarios such as non-existent consumerGroupId, non-existent partitions, etc. * @@ -286,12 +282,11 @@ public void testListOffsetsForConsumerGroup() throws Exception { pollNMessages(1, pscConsumer); pscConsumer.commitSync(); - Map offsets = client.listOffsetsForConsumerGroup( + Map offsets = client.listOffsetsForConsumerGroup( clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(4, offsets.size()); @@ -299,8 +294,7 @@ public void testListOffsetsForConsumerGroup() throws Exception { assertTrue(offsets.containsKey(t1p1)); assertTrue(offsets.containsKey(t2p23)); assertTrue(offsets.containsKey(t3p0)); - assertEquals(t1p0, offsets.get(t1p0).getTopicUriPartition()); - assertEquals(1, offsets.get(t1p0).getOffset()); + assertEquals(1, (long) offsets.get(t1p0)); assertNull(offsets.get(t1p1)); assertNull(offsets.get(t2p23)); assertNull(offsets.get(t3p0)); @@ -314,11 +308,10 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); assertNull(offsets.get(t1p1)); assertNull(offsets.get(t2p23)); assertNull(offsets.get(t3p0)); @@ -334,12 +327,11 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); - assertEquals(500, offsets.get(t1p1).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); + assertEquals(500, (long) offsets.get(t1p1)); assertNull(offsets.get(t2p23)); assertNull(offsets.get(t3p0)); @@ -354,13 +346,12 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); - assertEquals(500, offsets.get(t1p1).getOffset()); - assertEquals(1000, offsets.get(t2p23).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); + assertEquals(500, (long) offsets.get(t1p1)); + assertEquals(1000, (long) offsets.get(t2p23)); assertNull(offsets.get(t3p0)); // assign to t3p0, poll 999 messages, commit - this should set the offset to 999 @@ -374,22 +365,20 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); - assertEquals(500, offsets.get(t1p1).getOffset()); - assertEquals(1000, offsets.get(t2p23).getOffset()); - assertEquals(999, offsets.get(t3p0).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); + assertEquals(500, (long) offsets.get(t1p1)); + assertEquals(1000, (long) offsets.get(t2p23)); + assertEquals(999, (long) offsets.get(t3p0)); // query a non-existent consumer group offsets = client.listOffsetsForConsumerGroup( clusterUri, "non-existent-consumer-group", topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(4, offsets.size()); @@ -407,8 +396,7 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, Collections.singleton(new TopicUriPartition(topic1Uri, 100)), - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(1, offsets.size()); diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java index 7f9390d..acb9d3e 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java @@ -12,11 +12,11 @@ import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.metadata.TopicRnMetadata; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -36,28 +36,23 @@ public void initialize(TopicUri topicUri, Environment env, PscConfigurationInter ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri); } - public abstract List listTopicRns( - long timeout, - TimeUnit timeUnit - ) throws ExecutionException, InterruptedException, TimeoutException; + public abstract List listTopicRns(Duration duration) + throws ExecutionException, InterruptedException, TimeoutException; public abstract Map describeTopicRns( Collection topicRns, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException; - public abstract Map listOffsets( + public abstract Map listOffsets( Map topicRnsAndOptions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException; - public abstract Map listOffsetsForConsumerGroup( + public abstract Map listOffsetsForConsumerGroup( String consumerGroupId, Collection topicUriPartitions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException; public abstract void close() throws Exception; diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java index 3d8fec8..0dcebc3 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -13,13 +13,13 @@ import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -68,16 +68,15 @@ private void initialize() { * List all the {@link TopicRn}'s in the cluster. * * @param clusterUri - * @param timeout - * @param timeUnit + * @param duration * @return the list of {@link TopicRn}'s in the cluster * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException */ - public List listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public List listTopicRns(TopicUri clusterUri, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.listTopicRns(timeout, timeUnit); + return backendMetadataClient.listTopicRns(duration); } /** @@ -85,16 +84,15 @@ public List listTopicRns(TopicUri clusterUri, long timeout, TimeUnit ti * * @param clusterUri * @param topicRns - * @param timeout - * @param timeUnit + * @param duration * @return a map of {@link TopicRn} to {@link TopicRnMetadata} * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException */ - public Map describeTopicRns(TopicUri clusterUri, Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map describeTopicRns(TopicUri clusterUri, Set topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.describeTopicRns(topicRns, timeout, timeUnit); + return backendMetadataClient.describeTopicRns(topicRns, duration); } /** @@ -108,17 +106,16 @@ public Map describeTopicRns(TopicUri clusterUri, Set listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.listOffsets(topicRnsAndOptions, timeout, timeUnit); + return backendMetadataClient.listOffsets(topicRnsAndOptions, duration); } /** @@ -127,17 +124,16 @@ public Map listOffsets(TopicUri clusterUri, Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, timeout, timeUnit); + return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, duration); } @VisibleForTesting diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java index 9965bf2..2bfa0af 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -61,23 +62,20 @@ public void initialize( } @Override - public List listTopicRns( - long timeout, - TimeUnit timeUnit - ) throws ExecutionException, InterruptedException, TimeoutException { + public List listTopicRns(Duration duration) + throws ExecutionException, InterruptedException, TimeoutException { ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); - Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); + Collection topicListing = listTopicsResult.listings().get(duration.toMillis(), TimeUnit.MILLISECONDS); return topicListing.stream().map(tl -> MetadataUtils.createTopicRn(topicUri, tl.name())).collect(Collectors.toList()); } @Override public Map describeTopicRns( Collection topicRns, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { Collection topicNames = topicRns.stream().map(TopicRn::getTopic).collect(Collectors.toSet()); - Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(timeout, timeUnit); + Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(duration.toMillis(), TimeUnit.MILLISECONDS); Map result = new HashMap<>(); for (Map.Entry entry : topicMetadata.entrySet()) { String topicName = entry.getKey(); @@ -95,10 +93,9 @@ public Map describeTopicRns( } @Override - public Map listOffsets( + public Map listOffsets( Map topicUriPartitionsAndOptions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { Map topicPartitionOffsets = new HashMap<>(); for (Map.Entry entry : topicUriPartitionsAndOptions.entrySet()) { @@ -115,25 +112,24 @@ else if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) new TopicPartition(topicUriPartition.getTopicUri().getTopic(), topicUriPartition.getPartition()), offsetSpec); } ListOffsetsResult listOffsetsResult = kafkaAdminClient.listOffsets(topicPartitionOffsets); - Map result = new HashMap<>(); - listOffsetsResult.all().get(timeout, timeUnit).entrySet().forEach(e -> { + Map result = new HashMap<>(); + listOffsetsResult.all().get(duration.toMillis(), TimeUnit.MILLISECONDS).entrySet().forEach(e -> { TopicPartition tp = e.getKey(); ListOffsetsResult.ListOffsetsResultInfo info = e.getValue(); TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); result.put( createKafkaTopicUriPartition(topicRn, tp.partition()), - new MessageId(createKafkaTopicUriPartition(topicRn, tp.partition()), info.offset(), info.timestamp()) + info.offset() ); }); return result; } @Override - public Map listOffsetsForConsumerGroup( + public Map listOffsetsForConsumerGroup( String consumerGroupId, Collection topicUriPartitions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions(); options.topicPartitions(topicUriPartitions.stream().map(tup -> @@ -141,15 +137,15 @@ public Map listOffsetsForConsumerGroup( Map offsets = kafkaAdminClient .listConsumerGroupOffsets(consumerGroupId, options) .partitionsToOffsetAndMetadata() - .get(timeout, timeUnit); - Map result = new HashMap<>(); + .get(duration.toMillis(), TimeUnit.MILLISECONDS); + Map result = new HashMap<>(); offsets.forEach((tp, offsetAndMetadata) -> { TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); TopicUriPartition tup = createKafkaTopicUriPartition(topicRn, tp.partition()); - MessageId messageId = offsetAndMetadata == null ? null : new MessageId(tup, offsetAndMetadata.offset()); + Long offset = offsetAndMetadata == null ? null : offsetAndMetadata.offset(); result.put( - tup, - messageId + createKafkaTopicUriPartition(topicRn, tp.partition()), + offset ); }); return result;