Skip to content

Commit

Permalink
modify test to pass CD
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenxia committed Jan 16, 2025
2 parents fe289c4 + 3951482 commit 7827fdf
Show file tree
Hide file tree
Showing 97 changed files with 1,898 additions and 3,024 deletions.
883 changes: 184 additions & 699 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

308 changes: 151 additions & 157 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions .github/workflows/docker_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64
platforms: linux/amd64, linux/arm64
build-args: TARGET=server
push: ${{ needs.meta.outputs.push_enabled == 'true' }}
tags: ubercadence/server:${{ needs.meta.outputs.image_tag }}
Expand Down Expand Up @@ -92,7 +92,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64
platforms: linux/amd64, linux/arm64
build-args: TARGET=auto-setup
push: ${{ needs.meta.outputs.push_enabled == 'true' }}
tags: ubercadence/server:${{ needs.meta.outputs.image_tag }}-auto-setup
Expand Down Expand Up @@ -122,7 +122,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64
platforms: linux/amd64, linux/arm64
build-args: TARGET=cli
push: ${{ needs.meta.outputs.push_enabled == 'true' }}
tags: ubercadence/cli:${{ needs.meta.outputs.image_tag }}
Expand Down Expand Up @@ -152,7 +152,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64
platforms: linux/amd64, linux/arm64
build-args: TARGET=bench
tags: ubercadence/bench:master
push: ${{ needs.meta.outputs.push_enabled == 'true' }}
Expand All @@ -179,7 +179,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64
platforms: linux/amd64, linux/arm64
build-args: TARGET=canary
tags: ubercadence/canary:master
push: ${{ needs.meta.outputs.push_enabled == 'true' }}
39 changes: 26 additions & 13 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/timeout"
"github.com/uber/cadence/common/cluster"
)
Expand All @@ -44,20 +45,22 @@ type (
GetHistoryPeers() history.PeerResolver
GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
GetFrontendClient() frontend.Client
GetShardDistributorClient() sharddistributor.Client
GetRemoteAdminClient(cluster string) admin.Client
SetRemoteAdminClient(cluster string, client admin.Client)
GetRemoteFrontendClient(cluster string) frontend.Client
}

clientBeanImpl struct {
sync.Mutex
historyClient history.Client
historyPeers history.PeerResolver
matchingClient atomic.Value
frontendClient frontend.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
factory Factory
historyClient history.Client
historyPeers history.PeerResolver
matchingClient atomic.Value
frontendClient frontend.Client
shardDistributorClient sharddistributor.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
factory Factory
}
)

Expand Down Expand Up @@ -96,13 +99,19 @@ func NewClientBean(factory Factory, dispatcher *yarpc.Dispatcher, clusterMetadat
remoteFrontendClients[clusterName] = frontendClient
}

shardDistributorClient, err := factory.NewShardDistributorClient()
if err != nil {
return nil, err
}

return &clientBeanImpl{
factory: factory,
historyClient: historyClient,
historyPeers: historyPeers,
frontendClient: remoteFrontendClients[clusterMetadata.GetCurrentClusterName()],
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
factory: factory,
historyClient: historyClient,
historyPeers: historyPeers,
frontendClient: remoteFrontendClients[clusterMetadata.GetCurrentClusterName()],
shardDistributorClient: shardDistributorClient,
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
}, nil
}

Expand All @@ -125,6 +134,10 @@ func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
return h.frontendClient
}

func (h *clientBeanImpl) GetShardDistributorClient() sharddistributor.Client {
return h.shardDistributorClient
}

