From 8343882a38bd76649dd2d2f9b3e755f4927b3739 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sun, 5 Jan 2025 23:14:39 +0800 Subject: [PATCH] Refactor ComputeNodePath (#34257) --- .../metadata/persist/node/ComputeNode.java | 178 ----------------- .../persist/node/ComputeNodePath.java | 179 ++++++++++++++++++ .../persist/node/ComputeNodePathTest.java | 101 ++++++++++ .../persist/node/ComputeNodeTest.java | 84 -------- .../unified/ComputeNodePersistService.java | 28 +-- .../global/ComputeNodeOnlineHandler.java | 6 +- .../ComputeNodeStateChangedHandler.java | 20 +- .../handler/global/KillProcessHandler.java | 6 +- .../global/ShowProcessListHandler.java | 6 +- .../ClusterProcessPersistCoordinator.java | 6 +- .../service/ClusterProcessPersistService.java | 6 +- .../ClusterProcessPersistServiceTest.java | 10 +- 12 files changed, 324 insertions(+), 306 deletions(-) delete mode 100644 kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java create mode 100644 kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePath.java create mode 100644 kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePathTest.java delete mode 100644 kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java deleted file mode 100644 index dff3b04abd165..0000000000000 --- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.metadata.persist.node; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.apache.shardingsphere.infra.instance.metadata.InstanceType; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Compute node. - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class ComputeNode { - - private static final String ROOT_NODE = "nodes"; - - private static final String COMPUTE_NODE = "compute_nodes"; - - private static final String ONLINE_NODE = "online"; - - private static final String LABELS_NODE = "labels"; - - private static final String SHOW_PROCESS_LIST_TRIGGER = "show_process_list_trigger"; - - private static final String KILL_PROCESS_TRIGGER = "kill_process_trigger"; - - private static final String STATUS_NODE = "status"; - - private static final String WORKER_ID = "worker_id"; - - /** - * Get online compute node path. - * - * @param instanceType instance type - * @return path of online compute node - */ - public static String getOnlineNodePath(final InstanceType instanceType) { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ONLINE_NODE, instanceType.name().toLowerCase()); - } - - /** - * Get online compute node instance path. - * - * @param instanceId instance id - * @param instanceType instance type - * @return path of online compute node instance - */ - public static String getOnlineInstanceNodePath(final String instanceId, final InstanceType instanceType) { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ONLINE_NODE, instanceType.name().toLowerCase(), instanceId); - } - - /** - * Get online compute node path. - * - * @return path of online compute node - */ - public static String getOnlineInstanceNodePath() { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ONLINE_NODE); - } - - /** - * Get show process list trigger node path. - * - * @return show process list trigger node path - */ - public static String getShowProcessListTriggerNodePath() { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, SHOW_PROCESS_LIST_TRIGGER); - } - - /** - * Get kill process trigger node path. - * - * @return kill process trigger node path - */ - public static String getKillProcessTriggerNodePath() { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, KILL_PROCESS_TRIGGER); - } - - /** - * Get process trigger instance node path. - * - * @param instanceId instance id - * @param taskId show process list task id - * @return path of process trigger instance node path - */ - public static String getProcessTriggerInstanceNodePath(final String instanceId, final String taskId) { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, SHOW_PROCESS_LIST_TRIGGER, String.join(":", instanceId, taskId)); - } - - /** - * Get process kill instance id node path. - * - * @param instanceId instance id - * @param processId process id - * @return path of process kill instance id node path - */ - public static String getProcessKillInstanceIdNodePath(final String instanceId, final String processId) { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, KILL_PROCESS_TRIGGER, String.join(":", instanceId, processId)); - } - - /** - * Get compute node instance labels path. - * - * @param instanceId instance id - * @return path of compute node instance labels - */ - public static String getInstanceLabelsNodePath(final String instanceId) { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, LABELS_NODE, instanceId); - } - - /** - * Get compute node path. - * - * @return compute node path - */ - public static String getComputeNodePath() { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE); - } - - /** - * Get instance worker id node path. - * - * @param instanceId instance id - * @return worker id path - */ - public static String getInstanceWorkerIdNodePath(final String instanceId) { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, WORKER_ID, instanceId); - } - - /** - * Get instance worker id root node path. - * - * @return worker id root node path - */ - public static String getInstanceWorkerIdRootNodePath() { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, WORKER_ID); - } - - /** - * Get instance id by compute node path. - * - * @param computeNodePath compute node path - * @return instance id - */ - public static String getInstanceIdByComputeNode(final String computeNodePath) { - Pattern pattern = Pattern.compile(getComputeNodePath() + "(/status|/worker_id|/labels)" + "/([\\S]+)$", Pattern.CASE_INSENSITIVE); - Matcher matcher = pattern.matcher(computeNodePath); - return matcher.find() ? matcher.group(2) : ""; - } - - /** - * Get compute node state node path. - * - * @param instanceId instance id - * @return compute node state node path - */ - public static String getComputeNodeStateNodePath(final String instanceId) { - return String.join("/", "", ROOT_NODE, COMPUTE_NODE, STATUS_NODE, instanceId); - } -} diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePath.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePath.java new file mode 100644 index 0000000000000..4a2f536ff52ec --- /dev/null +++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePath.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.metadata.persist.node; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.infra.instance.metadata.InstanceType; + +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Compute node path. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class ComputeNodePath { + + private static final String ROOT_NODE = "nodes/compute_nodes"; + + private static final String ONLINE_NODE = "online"; + + private static final String SHOW_PROCESS_LIST_TRIGGER_NODE = "show_process_list_trigger"; + + private static final String KILL_PROCESS_TRIGGER_NODE = "kill_process_trigger"; + + private static final String STATUS_NODE = "status"; + + private static final String WORKER_ID_NODE = "worker_id"; + + private static final String LABELS_NODE = "labels"; + + private static final String INSTANCE_ID_PATTERN = "([\\S]+)"; + + /** + * Get compute node root path. + * + * @return compute node root path + */ + public static String getRootPath() { + return String.join("/", "", ROOT_NODE); + } + + /** + * Get online root path. + * + * @return online root path + */ + public static String getOnlineRootPath() { + return String.join("/", "", ROOT_NODE, ONLINE_NODE); + } + + /** + * Get online path. + * + * @param instanceType instance type + * @return online path + */ + public static String getOnlinePath(final InstanceType instanceType) { + return String.join("/", getOnlineRootPath(), instanceType.name().toLowerCase()); + } + + /** + * Get online path. + * + * @param instanceId instance ID + * @param instanceType instance type + * @return online path + */ + public static String getOnlinePath(final String instanceId, final InstanceType instanceType) { + return String.join("/", getOnlinePath(instanceType), instanceId); + } + + /** + * Get show process list trigger root path. + * + * @return show process list trigger root path + */ + public static String getShowProcessListTriggerRootPath() { + return String.join("/", "", ROOT_NODE, SHOW_PROCESS_LIST_TRIGGER_NODE); + } + + /** + * Get show process list trigger path. + * + * @param instanceId instance ID + * @param taskId show process list task ID + * @return show process list trigger path + */ + public static String getShowProcessListTriggerPath(final String instanceId, final String taskId) { + return String.join("/", getShowProcessListTriggerRootPath(), String.join(":", instanceId, taskId)); + } + + /** + * Get kill process trigger root path. + * + * @return kill process trigger root path + */ + public static String getKillProcessTriggerRootPath() { + return String.join("/", "", ROOT_NODE, KILL_PROCESS_TRIGGER_NODE); + } + + /** + * Get kill process trigger path. + * + * @param instanceId instance ID + * @param processId process ID + * @return kill process trigger path + */ + public static String getKillProcessTriggerPath(final String instanceId, final String processId) { + return String.join("/", getKillProcessTriggerRootPath(), String.join(":", instanceId, processId)); + } + + /** + * Get state path. + * + * @param instanceId instance ID + * @return state path + */ + public static String getStatePath(final String instanceId) { + return String.join("/", "", ROOT_NODE, STATUS_NODE, instanceId); + } + + /** + * Get worker ID root path. + * + * @return worker ID root path + */ + public static String getWorkerIdRootPath() { + return String.join("/", "", ROOT_NODE, WORKER_ID_NODE); + } + + /** + * Get worker ID path. + * + * @param instanceId instance ID + * @return worker ID path + */ + public static String getWorkerIdPath(final String instanceId) { + return String.join("/", getWorkerIdRootPath(), instanceId); + } + + /** + * Get labels path. + * + * @param instanceId instance ID + * @return labels path + */ + public static String getLabelsPath(final String instanceId) { + return String.join("/", "", ROOT_NODE, LABELS_NODE, instanceId); + } + + /** + * Find instance id by compute node path. + * + * @param computeNodePath compute node path + * @return found instance ID + */ + public static Optional findInstanceId(final String computeNodePath) { + Pattern pattern = Pattern.compile(getRootPath() + "(/status|/worker_id|/labels)" + "/" + INSTANCE_ID_PATTERN + "$", Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(computeNodePath); + return matcher.find() ? Optional.of(matcher.group(2)) : Optional.empty(); + } +} diff --git a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePathTest.java b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePathTest.java new file mode 100644 index 0000000000000..b75ac6269b36e --- /dev/null +++ b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodePathTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.metadata.persist.node; + +import org.apache.shardingsphere.infra.instance.metadata.InstanceType; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; + +class ComputeNodePathTest { + + @Test + void assertGetRootPath() { + assertThat(ComputeNodePath.getRootPath(), is("/nodes/compute_nodes")); + } + + @Test + void assertGetOnlineRootPath() { + assertThat(ComputeNodePath.getOnlineRootPath(), is("/nodes/compute_nodes/online")); + } + + @Test + void assertGetOnlinePathWithInstanceType() { + assertThat(ComputeNodePath.getOnlinePath(InstanceType.PROXY), is("/nodes/compute_nodes/online/proxy")); + assertThat(ComputeNodePath.getOnlinePath(InstanceType.JDBC), is("/nodes/compute_nodes/online/jdbc")); + } + + @Test + void assertGetOnlinePathWithInstanceId() { + assertThat(ComputeNodePath.getOnlinePath("foo_instance_1", InstanceType.PROXY), is("/nodes/compute_nodes/online/proxy/foo_instance_1")); + assertThat(ComputeNodePath.getOnlinePath("foo_instance_2", InstanceType.JDBC), is("/nodes/compute_nodes/online/jdbc/foo_instance_2")); + } + + @Test + void assertGetShowProcessListTriggerRootPath() { + assertThat(ComputeNodePath.getShowProcessListTriggerRootPath(), is("/nodes/compute_nodes/show_process_list_trigger")); + } + + @Test + void assertGetShowProcessListTriggerPathWithInstanceId() { + assertThat(ComputeNodePath.getShowProcessListTriggerPath("foo_instance", "foo_process_id"), is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id")); + assertThat(ComputeNodePath.getShowProcessListTriggerPath("foo_instance", "foo_process_id"), is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id")); + } + + @Test + void assertGetKillProcessTriggerRootPath() { + assertThat(ComputeNodePath.getKillProcessTriggerRootPath(), is("/nodes/compute_nodes/kill_process_trigger")); + } + + @Test + void assertGetKillProcessTriggerPathWithInstanceId() { + assertThat(ComputeNodePath.getKillProcessTriggerPath("foo_instance", "foo_process_id"), is("/nodes/compute_nodes/kill_process_trigger/foo_instance:foo_process_id")); + } + + @Test + void assertGetStatePath() { + assertThat(ComputeNodePath.getStatePath("foo_instance"), is("/nodes/compute_nodes/status/foo_instance")); + } + + @Test + void assertGetWorkerIdRootPath() { + assertThat(ComputeNodePath.getWorkerIdRootPath(), is("/nodes/compute_nodes/worker_id")); + } + + @Test + void assertGetWorkerIdPathWithInstanceId() { + assertThat(ComputeNodePath.getWorkerIdPath("foo_instance"), is("/nodes/compute_nodes/worker_id/foo_instance")); + } + + @Test + void assertGetLabelsPath() { + assertThat(ComputeNodePath.getLabelsPath("foo_instance"), is("/nodes/compute_nodes/labels/foo_instance")); + } + + @Test + void assertFindInstanceId() { + assertThat(ComputeNodePath.findInstanceId("/nodes/compute_nodes/status/foo_instance_1"), is(Optional.of("foo_instance_1"))); + assertThat(ComputeNodePath.findInstanceId("/nodes/compute_nodes/worker_id/foo_instance_2"), is(Optional.of("foo_instance_2"))); + assertThat(ComputeNodePath.findInstanceId("/nodes/compute_nodes/labels/foo_instance_3"), is(Optional.of("foo_instance_3"))); + assertFalse(ComputeNodePath.findInstanceId("/nodes/compute_nodes/invalid/foo_instance_4").isPresent()); + } +} diff --git a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java deleted file mode 100644 index 4cbbe4009222a..0000000000000 --- a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.metadata.persist.node; - -import org.apache.shardingsphere.infra.instance.metadata.InstanceType; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -class ComputeNodeTest { - - @Test - void assertGetOnlineNodePath() { - assertThat(ComputeNode.getOnlineNodePath(InstanceType.PROXY), is("/nodes/compute_nodes/online/proxy")); - assertThat(ComputeNode.getOnlineNodePath(InstanceType.JDBC), is("/nodes/compute_nodes/online/jdbc")); - } - - @Test - void assertGetOnlineInstanceNodePath() { - assertThat(ComputeNode.getOnlineInstanceNodePath("foo_instance_1", InstanceType.PROXY), is("/nodes/compute_nodes/online/proxy/foo_instance_1")); - assertThat(ComputeNode.getOnlineInstanceNodePath("foo_instance_2", InstanceType.JDBC), is("/nodes/compute_nodes/online/jdbc/foo_instance_2")); - } - - @Test - void assertGetProcessTriggerNodePatch() { - assertThat(ComputeNode.getShowProcessListTriggerNodePath(), is("/nodes/compute_nodes/show_process_list_trigger")); - } - - @Test - void assertGetProcessTriggerInstanceIdNodePath() { - assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance", "foo_process_id"), - is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id")); - assertThat(ComputeNode.getProcessTriggerInstanceNodePath("foo_instance", "foo_process_id"), - is("/nodes/compute_nodes/show_process_list_trigger/foo_instance:foo_process_id")); - } - - @Test - void assertGetInstanceLabelsNodePath() { - assertThat(ComputeNode.getInstanceLabelsNodePath("foo_instance"), is("/nodes/compute_nodes/labels/foo_instance")); - } - - @Test - void assertGetInstanceWorkerIdNodePath() { - assertThat(ComputeNode.getInstanceWorkerIdNodePath("foo_instance"), is("/nodes/compute_nodes/worker_id/foo_instance")); - } - - @Test - void assertGetInstanceWorkerIdRootNodePath() { - assertThat(ComputeNode.getInstanceWorkerIdRootNodePath(), is("/nodes/compute_nodes/worker_id")); - } - - @Test - void assertGetInstanceIdByComputeNodePath() { - assertThat(ComputeNode.getInstanceIdByComputeNode("/nodes/compute_nodes/status/foo_instance_1"), is("foo_instance_1")); - assertThat(ComputeNode.getInstanceIdByComputeNode("/nodes/compute_nodes/worker_id/foo_instance_2"), is("foo_instance_2")); - assertThat(ComputeNode.getInstanceIdByComputeNode("/nodes/compute_nodes/labels/foo_instance_3"), is("foo_instance_3")); - } - - @Test - void assertGetComputeNodeStateNodePath() { - assertThat(ComputeNode.getComputeNodeStateNodePath("foo_instance"), is("/nodes/compute_nodes/status/foo_instance")); - } - - @Test - void assertGetComputeNodePath() { - assertThat(ComputeNode.getComputeNodePath(), is("/nodes/compute_nodes")); - } -} diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/unified/ComputeNodePersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/unified/ComputeNodePersistService.java index 9c37b094ad2af..38af55fcd4e96 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/unified/ComputeNodePersistService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/unified/ComputeNodePersistService.java @@ -29,7 +29,7 @@ import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeDataSwapper; import org.apache.shardingsphere.infra.state.instance.InstanceState; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.mode.spi.PersistRepository; import java.util.Arrays; @@ -55,10 +55,10 @@ public final class ComputeNodePersistService { */ public void registerOnline(final ComputeNodeInstance computeNodeInstance) { String instanceId = computeNodeInstance.getMetaData().getId(); - repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceId, computeNodeInstance.getMetaData().getType()), YamlEngine.marshal( + repository.persistEphemeral(ComputeNodePath.getOnlinePath(instanceId, computeNodeInstance.getMetaData().getType()), YamlEngine.marshal( new YamlComputeNodeDataSwapper().swapToYamlConfiguration(new ComputeNodeData(computeNodeInstance.getMetaData().getAttributes(), computeNodeInstance.getMetaData().getVersion(), computeNodeInstance.getMetaData().getDatabaseName())))); - repository.persistEphemeral(ComputeNode.getComputeNodeStateNodePath(instanceId), computeNodeInstance.getState().getCurrentState().name()); + repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), computeNodeInstance.getState().getCurrentState().name()); persistInstanceLabels(instanceId, computeNodeInstance.getLabels()); } @@ -69,7 +69,7 @@ public void registerOnline(final ComputeNodeInstance computeNodeInstance) { * @param labels instance labels */ public void persistInstanceLabels(final String instanceId, final Collection labels) { - repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId), YamlEngine.marshal(labels)); + repository.persistEphemeral(ComputeNodePath.getLabelsPath(instanceId), YamlEngine.marshal(labels)); } /** @@ -79,7 +79,7 @@ public void persistInstanceLabels(final String instanceId, final Collection loadInstanceLabels(final String instanceId) { - String yamlContent = repository.query(ComputeNode.getInstanceLabelsNodePath(instanceId)); + String yamlContent = repository.query(ComputeNodePath.getLabelsPath(instanceId)); return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>() : YamlEngine.unmarshal(yamlContent, Collection.class); } @@ -101,7 +101,7 @@ public Collection loadInstanceLabels(final String instanceId) { * @return state */ public String loadComputeNodeState(final String instanceId) { - return repository.query(ComputeNode.getComputeNodeStateNodePath(instanceId)); + return repository.query(ComputeNodePath.getStatePath(instanceId)); } /** @@ -112,7 +112,7 @@ public String loadComputeNodeState(final String instanceId) { */ public Optional loadInstanceWorkerId(final String instanceId) { try { - String workerId = repository.query(ComputeNode.getInstanceWorkerIdNodePath(instanceId)); + String workerId = repository.query(ComputeNodePath.getWorkerIdPath(instanceId)); return Strings.isNullOrEmpty(workerId) ? Optional.empty() : Optional.of(Integer.valueOf(workerId)); } catch (final NumberFormatException ex) { log.error("Invalid worker id for instance: {}", instanceId); @@ -131,8 +131,8 @@ public Collection loadAllComputeNodeInstances() { private Collection loadComputeNodeInstances(final InstanceType instanceType) { Collection result = new LinkedList<>(); - for (String each : repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType))) { - String value = repository.query(ComputeNode.getOnlineInstanceNodePath(each, instanceType)); + for (String each : repository.getChildrenKeys(ComputeNodePath.getOnlinePath(instanceType))) { + String value = repository.query(ComputeNodePath.getOnlinePath(each, instanceType)); if (Strings.isNullOrEmpty(value)) { continue; } @@ -162,10 +162,10 @@ public ComputeNodeInstance loadComputeNodeInstance(final InstanceMetaData instan * @return assigned worker IDs */ public Collection getAssignedWorkerIds() { - Collection childrenKeys = repository.getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath()); + Collection childrenKeys = repository.getChildrenKeys(ComputeNodePath.getWorkerIdRootPath()); Collection result = new LinkedHashSet<>(childrenKeys.size(), 1F); for (String each : childrenKeys) { - String workerId = repository.query(ComputeNode.getInstanceWorkerIdNodePath(each)); + String workerId = repository.query(ComputeNodePath.getWorkerIdPath(each)); if (null != workerId) { result.add(Integer.parseInt(workerId)); } @@ -180,7 +180,7 @@ public Collection getAssignedWorkerIds() { * @param instanceState instance state */ public void updateComputeNodeState(final String instanceId, final InstanceState instanceState) { - repository.persistEphemeral(ComputeNode.getComputeNodeStateNodePath(instanceId), instanceState.name()); + repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), instanceState.name()); } /** @@ -189,6 +189,6 @@ public void updateComputeNodeState(final String instanceId, final InstanceState * @param computeNodeInstance compute node instance */ public void offline(final ComputeNodeInstance computeNodeInstance) { - repository.delete(ComputeNode.getOnlineInstanceNodePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType())); + repository.delete(ComputeNodePath.getOnlinePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType())); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeOnlineHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeOnlineHandler.java index 7f1110a80711a..569ef53dcd213 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeOnlineHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeOnlineHandler.java @@ -25,7 +25,7 @@ import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeData; import org.apache.shardingsphere.infra.instance.yaml.YamlComputeNodeDataSwapper; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -43,7 +43,7 @@ public final class ComputeNodeOnlineHandler implements DataChangedEventHandler { @Override public String getSubscribedKey() { - return ComputeNode.getOnlineInstanceNodePath(); + return ComputeNodePath.getOnlineRootPath(); } @Override @@ -69,6 +69,6 @@ public void handle(final ContextManager contextManager, final DataChangedEvent e } private Matcher getInstanceOnlinePathMatcher(final String onlineInstancePath) { - return Pattern.compile(ComputeNode.getOnlineInstanceNodePath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath); + return Pattern.compile(ComputeNodePath.getOnlineRootPath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeStateChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeStateChangedHandler.java index 99e2fe6d1220e..1db5d8b76178f 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeStateChangedHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ComputeNodeStateChangedHandler.java @@ -20,7 +20,7 @@ import com.google.common.base.Strings; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -37,7 +37,7 @@ public final class ComputeNodeStateChangedHandler implements DataChangedEventHan @Override public String getSubscribedKey() { - return ComputeNode.getComputeNodePath(); + return ComputeNodePath.getRootPath(); } @Override @@ -45,20 +45,20 @@ public Collection getSubscribedTypes() { return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED); } - @SuppressWarnings("unchecked") @Override public void handle(final ContextManager contextManager, final DataChangedEvent event) { - String instanceId = ComputeNode.getInstanceIdByComputeNode(event.getKey()); - if (Strings.isNullOrEmpty(instanceId)) { - return; - } + ComputeNodePath.findInstanceId(event.getKey()).ifPresent(optional -> handle(contextManager, event, optional)); + } + + @SuppressWarnings("unchecked") + private void handle(final ContextManager contextManager, final DataChangedEvent event, final String instanceId) { ComputeNodeInstanceContext computeNodeInstanceContext = contextManager.getComputeNodeInstanceContext(); - if (event.getKey().equals(ComputeNode.getComputeNodeStateNodePath(instanceId)) && Type.DELETED != event.getType()) { + if (event.getKey().equals(ComputeNodePath.getStatePath(instanceId)) && Type.DELETED != event.getType()) { computeNodeInstanceContext.updateStatus(instanceId, event.getValue()); - } else if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId)) && Type.DELETED != event.getType()) { + } else if (event.getKey().equals(ComputeNodePath.getLabelsPath(instanceId)) && Type.DELETED != event.getType()) { // TODO labels may be empty computeNodeInstanceContext.updateLabels(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)); - } else if (event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) { + } else if (event.getKey().equals(ComputeNodePath.getWorkerIdPath(instanceId))) { computeNodeInstanceContext.updateWorkerId(instanceId, Strings.isNullOrEmpty(event.getValue()) ? null : Integer.valueOf(event.getValue())); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/KillProcessHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/KillProcessHandler.java index abb05a42980cb..66deaa8df3241 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/KillProcessHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/KillProcessHandler.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -39,7 +39,7 @@ public final class KillProcessHandler implements DataChangedEventHandler { @Override public String getSubscribedKey() { - return ComputeNode.getKillProcessTriggerNodePath(); + return ComputeNodePath.getKillProcessTriggerRootPath(); } @Override @@ -71,6 +71,6 @@ public void handle(final ContextManager contextManager, final DataChangedEvent e } private Matcher getKillProcessTriggerMatcher(final DataChangedEvent event) { - return Pattern.compile(ComputeNode.getKillProcessTriggerNodePath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey()); + return Pattern.compile(ComputeNodePath.getKillProcessTriggerRootPath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ShowProcessListHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ShowProcessListHandler.java index 8a0652730fa23..230700cb89b2b 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ShowProcessListHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/dispatch/handler/global/ShowProcessListHandler.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.dispatch.handler.global; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -36,7 +36,7 @@ public final class ShowProcessListHandler implements DataChangedEventHandler { @Override public String getSubscribedKey() { - return ComputeNode.getShowProcessListTriggerNodePath(); + return ComputeNodePath.getShowProcessListTriggerRootPath(); } @Override @@ -62,6 +62,6 @@ public void handle(final ContextManager contextManager, final DataChangedEvent e } private Matcher getShowProcessListTriggerMatcher(final DataChangedEvent event) { - return Pattern.compile(ComputeNode.getShowProcessListTriggerNodePath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey()); + return Pattern.compile(ComputeNodePath.getShowProcessListTriggerRootPath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/coordinator/ClusterProcessPersistCoordinator.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/coordinator/ClusterProcessPersistCoordinator.java index 083cb50f1cfe9..bc71eb39fa4b6 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/coordinator/ClusterProcessPersistCoordinator.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/coordinator/ClusterProcessPersistCoordinator.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.metadata.persist.node.ProcessNode; import org.apache.shardingsphere.mode.persist.coordinator.ProcessPersistCoordinator; import org.apache.shardingsphere.mode.spi.PersistRepository; @@ -45,11 +45,11 @@ public void reportLocalProcesses(final String instanceId, final String taskId) { if (!processes.isEmpty()) { repository.persist(ProcessNode.getProcessListInstancePath(taskId, instanceId), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes))); } - repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(instanceId, taskId)); + repository.delete(ComputeNodePath.getShowProcessListTriggerPath(instanceId, taskId)); } @Override public void cleanProcess(final String instanceId, final String processId) { - repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(instanceId, processId)); + repository.delete(ComputeNodePath.getKillProcessTriggerPath(instanceId, processId)); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java index 3bc48b500c103..f368b9a326761 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java @@ -24,7 +24,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.metadata.persist.node.ProcessNode; import org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService; import org.apache.shardingsphere.mode.spi.PersistRepository; @@ -71,7 +71,7 @@ private Collection getShowProcessListData(final String taskId) { private Collection getShowProcessListTriggerPaths(final String taskId) { return Stream.of(InstanceType.values()) - .flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessTriggerInstanceNodePath(onlinePath, taskId))) + .flatMap(each -> repository.getChildrenKeys(ComputeNodePath.getOnlinePath(each)).stream().map(instanceId -> ComputeNodePath.getShowProcessListTriggerPath(instanceId, taskId))) .collect(Collectors.toList()); } @@ -91,7 +91,7 @@ public void killProcess(final String processId) { private Collection getKillProcessTriggerPaths(final String processId) { return Stream.of(InstanceType.values()) - .flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessKillInstanceIdNodePath(onlinePath, processId))) + .flatMap(each -> repository.getChildrenKeys(ComputeNodePath.getOnlinePath(each)).stream().map(onlinePath -> ComputeNodePath.getKillProcessTriggerPath(onlinePath, processId))) .collect(Collectors.toList()); } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java index df844197ba69d..2db1365b971f2 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistServiceTest.java @@ -24,7 +24,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; +import org.apache.shardingsphere.metadata.persist.node.ComputeNodePath; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; @@ -73,8 +73,8 @@ void assertGetUncompletedProcessList() { } private void assertGetProcessList() { - when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList()); - when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc")); + when(repository.getChildrenKeys(ComputeNodePath.getOnlinePath(InstanceType.JDBC))).thenReturn(Collections.emptyList()); + when(repository.getChildrenKeys(ComputeNodePath.getOnlinePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc")); when(repository.getChildrenKeys(contains("/execution_nodes/"))).thenReturn(Collections.singletonList("abc")); when(repository.query(contains("/execution_nodes/"))).thenReturn(YamlEngine.marshal(createYamlProcessList())); Collection actual = processPersistService.getProcessList(); @@ -107,8 +107,8 @@ void assertKillUncompletedProcess() { } private void assertKillProcess() { - when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList()); - when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc")); + when(repository.getChildrenKeys(ComputeNodePath.getOnlinePath(InstanceType.JDBC))).thenReturn(Collections.emptyList()); + when(repository.getChildrenKeys(ComputeNodePath.getOnlinePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc")); processPersistService.killProcess("foo_process_id"); verify(repository).persist("/nodes/compute_nodes/kill_process_trigger/abc:foo_process_id", ""); }