Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe/Subscription: avoid executing rollbackFromValidateTask or rollbackFromValidate multiple times in retry with rollback scenarios #13825

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public abstract class AbstractOperatePipeProcedureV2
// Only used in rollback to reduce the number of network calls
protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;

// Only used in rollback to avoid executing rollbackFromValidateTask multiple times
protected boolean isRollbackFromValidateTaskSuccessful = false;

// This variable should not be serialized into procedure store,
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
Expand Down Expand Up @@ -298,10 +301,13 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState st

switch (state) {
case VALIDATE_TASK:
try {
rollbackFromValidateTask(env);
} catch (Exception e) {
LOGGER.warn("ProcedureId {}: Failed to rollback from validate task.", getProcId(), e);
if (!isRollbackFromValidateTaskSuccessful) {
try {
rollbackFromValidateTask(env);
isRollbackFromValidateTaskSuccessful = true;
} catch (Exception e) {
LOGGER.warn("ProcedureId {}: Failed to rollback from validate task.", getProcId(), e);
}
}
break;
case CALCULATE_INFO_FOR_TASK:
Expand Down Expand Up @@ -330,7 +336,7 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperatePipeTaskState st
break;
case OPERATE_ON_DATA_NODES:
try {
// We have to make sure that rollbackFromOperateOnDataNodes is executed before
// We have to make sure that rollbackFromOperateOnDataNodes is executed after
// rollbackFromWriteConfigNodeConsensus, because rollbackFromOperateOnDataNodes is
// executed based on the consensus of config nodes that is written by
// rollbackFromWriteConfigNodeConsensus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public abstract class AbstractOperateSubscriptionProcedure

private static final int RETRY_THRESHOLD = 1;

// Only used in rollback to avoid executing rollbackFromValidate multiple times
protected boolean isRollbackFromValidateSuccessful = false;

protected AtomicReference<SubscriptionInfo> subscriptionInfo;

protected AtomicReference<SubscriptionInfo> acquireLockInternal(
Expand Down Expand Up @@ -250,15 +253,18 @@ protected void rollbackState(ConfigNodeProcedureEnv env, OperateSubscriptionStat

switch (state) {
case VALIDATE:
try {
rollbackFromValidate(env);
} catch (Exception e) {
LOGGER.warn(
"ProcedureId {}: Failed to rollback from state [{}], because {}",
getProcId(),
state,
e.getMessage(),
e);
if (!isRollbackFromValidateSuccessful) {
try {
rollbackFromValidate(env);
isRollbackFromValidateSuccessful = true;
} catch (Exception e) {
LOGGER.warn(
"ProcedureId {}: Failed to rollback from state [{}], because {}",
getProcId(),
state,
e.getMessage(),
e);
}
}
break;
case OPERATE_ON_CONFIG_NODES:
Expand Down
Loading