Skip to content

Commit

Permalink
work on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Feb 4, 2025
1 parent 9e5d173 commit 55dbac0
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions tests/activity_api_batch_unpause_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ import (
"github.com/stretchr/testify/suite"
batchpb "go.temporal.io/api/batch/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/testing/testvars"
"go.temporal.io/server/tests/testcore"
"google.golang.org/grpc/codes"
)

type ActivityApiBatchUnpauseClientTestSuite struct {
testcore.FunctionalTestSdkSuite
tv *testvars.TestVars
}

func TestActivityApiBatchUnpauseClientTestSuite(t *testing.T) {
Expand All @@ -56,8 +56,6 @@ func TestActivityApiBatchUnpauseClientTestSuite(t *testing.T) {

func (s *ActivityApiBatchUnpauseClientTestSuite) SetupTest() {
s.FunctionalTestSdkSuite.SetupTest()

s.tv = testvars.New(s.T()).WithTaskQueue(s.TaskQueue()).WithNamespaceName(s.Namespace())
}

type internalTestWorkflow struct {
Expand Down Expand Up @@ -86,14 +84,13 @@ func newInternalWorkflow() *internalTestWorkflow {
}

func (w *internalTestWorkflow) WorkflowFunc(ctx workflow.Context) error {
var ret string
err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
ActivityID: "activity-id",
DisableEagerExecution: true,
StartToCloseTimeout: w.startToCloseTimeout,
ScheduleToCloseTimeout: w.scheduleToCloseTimeout,
RetryPolicy: w.activityRetryPolicy,
}), w.ActivityFunc).Get(ctx, &ret)
}), w.ActivityFunc).Get(ctx, nil)
return err
}

Expand All @@ -118,7 +115,7 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) createWorkflow(ctx context.Cont
return workflowRun
}

func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Acceptance() {
func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Success() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -134,16 +131,12 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Accept
s.EventuallyWithT(func(t *assert.CollectT) {
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
assert.NoError(t, err)
if description.GetPendingActivities() != nil {
assert.Len(t, description.PendingActivities, 1)
}
assert.Len(t, description.GetPendingActivities(), 1)
assert.Greater(t, internalWorkflow.startedActivityCount.Load(), int32(0))

description, err = s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun2.GetID(), workflowRun2.GetRunID())
assert.NoError(t, err)
if description.GetPendingActivities() != nil {
assert.Len(t, description.PendingActivities, 1)
}
assert.Len(t, description.GetPendingActivities(), 1)
assert.Greater(t, internalWorkflow.startedActivityCount.Load(), int32(0))
}, 5*time.Second, 100*time.Millisecond)

Expand All @@ -168,12 +161,11 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Accept
description, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun1.GetID(), workflowRun1.GetRunID())
assert.NoError(t, err)
if description.GetPendingActivities() != nil {
assert.Len(t, description.PendingActivities, 1)
assert.Len(t, description.GetPendingActivities(), 1)
assert.True(t, description.PendingActivities[0].Paused)
}
}, 5*time.Second, 100*time.Millisecond)

// yes, both workflow type and activity type are "func1". There is no way to specify them.
workflowTypeName := "WorkflowFunc"
activityTypeName := "ActivityFunc"
// Make sure the activity is in visibility
Expand All @@ -189,9 +181,7 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Accept
})
assert.NoError(t, err)
assert.NotNil(t, listResp)
if listResp != nil {
assert.Len(t, listResp.Executions, 2)
}
assert.Len(t, listResp.GetExecutions(), 2)
}, 5*time.Second, 500*time.Millisecond)

// unpause the activities in both workflows with batch unpause
Expand Down Expand Up @@ -235,15 +225,34 @@ func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Accept
s.NoError(err)
}

func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Negative() {
func (s *ActivityApiBatchUnpauseClientTestSuite) TestActivityBatchUnpause_Failed() {
// neither activity type not "match all" is provided
_, err := s.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
Namespace: s.Namespace().String(),
Operation: &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{
UnpauseActivitiesOperation: &batchpb.BatchOperationUnpauseActivities{},
},
VisibilityQuery: fmt.Sprintf("WorkflowType='%s'", "workflowTypeName"),
VisibilityQuery: fmt.Sprintf("WorkflowType='%s'", "WorkflowFunc"),
JobId: uuid.New(),
Reason: "test",
})
s.Error(err)
s.Equal(codes.InvalidArgument, serviceerror.ToStatus(err).Code())
s.ErrorAs(err, new(*serviceerror.InvalidArgument))

// neither activity type not "match all" is provided
_, err = s.SdkClient().WorkflowService().StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
Namespace: s.Namespace().String(),
Operation: &workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation{
UnpauseActivitiesOperation: &batchpb.BatchOperationUnpauseActivities{
Activity: &batchpb.BatchOperationUnpauseActivities_Type{Type: ""},
},
},
VisibilityQuery: fmt.Sprintf("WorkflowType='%s'", "WorkflowFunc"),
JobId: uuid.New(),
Reason: "test",
})
s.Error(err)
s.Equal(codes.InvalidArgument, serviceerror.ToStatus(err).Code())
s.ErrorAs(err, new(*serviceerror.InvalidArgument))
}

0 comments on commit 55dbac0

Please sign in to comment.