Skip to content

Commit

Permalink
Modify regionMigrateService's ThreadPool to IoTThreadPoolFactory.cach…
Browse files Browse the repository at this point in the history
…edThreadPool (#13548)

* modify regionMigratePool to IoTThreadPool

* fix ut crash
  • Loading branch information
133tosakarin authored Sep 19, 2024
1 parent e5e4980 commit 5387a10
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private void initSchemaRegion() {
for (SchemaRegionId schemaRegionId : v) {
PartialPath database;
try {
database = new PartialPath(k);
database = PartialPath.getDatabasePath(k);
} catch (IllegalPathException e) {
logger.warn("Illegal database path: {}", k);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.storageengine.rescon.memory.AbstractPoolManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
Expand All @@ -53,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

public class RegionMigrateService implements IService {
Expand All @@ -65,7 +65,7 @@ public class RegionMigrateService implements IService {

private static final int SLEEP_MILLIS = 5000;

private RegionMigratePool regionMigratePool;
private ExecutorService regionMigratePool;

// Map<taskId, taskStatus>
// TODO: Due to the use of procedureId as taskId, it is currently unable to handle the situation
Expand Down Expand Up @@ -199,15 +199,15 @@ private boolean addToTaskResultMap(long taskId) {

@Override
public void start() throws StartupException {
regionMigratePool = new RegionMigratePool();
regionMigratePool.start();
regionMigratePool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.REGION_MIGRATE.getName());
LOGGER.info("Region migrate service start");
}

@Override
public void stop() {
if (regionMigratePool != null) {
regionMigratePool.stop();
regionMigratePool.shutdown();
}
LOGGER.info("Region migrate service stop");
}
Expand All @@ -217,32 +217,6 @@ public ServiceType getID() {
return ServiceType.DATA_NODE_REGION_MIGRATE_SERVICE;
}

private static class RegionMigratePool extends AbstractPoolManager {

private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);

private RegionMigratePool() {
this.pool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.REGION_MIGRATE.getName());
}

@Override
public Logger getLogger() {
return poolLogger;
}

@Override
public void start() {
if (this.pool != null) {
poolLogger.info("DataNode region migrate pool start");
}
}

@Override
public String getName() {
return "migrate region";
}
}

private static class AddRegionPeerTask implements Runnable {

private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
Expand Down

0 comments on commit 5387a10

Please sign in to comment.