Skip to content

Commit

Permalink
feat: refactor updateObjStatus for both RAG and workspace (#625)
Browse files Browse the repository at this point in the history
**Reason for Change**:
refactor updateObjStatus for both RAG and workspace

**Requirements**

- [ ] added unit tests and e2e tests (if applicable).

**Issue Fixed**:
<!-- If this PR fixes GitHub issue 4321, add "Fixes #4321" to the next
line. -->

**Notes for Reviewers**:

Signed-off-by: Bangqi Zhu <[email protected]>
Co-authored-by: Bangqi Zhu <[email protected]>
  • Loading branch information
bangqipropel and Bangqi Zhu authored Oct 13, 2024
1 parent 1818551 commit 920ada5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 43 deletions.
29 changes: 1 addition & 28 deletions pkg/controllers/ragengine_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,12 @@ import (
"context"

kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *RAGEngineReconciler) updateRAGEngineStatus(ctx context.Context, name *client.ObjectKey, condition *metav1.Condition, workerNodes []string) error {
return retry.OnError(retry.DefaultRetry,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsTooManyRequests(err)
},
func() error {
// Read the latest version to avoid update conflict.
ragObj := &kaitov1alpha1.RAGEngine{}
if err := c.Client.Get(ctx, *name, ragObj); err != nil {
if !errors.IsNotFound(err) {
return err
}
return nil
}
if condition != nil {
meta.SetStatusCondition(&ragObj.Status.Conditions, *condition)
}
if workerNodes != nil {
ragObj.Status.WorkerNodes = workerNodes
}
return c.Client.Status().Update(ctx, ragObj)
})
}

func (c *RAGEngineReconciler) updateStatusConditionIfNotMatch(ctx context.Context, ragObj *kaitov1alpha1.RAGEngine, cType kaitov1alpha1.ConditionType,
cStatus metav1.ConditionStatus, cReason, cMessage string) error {
if curCondition := meta.FindStatusCondition(ragObj.Status.Conditions, string(cType)); curCondition != nil {
Expand All @@ -56,5 +29,5 @@ func (c *RAGEngineReconciler) updateStatusConditionIfNotMatch(ctx context.Contex
ObservedGeneration: ragObj.GetGeneration(),
Message: cMessage,
}
return c.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragObj.Name, Namespace: ragObj.Namespace}, &cObj, nil)
return updateObjStatus(ctx, c.Client, &client.ObjectKey{Name: ragObj.Name, Namespace: ragObj.Namespace}, "RAGEngine", &cObj, nil)
}
6 changes: 3 additions & 3 deletions pkg/controllers/ragengine_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestUpdateRAGEngineStatus(t *testing.T) {
mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)
mockClient.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)

err := reconciler.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, &condition, workerNodes)
err := updateObjStatus(ctx, reconciler.Client, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, "RAGEngine", &condition, workerNodes)
assert.Nil(t, err)
})

Expand All @@ -60,7 +60,7 @@ func TestUpdateRAGEngineStatus(t *testing.T) {

mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(errors.New("Get operation failed"))

err := reconciler.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, &condition, workerNodes)
err := updateObjStatus(ctx, reconciler.Client, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, "RAGEngine", &condition, workerNodes)
assert.NotNil(t, err)
})

Expand All @@ -82,7 +82,7 @@ func TestUpdateRAGEngineStatus(t *testing.T) {

mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(apierrors.NewNotFound(schema.GroupResource{}, "ragengine"))

err := reconciler.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, &condition, workerNodes)
err := updateObjStatus(ctx, reconciler.Client, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, "RAGEngine", &condition, workerNodes)
assert.Nil(t, err)
})
}
Expand Down
35 changes: 26 additions & 9 deletions pkg/controllers/workspace_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package controllers