func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client {
client, ok := h.remoteAdminClients[cluster]
if !ok {
Expand Down
15 changes: 15 additions & 0 deletions client/clientBean_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package client

import (
"fmt"
"time"

adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1"
Expand All @@ -33,10 +34,12 @@ import (
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"
historyv1 "github.com/uber/cadence/.gen/proto/history/v1"
matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1"
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/errorinjectors"
"github.com/uber/cadence/client/wrappers/grpc"
"github.com/uber/cadence/client/wrappers/metered"
Expand All @@ -61,6 +64,9 @@ type (

NewAdminClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, largeTimeout time.Duration) (admin.Client, error)
NewFrontendClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)

NewShardDistributorClient() (sharddistributor.Client, error)
NewShardDistributorClientWithTimeout(timeout time.Duration) (sharddistributor.Client, error)
}

// DomainIDToNameFunc maps a domainID to domain name. Returns error when mapping is not possible.
Expand Down Expand Up @@ -229,3 +235,36 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndConfig(
}
return client, nil
}

func (cf *rpcClientFactory) NewShardDistributorClient() (sharddistributor.Client, error) {
return cf.NewShardDistributorClientWithTimeout(timeoutwrapper.ShardDistributorDefaultTimeout)
}

func (cf *rpcClientFactory) NewShardDistributorClientWithTimeout(
timeout time.Duration,
) (sharddistributor.Client, error) {
outboundConfig, ok := cf.rpcFactory.GetDispatcher().OutboundConfig(service.ShardDistributor)
// If no outbound config is found, it means the service is not enabled, we just return nil as we don't want to
// break existing configs.
if !ok {
return nil, nil
}

if !rpc.IsGRPCOutbound(outboundConfig) {
return nil, fmt.Errorf("shard distributor client does not support non-GRPC outbound")
}

client := grpc.NewShardDistributorClient(
sharddistributorv1.NewShardDistributorAPIYARPCClient(outboundConfig),
)

client = timeoutwrapper.NewShardDistributorClient(client, timeout)
if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.ShardDistributorErrorInjectionRate)(); errorRate != 0 {
client = errorinjectors.NewShardDistributorClient(client, errorRate, cf.logger)
}
if cf.metricsClient != nil {
client = metered.NewShardDistributorClient(client, cf.metricsClient)
}

return client, nil
}
34 changes: 6 additions & 28 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,20 +604,9 @@ func testMatchingUpdateTaskListPartitionConfigRequest() *types.MatchingUpdateTas
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
ReadPartitions: map[int]*types.TaskListPartition{
0: {},
1: {},
2: {
IsolationGroups: []string{"foo"},
},
},
WritePartitions: map[int]*types.TaskListPartition{
0: {},
1: {
IsolationGroups: []string{"bar"},
},
},
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}
Expand All @@ -627,20 +616,9 @@ func testMatchingRefreshTaskListPartitionConfigRequest() *types.MatchingRefreshT
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
ReadPartitions: map[int]*types.TaskListPartition{
0: {},
1: {},
2: {
IsolationGroups: []string{"foo"},
},
},
WritePartitions: map[int]*types.TaskListPartition{
0: {},
1: {
IsolationGroups: []string{"bar"},
},
},
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}
12 changes: 5 additions & 7 deletions client/matching/partition_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (p *partitionConfigProviderImpl) GetNumberOfReadPartitions(domainID string,
}
c.RLock()
v := c.Version
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
w := c.NumWritePartitions
r := c.NumReadPartitions
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
Expand Down Expand Up @@ -142,8 +142,8 @@ func (p *partitionConfigProviderImpl) GetNumberOfWritePartitions(domainID string
}
c.RLock()
v := c.Version
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
w := c.NumWritePartitions
r := c.NumReadPartitions
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
Expand Down Expand Up @@ -180,9 +180,7 @@ func (p *partitionConfigProviderImpl) UpdatePartitionConfig(domainID string, tas
}
updated := c.updateConfig(*config)
if updated {
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
p.logger.Info("tasklist partition config updated", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.Name), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", r), tag.Dynamic("write-partition", w), tag.Dynamic("config-version", config.Version))
p.logger.Info("tasklist partition config updated", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.Name), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", config.NumReadPartitions), tag.Dynamic("write-partition", config.NumWritePartitions), tag.Dynamic("config-version", config.Version))
}
}

