Skip to content

Commit

Permalink
Handle async response from CC /state endpoint (#56)
Browse files Browse the repository at this point in the history
* Handle the case when the request to CC STATE endpoint returns the async task instead of the result

When the request on /state endpoint of CC takes longer than webserver.request.maxBlockTimeMs,
then CC will convert the request into an async task and will respond with the taskID instead
of the result. The Koperator should handle this case by waiting for CC to complete the task.
  • Loading branch information
aguzovatii authored May 16, 2023
1 parent 59a7e61 commit 999acbe
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 63 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
OperationRemoveDisks CruiseControlTaskOperation = "remove_disks"
// OperationRebalance means a Cruise Control rebalance operation
OperationRebalance CruiseControlTaskOperation = "rebalance"
// OperationStatus means a Cruise Control status operation
OperationStatus CruiseControlTaskOperation = "status"
// KafkaAccessTypeRead states that a user wants consume access to a topic
KafkaAccessTypeRead KafkaAccessType = "read"
// KafkaAccessTypeWrite states that a user wants produce access to a topic
Expand Down
140 changes: 133 additions & 7 deletions controllers/cruisecontroloperation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"time"

"emperror.dev/errors"
Expand All @@ -34,19 +35,21 @@ import (

"github.com/banzaicloud/go-cruise-control/pkg/types"

apiutil "github.com/banzaicloud/koperator/api/util"
banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
banzaiv1beta1 "github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/scale"
"github.com/banzaicloud/koperator/pkg/util"
)

const (
defaultFailedTasksHistoryMaxLength = 50
ccOperationFinalizerGroup = "finalizer.cruisecontroloperations.kafka.banzaicloud.io"
ccOperationForStopExecution = "ccOperationStopExecution"
ccOperationFirstExecution = "ccOperationFirstExecution"
ccOperationRetryExecution = "ccOperationRetryExecution"
ccOperationInProgress = "ccOperationInProgress"
defaultFailedTasksHistoryMaxLength = 50
ccOperationFinalizerGroup = "finalizer.cruisecontroloperations.kafka.banzaicloud.io"
ccOperationForStopExecution = "ccOperationStopExecution"
ccOperationFirstExecution = "ccOperationFirstExecution"
ccOperationRetryExecution = "ccOperationRetryExecution"
ccOperationInProgress = "ccOperationInProgress"
defaultCruiseControlStatusOperationMaxDuration = time.Duration(5) * time.Minute
)

var (
Expand Down Expand Up @@ -98,6 +101,12 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
return reconciled()
}

// Skip reconciliation for Cruise Control Status operation
if currentCCOperation.CurrentTaskOperation() == banzaiv1alpha1.OperationStatus {
log.V(1).Info("skipping reconciliation for Cruise Control Status operation")
return reconciled()
}

// When the task is done we can remove the finalizer instantly thus we can return fast here.
if isFinalizerNeeded(currentCCOperation) && currentCCOperation.IsDone() {
controllerutil.RemoveFinalizer(currentCCOperation, ccOperationFinalizerGroup)
Expand Down Expand Up @@ -145,7 +154,7 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
}

// Checking Cruise Control health
status, err := r.scaler.Status(ctx)
status, err := r.getStatus(ctx, log, kafkaCluster, kafkaClusterRef, ccOperationListClusterWide)
if err != nil {
log.Error(err, "could not get Cruise Control status")
return requeueAfter(defaultRequeueIntervalInSeconds)
Expand Down Expand Up @@ -274,6 +283,8 @@ func (r *CruiseControlOperationReconciler) executeOperation(ctx context.Context,
cruseControlTaskResult, err = r.scaler.RemoveDisksWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationStopExecution:
cruseControlTaskResult, err = r.scaler.StopExecution(ctx)
case banzaiv1alpha1.OperationStatus:
err = errors.NewWithDetails("Cruise Control operation not supported", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters())
default:
err = errors.NewWithDetails("Cruise Control operation not supported", "name", ccOperationExecution.GetName(), "namespace", ccOperationExecution.GetNamespace(), "operation", ccOperationExecution.CurrentTaskOperation(), "parameters", ccOperationExecution.CurrentTaskParameters())
}
Expand Down Expand Up @@ -502,6 +513,121 @@ func (r *CruiseControlOperationReconciler) updateCurrentTasks(ctx context.Contex
return nil
}

// getStatus returns the internal state of Cruise Control.
//
// The logic is the following:
// - If the Cruise Control makes the Status request sync, then the result will be returned.
// - If the Cruise Control makes the Status request async, then a new Status CruiseControlOperation
// will be created and an error will be returned to indicate that Cruise Control is not ready yet.
// - If there is already a Status CruiseControlOperation in progress, then it will be updated and
// the result will be returned.
func (r *CruiseControlOperationReconciler) getStatus(
ctx context.Context,
log logr.Logger,
kafkaCluster *banzaiv1beta1.KafkaCluster,
kafkaClusterRef client.ObjectKey,
ccOperationListClusterWide banzaiv1alpha1.CruiseControlOperationList,
) (scale.CruiseControlStatus, error) {
var statusOperation *banzaiv1alpha1.CruiseControlOperation
for i := range ccOperationListClusterWide.Items {
ccOperation := &ccOperationListClusterWide.Items[i]
// ignoring the error here to continue processing the operations,
// even if the user does not provide a KafkaClusterRef label on the CCOperation then the ref will be an empty object (not nil) and the filter will skip it.
ref, _ := kafkaClusterReference(ccOperation)
if ref.Name == kafkaClusterRef.Name && ref.Namespace == kafkaClusterRef.Namespace && ccOperation.Status.CurrentTask != nil &&
ccOperation.Status.CurrentTask.Operation == banzaiv1alpha1.OperationStatus && ccOperation.IsCurrentTaskRunning() {
statusOperation = ccOperation
break
}
}

if statusOperation != nil {
res, err := r.scaler.StatusTask(ctx, statusOperation.CurrentTaskID())
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not get the latest state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}
if err := updateResult(log, res.TaskResult, statusOperation, false); err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}

err = r.Status().Update(ctx, statusOperation)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}

if statusOperation.CurrentTask().Finished != nil &&
statusOperation.CurrentTask().Finished.Time.Sub(statusOperation.CurrentTask().Started.Time) > defaultCruiseControlStatusOperationMaxDuration {
return scale.CruiseControlStatus{}, errors.New("the Cruise Control status operation took too long to finish")
}

if res.Status == nil {
return scale.CruiseControlStatus{}, errors.New("could not get Cruise Control status")
}

return *res.Status, nil
}

