diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
index 672a92c59e31b9..375aa789a773b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
@@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) {
public abstract boolean isCancelled();
+ public abstract boolean isFinished();
+
public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 496727c1e00086..a3fd66692a2928 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -108,10 +108,10 @@ public class BackupHandler extends MasterDaemon implements Writable {
private Env env;
- // map to store backup info, key is label name, value is Pair, meta && info is bytes
- // this map not present in persist && only in fe master memory
+ // map to store backup info, key is label name, value is the BackupJob
+ // this map not present in persist && only in fe memory
// one table only keep one snapshot info, only keep last
- private final Map localSnapshots = new HashMap<>();
+ private final Map localSnapshots = new HashMap<>();
private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();
public BackupHandler() {
@@ -166,6 +166,7 @@ private boolean init() {
return false;
}
}
+
isInit = true;
return true;
}
@@ -484,11 +485,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
return;
}
+ List removedLabels = Lists.newArrayList();
jobLock.lock();
try {
Deque jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
- jobs.removeFirst();
+ AbstractJob removedJob = jobs.removeFirst();
+ if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) {
+ removedLabels.add(removedJob.getLabel());
+ }
}
AbstractJob lastJob = jobs.peekLast();
@@ -501,6 +506,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
} finally {
jobLock.unlock();
}
+
+ if (job.isFinished() && job instanceof BackupJob) {
+ // Save snapshot to local repo, when reload backupHandler from image.
+ BackupJob backupJob = (BackupJob) job;
+ if (backupJob.isLocalSnapshot()) {
+ addSnapshot(backupJob.getLabel(), backupJob);
+ }
+ }
+ for (String label : removedLabels) {
+ removeSnapshot(label);
+ }
}
private List getAllCurrentJobs() {
@@ -737,22 +753,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum,
return false;
}
- public void addSnapshot(String labelName, Snapshot snapshot) {
+ public void addSnapshot(String labelName, BackupJob backupJob) {
+ assert backupJob.isFinished();
+
+ LOG.info("add snapshot {} to local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
- localSnapshots.put(labelName, snapshot);
+ localSnapshots.put(labelName, backupJob);
+ } finally {
+ localSnapshotsLock.writeLock().unlock();
+ }
+ }
+
+ public void removeSnapshot(String labelName) {
+ LOG.info("remove snapshot {} from local repo", labelName);
+ localSnapshotsLock.writeLock().lock();
+ try {
+ localSnapshots.remove(labelName);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}
public Snapshot getSnapshot(String labelName) {
+ BackupJob backupJob;
localSnapshotsLock.readLock().lock();
try {
- return localSnapshots.get(labelName);
+ backupJob = localSnapshots.get(labelName);
} finally {
localSnapshotsLock.readLock().unlock();
}
+
+ if (backupJob == null) {
+ return null;
+ }
+
+ return backupJob.getSnapshot();
}
public static BackupHandler read(DataInput in) throws IOException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 0ed342a57cdcdc..dc92e9a07c3c1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -123,9 +123,6 @@ public enum BackupJobState {
// backup properties && table commit seq with table id
private Map properties = Maps.newHashMap();
- private byte[] metaInfoBytes = null;
- private byte[] jobInfoBytes = null;
-
public BackupJob() {
super(JobType.BACKUP);
}
@@ -333,11 +330,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas
@Override
public synchronized void replayRun() {
- LOG.info("replay run backup job: {}", this);
- if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
- Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
- env.getBackupHandler().addSnapshot(label, snapshot);
- }
+ // nothing to do
}
@Override
@@ -355,6 +348,11 @@ public boolean isCancelled() {
return state == BackupJobState.CANCELLED;
}
+ @Override
+ public boolean isFinished() {
+ return state == BackupJobState.FINISHED;
+ }
+
// Polling the job state and do the right things.
@Override
public synchronized void run() {
@@ -792,8 +790,6 @@ private void saveMetaInfo() {
}
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
- // read meta info to metaInfoBytes
- metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
// 3. save job info file
Map tableCommitSeqMap = Maps.newHashMap();
@@ -818,8 +814,6 @@ private void saveMetaInfo() {
}
jobInfo.writeToFile(jobInfoFile);
localJobInfoFilePath = jobInfoFile.getAbsolutePath();
- // read job info to jobInfoBytes
- jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
} catch (Exception e) {
status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
return;
@@ -873,7 +867,6 @@ private void uploadMetaAndJobInfoFile() {
}
}
-
finishedTime = System.currentTimeMillis();
state = BackupJobState.FINISHED;
@@ -882,8 +875,7 @@ private void uploadMetaAndJobInfoFile() {
LOG.info("job is finished. {}", this);
if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
- Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
- env.getBackupHandler().addSnapshot(label, snapshot);
+ env.getBackupHandler().addSnapshot(label, this);
return;
}
}
@@ -976,6 +968,29 @@ private void cancelInternal() {
LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this);
}
+ public boolean isLocalSnapshot() {
+ return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
+ }
+
+ // read meta and job info bytes from disk, and return the snapshot
+ public synchronized Snapshot getSnapshot() {
+ if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
+ return null;
+ }
+
+ try {
+ File metaInfoFile = new File(localMetaInfoFilePath);
+ File jobInfoFile = new File(localJobInfoFilePath);
+ byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
+ byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
+ return new Snapshot(label, metaInfoBytes, jobInfoBytes);
+ } catch (IOException e) {
+ LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
+ localMetaInfoFilePath, localJobInfoFilePath, e);
+ return null;
+ }
+ }
+
public synchronized List getInfo() {
List info = Lists.newArrayList();
info.add(String.valueOf(jobId));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 36def1738d4397..69bcb3f7941f9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -377,6 +377,11 @@ public boolean isCancelled() {
return state == RestoreJobState.CANCELLED;
}
+ @Override
+ public boolean isFinished() {
+ return state == RestoreJobState.FINISHED;
+ }
+
@Override
public synchronized void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9dbd0afd1c319f..25fa5e1524c219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2803,15 +2803,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
}
// Step 3: get snapshot
+ String label = request.getLabelName();
TGetSnapshotResult result = new TGetSnapshotResult();
result.setStatus(new TStatus(TStatusCode.OK));
- Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
+ Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
if (snapshot == null) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
- result.getStatus().addToErrorMsgs("snapshot not exist");
+ result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label));
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
+ LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}",
+ label, snapshot.getMeta().length, snapshot.getJobInfo().length);
}
return result;