From 2eb26c189b67b539c01803272ea5cc26ee211a94 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Fri, 18 Oct 2024 17:01:40 +0800 Subject: [PATCH 1/2] setup --- .../pipe/AbstractOperatePipeProcedureV2.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 1963d520d3f2..5333d407bc2c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -83,6 +83,9 @@ public abstract class AbstractOperatePipeProcedureV2 // Only used in rollback to reduce the number of network calls protected boolean isRollbackFromOperateOnDataNodesSuccessful = false; + // Only used in rollback to avoid executing rollbackFromValidateTask multiple times + protected boolean isRollbackFromValidateTaskSuccessful = false; + // This variable should not be serialized into procedure store, // putting it here is just for convenience protected AtomicReference pipeTaskInfo; @@ -298,10 +301,13 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState st switch (state) { case VALIDATE_TASK: - try { - rollbackFromValidateTask(env); - } catch (Exception e) { - LOGGER.warn("ProcedureId {}: Failed to rollback from validate task.", getProcId(), e); + if (!isRollbackFromValidateTaskSuccessful) { + try { + rollbackFromValidateTask(env); + isRollbackFromValidateTaskSuccessful = true; + } catch (Exception e) { + LOGGER.warn("ProcedureId {}: Failed to rollback from validate task.", getProcId(), e); + } } break; case CALCULATE_INFO_FOR_TASK: @@ -330,7 +336,7 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState st break; case OPERATE_ON_DATA_NODES: try { - // We have to make sure that rollbackFromOperateOnDataNodes is executed before + // We have to make sure that rollbackFromOperateOnDataNodes is executed after // rollbackFromWriteConfigNodeConsensus, because rollbackFromOperateOnDataNodes is // executed based on the consensus of config nodes that is written by // rollbackFromWriteConfigNodeConsensus From f23a670d966f89c1a01be6c736871fa0cb5dd80c Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Fri, 18 Oct 2024 17:05:52 +0800 Subject: [PATCH 2/2] fix for sub --- .../AbstractOperateSubscriptionProcedure.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java index 4566738cd8d3..184969173c94 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java @@ -59,6 +59,9 @@ public abstract class AbstractOperateSubscriptionProcedure private static final int RETRY_THRESHOLD = 1; + // Only used in rollback to avoid executing rollbackFromValidate multiple times + protected boolean isRollbackFromValidateSuccessful = false; + protected AtomicReference subscriptionInfo; protected AtomicReference acquireLockInternal( @@ -250,15 +253,18 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperateSubscriptionStat switch (state) { case VALIDATE: - try { - rollbackFromValidate(env); - } catch (Exception e) { - LOGGER.warn( - "ProcedureId {}: Failed to rollback from state [{}], because {}", - getProcId(), - state, - e.getMessage(), - e); + if (!isRollbackFromValidateSuccessful) { + try { + rollbackFromValidate(env); + isRollbackFromValidateSuccessful = true; + } catch (Exception e) { + LOGGER.warn( + "ProcedureId {}: Failed to rollback from state [{}], because {}", + getProcId(), + state, + e.getMessage(), + e); + } } break; case OPERATE_ON_CONFIG_NODES: