Skip to content

Commit

Permalink
Support gc pause refine
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Feb 6, 2024
1 parent 3ef8e81 commit d9130ef
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 256 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
go get
./gen_proto.sh
./gen_swag.sh
./gen_proto.sh
go build
9 changes: 4 additions & 5 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ milvus:
tlsMode: 0
user: "root"
password: "Milvus"

useSSL: false # Optional, http API protocol, used to pause/resume GC of the Milvus cluster
httpPort: 9091 # Optional, http API port, used to pause/resume GC of the Milvus cluster

# Related configuration of minio, which is responsible for data persistence for Milvus.
minio:
Expand Down Expand Up @@ -60,5 +57,7 @@ backup:
keepTempFiles: false

# Pause GC during backup through Milvus Http API.
pauseGcWhenBackup: true
pauseGcSeconds: 7200
gcPause:
enable: true
seconds: 7200
address: http://localhost:9091
44 changes: 22 additions & 22 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,17 +539,11 @@ func (b *BackupContext) backupCollectionExecute(ctx context.Context, backupInfo
return nil
}

func (b *BackupContext) pauseMilvusGC(ctx context.Context) {
httpAddress := b.params.MilvusCfg.Address + ":" + b.params.MilvusCfg.HttpPort
if b.params.MilvusCfg.EnableSSL {
httpAddress = "https://" + httpAddress
} else {
httpAddress = "http://" + httpAddress
}
func (b *BackupContext) pauseMilvusGC(ctx context.Context, gcAddress string, pauseSeconds int) {
pauseAPI := "/management/datacoord/garbage_collection/pause"
params := url.Values{}
params.Add("pause_seconds", strconv.Itoa(b.params.BackupCfg.PauseGcSeconds))
fullURL := fmt.Sprintf("%s?%s", httpAddress+pauseAPI, params.Encode())
params.Add("pause_seconds", strconv.Itoa(pauseSeconds))
fullURL := fmt.Sprintf("%s?%s", gcAddress+pauseAPI, params.Encode())
response, err := http.Get(fullURL)
if err != nil {
log.Error("Pause Milvus GC Error:", zap.Error(err))
Expand All @@ -562,18 +556,12 @@ func (b *BackupContext) pauseMilvusGC(ctx context.Context) {
log.Error("Read response Error:", zap.Error(err))
return
}
log.Info("Pause Milvus GC response", zap.String("response", string(body)))
log.Info("Pause Milvus GC response", zap.String("response", string(body)), zap.String("address", gcAddress), zap.Int("pauseSeconds", pauseSeconds))
}

func (b *BackupContext) resumeMilvusGC(ctx context.Context) {
httpAddress := b.params.MilvusCfg.Address + ":" + b.params.MilvusCfg.HttpPort
if b.params.MilvusCfg.EnableSSL {
httpAddress = "https://" + httpAddress
} else {
httpAddress = "http://" + httpAddress
}
func (b *BackupContext) resumeMilvusGC(ctx context.Context, gcAddress string) {
pauseAPI := "/management/datacoord/garbage_collection/resume"
fullURL := httpAddress + pauseAPI
fullURL := gcAddress + pauseAPI
response, err := http.Get(fullURL)
if err != nil {
log.Error("Resume Milvus GC Error:", zap.Error(err))
Expand All @@ -585,17 +573,29 @@ func (b *BackupContext) resumeMilvusGC(ctx context.Context) {
log.Error("Read response Error:", zap.Error(err))
return
}
log.Info("Resume Milvus GC response", zap.String("response", string(body)))
log.Info("Resume Milvus GC response", zap.String("response", string(body)), zap.String("address", gcAddress))
}

func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backuppb.CreateBackupRequest, backupInfo *backuppb.BackupInfo) (*backuppb.BackupInfo, error) {
b.mu.Lock()
defer b.mu.Unlock()

// pause GC
if b.params.BackupCfg.PauseGcWhenBackup {
b.pauseMilvusGC(ctx)
defer b.resumeMilvusGC(ctx)
if request.GetGcPauseEnable() || b.params.BackupCfg.GcPauseEnable {
var pause = 0
if request.GetGcPauseSeconds() == 0 {
pause = b.params.BackupCfg.GcPauseSeconds
} else {
pause = int(request.GetGcPauseSeconds())
}
var gcAddress string = ""
if request.GetGcPauseAddress() == "" {
gcAddress = b.params.BackupCfg.GcPauseAddress
} else {
gcAddress = request.GetGcPauseAddress()
}
b.pauseMilvusGC(ctx, gcAddress, pause)
defer b.resumeMilvusGC(ctx, gcAddress)
}

backupInfo.BackupTimestamp = uint64(time.Now().UnixNano() / int64(time.Millisecond))
Expand Down
46 changes: 19 additions & 27 deletions core/paramtable/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ type BackupConfig struct {
BackupCopyDataParallelism int
RestoreParallelism int

KeepTempFiles bool
PauseGcWhenBackup bool
PauseGcSeconds int
KeepTempFiles bool

GcPauseEnable bool
GcPauseSeconds int
GcPauseAddress string
}

func (p *BackupConfig) init(base *BaseTable) {
Expand All @@ -51,8 +53,9 @@ func (p *BackupConfig) init(base *BaseTable) {
p.initRestoreParallelism()
p.initBackupCopyDataParallelism()
p.initKeepTempFiles()
p.initPauseGcWhenBackup()
p.initPauseGcSeconds()
p.initGcPauseEnable()
p.initGcPauseSeconds()
p.initGcPauseAddress()
}

func (p *BackupConfig) initMaxSegmentGroupSize() {
Expand Down Expand Up @@ -83,14 +86,19 @@ func (p *BackupConfig) initKeepTempFiles() {
p.KeepTempFiles, _ = strconv.ParseBool(keepTempFiles)
}

func (p *BackupConfig) initPauseGcWhenBackup() {
pauseGcWhenBackup := p.Base.LoadWithDefault("backup.pauseGcWhenBackup", "false")
p.PauseGcWhenBackup, _ = strconv.ParseBool(pauseGcWhenBackup)
func (p *BackupConfig) initGcPauseEnable() {
enable := p.Base.LoadWithDefault("backup.gcPause.enable", "false")
p.GcPauseEnable, _ = strconv.ParseBool(enable)
}

func (p *BackupConfig) initGcPauseSeconds() {
seconds := p.Base.ParseIntWithDefault("backup.gcPause.seconds", 7200)
p.GcPauseSeconds = seconds
}

func (p *BackupConfig) initPauseGcSeconds() {
size := p.Base.ParseIntWithDefault("backup.pauseGcSeconds", 7200)
p.PauseGcSeconds = size
func (p *BackupConfig) initGcPauseAddress() {
address := p.Base.LoadWithDefault("backup.gcPause.address", "http://localhost:9091")
p.GcPauseAddress = address
}

type MilvusConfig struct {
Expand All @@ -102,8 +110,6 @@ type MilvusConfig struct {
Password string
AuthorizationEnabled bool
TLSMode int
HttpPort string
EnableSSL bool
}

func (p *MilvusConfig) init(base *BaseTable) {
Expand All @@ -115,8 +121,6 @@ func (p *MilvusConfig) init(base *BaseTable) {
p.initPassword()
p.initAuthorizationEnabled()
p.initTLSMode()
p.initHttpPort()
p.initSSLEnabled()
}

func (p *MilvusConfig) initAddress() {
Expand Down Expand Up @@ -159,18 +163,6 @@ func (p *MilvusConfig) initTLSMode() {
p.TLSMode = p.Base.ParseIntWithDefault("milvus.tlsMode", 0)
}

func (p *MilvusConfig) initHttpPort() {
port, err := p.Base.Load("milvus.httpPort")
if err != nil {
panic(err)
}
p.HttpPort = port
}

func (p *MilvusConfig) initSSLEnabled() {
p.EnableSSL = p.Base.ParseBool("milvus.useSSL", false)
}

// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
const (
Expand Down
6 changes: 4 additions & 2 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ message CreateBackupRequest {
// only backup meta, including collection schema and index info
bool meta_only = 7;
// if true, stop GC to avoid the data compacted and GCed during backup, use it when the data to backup is very large.
bool stop_gc = 8;
bool gc_pause_enable = 8;
// gc pause seconds, set it larger than the time cost of backup
int64 gc_pause = 9;
int32 gc_pause_seconds = 9;
// gc pause API address
string gc_pause_address = 10;
}

/**
Expand Down
Loading

0 comments on commit d9130ef

Please sign in to comment.