diff --git a/service/frontend/admin_handler.go b/service/frontend/admin_handler.go index 1fbec4dba27..d74b08e002f 100644 --- a/service/frontend/admin_handler.go +++ b/service/frontend/admin_handler.go @@ -1890,7 +1890,7 @@ func (adh *AdminHandler) StreamWorkflowReplicationMessages( if errors.As(err, &solErr) || errors.As(err, &suErr) { ctx, cl := context.WithTimeout(context.Background(), 2*time.Second) // getShard here to make sure we will talk to correct host when stream is retrying - _, err := adh.historyClient.GetShard(ctx, &historyservice.GetShardRequest{ShardId: serverClusterShardID.ShardID}) + _, err := adh.historyClient.DescribeHistoryHost(ctx, &historyservice.DescribeHistoryHostRequest{ShardId: serverClusterShardID.ShardID}) if err != nil { logger.Error("failed to get shard", tag.Error(err)) } diff --git a/service/frontend/admin_handler_test.go b/service/frontend/admin_handler_test.go index 13153581e6e..1bdf5392e78 100644 --- a/service/frontend/admin_handler_test.go +++ b/service/frontend/admin_handler_test.go @@ -1099,7 +1099,7 @@ func (s *adminHandlerSuite) TestStreamWorkflowReplicationMessages_ServerToClient return nil, serviceerror.NewUnavailable("random error") }) - s.mockHistoryClient.EXPECT().GetShard(gomock.Any(), &historyservice.GetShardRequest{ShardId: serverClusterShardID.ShardID}).Return(&historyservice.GetShardResponse{}, nil) + s.mockHistoryClient.EXPECT().DescribeHistoryHost(gomock.Any(), &historyservice.DescribeHistoryHostRequest{ShardId: serverClusterShardID.ShardID}).Return(&historyservice.DescribeHistoryHostResponse{}, nil) serverCluster.EXPECT().Recv().DoAndReturn(func() (*historyservice.StreamWorkflowReplicationMessagesResponse, error) { waitGroupStart.Done() waitGroupStart.Wait() diff --git a/service/history/handler.go b/service/history/handler.go index 4daced93c88..c24b9debce5 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -675,12 +675,24 @@ func (h *Handler) ExecuteMultiOperation( } // DescribeHistoryHost returns information about the internal states of a history host -func (h *Handler) DescribeHistoryHost(_ context.Context, _ *historyservice.DescribeHistoryHostRequest) (_ *historyservice.DescribeHistoryHostResponse, retError error) { +func (h *Handler) DescribeHistoryHost(_ context.Context, req *historyservice.DescribeHistoryHostRequest) (_ *historyservice.DescribeHistoryHostResponse, retError error) { defer metrics.CapturePanic(h.logger, h.metricsHandler, &retError) h.startWG.Wait() - itemsInCacheByIDCount, itemsInCacheByNameCount := h.namespaceRegistry.GetCacheSize() + // This API supports describe history host by 1. address 2. shard id 3. namespace id + workflow id + // if option 2/3 is provided, we want to check on the shard ownership to return the correct host address. + shardID := req.GetShardId() + if len(req.GetNamespaceId()) != 0 && req.GetWorkflowExecution() != nil { + shardID = common.WorkflowIDToHistoryShard(req.GetNamespaceId(), req.GetWorkflowExecution().GetWorkflowId(), h.config.NumberOfShards) + } + if shardID > 0 { + _, err := h.controller.GetShardByID(shardID) + if err != nil { + return nil, err + } + } + itemsInCacheByIDCount, itemsInCacheByNameCount := h.namespaceRegistry.GetCacheSize() ownedShardIDs := h.controller.ShardIDs() resp := &historyservice.DescribeHistoryHostResponse{ ShardsNumber: int32(len(ownedShardIDs)), diff --git a/service/history/handler_test.go b/service/history/handler_test.go new file mode 100644 index 00000000000..9d38374f78e --- /dev/null +++ b/service/history/handler_test.go @@ -0,0 +1,95 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +package history + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/server/api/historyservice/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/membership" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/serviceerror" + "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tests" + "go.uber.org/mock/gomock" +) + +func TestDescribeHistoryHost(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + controller := shard.NewMockController(ctrl) + namespaceRegistry := namespace.NewMockRegistry(ctrl) + hostInfoProvider := membership.NewMockHostInfoProvider(ctrl) + h := Handler{ + config: &configs.Config{ + NumberOfShards: 10, + }, + metricsHandler: metrics.NoopMetricsHandler, + logger: log.NewNoopLogger(), + controller: controller, + namespaceRegistry: namespaceRegistry, + hostInfoProvider: hostInfoProvider, + } + + mockShard1 := shard.NewTestContext( + ctrl, + &persistencespb.ShardInfo{ + ShardId: 1, + RangeId: 1, + }, + tests.NewDynamicConfig(), + ) + controller.EXPECT().GetShardByID(int32(1)).Return(mockShard1, serviceerror.NewShardOwnershipLost("", "")) + + _, err := h.DescribeHistoryHost(context.Background(), &historyservice.DescribeHistoryHostRequest{ + ShardId: 1, + }) + assert.Error(t, err) + var sol *serviceerror.ShardOwnershipLost + assert.True(t, errors.As(err, &sol)) + + mockShard2 := shard.NewTestContext( + ctrl, + &persistencespb.ShardInfo{ + ShardId: 2, + RangeId: 1, + }, + tests.NewDynamicConfig(), + ) + controller.EXPECT().GetShardByID(int32(2)).Return(mockShard2, nil) + controller.EXPECT().ShardIDs().Return([]int32{2}) + namespaceRegistry.EXPECT().GetCacheSize().Return(int64(0), int64(0)) + hostInfoProvider.EXPECT().HostInfo().Return(membership.NewHostInfoFromAddress("0.0.0.0")) + _, err = h.DescribeHistoryHost(context.Background(), &historyservice.DescribeHistoryHostRequest{ + ShardId: 2, + }) + assert.NoError(t, err) +}