From fca78fbb3d1430675c3274fa9a0e64ffd60398d7 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 16 Oct 2024 14:06:31 +0800 Subject: [PATCH 1/5] Fix the cache invalidation logic in RouteBalancer --- .../iotdb/confignode/client/CnToDnRequestType.java | 1 + .../CnToDnInternalServiceAsyncRequestManager.java | 4 ++++ .../rpc/DataNodeAsyncRequestRPCHandler.java | 2 ++ .../manager/load/balancer/RouteBalancer.java | 13 ++++++------- .../thrift/impl/DataNodeInternalRPCServiceImpl.java | 11 +++++++++-- .../thrift-datanode/src/main/thrift/datanode.thrift | 7 +++++++ 6 files changed, 29 insertions(+), 9 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java index bc7307282573..4b7ebd57dbf7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/CnToDnRequestType.java @@ -56,6 +56,7 @@ public enum CnToDnRequestType { INVALIDATE_PARTITION_CACHE, INVALIDATE_PERMISSION_CACHE, INVALIDATE_SCHEMA_CACHE, + INVALIDATE_LAST_CACHE, CLEAR_CACHE, // Function diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index 01240b0dab80..1908427c59c5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -270,6 +270,10 @@ protected void initActionMapBuilder() { (req, client, handler) -> client.invalidateMatchedSchemaCache( (TInvalidateMatchedSchemaCacheReq) req, (DataNodeTSStatusRPCHandler) handler)); + actionMapBuilder.put( + CnToDnRequestType.INVALIDATE_LAST_CACHE, + (req, client, handler) -> + client.invalidateLastCache((String) req, (DataNodeTSStatusRPCHandler) handler)); actionMapBuilder.put( CnToDnRequestType.DELETE_DATA_FOR_DELETE_SCHEMA, (req, client, handler) -> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index 0cd2013a705d..a56eb9e6dbdb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -195,6 +195,8 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( case FULL_MERGE: case FLUSH: case CLEAR_CACHE: + case INVALIDATE_LAST_CACHE: + case STOP_DATA_NODE: case START_REPAIR_DATA: case STOP_REPAIR_DATA: case LOAD_CONFIGURATION: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 8cf4275906ac..e12845e9f88d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -256,7 +256,7 @@ private void balanceRegionLeader( private void invalidateSchemaCacheOfOldLeaders( Map oldLeaderMap, Set successTransferSet) { DataNodeAsyncRequestContext invalidateSchemaCacheRequestHandler = - new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_SCHEMA_CACHE); + new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_LAST_CACHE); AtomicInteger requestIndex = new AtomicInteger(0); oldLeaderMap.entrySet().stream() .filter(entry -> TConsensusGroupType.DataRegion == entry.getKey().getType()) @@ -264,8 +264,8 @@ private void invalidateSchemaCacheOfOldLeaders( .forEach( entry -> { // set target - Integer dataNodeId = entry.getValue(); - TDataNodeLocation dataNodeLocation = + final Integer dataNodeId = entry.getValue(); + final TDataNodeLocation dataNodeLocation = getNodeManager().getRegisteredDataNode(dataNodeId).getLocation(); if (dataNodeLocation == null) { LOGGER.warn("DataNodeLocation is null, datanodeId {}", dataNodeId); @@ -274,10 +274,9 @@ private void invalidateSchemaCacheOfOldLeaders( invalidateSchemaCacheRequestHandler.putNodeLocation( requestIndex.get(), dataNodeLocation); // set req - TConsensusGroupId consensusGroupId = entry.getKey(); - String database = getPartitionManager().getRegionStorageGroup(consensusGroupId); - invalidateSchemaCacheRequestHandler.putRequest( - requestIndex.get(), new TInvalidateCacheReq(true, database)); + final TConsensusGroupId consensusGroupId = entry.getKey(); + final String database = getPartitionManager().getRegionStorageGroup(consensusGroupId); + invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database); requestIndex.incrementAndGet(); }); CnToDnInternalServiceAsyncRequestManager.getInstance() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 1aea93a3a02f..464c660623b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -133,6 +133,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager; import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; @@ -497,13 +498,19 @@ public TSStatus createDataRegion(TCreateDataRegionReq req) { } @Override - public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) { + public TSStatus invalidatePartitionCache(final TInvalidateCacheReq req) { ClusterPartitionFetcher.getInstance().invalidAllCache(); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @Override - public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) { + public TSStatus invalidateLastCache(final String database) { + DataNodeSchemaCache.getInstance().invalidateLastCacheInDataRegion(database); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + @Override + public TSStatus invalidateSchemaCache(final TInvalidateCacheReq req) { DataNodeSchemaCache.getInstance().takeWriteLock(); try { // req.getFullPath() is a database path diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 8c6c88480dd5..d8b9461cb4f5 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -702,6 +702,13 @@ service IDataNodeRPCService { */ common.TSStatus invalidatePartitionCache(TInvalidateCacheReq req) + /** + * Config node will invalidate last cache. + * + * @param string:database(without root) + */ + common.TSStatus invalidateLastCache(string database) + /** * Config node will invalidate Schema Info cache. * From 217f5d36a7f89711ea5e365adaeb0ed68a7409be Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 21 Oct 2024 17:38:42 +0800 Subject: [PATCH 2/5] Update DataNodeInternalRPCServiceImpl.java --- .../db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 464c660623b4..40ac005458e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -133,7 +133,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager; import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; From f91dcc6f1f4fe0605ec3fbf0b53660d8b317829c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 21 Oct 2024 17:39:44 +0800 Subject: [PATCH 3/5] Update CnToDnInternalServiceAsyncRequestManager.java --- .../client/async/CnToDnInternalServiceAsyncRequestManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index 1908427c59c5..6a47a7ca4333 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -270,7 +270,7 @@ protected void initActionMapBuilder() { (req, client, handler) -> client.invalidateMatchedSchemaCache( (TInvalidateMatchedSchemaCacheReq) req, (DataNodeTSStatusRPCHandler) handler)); - actionMapBuilder.put( + actionMapBuilder.put( CnToDnRequestType.INVALIDATE_LAST_CACHE, (req, client, handler) -> client.invalidateLastCache((String) req, (DataNodeTSStatusRPCHandler) handler)); From a735f45892894b6616359748a6ee8cdadcfd2b43 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 21 Oct 2024 17:41:08 +0800 Subject: [PATCH 4/5] Update DataNodeAsyncRequestRPCHandler.java --- .../async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index a56eb9e6dbdb..19be87ef0681 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -196,7 +196,6 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( case FLUSH: case CLEAR_CACHE: case INVALIDATE_LAST_CACHE: - case STOP_DATA_NODE: case START_REPAIR_DATA: case STOP_REPAIR_DATA: case LOAD_CONFIGURATION: From 63f2e9647cad6ab2e160c6cc87c36a0b49cd0592 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 21 Oct 2024 18:38:06 +0800 Subject: [PATCH 5/5] Update RouteBalancer.java --- .../iotdb/confignode/manager/load/balancer/RouteBalancer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index e12845e9f88d..ad52faa62497 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -47,7 +47,6 @@ import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.consensus.ConsensusFactory; -import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; @@ -255,7 +254,7 @@ private void balanceRegionLeader( private void invalidateSchemaCacheOfOldLeaders( Map oldLeaderMap, Set successTransferSet) { - DataNodeAsyncRequestContext invalidateSchemaCacheRequestHandler = + DataNodeAsyncRequestContext invalidateSchemaCacheRequestHandler = new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_LAST_CACHE); AtomicInteger requestIndex = new AtomicInteger(0); oldLeaderMap.entrySet().stream()