import (
"context"
"fmt"
"reflect"
"sort"

Expand All @@ -20,27 +21,43 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *WorkspaceReconciler) updateWorkspaceStatus(ctx context.Context, name *client.ObjectKey, condition *metav1.Condition, workerNodes []string) error {
func updateObjStatus(ctx context.Context, c client.Client, name *client.ObjectKey, objType string, condition *metav1.Condition, workerNodes []string) error {
return retry.OnError(retry.DefaultRetry,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsTooManyRequests(err)
},
func() error {
// Read the latest version to avoid update conflict.
wObj := &kaitov1alpha1.Workspace{}
if err := c.Client.Get(ctx, *name, wObj); err != nil {
var obj client.Object
var conditions *[]metav1.Condition
var workerNodesField *[]string
switch objType {
case "Workspace":
ragObj := &kaitov1alpha1.Workspace{}
obj = ragObj
conditions = &ragObj.Status.Conditions
workerNodesField = &ragObj.Status.WorkerNodes
case "RAGEngine":
wObj := &kaitov1alpha1.RAGEngine{}
obj = wObj
conditions = &wObj.Status.Conditions
workerNodesField = &wObj.Status.WorkerNodes
default:
return fmt.Errorf("unsupported object type: %s", objType)
}

if err := c.Get(ctx, *name, obj); err != nil {
if !errors.IsNotFound(err) {
return err
}
return nil
}
if condition != nil {
meta.SetStatusCondition(&wObj.Status.Conditions, *condition)
meta.SetStatusCondition(conditions, *condition)
}
if workerNodes != nil {
wObj.Status.WorkerNodes = workerNodes
*workerNodesField = workerNodes
}
return c.Client.Status().Update(ctx, wObj)
return c.Status().Update(ctx, obj)
})
}

Expand All @@ -60,7 +77,7 @@ func (c *WorkspaceReconciler) updateStatusConditionIfNotMatch(ctx context.Contex
ObservedGeneration: wObj.GetGeneration(),
Message: cMessage,
}
return c.updateWorkspaceStatus(ctx, &client.ObjectKey{Name: wObj.Name, Namespace: wObj.Namespace}, &cObj, nil)
return updateObjStatus(ctx, c.Client, &client.ObjectKey{Name: wObj.Name, Namespace: wObj.Namespace}, "Workspace", &cObj, nil)
}

func (c *WorkspaceReconciler) updateStatusNodeListIfNotMatch(ctx context.Context, wObj *kaitov1alpha1.Workspace, validNodeList []*corev1.Node) error {
Expand All @@ -73,5 +90,5 @@ func (c *WorkspaceReconciler) updateStatusNodeListIfNotMatch(ctx context.Context
return nil
}
klog.InfoS("updateStatusNodeList", "workspace", klog.KObj(wObj))
return c.updateWorkspaceStatus(ctx, &client.ObjectKey{Name: wObj.Name, Namespace: wObj.Namespace}, nil, nodeNameList)
return updateObjStatus(ctx, c.Client, &client.ObjectKey{Name: wObj.Name, Namespace: wObj.Namespace}, "Workspace", nil, nodeNameList)
}
6 changes: 3 additions & 3 deletions pkg/controllers/workspace_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestUpdateWorkspaceStatus(t *testing.T) {
mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.Workspace{}), mock.Anything).Return(nil)
mockClient.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&kaitov1alpha1.Workspace{}), mock.Anything).Return(nil)

err := reconciler.updateWorkspaceStatus(ctx, &client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, &condition, workerNodes)
err := updateObjStatus(ctx, reconciler.Client, &client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, "Workspace", &condition, workerNodes)
assert.Nil(t, err)
})

Expand All @@ -60,7 +60,7 @@ func TestUpdateWorkspaceStatus(t *testing.T) {

mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.Workspace{}), mock.Anything).Return(errors.New("Get operation failed"))

err := reconciler.updateWorkspaceStatus(ctx, &client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, &condition, workerNodes)
err := updateObjStatus(ctx, reconciler.Client, &client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, "Workspace", &condition, workerNodes)
assert.NotNil(t, err)
})

Expand All @@ -82,7 +82,7 @@ func TestUpdateWorkspaceStatus(t *testing.T) {

mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.Workspace{}), mock.Anything).Return(apierrors.NewNotFound(schema.GroupResource{}, "workspace"))

err := reconciler.updateWorkspaceStatus(ctx, &client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, &condition, workerNodes)
err := updateObjStatus(ctx, reconciler.Client, &client.ObjectKey{Name: workspace.Name, Namespace: workspace.Namespace}, "Workspace", &condition, workerNodes)
assert.Nil(t, err)
})
}
Expand Down

0 comments on commit 920ada5

Please sign in to comment.