From 0d08de486fff7f04f38bedca0bb2f21bb8e115b7 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 8 Nov 2024 21:22:15 +0800 Subject: [PATCH 1/2] [feat](restore) Support compressed snapshot meta and job info (#43516) Related PR: https://github.com/selectdb/ccr-syncer/pull/223 The backup meta and snapshot info are larger and might exceed the thrift max message size limitation. This PR compresses the backup meta and job info in both the `GetSnapshot` and `RestoreSnapshot` methods to avoid this. To keep compatibility, the field `compressed` is added to `TGetSnapshotResult` to indicate whether the meta and job info are compressed; config `enable_restore_snapshot_rpc_compression` indicates whether the FE support reads the compressed meta and job info, the ccr-syncer should read this config before issuing a restore snapshot request with compressed meta and job info. Support compressing large backup meta and snapshot job info, to avoid exceeding the thrift server limitation. --- .../java/org/apache/doris/common/Config.java | 9 ++++ .../org/apache/doris/common/GZIPUtils.java | 48 +++++++++++++++++++ .../doris/service/FrontendServiceImpl.java | 48 ++++++++++++++++--- gensrc/thrift/FrontendService.thrift | 3 ++ 4 files changed, 101 insertions(+), 7 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 954dde05005b08..7fbb4745ac73f5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1506,6 +1506,15 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static boolean backup_job_compressed_serialization = false; + /** + * A internal config, to indicate whether to enable the restore snapshot rpc compression. + * + * The ccr syncer will depends this config to decide whether to compress the meta and job + * info of the restore snapshot request. + */ + @ConfField(mutable = false) + public static boolean enable_restore_snapshot_rpc_compression = true; + /** * Control the max num of tablets per backup job involved. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java new file mode 100644 index 00000000000000..7408e2888cc3a5 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.commons.io.IOUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class GZIPUtils { + public static boolean isGZIPCompressed(byte[] data) { + // From RFC 1952: 3.2. Members with a deflate compressed data stream (ID1 = 8, ID2 = 8) + return data.length >= 2 && data[0] == (byte) 0x1F && data[1] == (byte) 0x8B; + } + + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + gzipStream.write(data); + } + return bytesStream.toByteArray(); + } + + public static byte[] decompress(byte[] data) throws IOException { + ByteArrayInputStream bytesStream = new ByteArrayInputStream(data); + try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { + return IOUtils.toByteArray(gzipStream); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 25fa5e1524c219..5aa242b33d9b8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.GZIPUtils; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; @@ -211,6 +212,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -2765,8 +2767,9 @@ public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TExcep // getSnapshotImpl private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp) - throws UserException { - // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type + throws UserException, IOException { + // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, + // snapshot_type if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -2811,10 +2814,22 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { - result.setMeta(snapshot.getMeta()); - result.setJobInfo(snapshot.getJobInfo()); + byte[] meta = snapshot.getMeta(); + byte[] jobInfo = snapshot.getJobInfo(); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", - label, snapshot.getMeta().length, snapshot.getJobInfo().length); + label, meta.length, jobInfo.length); + if (request.isEnableCompress()) { + meta = GZIPUtils.compress(meta); + jobInfo = GZIPUtils.compress(jobInfo); + result.setCompressed(true); + if (LOG.isDebugEnabled()) { + LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta " + + "size {}, compressed job info size {}", label, meta.length, jobInfo.length); + } + } + result.setMeta(meta); + result.setJobInfo(jobInfo); } return result; @@ -2928,8 +2943,27 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque restoreTableRefClause = new AbstractBackupTableRefClause(isExclude, tableRefs); } } - RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, request.getMeta(), - request.getJobInfo()); + + byte[] meta = request.getMeta(); + byte[] jobInfo = request.getJobInfo(); + if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) { + if (LOG.isDebugEnabled()) { + LOG.debug("decompress meta and job info, compressed meta size {}, compressed job info size {}", + meta.length, jobInfo.length); + } + try { + meta = GZIPUtils.decompress(meta); + jobInfo = GZIPUtils.decompress(jobInfo); + } catch (Exception e) { + LOG.warn("decompress meta and job info failed", e); + throw new UserException("decompress meta and job info failed", e); + } + } else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) { + throw new UserException("The request is compressed, but the config " + + "`enable_restore_snapshot_rpc_compressed` is not enabled."); + } + + RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo); restoreStmt.setIsBeingSynced(); LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt); try { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 0140eeff5e113a..575643030e11ca 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1077,6 +1077,7 @@ struct TGetSnapshotRequest { 7: optional string label_name 8: optional string snapshot_name 9: optional TSnapshotType snapshot_type + 10: optional bool enable_compress; } struct TGetSnapshotResult { @@ -1084,6 +1085,7 @@ struct TGetSnapshotResult { 2: optional binary meta 3: optional binary job_info 4: optional Types.TNetworkAddress master_address + 5: optional bool compressed; } struct TTableRef { @@ -1107,6 +1109,7 @@ struct TRestoreSnapshotRequest { 13: optional bool clean_tables 14: optional bool clean_partitions 15: optional bool atomic_restore + 16: optional bool compressed; } struct TRestoreSnapshotResult { From 87d61f5d6b48f41abf6b78b873971b3de257d1f9 Mon Sep 17 00:00:00 2001 From: w41ter Date: Mon, 11 Nov 2024 07:44:35 +0000 Subject: [PATCH 2/2] fixup --- fe/fe-common/pom.xml | 4 ++++ .../java/org/apache/doris/service/FrontendServiceImpl.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml index 9ca13a628577e0..0f6fca30048fd4 100644 --- a/fe/fe-common/pom.xml +++ b/fe/fe-common/pom.xml @@ -89,6 +89,10 @@ under the License. org.aspectj aspectjrt + + commons-io + commons-io + doris-fe-common diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 5aa242b33d9b8d..d83ff7e08156d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -209,10 +209,10 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.io.IOException; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections;