Skip to content

Commit

Permalink
[fix](restore) Add synchronized to avoid concurrent modification #43172
Browse files Browse the repository at this point in the history
… (#43277)

cherry pick from #43172
  • Loading branch information
w41ter authored Nov 5, 2024
1 parent a4a3e7e commit 53af206
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public enum BackupJobState {
// all objects which need backup
private List<TableRef> tableRefs = Lists.newArrayList();

private BackupJobState state;
private volatile BackupJobState state;

private long snapshotFinishedTime = -1;
private long snapshotUploadFinishedTime = -1;
Expand Down Expand Up @@ -976,7 +976,7 @@ private void cancelInternal() {
LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this);
}

public List<String> getInfo() {
public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
info.add(label);
Expand Down
206 changes: 104 additions & 102 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
Expand Down Expand Up @@ -130,7 +129,8 @@ public class RestoreJob extends AbstractJob {
public enum RestoreJobState {
PENDING, // Job is newly created. Check and prepare meta in catalog. Create replica if necessary.
// Waiting for replica creation finished synchronously, then sending snapshot tasks.
// then transfer to SNAPSHOTING
// then transfer to CREATING.
CREATING, // Creating replica on BE. Transfer to SNAPSHOTING after all replicas created.
SNAPSHOTING, // Waiting for snapshot finished. Than transfer to DOWNLOAD.
DOWNLOAD, // Send download tasks.
DOWNLOADING, // Waiting for download finished.
Expand All @@ -147,7 +147,7 @@ public enum RestoreJobState {
private BackupJobInfo jobInfo;
private boolean allowLoad;

private RestoreJobState state;
private volatile RestoreJobState state;

private BackupMeta backupMeta;

Expand Down Expand Up @@ -197,6 +197,8 @@ public enum RestoreJobState {
// restore properties
private Map<String, String> properties = Maps.newHashMap();

private MarkedCountDownLatch<Long, Long> createReplicaTasksLatch = null;

public RestoreJob() {
super(JobType.RESTORE);
}
Expand Down Expand Up @@ -253,10 +255,6 @@ public RestoreJobState getState() {
return state;
}

public RestoreFileMapping getFileMapping() {
return fileMapping;
}

public int getMetaVersion() {
return metaVersion;
}
Expand Down Expand Up @@ -380,7 +378,7 @@ public boolean isCancelled() {
}

@Override
public void run() {
public synchronized void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
return;
}
Expand All @@ -406,8 +404,8 @@ public void run() {
checkIfNeedCancel();

if (status.ok()) {
if (state != RestoreJobState.PENDING && label.equals(
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) {
if (state != RestoreJobState.PENDING && state != RestoreJobState.CREATING
&& label.equals(DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) {
LOG.info("pause restore job by debug point: {}", this);
return;
}
Expand All @@ -416,6 +414,9 @@ public void run() {
case PENDING:
checkAndPrepareMeta();
break;
case CREATING:
waitingAllReplicasCreated();
break;
case SNAPSHOTING:
waitingAllSnapshotsFinished();
break;
Expand Down Expand Up @@ -445,7 +446,7 @@ public void run() {
* return true if some restored objs have been dropped.
*/
private void checkIfNeedCancel() {
if (state == RestoreJobState.PENDING) {
if (state == RestoreJobState.PENDING || state == RestoreJobState.CREATING) {
return;
}

Expand Down Expand Up @@ -910,121 +911,122 @@ private void checkAndPrepareMeta() {
LOG.debug("finished to restore resources. {}", this.jobId);

// Send create replica task to BE outside the db lock
boolean ok = false;
int numBatchTasks = batchTaskPerTable.values()
.stream()
.mapToInt(AgentBatchTask::getTaskNum)
.sum();
MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(numBatchTasks);
if (batchTaskPerTable.size() > 0) {
createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks);
if (numBatchTasks > 0) {
LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. {}", numBatchTasks, this);
for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
for (AgentTask task : batchTask.getAllTasks()) {
latch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(latch);
createReplicaTasksLatch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(createReplicaTasksLatch);
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}

// estimate timeout
long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks) / 1000;
try {
LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. timeout: {}s",
numBatchTasks, timeout);
for (long elapsed = 0; elapsed <= timeout; elapsed++) {
if (latch.await(1, TimeUnit.SECONDS)) {
ok = true;
break;
}
if (state != RestoreJobState.PENDING) { // user cancelled
return;
}
if (elapsed % 5 == 0) {
LOG.info("waiting {} create replica tasks for restore to finish, total {} tasks, elapsed {}s",
latch.getCount(), numBatchTasks, elapsed);
}
}
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
ok = false;
}
} else {
ok = true;
}

if (ok && latch.getStatus().ok()) {
if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replicas. {}", this);
}
// add restored partitions.
// table should be in State RESTORE, so no other partitions can be
// added to or removed from this table during the restore process.
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first);
localTbl.writeLock();
try {
Partition restoredPart = entry.second;
OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
if (localTbl.getPartitionInfo().getType() == PartitionType.RANGE
|| localTbl.getPartitionInfo().getType() == PartitionType.LIST) {

PartitionInfo remotePartitionInfo = remoteTbl.getPartitionInfo();
PartitionInfo localPartitionInfo = localTbl.getPartitionInfo();
BackupPartitionInfo backupPartitionInfo
= jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
long remotePartId = backupPartitionInfo.id;
PartitionItem remoteItem = remoteTbl.getPartitionInfo().getItem(remotePartId);
DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
if (reserveReplica) {
restoreReplicaAlloc = remotePartitionInfo.getReplicaAllocation(remotePartId);
}
localPartitionInfo.addPartition(restoredPart.getId(), false, remoteItem,
remoteDataProperty, restoreReplicaAlloc,
remotePartitionInfo.getIsInMemory(remotePartId),
remotePartitionInfo.getIsMutable(remotePartId));
}
localTbl.addPartition(restoredPart);
} finally {
localTbl.writeUnlock();
}
// No log here, PENDING state restore job will redo this method
state = RestoreJobState.CREATING;
}

private void waitingAllReplicasCreated() {
boolean ok = true;
try {
if (!createReplicaTasksLatch.await(0, TimeUnit.SECONDS)) {
LOG.info("waiting {} create replica tasks for restore to finish. {}",
createReplicaTasksLatch.getCount(), this);
return;
}
} catch (InterruptedException e) {
LOG.warn("InterruptedException, {}", this, e);
ok = false;
}

// add restored tables
for (Table tbl : restoredTbls) {
if (!db.writeLockIfExist()) {
status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName()
+ " has been dropped");
return;
}
tbl.writeLock();
try {
if (!db.createTable(tbl)) {
status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName()
+ " already exist in db: " + db.getFullName());
return;
}
} finally {
tbl.writeUnlock();
db.writeUnlock();
}
}
} else {
if (!(ok && createReplicaTasksLatch.getStatus().ok())) {
// only show at most 10 results
List<String> subList = latch.getLeftMarks().stream().limit(10)
List<String> subList = createReplicaTasksLatch.getLeftMarks().stream().limit(10)
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
.collect(Collectors.toList());
String idStr = Joiner.on(", ").join(subList);
String reason = "TIMEDOUT";
if (!latch.getStatus().ok()) {
reason = latch.getStatus().getErrorMsg();
if (!createReplicaTasksLatch.getStatus().ok()) {
reason = createReplicaTasksLatch.getStatus().getErrorMsg();
}
String errMsg = String.format(
"Failed to create replicas for restore: %s, unfinished marks: %s", reason, idStr);
status = new Status(ErrCode.COMMON_ERROR, errMsg);
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replicas. {}", this);
}
allReplicasCreated();
}

private void allReplicasCreated() {
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
return;
}

// add restored partitions.
// table should be in State RESTORE, so no other partitions can be
// added to or removed from this table during the restore process.
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first);
localTbl.writeLock();
try {
Partition restoredPart = entry.second;
OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
if (localTbl.getPartitionInfo().getType() == PartitionType.RANGE
|| localTbl.getPartitionInfo().getType() == PartitionType.LIST) {

PartitionInfo remotePartitionInfo = remoteTbl.getPartitionInfo();
PartitionInfo localPartitionInfo = localTbl.getPartitionInfo();
BackupPartitionInfo backupPartitionInfo
= jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
long remotePartId = backupPartitionInfo.id;
PartitionItem remoteItem = remoteTbl.getPartitionInfo().getItem(remotePartId);
DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
if (reserveReplica) {
restoreReplicaAlloc = remotePartitionInfo.getReplicaAllocation(remotePartId);
}
localPartitionInfo.addPartition(restoredPart.getId(), false, remoteItem,
remoteDataProperty, restoreReplicaAlloc,
remotePartitionInfo.getIsInMemory(remotePartId),
remotePartitionInfo.getIsMutable(remotePartId));
}
localTbl.addPartition(restoredPart);
} finally {
localTbl.writeUnlock();
}
}

// add restored tables
for (Table tbl : restoredTbls) {
if (!db.writeLockIfExist()) {
status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName() + " has been dropped");
return;
}
tbl.writeLock();
try {
if (!db.createTable(tbl)) {
status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName()
+ " already exist in db: " + db.getFullName());
return;
}
} finally {
tbl.writeUnlock();
db.writeUnlock();
}
}

LOG.info("finished to prepare meta. {}", this);

if (jobInfo.content == null || jobInfo.content == BackupContent.ALL) {
Expand Down Expand Up @@ -2133,7 +2135,7 @@ public List<String> getFullInfo() {
return getInfo(false);
}

public List<String> getInfo(boolean isBrief) {
public synchronized List<String> getInfo(boolean isBrief) {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
info.add(label);
Expand Down Expand Up @@ -2192,7 +2194,7 @@ public synchronized Status cancel() {
return Status.OK;
}

public void cancelInternal(boolean isReplay) {
private void cancelInternal(boolean isReplay) {
// We need to clean the residual due to current state
if (!isReplay) {
switch (state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ suite("test_backup_restore_atomic_with_alter", "backup_restore") {
boolean restore_paused = false
for (int k = 0; k < 60; k++) {
def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName} WHERE Label = "${snapshotName}" """
if (records.size() == 1 && records[0].State != 'PENDING') {
if (records.size() == 1 && (records[0].State != 'PENDING' && records[0].State != 'CREATING')) {
restore_paused = true
break
}
Expand Down

0 comments on commit 53af206

Please sign in to comment.