Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](restore) Add synchronized to avoid concurrent modification #43172 #43277

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public enum BackupJobState {
// all objects which need backup
private List<TableRef> tableRefs = Lists.newArrayList();

private BackupJobState state;
private volatile BackupJobState state;

private long snapshotFinishedTime = -1;
private long snapshotUploadFinishedTime = -1;
Expand Down Expand Up @@ -976,7 +976,7 @@ private void cancelInternal() {
LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this);
}

public List<String> getInfo() {
public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
info.add(label);
Expand Down
206 changes: 104 additions & 102 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
Expand Down Expand Up @@ -130,7 +129,8 @@ public class RestoreJob extends AbstractJob {
public enum RestoreJobState {
PENDING, // Job is newly created. Check and prepare meta in catalog. Create replica if necessary.
// Waiting for replica creation finished synchronously, then sending snapshot tasks.
// then transfer to SNAPSHOTING
// then transfer to CREATING.
CREATING, // Creating replica on BE. Transfer to SNAPSHOTING after all replicas created.
SNAPSHOTING, // Waiting for snapshot finished. Than transfer to DOWNLOAD.
DOWNLOAD, // Send download tasks.
DOWNLOADING, // Waiting for download finished.
Expand All @@ -147,7 +147,7 @@ public enum RestoreJobState {
private BackupJobInfo jobInfo;
private boolean allowLoad;

private RestoreJobState state;
private volatile RestoreJobState state;

private BackupMeta backupMeta;

Expand Down Expand Up @@ -197,6 +197,8 @@ public enum RestoreJobState {
// restore properties
private Map<String, String> properties = Maps.newHashMap();

private MarkedCountDownLatch<Long, Long> createReplicaTasksLatch = null;

public RestoreJob() {
super(JobType.RESTORE);
}
Expand Down Expand Up @@ -253,10 +255,6 @@ public RestoreJobState getState() {
return state;
}

public RestoreFileMapping getFileMapping() {
return fileMapping;
}

public int getMetaVersion() {
return metaVersion;
}
Expand Down Expand Up @@ -380,7 +378,7 @@ public boolean isCancelled() {
}

@Override
public void run() {
public synchronized void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
return;
}
Expand All @@ -406,8 +404,8 @@ public void run() {
checkIfNeedCancel();

if (status.ok()) {
if (state != RestoreJobState.PENDING && label.equals(
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) {
if (state != RestoreJobState.PENDING && state != RestoreJobState.CREATING
&& label.equals(DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) {
LOG.info("pause restore job by debug point: {}", this);
return;
}
Expand All @@ -416,6 +414,9 @@ public void run() {
case PENDING:
checkAndPrepareMeta();
break;
case CREATING:
waitingAllReplicasCreated();
break;
case SNAPSHOTING:
waitingAllSnapshotsFinished();
break;
Expand Down Expand Up @@ -445,7 +446,7 @@ public void run() {
* return true if some restored objs have been dropped.
*/
private void checkIfNeedCancel() {
if (state == RestoreJobState.PENDING) {
if (state == RestoreJobState.PENDING || state == RestoreJobState.CREATING) {
return;
}

Expand Down Expand Up @@ -910,121 +911,122 @@ private void checkAndPrepareMeta() {
LOG.debug("finished to restore resources. {}", this.jobId);

// Send create replica task to BE outside the db lock
boolean ok = false;
int numBatchTasks = batchTaskPerTable.values()
.stream()
.mapToInt(AgentBatchTask::getTaskNum)
.sum();
MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(numBatchTasks);
if (batchTaskPerTable.size() > 0) {
createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks);
if (numBatchTasks > 0) {
LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. {}", numBatchTasks, this);
for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
for (AgentTask task : batchTask.getAllTasks()) {
latch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(latch);
createReplicaTasksLatch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(createReplicaTasksLatch);
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}

// estimate timeout
long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks) / 1000;
try {
LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. timeout: {}s",
numBatchTasks, timeout);
for (long elapsed = 0; elapsed <= timeout; elapsed++) {
if (latch.await(1, TimeUnit.SECONDS)) {
ok = true;
break;
}
if (state != RestoreJobState.PENDING) { // user cancelled
return;
}
if (elapsed % 5 == 0) {
LOG.info("waiting {} create replica tasks for restore to finish, total {} tasks, elapsed {}s",
latch.getCount(), numBatchTasks, elapsed);
}
}
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
ok = false;
}
} else {
ok = true;
}

if (ok && latch.getStatus().ok()) {
if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replicas. {}", this);
}
// add restored partitions.
// table should be in State RESTORE, so no other partitions can be
// added to or removed from this table during the restore process.
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first);
localTbl.writeLock();
try {
Partition restoredPart = entry.second;
OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
if (localTbl.getPartitionInfo().getType() == PartitionType.RANGE
|| localTbl.getPartitionInfo().getType() == PartitionType.LIST) {

PartitionInfo remotePartitionInfo = remoteTbl.getPartitionInfo();
PartitionInfo localPartitionInfo = localTbl.getPartitionInfo();
BackupPartitionInfo backupPartitionInfo
= jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
long remotePartId = backupPartitionInfo.id;
PartitionItem remoteItem = remoteTbl.getPartitionInfo().getItem(remotePartId);
DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
if (reserveReplica) {
restoreReplicaAlloc = remotePartitionInfo.getReplicaAllocation(remotePartId);
}
localPartitionInfo.addPartition(restoredPart.getId(), false, remoteItem,
remoteDataProperty, restoreReplicaAlloc,
remotePartitionInfo.getIsInMemory(remotePartId),
remotePartitionInfo.getIsMutable(remotePartId));
}
localTbl.addPartition(restoredPart);
} finally {
localTbl.writeUnlock();
}
// No log here, PENDING state restore job will redo this method
state = RestoreJobState.CREATING;
}

private void waitingAllReplicasCreated() {
boolean ok = true;
try {
if (!createReplicaTasksLatch.await(0, TimeUnit.SECONDS)) {
LOG.info("waiting {} create replica tasks for restore to finish. {}",
createReplicaTasksLatch.getCount(), this);
return;
}
} catch (InterruptedException e) {
LOG.warn("InterruptedException, {}", this, e);
ok = false;
}

// add restored tables
for (Table tbl : restoredTbls) {
if (!db.writeLockIfExist()) {
status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName()
+ " has been dropped");
return;
}
tbl.writeLock();
try {
if (!db.createTable(tbl)) {
status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName()
+ " already exist in db: " + db.getFullName());
return;
}
} finally {
tbl.writeUnlock();
db.writeUnlock();
}
}
} else {
if (!(ok && createReplicaTasksLatch.getStatus().ok())) {
// only show at most 10 results
List<String> subList = latch.getLeftMarks().stream().limit(10)
List<String> subList = createReplicaTasksLatch.getLeftMarks().stream().limit(10)
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
.collect(Collectors.toList());
String idStr = Joiner.on(", ").join(subList);
String reason = "TIMEDOUT";
if (!latch.getStatus().ok()) {
reason = latch.getStatus().getErrorMsg();
if (!createReplicaTasksLatch.getStatus().ok()) {
reason = createReplicaTasksLatch.getStatus().getErrorMsg();
}
String errMsg = String.format(
"Failed to create replicas for restore: %s, unfinished marks: %s", reason, idStr);
status = new Status(ErrCode.COMMON_ERROR, errMsg);
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replicas. {}", this);
}
allReplicasCreated();
}

private void allReplicasCreated() {
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
return;
}

// add restored partitions.
// table should be in State RESTORE, so no other partitions can be
// added to or removed from this table during the restore process.
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first);
localTbl.writeLock();
try {
Partition restoredPart = entry.second;
OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
if (localTbl.getPartitionInfo().getType() == PartitionType.RANGE
|| localTbl.getPartitionInfo().getType() == PartitionType.LIST) {

PartitionInfo remotePartitionInfo = remoteTbl.getPartitionInfo();
PartitionInfo localPartitionInfo = localTbl.getPartitionInfo();
BackupPartitionInfo backupPartitionInfo
= jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
long remotePartId = backupPartitionInfo.id;
PartitionItem remoteItem = remoteTbl.getPartitionInfo().getItem(remotePartId);
DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
if (reserveReplica) {
restoreReplicaAlloc = remotePartitionInfo.getReplicaAllocation(remotePartId);
}
localPartitionInfo.addPartition(restoredPart.getId(), false, remoteItem,
remoteDataProperty, restoreReplicaAlloc,
remotePartitionInfo.getIsInMemory(remotePartId),
remotePartitionInfo.getIsMutable(remotePartId));
}
localTbl.addPartition(restoredPart);
} finally {
localTbl.writeUnlock();
}
}

// add restored tables
for (Table tbl : restoredTbls) {
if (!db.writeLockIfExist()) {
status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName() + " has been dropped");
return;
}
tbl.writeLock();
try {
if (!db.createTable(tbl)) {
status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName()
+ " already exist in db: " + db.getFullName());
return;
}
} finally {
tbl.writeUnlock();
db.writeUnlock();
}
}

LOG.info("finished to prepare meta. {}", this);

if (jobInfo.content == null || jobInfo.content == BackupContent.ALL) {
Expand Down Expand Up @@ -2133,7 +2135,7 @@ public List<String> getFullInfo() {
return getInfo(false);
}

public List<String> getInfo(boolean isBrief) {
public synchronized List<String> getInfo(boolean isBrief) {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
info.add(label);
Expand Down Expand Up @@ -2192,7 +2194,7 @@ public synchronized Status cancel() {
return Status.OK;
}

public void cancelInternal(boolean isReplay) {
private void cancelInternal(boolean isReplay) {
// We need to clean the residual due to current state
if (!isReplay) {
switch (state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ suite("test_backup_restore_atomic_with_alter", "backup_restore") {
boolean restore_paused = false
for (int k = 0; k < 60; k++) {
def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName} WHERE Label = "${snapshotName}" """
if (records.size() == 1 && records[0].State != 'PENDING') {
if (records.size() == 1 && (records[0].State != 'PENDING' && records[0].State != 'CREATING')) {
restore_paused = true
break
}
Expand Down
Loading