diff --git a/pkg/kbagent/proto/proto.go b/pkg/kbagent/proto/proto.go
index c8b70141fbd..08430e9a900 100644
--- a/pkg/kbagent/proto/proto.go
+++ b/pkg/kbagent/proto/proto.go
@@ -31,7 +31,6 @@ type Action struct {
type ExecAction struct {
Commands []string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
- Env []string `json:"env,omitempty"`
}
type RetryPolicy struct {
diff --git a/pkg/kbagent/service/action.go b/pkg/kbagent/service/action.go
index 7eea194726a..40706fc6766 100644
--- a/pkg/kbagent/service/action.go
+++ b/pkg/kbagent/service/action.go
@@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"strings"
+ "sync"
"github.com/go-logr/logr"
"github.com/pkg/errors"
@@ -41,6 +42,7 @@ func newActionService(logger logr.Logger, actions []proto.Action) (*actionServic
sa := &actionService{
logger: logger,
actions: make(map[string]*proto.Action),
+ mutex: sync.Mutex{},
runningActions: map[string]*runningAction{},
}
for i, action := range actions {
@@ -51,8 +53,10 @@ func newActionService(logger logr.Logger, actions []proto.Action) (*actionServic
}
type actionService struct {
- logger logr.Logger
- actions map[string]*proto.Action
+ logger logr.Logger
+ actions map[string]*proto.Action
+
+ mutex sync.Mutex
runningActions map[string]*runningAction
}
@@ -108,6 +112,9 @@ func (s *actionService) handleExecAction(ctx context.Context, req *proto.ActionR
}
func (s *actionService) handleExecActionNonBlocking(ctx context.Context, req *proto.ActionRequest, action *proto.Action) ([]byte, error) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
running, ok := s.runningActions[req.Action]
if !ok {
stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action.Exec, req.Parameters, req.TimeoutSeconds)
@@ -125,6 +132,7 @@ func (s *actionService) handleExecActionNonBlocking(ctx context.Context, req *pr
if err == nil {
return nil, ErrInProgress
}
+ delete(s.runningActions, req.Action)
if *err != nil {
return nil, *err
}
diff --git a/pkg/kbagent/service/action_test.go b/pkg/kbagent/service/action_test.go
new file mode 100644
index 00000000000..3a57a19a1ba
--- /dev/null
+++ b/pkg/kbagent/service/action_test.go
@@ -0,0 +1,29 @@
+/*
+Copyright (C) 2022-2024 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package service
+
+import (
+ . "github.com/onsi/ginkgo/v2"
+)
+
+var _ = Describe("action", func() {
+ Context("action", func() {
+ })
+})
diff --git a/pkg/kbagent/service/command.go b/pkg/kbagent/service/command.go
index 04f799903e7..9bf82b36826 100644
--- a/pkg/kbagent/service/command.go
+++ b/pkg/kbagent/service/command.go
@@ -48,12 +48,16 @@ func gather[T interface{}](ch chan T) *T {
}
func runCommand(ctx context.Context, action *proto.ExecAction, parameters map[string]string, timeout *int32) ([]byte, error) {
- stdoutChan, _, errChan, err := runCommandNonBlocking(ctx, action, parameters, timeout)
+ stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action, parameters, timeout)
if err != nil {
return nil, err
}
err = <-errChan
if err != nil {
+ var exitErr *exec.ExitError
+ if errors.As(err, &exitErr) {
+ err = errors.Wrap(ErrFailed, string(<-stderrChan))
+ }
return nil, err
}
return <-stdoutChan, nil
@@ -90,10 +94,14 @@ func runCommandNonBlocking(ctx context.Context, action *proto.ExecAction, parame
func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[string]string, timeout *int32,
stdinReader io.Reader, stdoutWriter, stderrWriter io.Writer) (chan error, error) {
+ var timeoutCancel context.CancelFunc
if timeout != nil && *timeout > 0 {
- timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(*timeout)*time.Second)
- defer cancel()
- ctx = timeoutCtx
+ ctx, timeoutCancel = context.WithTimeout(ctx, time.Duration(*timeout)*time.Second)
+ }
+ cancelTimeout := func() {
+ if timeoutCancel != nil {
+ timeoutCancel()
+ }
}
mergedArgs := func() []string {
@@ -106,14 +114,11 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
}()
mergedEnv := func() []string {
- // env order: parameters (action specific variables) | os env (defined by vars) | user-defined env in action
+ // order: parameters (action specific variables) | os env
env := util.EnvM2L(parameters)
- if len(env) > 0 || len(action.Env) > 0 {
+ if len(env) > 0 {
env = append(env, os.Environ()...)
}
- if len(action.Env) > 0 {
- env = append(env, action.Env...)
- }
return env
}()
@@ -130,6 +135,7 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
var stdinErr error
stdin, stdinErr = cmd.StdinPipe()
if stdinErr != nil {
+ cancelTimeout()
return nil, errors.Wrapf(ErrInternalError, "failed to create stdin pipe: %v", stdinErr)
}
}
@@ -137,6 +143,7 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
var stdoutErr error
stdout, stdoutErr = cmd.StdoutPipe()
if stdoutErr != nil {
+ cancelTimeout()
return nil, errors.Wrapf(ErrInternalError, "failed to create stdout pipe: %v", stdoutErr)
}
}
@@ -144,12 +151,14 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
var stderrErr error
stderr, stderrErr = cmd.StderrPipe()
if stderrErr != nil {
+ cancelTimeout()
return nil, errors.Wrapf(ErrInternalError, "failed to create stderr pipe: %v", stderrErr)
}
}
errChan := make(chan error)
go func() {
+ defer cancelTimeout()
defer close(errChan)
if err := cmd.Start(); err != nil {
@@ -164,13 +173,17 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
var wg sync.WaitGroup
wg.Add(3)
+ var ioCopyError error
go func() {
defer wg.Done()
if stdinReader != nil {
defer stdin.Close()
_, copyErr := io.Copy(stdin, stdinReader)
if copyErr != nil {
- errChan <- errors.Wrapf(ErrFailed, "failed to copy from input reader to stdin: %v", copyErr)
+ if errors.Is(copyErr, os.ErrClosed) {
+ return
+ }
+ ioCopyError = errors.Wrapf(ErrFailed, "failed to copy from input reader to stdin: %v", copyErr)
}
}
}()
@@ -179,7 +192,10 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
if stdoutWriter != nil {
_, copyErr := io.Copy(stdoutWriter, stdout)
if copyErr != nil {
- errChan <- errors.Wrapf(ErrFailed, "failed to copy stdout to output writer: %v", copyErr)
+ if errors.Is(copyErr, os.ErrClosed) {
+ return
+ }
+ ioCopyError = errors.Wrapf(ErrFailed, "failed to copy stdout to output writer: %v", copyErr)
}
}
}()
@@ -188,25 +204,30 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
if stderrWriter != nil {
_, copyErr := io.Copy(stderrWriter, stderr)
if copyErr != nil {
- errChan <- errors.Wrapf(ErrFailed, "failed to copy stderr to error writer: %v", copyErr)
+ if errors.Is(copyErr, os.ErrClosed) {
+ return
+ }
+ ioCopyError = errors.Wrapf(ErrFailed, "failed to copy stderr to error writer: %v", copyErr)
}
}
}()
- wg.Wait()
-
+ // wait for the command to finish and the pipes to be closed
execErr := cmd.Wait()
if execErr != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
execErr = ErrTimeout
- } else {
- var exitErr *exec.ExitError
- if errors.As(execErr, &exitErr) && stderrWriter == nil {
- execErr = errors.Wrap(ErrFailed, string(exitErr.Stderr))
- }
}
}
- errChan <- execErr
+
+ // and then wait for the io copy goroutines to finish
+ wg.Wait()
+
+ if execErr != nil {
+ errChan <- execErr
+ } else {
+ errChan <- ioCopyError
+ }
}()
return errChan, nil
}
diff --git a/pkg/kbagent/service/command_test.go b/pkg/kbagent/service/command_test.go
new file mode 100644
index 00000000000..30147708a44
--- /dev/null
+++ b/pkg/kbagent/service/command_test.go
@@ -0,0 +1,292 @@
+/*
+Copyright (C) 2022-2024 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package service
+
+import (
+ "bytes"
+ "os/exec"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/pkg/errors"
+
+ "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
+)
+
+var _ = Describe("command", func() {
+ wait := func(errChan chan error) {
+ Expect(errChan).ShouldNot(BeNil())
+ err, ok := <-errChan
+ Expect(ok).Should(BeTrue())
+ Expect(err).Should(BeNil())
+ }
+
+ waitError := func(errChan chan error) error {
+ Expect(errChan).ShouldNot(BeNil())
+ err, ok := <-errChan
+ Expect(ok).Should(BeTrue())
+ return err
+ }
+
+ Context("runCommandX", func() {
+ It("simple", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n simple"},
+ }
+ execErrorChan, err := runCommandX(ctx, action, nil, nil, nil, nil, nil)
+ Expect(err).Should(BeNil())
+
+ wait(execErrorChan)
+ })
+
+ It("stdout", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n stdout"},
+ }
+ stdoutBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ execErrorChan, err := runCommandX(ctx, action, nil, nil, nil, stdoutBuf, nil)
+ Expect(err).Should(BeNil())
+
+ wait(execErrorChan)
+ Expect(stdoutBuf.String()).Should(Equal("stdout"))
+ })
+
+ It("stderr", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n stderr >&2"},
+ }
+ stdoutBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ stderrBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ execErrorChan, err := runCommandX(ctx, action, nil, nil, nil, stdoutBuf, stderrBuf)
+ Expect(err).Should(BeNil())
+
+ wait(execErrorChan)
+ Expect(stdoutBuf.String()).Should(HaveLen(0))
+ Expect(stderrBuf.String()).Should(Equal("stderr"))
+ })
+
+ It("stdin", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "xargs echo -n"},
+ }
+ stdinBuf := bytes.NewBuffer([]byte{'s', 't', 'd', 'i', 'n'})
+ stdoutBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ execErrorChan, err := runCommandX(ctx, action, nil, nil, stdinBuf, stdoutBuf, nil)
+ Expect(err).Should(BeNil())
+
+ wait(execErrorChan)
+ Expect(stdoutBuf.String()).Should(Equal("stdin"))
+ })
+
+ It("parameters", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n $PARAM"},
+ }
+ parameters := map[string]string{
+ "PARAM": "parameters",
+ "useless": "useless",
+ }
+ stdoutBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ execErrorChan, err := runCommandX(ctx, action, parameters, nil, nil, stdoutBuf, nil)
+ Expect(err).Should(BeNil())
+
+ wait(execErrorChan)
+ Expect(stdoutBuf.String()).Should(Equal("parameters"))
+ })
+
+ It("timeout", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "sleep 60"},
+ }
+ timeout := int32(1)
+ execErrorChan, err := runCommandX(ctx, action, nil, &timeout, nil, nil, nil)
+ Expect(err).Should(BeNil())
+
+ err = waitError(execErrorChan)
+ Expect(err).ShouldNot(BeNil())
+ Expect(errors.Is(err, ErrTimeout)).Should(BeTrue())
+ })
+
+ It("timeout and stdout", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "sleep 60 && echo -n timeout"},
+ }
+ stdoutBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ timeout := int32(1)
+ execErrorChan, err := runCommandX(ctx, action, nil, &timeout, nil, stdoutBuf, nil)
+ Expect(err).Should(BeNil())
+
+ err = waitError(execErrorChan)
+ Expect(err).ShouldNot(BeNil())
+ Expect(errors.Is(err, ErrTimeout)).Should(BeTrue())
+ Expect(stdoutBuf.String()).Should(HaveLen(0))
+ })
+
+ It("stdout and timeout", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n timeout && sleep 60"},
+ }
+ stdoutBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ timeout := int32(1)
+ execErrorChan, err := runCommandX(ctx, action, nil, &timeout, nil, stdoutBuf, nil)
+ Expect(err).Should(BeNil())
+
+ err = waitError(execErrorChan)
+ Expect(err).ShouldNot(BeNil())
+ Expect(errors.Is(err, ErrTimeout)).Should(BeTrue())
+ Expect(stdoutBuf.String()).Should(Equal("timeout"))
+ })
+
+ It("fail", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "command-not-exist"},
+ }
+ execErrorChan, err := runCommandX(ctx, action, nil, nil, nil, nil, nil)
+ Expect(err).Should(BeNil())
+
+ err = waitError(execErrorChan)
+ Expect(err).ShouldNot(BeNil())
+ var exitErr *exec.ExitError
+ Expect(errors.As(err, &exitErr)).Should(BeTrue())
+ })
+
+ It("fail with stderr writer", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "command-not-found"},
+ }
+ stderrBuf := bytes.NewBuffer(make([]byte, 0, defaultBufferSize))
+ execErrorChan, err := runCommandX(ctx, action, nil, nil, nil, nil, stderrBuf)
+ Expect(err).Should(BeNil())
+
+ err = waitError(execErrorChan)
+ Expect(err).ShouldNot(BeNil())
+ var exitErr *exec.ExitError
+ Expect(errors.As(err, &exitErr)).Should(BeTrue())
+ Expect(stderrBuf.String()).Should(ContainSubstring("command not found"))
+ })
+ })
+
+ Context("runCommandNonBlocking", func() {
+ It("ok", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n ok"},
+ }
+ stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action, nil, nil)
+ Expect(err).Should(BeNil())
+
+ execErr := <-errChan
+ Expect(execErr).Should(BeNil())
+ Expect(<-stdoutChan).Should(Equal([]byte("ok")))
+ Expect(<-stderrChan).Should(HaveLen(0))
+ })
+
+ It("parameters", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n $PARAM"},
+ }
+ parameters := map[string]string{
+ "PARAM": "parameters",
+ }
+ stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action, parameters, nil)
+ Expect(err).Should(BeNil())
+
+ execErr := <-errChan
+ Expect(execErr).Should(BeNil())
+ Expect(<-stdoutChan).Should(Equal([]byte("parameters")))
+ Expect(<-stderrChan).Should(HaveLen(0))
+ })
+
+ It("fail", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "command-not-found"},
+ }
+ stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action, nil, nil)
+ Expect(err).Should(BeNil())
+
+ execErr := <-errChan
+ Expect(execErr).ShouldNot(BeNil())
+ var exitErr *exec.ExitError
+ Expect(errors.As(execErr, &exitErr)).Should(BeTrue())
+ Expect(<-stdoutChan).Should(HaveLen(0))
+ Expect(<-stderrChan).Should(ContainSubstring("command not found"))
+ })
+
+ It("timeout", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "sleep 60"},
+ }
+ timeout := int32(1)
+ stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action, nil, &timeout)
+ Expect(err).Should(BeNil())
+
+ execErr := <-errChan
+ Expect(execErr).ShouldNot(BeNil())
+ Expect(errors.Is(execErr, ErrTimeout)).Should(BeTrue())
+ Expect(<-stdoutChan).Should(HaveLen(0))
+ Expect(<-stderrChan).Should(HaveLen(0))
+ })
+ })
+
+ Context("runCommand", func() {
+ It("ok", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n ok"},
+ }
+ output, err := runCommand(ctx, action, nil, nil)
+ Expect(err).Should(BeNil())
+ Expect(output).Should(Equal([]byte("ok")))
+ })
+
+ It("parameters", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n $PARAM"},
+ }
+ parameters := map[string]string{
+ "PARAM": "parameters",
+ }
+ output, err := runCommand(ctx, action, parameters, nil)
+ Expect(err).Should(BeNil())
+ Expect(output).Should(Equal([]byte("parameters")))
+ })
+
+ It("fail", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "command-not-found"},
+ }
+ output, err := runCommand(ctx, action, nil, nil)
+ Expect(err).ShouldNot(BeNil())
+ Expect(errors.Is(err, ErrFailed)).Should(BeTrue())
+ Expect(err.Error()).Should(ContainSubstring("command not found"))
+ Expect(output).Should(BeNil())
+ })
+
+ It("timeout", func() {
+ action := &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "sleep 60"},
+ }
+ timeout := int32(1)
+ output, err := runCommand(ctx, action, nil, &timeout)
+ Expect(err).ShouldNot(BeNil())
+ Expect(errors.Is(err, ErrTimeout)).Should(BeTrue())
+ Expect(output).Should(BeNil())
+ })
+ })
+})
diff --git a/pkg/kbagent/service/probe_test.go b/pkg/kbagent/service/probe_test.go
new file mode 100644
index 00000000000..2c507904558
--- /dev/null
+++ b/pkg/kbagent/service/probe_test.go
@@ -0,0 +1,113 @@
+/*
+Copyright (C) 2022-2024 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package service
+
+import (
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/go-logr/logr"
+ "github.com/pkg/errors"
+
+ "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
+)
+
+var _ = Describe("probe", func() {
+ Context("probe", func() {
+ var (
+ actions = []proto.Action{
+ {
+ Name: "roleProbe",
+ Exec: &proto.ExecAction{
+ Commands: []string{"/bin/bash", "-c", "echo -n leader"},
+ },
+ },
+ }
+ probes = []proto.Probe{
+ {
+ Action: "roleProbe",
+ InitialDelaySeconds: 0,
+ PeriodSeconds: 1,
+ SuccessThreshold: 1,
+ FailureThreshold: 1,
+ ReportPeriodSeconds: nil,
+ },
+ }
+
+ actionSvc *actionService
+ )
+
+ BeforeEach(func() {
+ var err error
+ actionSvc, err = newActionService(logr.New(nil), actions)
+ Expect(err).Should(BeNil())
+ })
+
+ // func newProbeService(logger logr.Logger, actionService *actionService, probes []proto.Probe) (*probeService, error) {
+ It("new", func() {
+ service, err := newProbeService(logr.New(nil), actionSvc, probes)
+ Expect(err).Should(BeNil())
+ Expect(service).ShouldNot(BeNil())
+ Expect(service.Kind()).Should(Equal(probeServiceName))
+ Expect(service.Version()).Should(Equal(probeServiceVersion))
+ })
+
+ It("start", func() {
+ service, err := newProbeService(logr.New(nil), actionSvc, probes)
+ Expect(err).Should(BeNil())
+ Expect(service).ShouldNot(BeNil())
+
+ Expect(service.Start()).Should(Succeed())
+ Expect(len(service.probes)).Should(Equal(len(service.runners)))
+ })
+
+ It("handle request", func() {
+ service, err := newProbeService(logr.New(nil), actionSvc, probes)
+ Expect(err).Should(BeNil())
+ Expect(service).ShouldNot(BeNil())
+
+ _, err = service.Decode([]byte{})
+ Expect(err).ShouldNot(BeNil())
+ Expect(errors.Is(err, ErrNotImplemented)).Should(BeTrue())
+
+ _, err = service.HandleRequest(ctx, nil)
+ Expect(err).ShouldNot(BeNil())
+ Expect(errors.Is(err, ErrNotImplemented)).Should(BeTrue())
+ })
+
+ It("initial delay seconds", func() {
+ probes[0].InitialDelaySeconds = 60
+ service, err := newProbeService(logr.New(nil), actionSvc, probes)
+ Expect(err).Should(BeNil())
+ Expect(service).ShouldNot(BeNil())
+
+ Expect(service.Start()).Should(Succeed())
+
+ time.Sleep(1 * time.Second)
+ r := service.runners["roleProbe"]
+ Expect(r).ShouldNot(BeNil())
+ Expect(r.ticker).Should(BeNil())
+ })
+
+ // TODO: more cases
+ })
+})
diff --git a/pkg/kbagent/service/service_test.go b/pkg/kbagent/service/service_test.go
new file mode 100644
index 00000000000..1f931f577f9
--- /dev/null
+++ b/pkg/kbagent/service/service_test.go
@@ -0,0 +1,90 @@
+/*
+Copyright (C) 2022-2024 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package service
+
+import (
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/go-logr/logr"
+
+ "github.com/apecloud/kubeblocks/pkg/kbagent/proto"
+)
+
+var _ = Describe("service", func() {
+ Context("new", func() {
+ It("empty", func() {
+ services, err := New(logr.New(nil), nil, nil)
+ Expect(err).Should(BeNil())
+ Expect(services).Should(HaveLen(2))
+ Expect(services[0]).ShouldNot(BeNil())
+ Expect(services[1]).ShouldNot(BeNil())
+ })
+
+ It("action", func() {
+ actions := []proto.Action{
+ {
+ Name: "action",
+ },
+ }
+ services, err := New(logr.New(nil), actions, nil)
+ Expect(err).Should(BeNil())
+ Expect(services).Should(HaveLen(2))
+ Expect(services[0]).ShouldNot(BeNil())
+ Expect(services[1]).ShouldNot(BeNil())
+ })
+
+ It("probe", func() {
+ actions := []proto.Action{
+ {
+ Name: "action",
+ },
+ }
+ probes := []proto.Probe{
+ {
+ Action: "action",
+ },
+ }
+ services, err := New(logr.New(nil), actions, probes)
+ Expect(err).Should(BeNil())
+ Expect(services).Should(HaveLen(2))
+ Expect(services[0]).ShouldNot(BeNil())
+ Expect(services[1]).ShouldNot(BeNil())
+ })
+
+ It("probe which has no action", func() {
+ actions := []proto.Action{
+ {
+ Name: "action",
+ },
+ }
+ probes := []proto.Probe{
+ {
+ Action: "action",
+ },
+ {
+ Action: "not-defined",
+ },
+ }
+ _, err := New(logr.New(nil), actions, probes)
+ Expect(err).ShouldNot(BeNil())
+ })
+ })
+})
diff --git a/pkg/kbagent/service/suite_test.go b/pkg/kbagent/service/suite_test.go
new file mode 100644
index 00000000000..af1488fa324
--- /dev/null
+++ b/pkg/kbagent/service/suite_test.go
@@ -0,0 +1,60 @@
+/*
+Copyright (C) 2022-2024 ApeCloud Co., Ltd
+
+This file is part of KubeBlocks project
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package service
+
+import (
+ "context"
+ "testing"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ viper "github.com/apecloud/kubeblocks/pkg/viperx"
+)
+
+// These tests use Ginkgo (BDD-style Go testing framework). Refer to
+// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
+
+var ctx context.Context
+var cancel context.CancelFunc
+
+func init() {
+ viper.AutomaticEnv()
+ // viper.Set("ENABLE_DEBUG_LOG", "true")
+}
+
+func TestAPIs(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Controller Suite")
+}
+
+var _ = BeforeSuite(func() {
+ ctx, cancel = context.WithCancel(context.TODO())
+
+ // +kubebuilder:scaffold:scheme
+
+ go func() {
+ defer GinkgoRecover()
+ }()
+})
+
+var _ = AfterSuite(func() {
+ cancel()
+})