Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[To rc/1.3.3] Fix the cache invalidation logic in RouteBalancer #13850

Open
wants to merge 5 commits into
base: rc/1.3.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public enum CnToDnRequestType {
INVALIDATE_PARTITION_CACHE,
INVALIDATE_PERMISSION_CACHE,
INVALIDATE_SCHEMA_CACHE,
INVALIDATE_LAST_CACHE,
CLEAR_CACHE,

// Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ protected void initActionMapBuilder() {
(req, client, handler) ->
client.invalidateMatchedSchemaCache(
(TInvalidateMatchedSchemaCacheReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnRequestType.INVALIDATE_LAST_CACHE,
(req, client, handler) ->
client.invalidateLastCache((String) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnRequestType.DELETE_DATA_FOR_DELETE_SCHEMA,
(req, client, handler) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
case FULL_MERGE:
case FLUSH:
case CLEAR_CACHE:
case INVALIDATE_LAST_CACHE:
case START_REPAIR_DATA:
case STOP_REPAIR_DATA:
case LOAD_CONFIGURATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
Expand Down Expand Up @@ -255,17 +254,17 @@ private void balanceRegionLeader(

private void invalidateSchemaCacheOfOldLeaders(
Map<TConsensusGroupId, Integer> oldLeaderMap, Set<TConsensusGroupId> successTransferSet) {
DataNodeAsyncRequestContext<TInvalidateCacheReq, TSStatus> invalidateSchemaCacheRequestHandler =
new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_SCHEMA_CACHE);
DataNodeAsyncRequestContext<String, TSStatus> invalidateSchemaCacheRequestHandler =
new DataNodeAsyncRequestContext<>(CnToDnRequestType.INVALIDATE_LAST_CACHE);
AtomicInteger requestIndex = new AtomicInteger(0);
oldLeaderMap.entrySet().stream()
.filter(entry -> TConsensusGroupType.DataRegion == entry.getKey().getType())
.filter(entry -> successTransferSet.contains(entry.getKey()))
.forEach(
entry -> {
// set target
Integer dataNodeId = entry.getValue();
TDataNodeLocation dataNodeLocation =
final Integer dataNodeId = entry.getValue();
final TDataNodeLocation dataNodeLocation =
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
if (dataNodeLocation == null) {
LOGGER.warn("DataNodeLocation is null, datanodeId {}", dataNodeId);
Expand All @@ -274,10 +273,9 @@ private void invalidateSchemaCacheOfOldLeaders(
invalidateSchemaCacheRequestHandler.putNodeLocation(
requestIndex.get(), dataNodeLocation);
// set req
TConsensusGroupId consensusGroupId = entry.getKey();
String database = getPartitionManager().getRegionStorageGroup(consensusGroupId);
invalidateSchemaCacheRequestHandler.putRequest(
requestIndex.get(), new TInvalidateCacheReq(true, database));
final TConsensusGroupId consensusGroupId = entry.getKey();
final String database = getPartitionManager().getRegionStorageGroup(consensusGroupId);
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
requestIndex.incrementAndGet();
});
CnToDnInternalServiceAsyncRequestManager.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,19 @@ public TSStatus createDataRegion(TCreateDataRegionReq req) {
}

@Override
public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
public TSStatus invalidatePartitionCache(final TInvalidateCacheReq req) {
ClusterPartitionFetcher.getInstance().invalidAllCache();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
public TSStatus invalidateLastCache(final String database) {
DataNodeSchemaCache.getInstance().invalidateLastCacheInDataRegion(database);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

@Override
public TSStatus invalidateSchemaCache(final TInvalidateCacheReq req) {
DataNodeSchemaCache.getInstance().takeWriteLock();
try {
// req.getFullPath() is a database path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,13 @@ service IDataNodeRPCService {
*/
common.TSStatus invalidatePartitionCache(TInvalidateCacheReq req)

/**
* Config node will invalidate last cache.
*
* @param string:database(without root)
*/
common.TSStatus invalidateLastCache(string database)

/**
* Config node will invalidate Schema Info cache.
*
Expand Down
Loading