From 46119f2e0811a605087e44be07f22ae1d508424d Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 29 Dec 2024 21:04:04 +0800 Subject: [PATCH] Refactor ProcessListChangedSubscriber --- .../infra/executor/sql/process/Process.java | 13 ++++++ .../executor/sql/process/ProcessRegistry.java | 15 ++++++- .../divided/ProcessPersistService.java | 16 +++++++ .../type/ProcessListChangedSubscriber.java | 45 +++++-------------- .../service/ClusterProcessPersistService.java | 17 +++++++ .../StandaloneProcessPersistService.java | 17 +++---- 6 files changed, 81 insertions(+), 42 deletions(-) diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java index f715b1af99b54..a1eae30a7ec5b 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; import org.apache.shardingsphere.infra.metadata.user.Grantee; +import java.sql.SQLException; import java.sql.Statement; import java.util.LinkedHashMap; import java.util.Map; @@ -151,4 +152,16 @@ public boolean isIdle() { public void removeProcessStatement(final ExecutionUnit executionUnit) { processStatements.remove(System.identityHashCode(executionUnit)); } + + /** + * Kill process. + * + * @throws SQLException SQL exception + */ + public void kill() throws SQLException { + setInterrupted(true); + for (Statement each : processStatements.values()) { + each.cancel(); + } + } } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java index 53a5c18f6c98f..b28660f0bf7e2 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException; +import java.sql.SQLException; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -94,11 +95,23 @@ public void remove(final String id) { } /** - * List all process. + * List all processes. * * @return all processes */ public Collection listAll() { return processes.values(); } + + /** + * Kill process. + * + * @throws SQLException SQL exception + */ + public void kill(final String processId) throws SQLException { + Process process = ProcessRegistry.getInstance().get(processId); + if (null != process) { + process.kill(); + } + } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java index 0794364a6b819..91be78ef9784f 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/persist/service/divided/ProcessPersistService.java @@ -27,6 +27,14 @@ */ public interface ProcessPersistService { + /** + * Report local processes. + * + * @param instanceId instance ID + * @param taskId task ID + */ + void reportLocalProcesses(final String instanceId, final String taskId); + /** * Get process list. * @@ -41,4 +49,12 @@ public interface ProcessPersistService { * @throws SQLException SQL exception */ void killProcess(String processId) throws SQLException; + + /** + * Clean process. + * + * @param instanceId instance ID + * @param processId process ID + */ + void cleanProcess(String instanceId, String processId); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java index d4e9c65924b56..32f2243491924 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java @@ -18,57 +18,42 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type; import com.google.common.eventbus.Subscribe; -import org.apache.shardingsphere.infra.executor.sql.process.Process; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; -import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.metadata.persist.node.ComputeNode; -import org.apache.shardingsphere.metadata.persist.node.ProcessNode; +import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent; -import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber; -import org.apache.shardingsphere.mode.spi.PersistRepository; +import org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService; import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; /** * Process list changed subscriber. */ public final class ProcessListChangedSubscriber implements DispatchEventSubscriber { - private final String instanceMetaDataId; - - private final PersistRepository repository; + private final String instanceId; - private final YamlProcessListSwapper swapper; + private final ProcessPersistService processPersistService; public ProcessListChangedSubscriber(final ContextManager contextManager) { - instanceMetaDataId = contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(); - repository = contextManager.getPersistServiceFacade().getRepository(); - swapper = new YamlProcessListSwapper(); + instanceId = contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(); + processPersistService = contextManager.getPersistServiceFacade().getProcessPersistService(); } /** * Report local processes. * - * @param event show process list trigger event + * @param event report local processes event */ @Subscribe public void reportLocalProcesses(final ReportLocalProcessesEvent event) { - if (!event.getInstanceId().equals(instanceMetaDataId)) { - return; - } - Collection processes = ProcessRegistry.getInstance().listAll(); - if (!processes.isEmpty()) { - repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes))); + if (event.getInstanceId().equals(instanceId)) { + processPersistService.reportLocalProcesses(instanceId, event.getTaskId()); } - repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId())); } /** @@ -89,17 +74,11 @@ public synchronized void completeToReportLocalProcesses(final ReportLocalProcess */ @Subscribe public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException { - if (!event.getInstanceId().equals(instanceMetaDataId)) { + if (!event.getInstanceId().equals(instanceId)) { return; } - Process process = ProcessRegistry.getInstance().get(event.getProcessId()); - if (null != process) { - process.setInterrupted(true); - for (Statement each : process.getProcessStatements().values()) { - each.cancel(); - } - } - repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId())); + ProcessRegistry.getInstance().kill(event.getProcessId()); + processPersistService.cleanProcess(instanceId, event.getProcessId()); } /** diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java index 3bc48b500c103..9bcc9470062c1 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/service/ClusterProcessPersistService.java @@ -19,6 +19,7 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.executor.sql.process.Process; +import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList; import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; @@ -43,6 +44,17 @@ public final class ClusterProcessPersistService implements ProcessPersistService private final PersistRepository repository; + private final YamlProcessListSwapper swapper = new YamlProcessListSwapper(); + + @Override + public void reportLocalProcesses(final String instanceId, final String taskId) { + Collection processes = ProcessRegistry.getInstance().listAll(); + if (!processes.isEmpty()) { + repository.persist(ProcessNode.getProcessListInstancePath(taskId, instanceId), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes))); + } + repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(instanceId, taskId)); + } + @Override public Collection getProcessList() { String taskId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", ""); @@ -98,4 +110,9 @@ private Collection getKillProcessTriggerPaths(final String processId) { private boolean isReady(final Collection paths) { return paths.stream().noneMatch(each -> null != repository.query(each)); } + + @Override + public void cleanProcess(final String instanceId, final String processId) { + repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(instanceId, processId)); + } } diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java index 135bc0a1a55a5..d7c0e6e50576b 100644 --- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java +++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/persist/service/StandaloneProcessPersistService.java @@ -22,7 +22,6 @@ import org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService; import java.sql.SQLException; -import java.sql.Statement; import java.util.Collection; /** @@ -30,6 +29,10 @@ */ public final class StandaloneProcessPersistService implements ProcessPersistService { + @Override + public void reportLocalProcesses(final String instanceId, final String taskId) { + } + @Override public Collection getProcessList() { return ProcessRegistry.getInstance().listAll(); @@ -37,12 +40,10 @@ public Collection getProcessList() { @Override public void killProcess(final String processId) throws SQLException { - Process process = ProcessRegistry.getInstance().get(processId); - if (null == process) { - return; - } - for (Statement each : process.getProcessStatements().values()) { - each.cancel(); - } + ProcessRegistry.getInstance().kill(processId); + } + + @Override + public void cleanProcess(final String instanceId, final String processId) { } }