Skip to content

Commit

Permalink
Refactor ProcessListChangedSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 29, 2024
1 parent d1e6c60 commit 46119f2
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.metadata.user.Grantee;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -151,4 +152,16 @@ public boolean isIdle() {
public void removeProcessStatement(final ExecutionUnit executionUnit) {
processStatements.remove(System.identityHashCode(executionUnit));
}

/**
* Kill process.
*
* @throws SQLException SQL exception
*/
public void kill() throws SQLException {
setInterrupted(true);
for (Statement each : processStatements.values()) {
each.cancel();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;

import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -94,11 +95,23 @@ public void remove(final String id) {
}

/**
* List all process.
* List all processes.
*
* @return all processes
*/
public Collection<Process> listAll() {
return processes.values();
}

/**
* Kill process.
*
* @throws SQLException SQL exception
*/
public void kill(final String processId) throws SQLException {
Process process = ProcessRegistry.getInstance().get(processId);
if (null != process) {
process.kill();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
*/
public interface ProcessPersistService {

/**
* Report local processes.
*
* @param instanceId instance ID
* @param taskId task ID
*/
void reportLocalProcesses(final String instanceId, final String taskId);

/**
* Get process list.
*
Expand All @@ -41,4 +49,12 @@ public interface ProcessPersistService {
* @throws SQLException SQL exception
*/
void killProcess(String processId) throws SQLException;

/**
* Clean process.
*
* @param instanceId instance ID
* @param processId process ID
*/
void cleanProcess(String instanceId, String processId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,57 +18,42 @@
package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.KillLocalProcessEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.event.state.compute.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;

/**
* Process list changed subscriber.
*/
public final class ProcessListChangedSubscriber implements DispatchEventSubscriber {

private final String instanceMetaDataId;

private final PersistRepository repository;
private final String instanceId;

private final YamlProcessListSwapper swapper;
private final ProcessPersistService processPersistService;

public ProcessListChangedSubscriber(final ContextManager contextManager) {
instanceMetaDataId = contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
repository = contextManager.getPersistServiceFacade().getRepository();
swapper = new YamlProcessListSwapper();
instanceId = contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId();
processPersistService = contextManager.getPersistServiceFacade().getProcessPersistService();
}

/**
* Report local processes.
*
* @param event show process list trigger event
* @param event report local processes event
*/
@Subscribe
public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
if (!event.getInstanceId().equals(instanceMetaDataId)) {
return;
}
Collection<Process> processes = ProcessRegistry.getInstance().listAll();
if (!processes.isEmpty()) {
repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
if (event.getInstanceId().equals(instanceId)) {
processPersistService.reportLocalProcesses(instanceId, event.getTaskId());
}
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
}

/**
Expand All @@ -89,17 +74,11 @@ public synchronized void completeToReportLocalProcesses(final ReportLocalProcess
*/
@Subscribe
public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException {
if (!event.getInstanceId().equals(instanceMetaDataId)) {
if (!event.getInstanceId().equals(instanceId)) {
return;
}
Process process = ProcessRegistry.getInstance().get(event.getProcessId());
if (null != process) {
process.setInterrupted(true);
for (Statement each : process.getProcessStatements().values()) {
each.cancel();
}
}
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
ProcessRegistry.getInstance().kill(event.getProcessId());
processPersistService.cleanProcess(instanceId, event.getProcessId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
Expand All @@ -43,6 +44,17 @@ public final class ClusterProcessPersistService implements ProcessPersistService

private final PersistRepository repository;

private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();

@Override
public void reportLocalProcesses(final String instanceId, final String taskId) {
Collection<Process> processes = ProcessRegistry.getInstance().listAll();
if (!processes.isEmpty()) {
repository.persist(ProcessNode.getProcessListInstancePath(taskId, instanceId), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
}
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(instanceId, taskId));
}

@Override
public Collection<Process> getProcessList() {
String taskId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
Expand Down Expand Up @@ -98,4 +110,9 @@ private Collection<String> getKillProcessTriggerPaths(final String processId) {
private boolean isReady(final Collection<String> paths) {
return paths.stream().noneMatch(each -> null != repository.query(each));
}

@Override
public void cleanProcess(final String instanceId, final String processId) {
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(instanceId, processId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@
import org.apache.shardingsphere.mode.persist.service.divided.ProcessPersistService;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;

/**
* Standalone process persist service.
*/
public final class StandaloneProcessPersistService implements ProcessPersistService {

@Override
public void reportLocalProcesses(final String instanceId, final String taskId) {
}

@Override
public Collection<Process> getProcessList() {
return ProcessRegistry.getInstance().listAll();
}

@Override
public void killProcess(final String processId) throws SQLException {
Process process = ProcessRegistry.getInstance().get(processId);
if (null == process) {
return;
}
for (Statement each : process.getProcessStatements().values()) {
each.cancel();
}
ProcessRegistry.getInstance().kill(processId);
}

@Override
public void cleanProcess(final String instanceId, final String processId) {
}
}

0 comments on commit 46119f2

Please sign in to comment.