Skip to content

Commit

Permalink
[improve](restore) Compress restore job to reduce editlog size
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Oct 24, 2024
1 parent 82e5fd7 commit cb3f14a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,14 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int max_backup_restore_job_num_per_db = 10;

/**
* A internal config, to reduce the restore job size during serialization by compress.
*
* WARNING: Once this option is enabled and a restore is performed, the FE version cannot be rolled back.
*/
@ConfField(mutable = false)
public static boolean restore_job_compressed_serialization = false;

/**
* Control the max num of tablets per backup job involved.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public abstract class AbstractJob implements Writable {

public enum JobType {
BACKUP, RESTORE
BACKUP, RESTORE, RESTORE_COMPRESSED
}

protected JobType type;
Expand Down Expand Up @@ -160,7 +160,7 @@ public static AbstractJob read(DataInput in) throws IOException {
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
} else if (type == JobType.RESTORE || type == JobType.RESTORE_COMPRESSED) {
job = new RestoreJob();
} else {
throw new IOException("Unknown job type: " + type.name());
Expand Down
48 changes: 48 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,21 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class RestoreJob extends AbstractJob {
private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA;
Expand Down Expand Up @@ -2421,8 +2427,30 @@ public static RestoreJob read(DataInput in) throws IOException {

@Override
public void write(DataOutput out) throws IOException {
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE_COMPRESSED;
}
super.write(out);
if (Config.restore_job_compressed_serialization) {
type = JobType.RESTORE;

ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {
try (DataOutputStream stream = new DataOutputStream(gzipStream)) {
writeOthers(stream);
}
}
Text text = new Text(bytesStream.toByteArray());
if (text.getLength() > (100 << 20)) {
LOG.info("restore job serialized size {}", text.getLength());
}
text.write(out);
} else {
writeOthers(out);
}
}

private void writeOthers(DataOutput out) throws IOException {
Text.writeString(out, backupTimestamp);
jobInfo.write(out);
out.writeBoolean(allowLoad);
Expand Down Expand Up @@ -2495,7 +2523,27 @@ public void write(DataOutput out) throws IOException {
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
if (type == JobType.RESTORE_COMPRESSED) {
type = JobType.RESTORE;

Text text = new Text();
text.readFields(in);
if (text.getLength() > (100 << 20)) {
LOG.info("read restore job compressed size {}", text.getLength());
}

ByteArrayInputStream bytesStream = new ByteArrayInputStream(text.getBytes());
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
try (DataInputStream stream = new DataInputStream(gzipStream)) {
readOthers(stream);
}
}
} else {
readOthers(in);
}
}

private void readOthers(DataInput in) throws IOException {
backupTimestamp = Text.readString(in);
jobInfo = BackupJobInfo.read(in);
allowLoad = in.readBoolean();
Expand Down

0 comments on commit cb3f14a

Please sign in to comment.