Skip to content

Commit

Permalink
Merge pull request distribution#2631 from whoshuu/feature/improve-gcs…
Browse files Browse the repository at this point in the history
…-driver

Improve gcs driver
  • Loading branch information
dmcgowan authored Sep 6, 2018
2 parents 15de837 + 7a195dd commit 642075f
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 38 deletions.
22 changes: 22 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ storage:
gcs:
bucket: bucketname
keyfile: /path/to/keyfile
credentials:
type: service_account
project_id: project_id_string
private_key_id: private_key_id_string
private_key: private_key_string
client_email: [email protected]
client_id: client_id_string
auth_uri: http://example.com/auth_uri
token_uri: http://example.com/token_uri
auth_provider_x509_cert_url: http://example.com/provider_cert_url
client_x509_cert_url: http://example.com/client_cert_url
rootdirectory: /gcs/object/name/prefix
chunksize: 5242880
s3:
Expand Down Expand Up @@ -389,6 +400,17 @@ storage:
gcs:
bucket: bucketname
keyfile: /path/to/keyfile
credentials:
type: service_account
project_id: project_id_string
private_key_id: private_key_id_string
private_key: private_key_string
client_email: [email protected]
client_id: client_id_string
auth_uri: http://example.com/auth_uri
token_uri: http://example.com/token_uri
auth_provider_x509_cert_url: http://example.com/provider_cert_url
client_x509_cert_url: http://example.com/client_cert_url
rootdirectory: /gcs/object/name/prefix
s3:
accesskey: awsaccesskey
Expand Down
43 changes: 43 additions & 0 deletions registry/storage/driver/base/regulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package base

import (
"context"
"fmt"
"io"
"reflect"
"strconv"
"sync"

storagedriver "github.com/docker/distribution/registry/storage/driver"
Expand All @@ -15,6 +18,46 @@ type regulator struct {
available uint64
}

// GetLimitFromParameter takes an interface type as decoded from the YAML
// configuration and returns a uint64 representing the maximum number of
// concurrent calls given a minimum limit and default.
//
// If the parameter supplied is of an invalid type this returns an error.
func GetLimitFromParameter(param interface{}, min, def uint64) (uint64, error) {
limit := def

switch v := param.(type) {
case string:
var err error
if limit, err = strconv.ParseUint(v, 0, 64); err != nil {
return limit, fmt.Errorf("parameter must be an integer, '%v' invalid", param)
}
case uint64:
limit = v
case int, int32, int64:
val := reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Int()
// if param is negative casting to uint64 will wrap around and
// give you the hugest thread limit ever. Let's be sensible, here
if val > 0 {
limit = uint64(val)
} else {
limit = min
}
case uint, uint32:
limit = reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Uint()
case nil:
// use the default
default:
return 0, fmt.Errorf("invalid value '%#v'", param)
}

if limit < min {
return min, nil
}

return limit, nil
}

// NewRegulator wraps the given driver and is used to regulate concurrent calls
// to the given storage driver to a maximum of the given limit. This is useful
// for storage drivers that would otherwise create an unbounded number of OS
Expand Down
31 changes: 31 additions & 0 deletions registry/storage/driver/base/regulator_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package base

import (
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -65,3 +66,33 @@ func TestRegulatorEnterExit(t *testing.T) {
}
}
}

func TestGetLimitFromParameter(t *testing.T) {
tests := []struct {
Input interface{}
Expected uint64
Min uint64
Default uint64
Err error
}{
{"foo", 0, 5, 5, fmt.Errorf("parameter must be an integer, 'foo' invalid")},
{"50", 50, 5, 5, nil},
{"5", 25, 25, 50, nil}, // lower than Min returns Min
{nil, 50, 25, 50, nil}, // nil returns default
{812, 812, 25, 50, nil},
}

for _, item := range tests {
t.Run(fmt.Sprint(item.Input), func(t *testing.T) {
actual, err := GetLimitFromParameter(item.Input, item.Min, item.Default)

if err != nil && item.Err != nil && err.Error() != item.Err.Error() {
t.Fatalf("GetLimitFromParameter error, expected %#v got %#v", item.Err, err)
}

if actual != item.Expected {
t.Fatalf("GetLimitFromParameter result error, expected %d got %d", item.Expected, actual)
}
})
}
}
32 changes: 3 additions & 29 deletions registry/storage/driver/filesystem/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"io/ioutil"
"os"
"path"
"reflect"
"strconv"
"time"