Expand Down
20 changes: 6 additions & 14 deletions client/matching/partition_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestGetNumberOfReadPartitions(t *testing.T) {
if tc.enableReadFromCache && tc.taskListKind == types.TaskListKindNormal {
if tc.cachedConfigExists {
mockCache.EXPECT().Get(gomock.Any()).Return(&syncedTaskListPartitionConfig{
TaskListPartitionConfig: types.TaskListPartitionConfig{ReadPartitions: partitions(4)},
TaskListPartitionConfig: types.TaskListPartitionConfig{NumReadPartitions: 4},
}).Times(1)
} else {
mockCache.EXPECT().Get(gomock.Any()).Return(nil).Times(1)
Expand All @@ -145,10 +145,10 @@ func TestGetNumberOfReadPartitions(t *testing.T) {

kind := tc.taskListKind
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}
p := partitionProvider.GetNumberOfReadPartitions("test-domain-id", taskList, 0)
partitions := partitionProvider.GetNumberOfReadPartitions("test-domain-id", taskList, 0)

// Validate result
assert.Equal(t, tc.expectedPartitions, p)
assert.Equal(t, tc.expectedPartitions, partitions)
})
}
}
Expand Down Expand Up @@ -196,18 +196,18 @@ func TestGetNumberOfWritePartitions(t *testing.T) {
if tc.enableReadFromCache && tc.taskListKind == types.TaskListKindNormal {
if tc.cachedConfigExists {
mockCache.EXPECT().Get(gomock.Any()).Return(&syncedTaskListPartitionConfig{
TaskListPartitionConfig: types.TaskListPartitionConfig{ReadPartitions: partitions(2), WritePartitions: partitions(5)},
TaskListPartitionConfig: types.TaskListPartitionConfig{NumReadPartitions: 2, NumWritePartitions: 5},
}).Times(1)
} else {
mockCache.EXPECT().Get(gomock.Any()).Return(nil).Times(1)
}
}
kind := tc.taskListKind
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}
p := partitionProvider.GetNumberOfWritePartitions("test-domain-id", taskList, 0)
partitions := partitionProvider.GetNumberOfWritePartitions("test-domain-id", taskList, 0)

// Validate result
assert.Equal(t, tc.expectedPartitions, p)
assert.Equal(t, tc.expectedPartitions, partitions)
})
}
}
Expand Down Expand Up @@ -253,11 +253,3 @@ func TestUpdatePartitionConfig(t *testing.T) {
})
}
}

func partitions(num int) map[int]*types.TaskListPartition {
result := make(map[int]*types.TaskListPartition, num)
for i := 0; i < num; i++ {
result[i] = &types.TaskListPartition{}
}
return result
}
42 changes: 42 additions & 0 deletions client/sharddistributor/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package sharddistributor

import (
"context"

"go.uber.org/yarpc"

"github.com/uber/cadence/common/types"
)

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -self_package github.com/uber/cadence/client/sharddistributor
//go:generate gowrap gen -g -p . -i Client -t ../templates/retry.tmpl -o ../wrappers/retryable/sharddistributor_generated.go -v client=ShardDistributor
//go:generate gowrap gen -g -p . -i Client -t ../templates/metered.tmpl -o ../wrappers/metered/sharddistributor_generated.go -v client=ShardDistributor
//go:generate gowrap gen -g -p . -i Client -t ../templates/errorinjectors.tmpl -o ../wrappers/errorinjectors/sharddistributor_generated.go -v client=ShardDistributor
//go:generate gowrap gen -g -p . -i Client -t ../templates/grpc.tmpl -o ../wrappers/grpc/sharddistributor_generated.go -v client=ShardDistributor -v package=apiv1 -v path=github.com/uber/cadence/proto/internal/uber/cadence/sharddistributor/v1 -v prefix=ShardDistributor
//go:generate gowrap gen -g -p . -i Client -t ../templates/timeout.tmpl -o ../wrappers/timeout/sharddistributor_generated.go -v client=ShardDistributor

type Client interface {
GetShardOwner(context.Context, *types.GetShardOwnerRequest, ...yarpc.CallOption) (*types.GetShardOwnerResponse, error)
}
Loading

0 comments on commit 7827fdf

Please sign in to comment.