From e1d19361d60462f61f781c51de631f58fee523e4 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Thu, 20 Jun 2024 10:19:11 -0700 Subject: [PATCH] Add an internal subscribe call to MemQ consumer's assign() --- .../psc/consumer/memq/TestPscMemqConsumer.java | 15 +++++++++++++++ .../psc/consumer/memq/PscMemqConsumer.java | 2 ++ 2 files changed, 17 insertions(+) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java b/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java index 9199393..b1a1e64 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/consumer/memq/TestPscMemqConsumer.java @@ -168,6 +168,21 @@ void unsubscribe() throws Exception { pscMemqConsumer.close(); } + @Test + void testAssignDoesNotRequireSubscribe() throws Exception { + PscMemqConsumer pscMemqConsumer = getPscMemqConsumer( + "testAssignDoesNotRequireSubscribe"); + + pscMemqConsumer.assign(Collections.emptySet()); + + MemqTopicUri uri1 = MemqTopicUri.validate(TopicUri.validate(testMemqTopic1)); + TopicUriPartition topicUriPartition = TestUtils.getFinalizedTopicUriPartition(uri1, 0); + + pscMemqConsumer.assign(Sets.newHashSet(topicUriPartition)); + + pscMemqConsumer.close(); + } + @Test void close() throws Exception { PscMemqConsumer pscMemqConsumer = getPscMemqConsumer( diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java index edd1b3b..155706b 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java @@ -181,6 +181,8 @@ public void assign(Set topicUriPartitions) throws ConsumerExc currentAssignment.retainAll(topicUriPartitions); currentAssignment.addAll(topicUriPartitions); try { + memqConsumer.subscribe(topicUriPartitions.stream().map(TopicUriPartition::getTopicUri) + .map(TopicUri::getTopic).collect(Collectors.toSet())); memqConsumer.assign(topicUriPartitions.stream().map(TopicUriPartition::getPartition) .collect(Collectors.toList())); } catch (Exception exception) {