Skip to content

Commit

Permalink
longer TerminationGracePeriodSeconds
Browse files Browse the repository at this point in the history
Signed-off-by: shaoyue.chen <[email protected]>
  • Loading branch information
haorenfsa committed Jan 15, 2024
1 parent bfcbe1f commit a3e7a5b
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 18 deletions.
38 changes: 36 additions & 2 deletions pkg/controllers/component_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util/rest"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -24,7 +26,7 @@ func (c ComponentConditionGetterImpl) GetMilvusInstanceCondition(ctx context.Con
if mc.Spec.IsStopping() {
reason := v1beta1.ReasonMilvusStopping
msg := MessageMilvusStopped
stopped, err := CheckMilvusStopped(ctx, cli, mc)
stopped, err := CheckMilvusStopped(ctx, cli, mc, false)
if err != nil {
return v1beta1.MilvusCondition{}, err
}
Expand Down Expand Up @@ -158,7 +160,7 @@ func GetComponentConditionGetter() ComponentConditionGetter {

var singletonComponentConditionGetter ComponentConditionGetter = ComponentConditionGetterImpl{}

var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, killIfTooLong bool) (bool, error) {
podList := &corev1.PodList{}
opts := &client.ListOptions{
Namespace: mc.Namespace,
Expand All @@ -171,7 +173,39 @@ var CheckMilvusStopped = func(ctx context.Context, cli client.Client, mc v1beta1
return false, err
}
if len(podList.Items) > 0 {
if killIfTooLong {
return false, ExecKillIfTerminatingTooLong(ctx, podList)
}
return false, nil
}
return true, nil
}

var gracefulStopTimeout = time.Second * 30

func ExecKillIfTerminatingTooLong(ctx context.Context, podList *corev1.PodList) error {
// we use kubectl exec to kill milvus process, because tini ignore SIGKILL
cli := rest.GetRestClient()
var ret, err error
for _, pod := range podList.Items {
if pod.DeletionTimestamp == nil {
continue
}
if time.Since(pod.DeletionTimestamp.Time) < gracefulStopTimeout {
continue
}
// kill milvus process
logger := ctrl.LoggerFrom(ctx)
containerName := pod.Labels[AppLabelComponent]
logger.Info("kill milvus process", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName)
_, _, err = cli.Exec(ctx, pod.Namespace, pod.Name, containerName, []string{"kill", "-9", "1", "8"})
if err != nil {
logger.Error(err, "kill milvus process err", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "container", containerName)
ret = err
}
}
if ret != nil {
return errors.Wrap(ret, "failed to kill some milvus pod")
}
return nil
}
41 changes: 41 additions & 0 deletions pkg/controllers/component_condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package controllers
import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/milvus-io/milvus-operator/apis/milvus.io/v1beta1"
"github.com/milvus-io/milvus-operator/pkg/util/rest"
"github.com/pkg/errors"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -264,3 +266,42 @@ func TestGetComponentErrorDetail(t *testing.T) {
assert.Equal(t, "creating", ret.Deployment.Message)
})
}

func TestExecKillIfTerminatingTooLong(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockRestClient := rest.NewMockRestClient(mockCtrl)
ctx := context.Background()
rest.SetRestClient(mockRestClient)
pods := &corev1.PodList{
Items: []corev1.Pod{{}, {}},
}
t.Run("delete not sent yet", func(t *testing.T) {
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("delete sent, but not timeout", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now()}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now()}
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("kill ok", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(2)
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.NoError(t, err)
})

t.Run("kill 1 ok,1 error", func(t *testing.T) {
pods.Items[0].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
pods.Items[1].DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-time.Hour)}
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", errors.New("test")).Times(1)
mockRestClient.EXPECT().Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", "", nil).Times(1)
err := ExecKillIfTerminatingTooLong(ctx, pods)
assert.Error(t, err)
})
}
2 changes: 1 addition & 1 deletion pkg/controllers/deployment_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func updateSomeFieldsOnlyWhenRolling(template *corev1.PodTemplateSpec, updater d
},
}
}
template.Spec.TerminationGracePeriodSeconds = int64Ptr(300)
template.Spec.TerminationGracePeriodSeconds = int64Ptr(1800)
}