res, err := r.scaler.Status(ctx)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not get Cruise Control status")
}

if res.Status == nil {
operationTTLSecondsAfterFinished := kafkaCluster.Spec.CruiseControlConfig.CruiseControlOperationSpec.GetTTLSecondsAfterFinished()
operation, err := r.createCCOperation(ctx, kafkaCluster, operationTTLSecondsAfterFinished, banzaiv1alpha1.OperationStatus)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not create a new Status CruiseControlOperation")
}
if err = updateResult(log, res.TaskResult, operation, true); err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}
err = r.Status().Update(ctx, operation)
if err != nil {
return scale.CruiseControlStatus{}, errors.WrapIfWithDetails(err, "could not update the state of Status CruiseControlOperation", "name", statusOperation.GetName(), "namespace", statusOperation.GetNamespace())
}

return scale.CruiseControlStatus{}, errors.New("could not get Cruise Control status, the operation is still in progress")
}

return *res.Status, nil
}

func (r *CruiseControlOperationReconciler) createCCOperation(
ctx context.Context,
kafkaCluster *banzaiv1beta1.KafkaCluster,
ttlSecondsAfterFinished *int,
operationType banzaiv1alpha1.CruiseControlTaskOperation,
) (*banzaiv1alpha1.CruiseControlOperation, error) {
operation := &banzaiv1alpha1.CruiseControlOperation{
ObjectMeta: v1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-%s-", kafkaCluster.Name, strings.ReplaceAll(string(operationType), "_", "")),
Namespace: kafkaCluster.Namespace,
Labels: apiutil.LabelsForKafka(kafkaCluster.Name),
},
}

if ttlSecondsAfterFinished != nil {
operation.Spec.TTLSecondsAfterFinished = ttlSecondsAfterFinished
}

if err := controllerutil.SetControllerReference(kafkaCluster, operation, r.Scheme); err != nil {
return nil, err
}
if err := r.Client.Create(ctx, operation); err != nil {
return nil, err
}

operation.Status.CurrentTask = &banzaiv1alpha1.CruiseControlTask{
Operation: operationType,
}

if err := r.Status().Update(ctx, operation); err != nil {
return nil, err
}

return operation, nil
}

