Skip to content

Commit

Permalink
Merge pull request wal-g#1067 from mialinx/sqlserver_db_concurency
Browse files Browse the repository at this point in the history
sqlserver: limit concurrent database operations
  • Loading branch information
mialinx authored Aug 20, 2021
2 parents d443bc4 + 3ef1258 commit eaa57be
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 19 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/blang/semver v3.5.1+incompatible
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cyberdelia/lzo v0.0.0-20171006181345-d85071271a6f
github.com/denisenkom/go-mssqldb v0.9.0
github.com/denisenkom/go-mssqldb v0.10.0
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v1.13.1
github.com/docker/go-connections v0.4.0 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ github.com/cyberdelia/lzo v0.0.0-20171006181345-d85071271a6f/go.mod h1:JlLvEOSII
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.9.0 h1:RSohk2RsiZqLZ0zCjtfn3S4Gp4exhpBWHyQ7D0yGjAk=
github.com/denisenkom/go-mssqldb v0.9.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/denisenkom/go-mssqldb v0.10.0 h1:QykgLZBorFE95+gO3u9esLd0BmbvpWp0/waNNZfHBM8=
github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
Expand Down Expand Up @@ -227,7 +227,6 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -329,7 +328,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down Expand Up @@ -629,7 +627,6 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25 h1:OKbAoGs4fGM5cPLlVQLZGYkFC8OnOfgo6tt0Smf9XhM=
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4 h1:kCCpuwSAoYJPkNc6x0xT9yTtV4oKtARo4RGBQWOfg9E=
golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -763,7 +760,6 @@ google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2M
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
9 changes: 9 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ const (
SQLServerBlobKeyFile = "SQLSERVER_BLOB_KEY_FILE"
SQLServerBlobLockFile = "SQLSERVER_BLOB_LOCK_FILE"
SQLServerConnectionString = "SQLSERVER_CONNECTION_STRING"
SQLServerDBConcurrency = "SQLSERVER_DB_CONCURRENCY"

EndpointSourceSetting = "S3_ENDPOINT_SOURCE"
EndpointPortSetting = "S3_ENDPOINT_PORT"
Expand Down Expand Up @@ -156,6 +157,10 @@ var (
MongoDBLastWriteUpdateInterval: "3s",
}

SQLServerDefaultSettings = map[string]string{
SQLServerDBConcurrency: "10",
}

PGDefaultSettings = map[string]string{
PgWalSize: "16",
}
Expand Down Expand Up @@ -311,6 +316,7 @@ var (
SQLServerBlobKeyFile: true,
SQLServerBlobLockFile: true,
SQLServerConnectionString: true,
SQLServerDBConcurrency: true,
}

MysqlAllowedSettings = map[string]bool{
Expand All @@ -337,6 +343,7 @@ var (
Turbo bool
)

// nolint: gocyclo
func ConfigureSettings(currentType string) {
if len(defaultConfigValues) == 0 {
defaultConfigValues = commonDefaultConfigValues
Expand All @@ -346,6 +353,8 @@ func ConfigureSettings(currentType string) {
dbSpecificDefaultSettings = PGDefaultSettings
case MONGO:
dbSpecificDefaultSettings = MongoDefaultSettings
case SQLSERVER:
dbSpecificDefaultSettings = SQLServerDefaultSettings
}

for k, v := range dbSpecificDefaultSettings {
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func HandleBackupPush(dbnames []string, updateLatest bool, compression bool) {
}
err = runParallel(func(i int) error {
return backupSingleDatabase(ctx, db, backupName, dbnames[i], compression)
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall backup failed: %v", err)

sentinel.StopLocalTime = utility.TimeNowCrossPlatformLocal()
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/backup_restore_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func HandleBackupRestore(backupName string, dbnames []string, fromnames []string
return recoverSingleDatabase(ctx, db, dbname)
}
return nil
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall restore failed: %v", err)

tracelog.InfoLogger.Printf("restore finished")
Expand Down
22 changes: 15 additions & 7 deletions internal/databases/sqlserver/blob/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httputil"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand All @@ -25,7 +26,7 @@ const ProxyStartTimeout = 10 * time.Second

const ReqIDHeader = "X-Ms-Request-Id"

const DefaultConcurency = 8
const DefaultConcurrency = 8

type Server struct {
folder storage.Folder
Expand Down Expand Up @@ -57,16 +58,16 @@ func NewServer(folder storage.Folder) (*Server, error) {
if err != nil {
return nil, err
}
downloadConcurency, err := internal.GetMaxDownloadConcurrency()
downloadConcurrency, err := internal.GetMaxDownloadConcurrency()
if err != nil {
downloadConcurency = DefaultConcurency
downloadConcurrency = DefaultConcurrency
}
bs.downloadSem = make(chan struct{}, downloadConcurency)
uploadConcurency, err := internal.GetMaxUploadConcurrency()
bs.downloadSem = make(chan struct{}, downloadConcurrency)
uploadConcurrency, err := internal.GetMaxUploadConcurrency()
if err != nil {
uploadConcurency = DefaultConcurency
uploadConcurrency = DefaultConcurrency
}
bs.uploadSem = make(chan struct{}, uploadConcurency)
bs.uploadSem = make(chan struct{}, uploadConcurrency)
bs.endpoint = fmt.Sprintf("%s:%d", hostname, 443)
bs.server = http.Server{Addr: bs.endpoint, Handler: bs}
bs.indexes = make(map[string]*Index)
Expand Down Expand Up @@ -145,6 +146,13 @@ func (bs *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

func (bs *Server) ServeHTTP2(w http.ResponseWriter, req *http.Request) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
tracelog.ErrorLogger.Printf("proxy server goroutine panic: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
}()
// default headers
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", "0")
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/log_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func HandleLogPush(dbnames []string, compression bool) {
logBackupName := generateLogBackupName()
err = runParallel(func(i int) error {
return backupSingleLog(ctx, db, logBackupName, dbnames[i], compression)
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log backup failed: %v", err)

tracelog.InfoLogger.Printf("log backup finished")
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/sqlserver/log_restore_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func HandleLogRestore(backupName string, untilTS string, dbnames []string, fromn
return recoverSingleDatabase(ctx, db, dbname)
}
return nil
}, len(dbnames))
}, len(dbnames), getDBConcurrency())
tracelog.ErrorLogger.FatalfOnError("overall log restore failed: %v", err)

tracelog.InfoLogger.Printf("log restore finished")
Expand Down
18 changes: 17 additions & 1 deletion internal/databases/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,16 @@ func getLogsSinceBackup(folder storage.Folder, backupName string, stopAt time.Ti
return logNames, nil
}

func runParallel(f func(int) error, cnt int) error {
func runParallel(f func(int) error, cnt int, concurrency int) error {
if concurrency <= 0 {
concurrency = cnt
}
sem := make(chan struct{}, concurrency)
errs := make(chan error, cnt)
for i := 0; i < cnt; i++ {
go func(i int) {
sem <- struct{}{}
defer func() { <-sem }()
errs <- f(i)
}(i)
}
Expand All @@ -398,6 +404,16 @@ func runParallel(f func(int) error, cnt int) error {
return nil
}

func getDBConcurrency() int {
concurrency, err := internal.GetMaxConcurrency(internal.SQLServerDBConcurrency)
if err != nil {
tracelog.WarningLogger.Printf("config error: %v", err)
tracelog.WarningLogger.Printf("using default db concurrency: %d", blob.DefaultConcurrency)
return blob.DefaultConcurrency
}
return concurrency
}

func exclude(src, excl []string) []string {
var res []string
SRC:
Expand Down

0 comments on commit eaa57be

Please sign in to comment.