storagedriver "github.com/docker/distribution/registry/storage/driver"
Expand Down Expand Up @@ -85,33 +83,9 @@ func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, e
rootDirectory = fmt.Sprint(rootDir)
}

// Get maximum number of threads for blocking filesystem operations,
// if specified
threads := parameters["maxthreads"]
switch v := threads.(type) {
case string:
if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil {
return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads)
}
case uint64:
maxThreads = v
case int, int32, int64:
val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()
// If threads is negative casting to uint64 will wrap around and
// give you the hugest thread limit ever. Let's be sensible, here
if val > 0 {
maxThreads = uint64(val)
}
case uint, uint32:
maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint()
case nil:
// do nothing
default:
return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads)
}

if maxThreads < minThreads {
maxThreads = minThreads
maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads)
if err != nil {
return nil, fmt.Errorf("maxthreads config error: %s", err.Error())
}
}

Expand Down
72 changes: 63 additions & 9 deletions registry/storage/driver/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gcs
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -49,6 +50,8 @@ const (
uploadSessionContentType = "application/x-docker-upload-session"
minChunkSize = 256 * 1024
defaultChunkSize = 20 * minChunkSize
defaultMaxConcurrency = 50
minConcurrency = 25

maxTries = 5
)
Expand All @@ -64,6 +67,12 @@ type driverParameters struct {
client *http.Client
rootDirectory string
chunkSize int

// maxConcurrency limits the number of concurrent driver operations
// to GCS, which ultimately increases reliability of many simultaneous
// pushes by ensuring we aren't DoSing our own server with many
// connections.
maxConcurrency uint64
}

func init() {
Expand All @@ -89,6 +98,16 @@ type driver struct {
chunkSize int
}

// Wrapper wraps `driver` with a throttler, ensuring that no more than N
// GCS actions can occur concurrently. The default limit is 75.
type Wrapper struct {
baseEmbed
}

type baseEmbed struct {
base.Base
}

// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - bucket
Expand Down Expand Up @@ -140,6 +159,31 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
return nil, err
}
ts = jwtConf.TokenSource(context.Background())
} else if credentials, ok := parameters["credentials"]; ok {
credentialMap, ok := credentials.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("The credentials were not specified in the correct format")
}

stringMap := map[string]interface{}{}
for k, v := range credentialMap {
key, ok := k.(string)
if !ok {
return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k))
}
stringMap[key] = v
}

data, err := json.Marshal(stringMap)
if err != nil {
return nil, fmt.Errorf("Failed to marshal gcs credentials to json")
}

jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl)
if err != nil {
return nil, err
}
ts = jwtConf.TokenSource(context.Background())
} else {
var err error
ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
Expand All @@ -148,13 +192,19 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
}
}

maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
if err != nil {
return nil, fmt.Errorf("maxconcurrency config error: %s", err)
}

params := driverParameters{
bucket: fmt.Sprint(bucket),
rootDirectory: fmt.Sprint(rootDirectory),
email: jwtConf.Email,
privateKey: jwtConf.PrivateKey,
client: oauth2.NewClient(context.Background(), ts),
chunkSize: chunkSize,
bucket: fmt.Sprint(bucket),
rootDirectory: fmt.Sprint(rootDirectory),
email: jwtConf.Email,
privateKey: jwtConf.PrivateKey,
client: oauth2.NewClient(context.Background(), ts),
chunkSize: chunkSize,
maxConcurrency: maxConcurrency,
}

return New(params)
Expand All @@ -178,8 +228,12 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) {
chunkSize: params.chunkSize,
}

return &base.Base{
StorageDriver: d,
return &Wrapper{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: base.NewRegulator(d, params.maxConcurrency),
},
},
}, nil
}

Expand Down Expand Up @@ -864,7 +918,7 @@ func (d *driver) context(context context.Context) context.Context {
}

func (d *driver) pathToKey(path string) string {
return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")
return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
}

func (d *driver) pathToDirKey(path string) string {
Expand Down

0 comments on commit 642075f

Please sign in to comment.