From 999acbe1b77c95843978b5d00a5e4a726faf333b Mon Sep 17 00:00:00 2001 From: aguzovatii Date: Tue, 16 May 2023 14:50:55 +0300 Subject: [PATCH] Handle async response from CC /state endpoint (#56) * Handle the case when the request to CC STATE endpoint returns the async task instead of the result When the request on /state endpoint of CC takes longer than webserver.request.maxBlockTimeMs, then CC will convert the request into an async task and will respond with the taskID instead of the result. The Koperator should handle this case by waiting for CC to complete the task. --- api/v1alpha1/common_types.go | 2 + .../cruisecontroloperation_controller.go | 140 +++++++++++++++++- .../cruisecontroloperation_controller_test.go | 127 ++++++++++++---- controllers/tests/mocks/scale.go | 19 ++- pkg/scale/scale.go | 131 +++++++++++++--- pkg/scale/types.go | 8 +- 6 files changed, 364 insertions(+), 63 deletions(-) diff --git a/api/v1alpha1/common_types.go b/api/v1alpha1/common_types.go index 0a948c9a8..e3132cc70 100644 --- a/api/v1alpha1/common_types.go +++ b/api/v1alpha1/common_types.go @@ -48,6 +48,8 @@ const ( OperationRemoveDisks CruiseControlTaskOperation = "remove_disks" // OperationRebalance means a Cruise Control rebalance operation OperationRebalance CruiseControlTaskOperation = "rebalance" + // OperationStatus means a Cruise Control status operation + OperationStatus CruiseControlTaskOperation = "status" // KafkaAccessTypeRead states that a user wants consume access to a topic KafkaAccessTypeRead KafkaAccessType = "read" // KafkaAccessTypeWrite states that a user wants produce access to a topic diff --git a/controllers/cruisecontroloperation_controller.go b/controllers/cruisecontroloperation_controller.go index d52846421..56eaedb1c 100644 --- a/controllers/cruisecontroloperation_controller.go +++ b/controllers/cruisecontroloperation_controller.go @@ -19,6 +19,7 @@ import ( "fmt" "reflect" "sort" + "strings" "time" "emperror.dev/errors" @@ -34,6 +35,7 @@ import ( "github.com/banzaicloud/go-cruise-control/pkg/types" + apiutil "github.com/banzaicloud/koperator/api/util" banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1" banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/scale" @@ -41,12 +43,13 @@ import ( ) const ( - defaultFailedTasksHistoryMaxLength = 50 - ccOperationFinalizerGroup = "finalizer.cruisecontroloperations.kafka.banzaicloud.io" - ccOperationForStopExecution = "ccOperationStopExecution" - ccOperationFirstExecution = "ccOperationFirstExecution" - ccOperationRetryExecution = "ccOperationRetryExecution" - ccOperationInProgress = "ccOperationInProgress" + defaultFailedTasksHistoryMaxLength = 50 + ccOperationFinalizerGroup = "finalizer.cruisecontroloperations.kafka.banzaicloud.io" + ccOperationForStopExecution = "ccOperationStopExecution" + ccOperationFirstExecution = "ccOperationFirstExecution" + ccOperationRetryExecution = "ccOperationRetryExecution" + ccOperationInProgress = "ccOperationInProgress" + defaultCruiseControlStatusOperationMaxDuration = time.Duration(5) * time.Minute ) var ( @@ -98,6 +101,12 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques return reconciled() } + // Skip reconciliation for Cruise Control Status operation + if currentCCOperation.CurrentTaskOperation() == banzaiv1alpha1.OperationStatus { + log.V(1).Info("skipping reconciliation for Cruise Control Status operation") + return reconciled() + } + // When the task is done we can remove the finalizer instantly thus we can return fast here. if isFinalizerNeeded(currentCCOperation) && currentCCOperation.IsDone() { controllerutil.RemoveFinalizer(currentCCOperation, ccOperationFinalizerGroup) @@ -145,7 +154,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques } // Checking Cruise Control health - status, err := r.scaler.Status(ctx) + status, err := r.getStatus(ctx, log, kafkaCluster, kafkaClusterRef, ccOperationListClusterWide) if err != nil { log.Error(err, "could not get Cruise Control status") return requeueAfter(defaultRequeueIntervalInSeconds) @@ -274,6 +283,8 @@ func (r *CruiseControlOperationReconciler) executeOperation(ctx context.Context, cruseControlTaskResult, err = r.scaler.RemoveDisksWithParams(ctx, ccOperationExecution.CurrentTaskParameters()) case banzaiv1alpha1.OperationStopExecution: cruseControlTaskResult, err = r.scaler.StopExecution(ctx) + case banzaiv1alpha1.OperationStatus: + err = errors.NewWithDetails("Cruise Control operation not supported", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters()) default: err = errors.NewWithDetails("Cruise Control operation not supported", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters()) } @@ -502,6 +513,121 @@ func (r *CruiseControlOperationReconciler) updateCurrentTasks(ctx context.Contex return nil } +// getStatus returns the internal state of Cruise Control. +// +// The logic is the following: +// - If the Cruise Control makes the Status request sync, then the result will be returned. +// - If the Cruise Control makes the Status request async, then a new Status CruiseControlOperation +// will be created and an error will be returned to indicate that Cruise Control is not ready yet. +// - If there is already a Status CruiseControlOperation in progress, then it will be updated and +// the result will be returned. +func (r *CruiseControlOperationReconciler) getStatus( + ctx context.Context, + log logr.Logger, + kafkaCluster *banzaiv1beta1.KafkaCluster, + kafkaClusterRef client.ObjectKey, + ccOperationListClusterWide banzaiv1alpha1.CruiseControlOperationList, +) (scale.CruiseControlStatus, error) { + var statusOperation *banzaiv1alpha1.CruiseControlOperation + for i := range ccOperationListClusterWide.Items { + ccOperation := &ccOperationListClusterWide.Items[i] + // ignoring the error here to continue processing the operations, + // even if the user does not provide a KafkaClusterRef label on the CCOperation then the ref will be an empty object (not nil) and the filter will skip it. + ref, _ := kafkaClusterReference(ccOperation) + if ref.Name == kafkaClusterRef.Name && ref.Namespace == kafkaClusterRef.Namespace && ccOperation.Status.CurrentTask != nil && + ccOperation.Status.CurrentTask.Operation == banzaiv1alpha1.OperationStatus && ccOperation.IsCurrentTaskRunning() { + statusOperation = ccOperation + break + } + } + + if statusOperation != nil { + res, err := r.scaler.StatusTask(ctx, statusOperation.CurrentTaskID()) + if err != nil { + return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not get the latest state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace()) + } + if err := updateResult(log, res.TaskResult, statusOperation, false); err != nil { + return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace()) + } + + err = r.Status().Update(ctx, statusOperation) + if err != nil { + return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace()) + } + + if statusOperation.CurrentTask().Finished != nil && + statusOperation.CurrentTask().Finished.Time.Sub(statusOperation.CurrentTask().Started.Time) > defaultCruiseControlStatusOperationMaxDuration { + return scale.CruiseControlStatus{}, errors.New("the Cruise Control status operation took too long to finish") + } + + if res.Status == nil { + return scale.CruiseControlStatus{}, errors.New("could not get Cruise Control status") + } + + return *res.Status, nil + } + + res, err := r.scaler.Status(ctx) + if err != nil { + return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not get Cruise Control status") + } + + if res.Status == nil { + operationTTLSecondsAfterFinished := kafkaCluster.Spec.CruiseControlConfig.CruiseControlOperationSpec.GetTTLSecondsAfterFinished() + operation, err := r.createCCOperation(ctx, kafkaCluster, operationTTLSecondsAfterFinished, banzaiv1alpha1.OperationStatus) + if err != nil { + return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not create a new Status CruiseControlOperation") + } + if err = updateResult(log, res.TaskResult, operation, true); err != nil { + return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace()) + } + err = r.Status().Update(ctx, operation) + if err != nil { + return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace()) + } + + return scale.CruiseControlStatus{}, errors.New("could not get Cruise Control status, the operation is still in progress") + } + + return *res.Status, nil +} + +func (r *CruiseControlOperationReconciler) createCCOperation( + ctx context.Context, + kafkaCluster *banzaiv1beta1.KafkaCluster, + ttlSecondsAfterFinished *int, + operationType banzaiv1alpha1.CruiseControlTaskOperation, +) (*banzaiv1alpha1.CruiseControlOperation, error) { + operation := &banzaiv1alpha1.CruiseControlOperation{ + ObjectMeta: v1.ObjectMeta{ + GenerateName: fmt.Sprintf("%s-%s-", kafkaCluster.Name, strings.ReplaceAll(string(operationType), "_", "")), + Namespace: kafkaCluster.Namespace, + Labels: apiutil.LabelsForKafka(kafkaCluster.Name), + }, + } + + if ttlSecondsAfterFinished != nil { + operation.Spec.TTLSecondsAfterFinished = ttlSecondsAfterFinished + } + + if err := controllerutil.SetControllerReference(kafkaCluster, operation, r.Scheme); err != nil { + return nil, err + } + if err := r.Client.Create(ctx, operation); err != nil { + return nil, err + } + + operation.Status.CurrentTask = &banzaiv1alpha1.CruiseControlTask{ + Operation: operationType, + } + + if err := r.Status().Update(ctx, operation); err != nil { + return nil, err + } + + return operation, nil +} + func isWaitingForFinalization(ccOperation *banzaiv1alpha1.CruiseControlOperation) bool { return ccOperation.IsCurrentTaskRunning() && !ccOperation.ObjectMeta.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(ccOperation, ccOperationFinalizerGroup) } diff --git a/controllers/tests/cruisecontroloperation_controller_test.go b/controllers/tests/cruisecontroloperation_controller_test.go index 9f99b4994..edc8caa40 100644 --- a/controllers/tests/cruisecontroloperation_controller_test.go +++ b/controllers/tests/cruisecontroloperation_controller_test.go @@ -359,6 +359,34 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }, 10*time.Second, 500*time.Millisecond).Should(BeTrue()) }) }) + When("Cruise Control makes the Status operation async", Serial, func() { + JustBeforeEach(func(ctx SpecContext) { + cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7()) + operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName()) + err := k8sClient.Create(ctx, &operation) + Expect(err).NotTo(HaveOccurred()) + + operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ + Operation: v1alpha1.OperationAddBroker, + ID: "11111", + } + err = k8sClient.Status().Update(ctx, &operation) + Expect(err).NotTo(HaveOccurred()) + }) + It("should create status CruiseControlOperation and retry", func(ctx SpecContext) { + Eventually(ctx, func() v1beta1.CruiseControlUserTaskState { + operation := v1alpha1.CruiseControlOperation{} + err := k8sClient.Get(ctx, client.ObjectKey{ + Namespace: kafkaCluster.Namespace, + Name: "add-broker-operation", + }, &operation) + if err != nil { + return "" + } + return operation.CurrentTaskState() + }, 15*time.Second, 500*time.Millisecond).Should(Equal(v1beta1.CruiseControlTaskCompleted)) + }) + }) }) func generateCruiseControlOperation(name, namespace, kafkaRef string) v1alpha1.CruiseControlOperation { @@ -382,11 +410,12 @@ func getScaleMock2() *mocks.MockCruiseControlScaler { State: v1beta1.CruiseControlTaskCompletedWithError, })} scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ - ExecutorReady: true, - MonitorReady: true, - AnalyzerReady: true, - }, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{ + Status: &scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }}, nil).AnyTimes() scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", @@ -405,11 +434,12 @@ func getScaleMock1() *mocks.MockCruiseControlScaler { State: v1beta1.CruiseControlTaskCompleted, })} scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ - ExecutorReady: true, - MonitorReady: true, - AnalyzerReady: true, - }, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{ + Status: &scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }}, nil).AnyTimes() scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", @@ -433,11 +463,12 @@ func getScaleMock3() *mocks.MockCruiseControlScaler { State: v1beta1.CruiseControlTaskCompleted, })} scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ - ExecutorReady: true, - MonitorReady: true, - AnalyzerReady: true, - }, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{ + Status: &scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }}, nil).AnyTimes() scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", @@ -467,11 +498,12 @@ func getScaleMock4() *mocks.MockCruiseControlScaler { State: v1beta1.CruiseControlTaskCompletedWithError, })} scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ - ExecutorReady: true, - MonitorReady: true, - AnalyzerReady: true, - }, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{ + Status: &scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }}, nil).AnyTimes() scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "1", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", @@ -497,11 +529,12 @@ func getScaleMock5() *mocks.MockCruiseControlScaler { })} first := scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).Times(1) scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult2, nil).After(first).AnyTimes() - scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ - ExecutorReady: true, - MonitorReady: true, - AnalyzerReady: true, - }, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{ + Status: &scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }}, nil).AnyTimes() scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12345", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", @@ -521,11 +554,12 @@ func getScaleMock6() *mocks.MockCruiseControlScaler { State: v1beta1.CruiseControlTaskCompletedWithError, })} scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() - scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ - ExecutorReady: true, - MonitorReady: true, - AnalyzerReady: true, - }, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{ + Status: &scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }}, nil).AnyTimes() scaleMock.EXPECT().RebalanceWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ TaskID: "12346", StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", @@ -541,6 +575,39 @@ func getScaleMock6() *mocks.MockCruiseControlScaler { return scaleMock } +func getScaleMock7() *mocks.MockCruiseControlScaler { + mockCtrl := gomock.NewController(GinkgoT()) + scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) + scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes() + + startTime := metav1.Now().Format(time.RFC1123) + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{ + TaskResult: &scale.Result{ + TaskID: "22222", + StartedAt: startTime, + State: v1beta1.CruiseControlTaskActive, + }}, nil).AnyTimes() + scaleMock.EXPECT().StatusTask(gomock.Any(), "22222").Return(scale.StatusTaskResult{ + TaskResult: &scale.Result{ + TaskID: "22222", + StartedAt: startTime, + State: v1beta1.CruiseControlTaskCompleted, + }, + Status: &scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }}, nil).AnyTimes() + + userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{ + TaskID: "11111", + StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", + State: v1beta1.CruiseControlTaskCompleted, + })} + scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() + return scaleMock +} + func scaleResultPointer(res scale.Result) *scale.Result { return &res } diff --git a/controllers/tests/mocks/scale.go b/controllers/tests/mocks/scale.go index 947fbe1a4..fad1cac78 100644 --- a/controllers/tests/mocks/scale.go +++ b/controllers/tests/mocks/scale.go @@ -296,10 +296,10 @@ func (mr *MockCruiseControlScalerMockRecorder) RemoveBrokersWithParams(ctx, para } // Status mocks base method. -func (m *MockCruiseControlScaler) Status(ctx context.Context) (scale.CruiseControlStatus, error) { +func (m *MockCruiseControlScaler) Status(ctx context.Context) (scale.StatusTaskResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Status", ctx) - ret0, _ := ret[0].(scale.CruiseControlStatus) + ret0, _ := ret[0].(scale.StatusTaskResult) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -310,6 +310,21 @@ func (mr *MockCruiseControlScalerMockRecorder) Status(ctx interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockCruiseControlScaler)(nil).Status), ctx) } +// StatusTask mocks base method. +func (m *MockCruiseControlScaler) StatusTask(ctx context.Context, taskID string) (scale.StatusTaskResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StatusTask", ctx, taskID) + ret0, _ := ret[0].(scale.StatusTaskResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StatusTask indicates an expected call of StatusTask. +func (mr *MockCruiseControlScalerMockRecorder) StatusTask(ctx interface{}, taskID string) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatusTask", reflect.TypeOf((*MockCruiseControlScaler)(nil).Status), ctx, taskID) +} + // StopExecution mocks base method. func (m *MockCruiseControlScaler) StopExecution(ctx context.Context) (*scale.Result, error) { m.ctrl.T.Helper() diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index d2d4b39b4..cb71fd4f1 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -16,17 +16,17 @@ package scale import ( "context" + "encoding/json" "fmt" "math" "strconv" "strings" "emperror.dev/errors" - "github.com/go-logr/logr" - "github.com/banzaicloud/go-cruise-control/pkg/api" "github.com/banzaicloud/go-cruise-control/pkg/client" "github.com/banzaicloud/go-cruise-control/pkg/types" + "github.com/go-logr/logr" "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" @@ -107,18 +107,102 @@ type cruiseControlScaler struct { client *client.Client } -// Status returns a CruiseControlStatus describing the internal state of Cruise Control. -func (cc *cruiseControlScaler) Status(ctx context.Context) (CruiseControlStatus, error) { +// Status returns a StatusTaskResult describing the internal state of Cruise Control. +func (cc *cruiseControlScaler) Status(ctx context.Context) (StatusTaskResult, error) { req := api.StateRequestWithDefaults() req.Verbose = true resp, err := cc.client.State(ctx, req) if err != nil { - return CruiseControlStatus{}, err + return StatusTaskResult{}, err + } + + // if the execution takes too much time, then Cruise Control converts the request into async + // and returns the taskID + if resp.Result == nil { + return StatusTaskResult{ + TaskResult: &Result{ + TaskID: resp.TaskID, + StartedAt: resp.Date, + ResponseStatusCode: resp.StatusCode, + RequestURL: resp.RequestURL, + State: v1beta1.CruiseControlTaskActive, + }, + }, nil + } + + status := convert(resp.Result) + + return StatusTaskResult{ + TaskResult: &Result{ + TaskID: resp.TaskID, + StartedAt: resp.Date, + ResponseStatusCode: resp.StatusCode, + RequestURL: resp.RequestURL, + State: v1beta1.CruiseControlTaskActive, + }, + Status: &status, + }, nil +} + +// StatusTask returns the latest state of the Status Cruise Control task. +func (cc *cruiseControlScaler) StatusTask(ctx context.Context, taskID string) (StatusTaskResult, error) { + req := &api.UserTasksRequest{ + UserTaskIDs: []string{taskID}, + FetchCompletedTasks: true, + } + + resp, err := cc.client.UserTasks(ctx, req) + if err != nil { + return StatusTaskResult{}, err + } + + //CC no longer has the info about the task, mark it as completed with error + if len(resp.Result.UserTasks) == 0 { + return StatusTaskResult{ + TaskResult: &Result{ + TaskID: taskID, + State: v1beta1.CruiseControlTaskCompletedWithError, + }, + }, nil } + if len(resp.Result.UserTasks) != 1 { + return StatusTaskResult{}, fmt.Errorf("could not get the Cruise Control state, expected the response for 1 task (%s), but got %d responses", taskID, len(resp.Result.UserTasks)) + } + + taskInfo := resp.Result.UserTasks[0] + status := taskInfo.Status + if status == types.UserTaskStatusCompleted { + result := &types.StateResult{} + if err = json.Unmarshal([]byte(taskInfo.OriginalResponse), result); err != nil { + return StatusTaskResult{}, err + } + + status := convert(result) + + return StatusTaskResult{ + TaskResult: &Result{ + TaskID: taskInfo.UserTaskID, + StartedAt: taskInfo.StartMs.UTC().String(), + State: v1beta1.CruiseControlUserTaskState(taskInfo.Status.String()), + }, + Status: &status, + }, nil + } + + return StatusTaskResult{ + TaskResult: &Result{ + TaskID: taskInfo.UserTaskID, + StartedAt: taskInfo.StartMs.UTC().String(), + State: v1beta1.CruiseControlUserTaskState(taskInfo.Status.String()), + }, + }, nil +} + +func convert(result *types.StateResult) CruiseControlStatus { goalsReady := true - if len(resp.Result.AnalyzerState.GoalReadiness) > 0 { - for _, goal := range resp.Result.AnalyzerState.GoalReadiness { + if len(result.AnalyzerState.GoalReadiness) > 0 { + for _, goal := range result.AnalyzerState.GoalReadiness { if goal.Status != types.GoalReadinessStatusReady { goalsReady = false break @@ -126,32 +210,33 @@ func (cc *cruiseControlScaler) Status(ctx context.Context) (CruiseControlStatus, } } - return CruiseControlStatus{ - MonitorReady: resp.Result.MonitorState.State == types.MonitorStateRunning, - ExecutorReady: resp.Result.ExecutorState.State == types.ExecutorStateTypeNoTaskInProgress, - AnalyzerReady: resp.Result.AnalyzerState.IsProposalReady && goalsReady, - ProposalReady: resp.Result.AnalyzerState.IsProposalReady, + status := CruiseControlStatus{ + MonitorReady: result.MonitorState.State == types.MonitorStateRunning, + ExecutorReady: result.ExecutorState.State == types.ExecutorStateTypeNoTaskInProgress, + AnalyzerReady: result.AnalyzerState.IsProposalReady && goalsReady, + ProposalReady: result.AnalyzerState.IsProposalReady, GoalsReady: goalsReady, - MonitoredWindows: resp.Result.MonitorState.NumMonitoredWindows, - MonitoringCoverage: resp.Result.MonitorState.MonitoringCoveragePercentage, - }, nil + MonitoredWindows: result.MonitorState.NumMonitoredWindows, + MonitoringCoverage: result.MonitorState.MonitoringCoveragePercentage, + } + return status } // IsReady returns true if the Analyzer and Monitor components of Cruise Control are in ready state. func (cc *cruiseControlScaler) IsReady(ctx context.Context) bool { status, err := cc.Status(ctx) - if err != nil { + if err != nil || status.Status == nil { cc.log.Error(err, "could not get Cruise Control status") return false } cc.log.Info("cruise control readiness", - "analyzer", status.AnalyzerReady, - "monitor", status.MonitorReady, - "executor", status.ExecutorReady, - "goals ready", status.GoalsReady, - "monitored windows", status.MonitoredWindows, - "monitoring coverage percentage", status.MonitoringCoverage) - return status.IsReady() + "analyzer", status.Status.AnalyzerReady, + "monitor", status.Status.MonitorReady, + "executor", status.Status.ExecutorReady, + "goals ready", status.Status.GoalsReady, + "monitored windows", status.Status.MonitoredWindows, + "monitoring coverage percentage", status.Status.MonitoringCoverage) + return status.Status.IsReady() } // IsUp returns true if Cruise Control is online. diff --git a/pkg/scale/types.go b/pkg/scale/types.go index 700623a27..92b9d8f63 100644 --- a/pkg/scale/types.go +++ b/pkg/scale/types.go @@ -25,7 +25,8 @@ import ( type CruiseControlScaler interface { IsReady(ctx context.Context) bool - Status(ctx context.Context) (CruiseControlStatus, error) + Status(ctx context.Context) (StatusTaskResult, error) + StatusTask(ctx context.Context, taskId string) (StatusTaskResult, error) UserTasks(ctx context.Context, taskIDs ...string) ([]*Result, error) IsUp(ctx context.Context) bool AddBrokers(ctx context.Context, brokerIDs ...string) (*Result, error) @@ -61,6 +62,11 @@ const ( LogDirStateOffline ) +type StatusTaskResult struct { + TaskResult *Result + Status *CruiseControlStatus +} + // CruiseControlStatus struct is used to describe internal state of Cruise Control. type CruiseControlStatus struct { MonitorReady bool