func isWaitingForFinalization(ccOperation *banzaiv1alpha1.CruiseControlOperation) bool {
return ccOperation.IsCurrentTaskRunning() && !ccOperation.ObjectMeta.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(ccOperation, ccOperationFinalizerGroup)
}
Expand Down
127 changes: 97 additions & 30 deletions controllers/tests/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,34 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
}, 10*time.Second, 500*time.Millisecond).Should(BeTrue())
})
})
When("Cruise Control makes the Status operation async", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock7())
operation := generateCruiseControlOperation("add-broker-operation", namespace, kafkaCluster.GetName())
err := k8sClient.Create(ctx, &operation)
Expect(err).NotTo(HaveOccurred())

operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{
Operation: v1alpha1.OperationAddBroker,
ID: "11111",
}
err = k8sClient.Status().Update(ctx, &operation)
Expect(err).NotTo(HaveOccurred())
})
It("should create status CruiseControlOperation and retry", func(ctx SpecContext) {
Eventually(ctx, func() v1beta1.CruiseControlUserTaskState {
operation := v1alpha1.CruiseControlOperation{}
err := k8sClient.Get(ctx, client.ObjectKey{
Namespace: kafkaCluster.Namespace,
Name: "add-broker-operation",
}, &operation)
if err != nil {
return ""
}
return operation.CurrentTaskState()
}, 15*time.Second, 500*time.Millisecond).Should(Equal(v1beta1.CruiseControlTaskCompleted))
})
})
})

func generateCruiseControlOperation(name, namespace, kafkaRef string) v1alpha1.CruiseControlOperation {
Expand All @@ -382,11 +410,12 @@ func getScaleMock2() *mocks.MockCruiseControlScaler {
State: v1beta1.CruiseControlTaskCompletedWithError,
})}
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{
Status: &scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}}, nil).AnyTimes()
scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
Expand All @@ -405,11 +434,12 @@ func getScaleMock1() *mocks.MockCruiseControlScaler {
State: v1beta1.CruiseControlTaskCompleted,
})}
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{
Status: &scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}}, nil).AnyTimes()
scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
Expand All @@ -433,11 +463,12 @@ func getScaleMock3() *mocks.MockCruiseControlScaler {
State: v1beta1.CruiseControlTaskCompleted,
})}
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{
Status: &scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}}, nil).AnyTimes()
scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
Expand Down Expand Up @@ -467,11 +498,12 @@ func getScaleMock4() *mocks.MockCruiseControlScaler {
State: v1beta1.CruiseControlTaskCompletedWithError,
})}
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{
Status: &scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}}, nil).AnyTimes()
scaleMock.EXPECT().RemoveBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "1",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
Expand All @@ -497,11 +529,12 @@ func getScaleMock5() *mocks.MockCruiseControlScaler {
})}
first := scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).Times(1)
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult2, nil).After(first).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{
Status: &scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}}, nil).AnyTimes()
scaleMock.EXPECT().AddBrokersWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12345",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
Expand All @@ -521,11 +554,12 @@ func getScaleMock6() *mocks.MockCruiseControlScaler {
State: v1beta1.CruiseControlTaskCompletedWithError,
})}
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}, nil).AnyTimes()
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{
Status: &scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}}, nil).AnyTimes()
scaleMock.EXPECT().RebalanceWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{
TaskID: "12346",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
Expand All @@ -541,6 +575,39 @@ func getScaleMock6() *mocks.MockCruiseControlScaler {
return scaleMock
}

func getScaleMock7() *mocks.MockCruiseControlScaler {
mockCtrl := gomock.NewController(GinkgoT())
scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl)
scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes()

startTime := metav1.Now().Format(time.RFC1123)
scaleMock.EXPECT().Status(gomock.Any()).Return(scale.StatusTaskResult{
TaskResult: &scale.Result{
TaskID: "22222",
StartedAt: startTime,
State: v1beta1.CruiseControlTaskActive,
}}, nil).AnyTimes()
scaleMock.EXPECT().StatusTask(gomock.Any(), "22222").Return(scale.StatusTaskResult{
TaskResult: &scale.Result{
TaskID: "22222",
StartedAt: startTime,
State: v1beta1.CruiseControlTaskCompleted,
},
Status: &scale.CruiseControlStatus{
ExecutorReady: true,
MonitorReady: true,
AnalyzerReady: true,
}}, nil).AnyTimes()

userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{
TaskID: "11111",
StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT",
State: v1beta1.CruiseControlTaskCompleted,
})}
scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes()
return scaleMock
}

func scaleResultPointer(res scale.Result) *scale.Result {
return &res
}
Loading

0 comments on commit 999acbe

Please sign in to comment.