Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instanciate shard distributor client #6620

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/timeout"
"github.com/uber/cadence/common/cluster"
)
Expand All @@ -44,20 +45,22 @@ type (
GetHistoryPeers() history.PeerResolver
GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
GetFrontendClient() frontend.Client
GetShardDistributorClient() sharddistributor.Client
GetRemoteAdminClient(cluster string) admin.Client
SetRemoteAdminClient(cluster string, client admin.Client)
GetRemoteFrontendClient(cluster string) frontend.Client
}

clientBeanImpl struct {
sync.Mutex
historyClient history.Client
historyPeers history.PeerResolver
matchingClient atomic.Value
frontendClient frontend.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
factory Factory
historyClient history.Client
historyPeers history.PeerResolver
matchingClient atomic.Value
frontendClient frontend.Client
shardDistributorClient sharddistributor.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
factory Factory
}
)

Expand Down Expand Up @@ -96,13 +99,19 @@ func NewClientBean(factory Factory, dispatcher *yarpc.Dispatcher, clusterMetadat
remoteFrontendClients[clusterName] = frontendClient
}

shardDistributorClient, err := factory.NewShardDistributorClient()
if err != nil {
return nil, err
}

return &clientBeanImpl{
factory: factory,
historyClient: historyClient,
historyPeers: historyPeers,
frontendClient: remoteFrontendClients[clusterMetadata.GetCurrentClusterName()],
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
factory: factory,
historyClient: historyClient,
historyPeers: historyPeers,
frontendClient: remoteFrontendClients[clusterMetadata.GetCurrentClusterName()],
shardDistributorClient: shardDistributorClient,
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
}, nil
}

Expand All @@ -125,6 +134,10 @@ func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
return h.frontendClient
}

func (h *clientBeanImpl) GetShardDistributorClient() sharddistributor.Client {
return h.shardDistributorClient
}

