Skip to content

Commit

Permalink
chore: incorrect responses of kbagent under different request situati…
Browse files Browse the repository at this point in the history
…ons (#8009)

Co-authored-by: Leon <[email protected]>
  • Loading branch information
Ursasi and leon-inf authored Aug 23, 2024
1 parent 83cfbf0 commit 9f5742c
Show file tree
Hide file tree
Showing 8 changed files with 635 additions and 23 deletions.
1 change: 0 additions & 1 deletion pkg/kbagent/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type Action struct {
type ExecAction struct {
Commands []string `json:"command,omitempty"`
Args []string `json:"args,omitempty"`
Env []string `json:"env,omitempty"`
}

type RetryPolicy struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/kbagent/service/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -41,6 +42,7 @@ func newActionService(logger logr.Logger, actions []proto.Action) (*actionServic
sa := &actionService{
logger: logger,
actions: make(map[string]*proto.Action),
mutex: sync.Mutex{},
runningActions: map[string]*runningAction{},
}
for i, action := range actions {
Expand All @@ -51,8 +53,10 @@ func newActionService(logger logr.Logger, actions []proto.Action) (*actionServic
}

type actionService struct {
logger logr.Logger
actions map[string]*proto.Action
logger logr.Logger
actions map[string]*proto.Action

mutex sync.Mutex
runningActions map[string]*runningAction
}

Expand Down Expand Up @@ -108,6 +112,9 @@ func (s *actionService) handleExecAction(ctx context.Context, req *proto.ActionR
}

func (s *actionService) handleExecActionNonBlocking(ctx context.Context, req *proto.ActionRequest, action *proto.Action) ([]byte, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

running, ok := s.runningActions[req.Action]
if !ok {
stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action.Exec, req.Parameters, req.TimeoutSeconds)
Expand All @@ -125,6 +132,7 @@ func (s *actionService) handleExecActionNonBlocking(ctx context.Context, req *pr
if err == nil {
return nil, ErrInProgress
}
delete(s.runningActions, req.Action)
if *err != nil {
return nil, *err
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/kbagent/service/action_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package service

import (
. "github.com/onsi/ginkgo/v2"
)

var _ = Describe("action", func() {
Context("action", func() {
})
})
61 changes: 41 additions & 20 deletions pkg/kbagent/service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ func gather[T interface{}](ch chan T) *T {
}

func runCommand(ctx context.Context, action *proto.ExecAction, parameters map[string]string, timeout *int32) ([]byte, error) {
stdoutChan, _, errChan, err := runCommandNonBlocking(ctx, action, parameters, timeout)
stdoutChan, stderrChan, errChan, err := runCommandNonBlocking(ctx, action, parameters, timeout)
if err != nil {
return nil, err
}
err = <-errChan
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
err = errors.Wrap(ErrFailed, string(<-stderrChan))
}
return nil, err
}
return <-stdoutChan, nil
Expand Down Expand Up @@ -90,10 +94,14 @@ func runCommandNonBlocking(ctx context.Context, action *proto.ExecAction, parame

func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[string]string, timeout *int32,
stdinReader io.Reader, stdoutWriter, stderrWriter io.Writer) (chan error, error) {
var timeoutCancel context.CancelFunc
if timeout != nil && *timeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(*timeout)*time.Second)
defer cancel()
ctx = timeoutCtx
ctx, timeoutCancel = context.WithTimeout(ctx, time.Duration(*timeout)*time.Second)
}
cancelTimeout := func() {
if timeoutCancel != nil {
timeoutCancel()
}
}

mergedArgs := func() []string {
Expand All @@ -106,14 +114,11 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
}()

mergedEnv := func() []string {
// env order: parameters (action specific variables) | os env (defined by vars) | user-defined env in action
// order: parameters (action specific variables) | os env
env := util.EnvM2L(parameters)
if len(env) > 0 || len(action.Env) > 0 {
if len(env) > 0 {
env = append(env, os.Environ()...)
}
if len(action.Env) > 0 {
env = append(env, action.Env...)
}
return env
}()

Expand All @@ -130,26 +135,30 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
var stdinErr error
stdin, stdinErr = cmd.StdinPipe()
if stdinErr != nil {
cancelTimeout()
return nil, errors.Wrapf(ErrInternalError, "failed to create stdin pipe: %v", stdinErr)
}
}
if stdoutWriter != nil {
var stdoutErr error
stdout, stdoutErr = cmd.StdoutPipe()
if stdoutErr != nil {
cancelTimeout()
return nil, errors.Wrapf(ErrInternalError, "failed to create stdout pipe: %v", stdoutErr)
}
}
if stderrWriter != nil {
var stderrErr error
stderr, stderrErr = cmd.StderrPipe()
if stderrErr != nil {
cancelTimeout()
return nil, errors.Wrapf(ErrInternalError, "failed to create stderr pipe: %v", stderrErr)
}
}

errChan := make(chan error)
go func() {
defer cancelTimeout()
defer close(errChan)

if err := cmd.Start(); err != nil {
Expand All @@ -164,13 +173,17 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
var wg sync.WaitGroup
wg.Add(3)

var ioCopyError error
go func() {
defer wg.Done()
if stdinReader != nil {
defer stdin.Close()
_, copyErr := io.Copy(stdin, stdinReader)
if copyErr != nil {
errChan <- errors.Wrapf(ErrFailed, "failed to copy from input reader to stdin: %v", copyErr)
if errors.Is(copyErr, os.ErrClosed) {
return
}
ioCopyError = errors.Wrapf(ErrFailed, "failed to copy from input reader to stdin: %v", copyErr)
}
}
}()
Expand All @@ -179,7 +192,10 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
if stdoutWriter != nil {
_, copyErr := io.Copy(stdoutWriter, stdout)
if copyErr != nil {
errChan <- errors.Wrapf(ErrFailed, "failed to copy stdout to output writer: %v", copyErr)
if errors.Is(copyErr, os.ErrClosed) {
return
}
ioCopyError = errors.Wrapf(ErrFailed, "failed to copy stdout to output writer: %v", copyErr)
}
}
}()
Expand All @@ -188,25 +204,30 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
if stderrWriter != nil {
_, copyErr := io.Copy(stderrWriter, stderr)
if copyErr != nil {
errChan <- errors.Wrapf(ErrFailed, "failed to copy stderr to error writer: %v", copyErr)
if errors.Is(copyErr, os.ErrClosed) {
return
}
ioCopyError = errors.Wrapf(ErrFailed, "failed to copy stderr to error writer: %v", copyErr)
}
}
}()

wg.Wait()

// wait for the command to finish and the pipes to be closed
execErr := cmd.Wait()
if execErr != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
execErr = ErrTimeout
} else {
var exitErr *exec.ExitError
if errors.As(execErr, &exitErr) && stderrWriter == nil {
execErr = errors.Wrap(ErrFailed, string(exitErr.Stderr))
}
}
}
errChan <- execErr

// and then wait for the io copy goroutines to finish
wg.Wait()

if execErr != nil {
errChan <- execErr
} else {
errChan <- ioCopyError
}
}()
return errChan, nil
}
Loading

0 comments on commit 9f5742c

Please sign in to comment.