From 51677e1cf2eddd8fefae0aa9f81a5bcf389780d5 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Fri, 17 Nov 2023 13:14:26 +0800 Subject: [PATCH] fix: lookup DB is leader address is not yet cached Signed-off-by: Li Zhanhui --- .../server/store/DefaultMetadataStore.java | 13 +++++++++- .../store/DefaultMetadataStoreTest.java | 24 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java index 7492cbda3..ca6fb14fd 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java @@ -729,7 +729,18 @@ public TopicManager topicManager() { public Optional getNode(int nodeId) { BrokerNode node = nodes.get(nodeId); if (null == node) { - return Optional.empty(); + try (SqlSession session = openSession()) { + NodeMapper mapper = session.getMapper(NodeMapper.class); + Node rawNode = mapper.get(nodeId, null, null, null); + if (null != rawNode) { + addBrokerNode(rawNode); + } + } + + node = nodes.get(nodeId); + if (null == node) { + return Optional.empty(); + } } return Optional.of(node); } diff --git a/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java b/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java index 82324a649..ce6585d81 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import org.apache.ibatis.session.SqlSession; import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; @@ -1279,4 +1280,27 @@ public void testOnQueueClose_Remote() throws IOException { assertThrows(CompletionException.class, () -> spy.onQueueClosed(2, 1).join()); } } + + @Test + public void testGetNode() throws IOException { + try (MetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + metadataStore.start(); + awaitElectedAsLeader(metadataStore); + Optional brokerNode = metadataStore.getNode(config.nodeId()); + Assertions.assertTrue(brokerNode.isPresent()); + Assertions.assertTrue(metadataStore.leaderAddress().isPresent()); + + try (SqlSession session = metadataStore.openSession()) { + NodeMapper mapper = session.getMapper(NodeMapper.class); + Node node = new Node(); + node.setName("n1"); + node.setAddress("localhost:2345"); + node.setInstanceId("i-2345"); + node.setVolumeId("v-2345"); + mapper.create(node); + session.commit(); + Assertions.assertTrue(metadataStore.getNode(node.getId()).isPresent()); + } + } + } }