func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client {
client, ok := h.remoteAdminClients[cluster]
if !ok {
Expand Down
15 changes: 15 additions & 0 deletions client/clientBean_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package client

import (
"fmt"
"time"

adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1"
Expand All @@ -33,10 +34,12 @@ import (
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"
historyv1 "github.com/uber/cadence/.gen/proto/history/v1"
matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1"
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/errorinjectors"
"github.com/uber/cadence/client/wrappers/grpc"
"github.com/uber/cadence/client/wrappers/metered"
Expand All @@ -61,6 +64,9 @@ type (

NewAdminClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, largeTimeout time.Duration) (admin.Client, error)
NewFrontendClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)

NewShardDistributorClient() (sharddistributor.Client, error)
NewShardDistributorClientWithTimeout(timeout time.Duration) (sharddistributor.Client, error)
}

// DomainIDToNameFunc maps a domainID to domain name. Returns error when mapping is not possible.
Expand Down Expand Up @@ -229,3 +235,36 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndConfig(
}
return client, nil
}

func (cf *rpcClientFactory) NewShardDistributorClient() (sharddistributor.Client, error) {
return cf.NewShardDistributorClientWithTimeout(timeoutwrapper.ShardDistributorDefaultTimeout)
}

func (cf *rpcClientFactory) NewShardDistributorClientWithTimeout(
timeout time.Duration,
) (sharddistributor.Client, error) {
outboundConfig, ok := cf.rpcFactory.GetDispatcher().OutboundConfig(service.ShardDistributor)
// If no outbound config is found, it means the service is not enabled, we just return nil as we don't want to
// break existing configs.
if !ok {
return nil, nil
}

if !rpc.IsGRPCOutbound(outboundConfig) {
return nil, fmt.Errorf("shard distributor client does not support non-GRPC outbound")
}

client := grpc.NewShardDistributorClient(
sharddistributorv1.NewShardDistributorAPIYARPCClient(outboundConfig),
)

client = timeoutwrapper.NewShardDistributorClient(client, timeout)
if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.ShardDistributorErrorInjectionRate)(); errorRate != 0 {
client = errorinjectors.NewShardDistributorClient(client, errorRate, cf.logger)
}
if cf.metricsClient != nil {
client = metered.NewShardDistributorClient(client, cf.metricsClient)
}

return client, nil
}
52 changes: 36 additions & 16 deletions common/resource/resource_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/retryable"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
Expand Down Expand Up @@ -108,14 +109,16 @@ type Impl struct {

// internal services clients

sdkClient workflowserviceclient.Interface
frontendRawClient frontend.Client
frontendClient frontend.Client
matchingRawClient matching.Client
matchingClient matching.Client
historyRawClient history.Client
historyClient history.Client
clientBean client.Bean
sdkClient workflowserviceclient.Interface
frontendRawClient frontend.Client
frontendClient frontend.Client
matchingRawClient matching.Client
matchingClient matching.Client
historyRawClient history.Client
historyClient history.Client
shardDistributorRawClient sharddistributor.Client
shardDistributorClient sharddistributor.Client
clientBean client.Bean

// persistence clients
persistenceBean persistenceClient.Bean
Expand Down Expand Up @@ -253,6 +256,21 @@ func New(
serviceConfig.IsErrorRetryableFunction,
)

shardDistributorRawClient := clientBean.GetShardDistributorClient()

// If the raw client is nil, then the client bean is not configured to provide a shard distributor client, so we
// do not wrap and provide a retryable client
var shardDistributorClient sharddistributor.Client
if shardDistributorRawClient == nil {
shardDistributorClient = nil
} else {
shardDistributorClient = retryable.NewShardDistributorClient(
shardDistributorRawClient,
common.CreateShardDistributorServiceRetryPolicy(),
serviceConfig.IsErrorRetryableFunction,
)
}

var historyRawClient history.Client
if params.HistoryClientFn != nil {
logger.Debug("Using history client from HistoryClientFn")
Expand Down Expand Up @@ -336,14 +354,16 @@ func New(

// internal services clients

sdkClient: params.PublicClient,
frontendRawClient: frontendRawClient,
frontendClient: frontendClient,
matchingRawClient: matchingRawClient,
matchingClient: matchingClient,
historyRawClient: historyRawClient,
historyClient: historyClient,
clientBean: clientBean,
sdkClient: params.PublicClient,
frontendRawClient: frontendRawClient,
frontendClient: frontendClient,
matchingRawClient: matchingRawClient,
matchingClient: matchingClient,
historyRawClient: historyRawClient,
historyClient: historyClient,
shardDistributorRawClient: shardDistributorRawClient,
shardDistributorClient: shardDistributorClient,
clientBean: clientBean,

// persistence clients
persistenceBean: persistenceBean,
Expand Down
12 changes: 12 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ const (
frontendServiceOperationMaxInterval = 5 * time.Second
frontendServiceOperationExpirationInterval = 15 * time.Second

shardDistributorServiceOperationInitialInterval = 200 * time.Millisecond
shardDistributorServiceOperationMaxInterval = 10 * time.Second
shardDistributorServiceOperationExpirationInterval = 15 * time.Second

adminServiceOperationInitialInterval = 200 * time.Millisecond
adminServiceOperationMaxInterval = 5 * time.Second
adminServiceOperationExpirationInterval = 15 * time.Second
Expand Down Expand Up @@ -171,6 +175,14 @@ func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy {
return policy
}

func CreateShardDistributorServiceRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(shardDistributorServiceOperationInitialInterval)
policy.SetMaximumInterval(shardDistributorServiceOperationMaxInterval)
policy.SetExpirationInterval(shardDistributorServiceOperationExpirationInterval)

return policy
}

// CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service
func CreateAdminServiceRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(adminServiceOperationInitialInterval)
Expand Down
6 changes: 6 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ func TestCreateXXXRetryPolicyWithSetExpirationInterval(t *testing.T) {
wantMaximumInterval: replicationServiceBusyMaxInterval,
wantSetExpirationInterval: replicationServiceBusyExpirationInterval,
},
"CreateShardDistributorServiceRetryPolicy": {
createFn: CreateShardDistributorServiceRetryPolicy,
wantInitialInterval: shardDistributorServiceOperationInitialInterval,
wantMaximumInterval: shardDistributorServiceOperationMaxInterval,
wantSetExpirationInterval: shardDistributorServiceOperationExpirationInterval,
},
} {
t.Run(name, func(t *testing.T) {
want := backoff.NewExponentialRetryPolicy(c.wantInitialInterval)
Expand Down
Loading