func updateSidecars(template *corev1.PodTemplateSpec, updater deploymentUpdater) {
Expand Down
18 changes: 14 additions & 4 deletions pkg/controllers/milvus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -82,6 +83,7 @@ type MilvusReconciler struct {
func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.statusSyncer.RunIfNot()
globalCommonInfo.InitIfNot(r.Client)
logger := r.logger.WithValues("milvus", req.NamespacedName)
if !config.IsDebug() {
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -111,20 +113,28 @@ func (r *MilvusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}
} else {
logger.Info("deleteing milvus")
if milvus.Status.Status != milvusv1beta1.StatusDeleting {
milvus.Status.Status = milvusv1beta1.StatusDeleting
if err := r.Status().Update(ctx, milvus); err != nil {
return ctrl.Result{}, err
}
}

if controllerutil.ContainsFinalizer(milvus, ForegroundDeletionFinalizer) {
stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus)
if !stopped || err != nil {
return ctrl.Result{RequeueAfter: unhealthySyncInterval}, err
if !controllerutil.ContainsFinalizer(milvus, ForegroundDeletionFinalizer) {
// delete self again with foreground deletion
logger.Info("change background delete to foreground")
if err := r.Delete(ctx, milvus, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
return ctrl.Result{}, err
}
}

stopped, err := CheckMilvusStopped(ctx, r.Client, *milvus, true)
if !stopped || err != nil {
logger.Info("deleting milvus: not all pod stopped, requeue")
return ctrl.Result{RequeueAfter: unhealthySyncInterval}, err
}

if controllerutil.ContainsFinalizer(milvus, MilvusFinalizerName) {
if err := Finalize(ctx, r, *milvus); err != nil {
return ctrl.Result{}, err
Expand Down
14 changes: 4 additions & 10 deletions pkg/controllers/milvus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

var mockCheckMilvusStopRet = false
var mockCheckMilvusStopErr error = nil
var mockCheckMilvusStop = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus) (bool, error) {
var mockCheckMilvusStop = func(ctx context.Context, cli client.Client, mc v1beta1.Milvus, kill bool) (bool, error) {
return mockCheckMilvusStopRet, mockCheckMilvusStopErr
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func TestClusterReconciler(t *testing.T) {
assert.Error(t, err)
})

t.Run("case delete background", func(t *testing.T) {
t.Run("case delete background, change to foreground failed", func(t *testing.T) {
defer ctrl.Finish()
m.Finalizers = []string{MilvusFinalizerName}
mockCheckMilvusStopRet = false
Expand All @@ -103,16 +103,10 @@ func TestClusterReconciler(t *testing.T) {
mockClient.EXPECT().Status().Return(mockClient)
mockClient.EXPECT().Update(gomock.Any(), gomock.Any()).Times(1)

mockClient.EXPECT().Update(gomock.Any(), gomock.Any()).Do(
func(ctx, obj interface{}, opts ...interface{}) {
// finalizer should be removed
u := obj.(*v1beta1.Milvus)
assert.Equal(t, []string{}, u.Finalizers)
},
).Return(nil)
mockClient.EXPECT().Delete(gomock.Any(), gomock.Any(), client.PropagationPolicy(metav1.DeletePropagationForeground)).Times(1).Return(errMock)

_, err := r.Reconcile(ctx, reconcile.Request{})
assert.NoError(t, err)
assert.Error(t, err)
})

t.Run("delete foreground deletion", func(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion pkg/util/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func NewK8sClientsForConfig(config *rest.Config) (*K8sClients, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to create dynamic client")
}

return &K8sClients{
ClientSet: clientSet,
ExtClientSet: extClientSet,
Expand Down
97 changes: 97 additions & 0 deletions pkg/util/rest/rest_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package rest

import (
"bytes"
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
ctrl "sigs.k8s.io/controller-runtime"
)

//go:generate mockgen -source=./rest_client.go -destination=./rest_client_mock.go -package=rest RestClient

type RestClient interface {
// Exec exec command in pod
Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error)
}

type RestClientImpl struct {
restClient rest.Interface
config *rest.Config
scheme *runtime.Scheme
}

var singletonRestClient RestClient

func GetRestClient() RestClient {
return singletonRestClient
}

// SetRestClient for unit test
func SetRestClient(r RestClient) {
singletonRestClient = r
}

func init() {
config := ctrl.GetConfigOrDie()
restClient, err := newRestClientImpl(config)
if err != nil {
panic(err)
}
singletonRestClient = restClient
}

func newRestClientImpl(config *rest.Config) (*RestClientImpl, error) {
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return nil, errors.Wrap(err, "failed to add corev1 to scheme")
}
config.NegotiatedSerializer = serializer.NewCodecFactory(scheme)
config.GroupVersion = &corev1.SchemeGroupVersion
restClient, err := rest.RESTClientFor(config)
if err != nil {
return nil, errors.Wrap(err, "failed to create rest client")
}

return &RestClientImpl{
restClient: restClient,
config: config,
scheme: scheme,
}, nil
}

func (clis RestClientImpl) Exec(ctx context.Context, namespace, pod, container string, cmd []string) (stdout string, stderr string, err error) {
req := clis.restClient.Post().
Resource("pods").
Namespace(namespace).
Name(pod).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
}, runtime.NewParameterCodec(clis.scheme))

exec, err := remotecommand.NewSPDYExecutor(clis.config, "POST", req.URL())
if err != nil {
return "", "", errors.Wrap(err, "failed to create executor")
}

var stdoutBuf, stderrBuf bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
return "", "", errors.Wrap(err, "failed to exec command")
}

return stdoutBuf.String(), stderrBuf.String(), nil
}

0 comments on commit a3e7a5b

Please sign in to comment.