diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 741e27487d9b..3740a7ef0dcb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -133,40 +133,27 @@ public void init() { } } - /** - * Scan the database and schema region directories to recover schema regions and return the - * collected local schema partition info for localSchemaPartitionTable recovery. - */ - @SuppressWarnings("java:S2142") - private void initSchemaRegion() { + public static Map> getLocalSchemaRegionInfo() { final File schemaDir = new File(config.getSchemaDir()); final File[] sgDirList = schemaDir.listFiles(); - + final Map> localSchemaPartitionTable = new HashMap<>(); if (sgDirList == null) { - return; + return localSchemaPartitionTable; } - - // recover SchemaRegion concurrently - final ExecutorService schemaRegionRecoverPools = - IoTDBThreadPoolFactory.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), - ThreadName.SCHEMA_REGION_RECOVER_TASK.getName()); - final List> futures = new ArrayList<>(); - for (File file : sgDirList) { if (!file.isDirectory()) { continue; } - final PartialPath storageGroup; + final PartialPath database; try { - storageGroup = PartialPath.getDatabasePath(file.getName()); + database = PartialPath.getDatabasePath(file.getName()); } catch (IllegalPathException illegalPathException) { // not a legal sg dir continue; } - final File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath()); + final File sgDir = new File(config.getSchemaDir(), database.getFullPath()); if (!sgDir.exists()) { continue; @@ -176,7 +163,7 @@ private void initSchemaRegion() { if (schemaRegionDirs == null) { continue; } - + List schemaRegionIds = new ArrayList<>(); for (final File schemaRegionDir : schemaRegionDirs) { final SchemaRegionId schemaRegionId; try { @@ -185,11 +172,38 @@ private void initSchemaRegion() { // the dir/file is not schemaRegionDir, ignore this. continue; } - futures.add( - schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup, schemaRegionId))); + schemaRegionIds.add(schemaRegionId); } + localSchemaPartitionTable.put(database.getFullPath(), schemaRegionIds); } + return localSchemaPartitionTable; + } + /** + * Scan the database and schema region directories to recover schema regions and return the + * collected local schema partition info for localSchemaPartitionTable recovery. + */ + @SuppressWarnings("java:S2142") + private void initSchemaRegion() { + // recover SchemaRegion concurrently + Map> localSchemaRegionInfo = getLocalSchemaRegionInfo(); + final ExecutorService schemaRegionRecoverPools = + IoTDBThreadPoolFactory.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), + ThreadName.SCHEMA_REGION_RECOVER_TASK.getName()); + final List> futures = new ArrayList<>(); + localSchemaRegionInfo.forEach( + (k, v) -> { + for (SchemaRegionId schemaRegionId : v) { + try { + futures.add( + schemaRegionRecoverPools.submit( + recoverSchemaRegionTask(new PartialPath(k), schemaRegionId))); + } catch (IllegalPathException e) { + throw new RuntimeException(e); + } + } + }); for (final Future future : futures) { try { final ISchemaRegion schemaRegion = future.get(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 2dc24b8f78fd..7aa5eedd20cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -32,6 +32,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.StartupException; @@ -123,6 +125,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -215,7 +218,6 @@ protected void start() { // Pull and check system configurations from ConfigNode-leader pullAndCheckSystemConfigurations(); - if (isFirstStart) { sendRegisterRequestToConfigNode(true); IoTDBStartCheck.getInstance().generateOrOverwriteSystemPropertiesFile(); @@ -544,49 +546,92 @@ private void sendRegisterRequestToConfigNode(boolean isPreCheck) } private void removeInvalidRegions(List dataNodeConsensusGroupIds) { + removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds); + removeInvalidDataRegions(dataNodeConsensusGroupIds); + removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds); + removeInvalidSchemaRegions(dataNodeConsensusGroupIds); + } + + private void removeInvalidDataRegions(List dataNodeConsensusGroupIds) { + Map> localDataRegionInfo = + StorageEngine.getInstance().getLocalDataRegionInfo(); + List allLocalFilesFolders = TierManager.getInstance().getAllLocalFilesFolders(); + localDataRegionInfo.forEach( + (database, dataRegionIds) -> { + for (DataRegionId dataRegionId : dataRegionIds) { + if (!dataNodeConsensusGroupIds.contains(dataRegionId)) { + removeDataDirRegion(database, dataRegionId, allLocalFilesFolders); + } + } + }); + } + + private void removeInvalidConsensusDataRegions(List dataNodeConsensusGroupIds) { List invalidDataRegionConsensusGroupIds = DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream() .filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId)) .collect(Collectors.toList()); - - List invalidSchemaRegionConsensusGroupIds = - SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream() - .filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId)) - .collect(Collectors.toList()); - removeInvalidDataRegions(invalidDataRegionConsensusGroupIds); - removeInvalidSchemaRegions(invalidSchemaRegionConsensusGroupIds); - } - - private void removeInvalidDataRegions(List invalidConsensusGroupId) { - logger.info("Remove invalid dataRegion directories... {}", invalidConsensusGroupId); - for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) { + logger.info("Remove invalid dataRegion directories... {}", invalidDataRegionConsensusGroupIds); + for (ConsensusGroupId consensusGroupId : invalidDataRegionConsensusGroupIds) { File oldDir = new File( DataRegionConsensusImpl.getInstance() .getRegionDirFromConsensusGroupId(consensusGroupId)); - removeRegionsDir(oldDir); + removeDir(oldDir); } } - private void removeInvalidSchemaRegions(List invalidConsensusGroupId) { - logger.info("Remove invalid schemaRegion directories... {}", invalidConsensusGroupId); - for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) { + private void removeInvalidSchemaRegions(List schemaConsensusGroupIds) { + Map> localSchemaRegionInfo = + SchemaEngine.getLocalSchemaRegionInfo(); + localSchemaRegionInfo.forEach( + (database, schemaRegionIds) -> { + for (SchemaRegionId schemaRegionId : schemaRegionIds) { + if (!schemaConsensusGroupIds.contains(schemaRegionId)) { + removeInvalidSchemaDir(database, schemaRegionId); + } + } + }); + } + + private void removeDataDirRegion( + String database, DataRegionId dataRegionId, List fileFolders) { + fileFolders.forEach( + folder -> { + String regionDir = + folder + File.separator + database + File.separator + dataRegionId.getId(); + removeDir(new File(regionDir)); + }); + } + + private void removeInvalidConsensusSchemaRegions( + List dataNodeConsensusGroupIds) { + List invalidSchemaRegionConsensusGroupIds = + SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream() + .filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId)) + .collect(Collectors.toList()); + logger.info( + "Remove invalid schemaRegion directories... {}", invalidSchemaRegionConsensusGroupIds); + + for (ConsensusGroupId consensusGroupId : invalidSchemaRegionConsensusGroupIds) { File oldDir = new File( SchemaRegionConsensusImpl.getInstance() .getRegionDirFromConsensusGroupId(consensusGroupId)); - removeRegionsDir(oldDir); + removeDir(oldDir); } } - private void removeRegionsDir(File regionDir) { + private void removeInvalidSchemaDir(String database, SchemaRegionId schemaRegionId) { + String systemSchemaDir = + config.getSystemDir() + File.separator + database + File.separator + schemaRegionId.getId(); + removeDir(new File(systemSchemaDir)); + } + + private void removeDir(File regionDir) { if (regionDir.exists()) { - try { - FileUtils.recursivelyDeleteFolder(regionDir.getPath()); - logger.info("delete {} succeed.", regionDir.getAbsolutePath()); - } catch (IOException e) { - logger.error("delete {} failed.", regionDir.getAbsolutePath()); - } + FileUtils.deleteDirectoryAndEmptyParent(regionDir); + logger.info("delete {} succeed.", regionDir.getAbsolutePath()); } else { logger.info("delete {} failed, because it does not exist.", regionDir.getAbsolutePath()); } @@ -645,12 +690,10 @@ private void sendRestartRequestToConfigNode() throws StartupException { (endTime - startTime)); List consensusGroupIds = dataNodeRestartResp.getConsensusGroupIds(); - List dataNodeConsensusGroupIds = + removeInvalidRegions( consensusGroupIds.stream() .map(ConsensusGroupId.Factory::createFromTConsensusGroupId) - .collect(Collectors.toList()); - - removeInvalidRegions(dataNodeConsensusGroupIds); + .collect(Collectors.toList())); } else { /* Throw exception when restart is rejected */ throw new StartupException(dataNodeRestartResp.getStatus().getMessage());