Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PMM-13292 Add INITIALIZATION_ERROR status for process initialization failure #2935

Merged
merged 21 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7ca52dc
add process init fail status
GuanqunYang193 Mar 29, 2024
86b79a2
Merge branch 'main' into report_process_init_error
GuanqunYang193 Mar 29, 2024
7f99acb
minor fix
GuanqunYang193 Apr 2, 2024
e280e5d
Merge branch 'main' into report_process_init_error
GuanqunYang193 Apr 2, 2024
bd8647f
Merge branch 'main' into report_process_init_error
JiriCtvrtka May 7, 2024
125bae9
Merge branch 'main' into report_process_init_error
JiriCtvrtka Jun 7, 2024
01debf7
Merge branch 'percona:main' into report_process_init_error
GuanqunYang193 Jun 13, 2024
8c081b4
fixed the test
GuanqunYang193 Jun 13, 2024
9059aff
lint and add timer
GuanqunYang193 Jun 14, 2024
4da031c
Merge branch 'main' into report_process_init_error
GuanqunYang193 Jun 23, 2024
b950486
lint
GuanqunYang193 Jun 23, 2024
ab545e8
lint
GuanqunYang193 Jun 25, 2024
b863017
Merge branch 'main' into report_process_init_error
rnovikovP Jun 26, 2024
b861c25
Merge branch 'main' into report_process_init_error
GuanqunYang193 Jun 26, 2024
c54add9
Merge branch 'main' into report_process_init_error
rnovikovP Jul 1, 2024
3431132
Merge branch 'main' into report_process_init_error
JiriCtvrtka Jul 8, 2024
296fdb5
Merge branch 'main' into report_process_init_error
rnovikovP Jul 16, 2024
72fad58
Merge branch 'main' into report_process_init_error
JiriCtvrtka Jul 29, 2024
a2ccbae
Merge branch 'main' into report_process_init_error
BupycHuk Aug 3, 2024
9796faf
Merge branch 'main' into report_process_init_error
JiriCtvrtka Aug 7, 2024
9055176
Merge branch 'main' into report_process_init_error
ademidoff Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions agent/agents/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package process

import (
"context"
"errors"
"fmt"
"os/exec"
"strings"
Expand Down Expand Up @@ -53,12 +54,14 @@ const (
// implements its own logic, and then switches to then next state via "go toXXX()". "go" statement is used
// only to avoid stack overflow; there are no extra goroutines for states.
type Process struct {
params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}
params *Params
l *logrus.Entry
pl *processLogger
changes chan inventorypb.AgentStatus
backoff *backoff.Backoff
ctxDone chan struct{}
err chan error
GuanqunYang193 marked this conversation as resolved.
Show resolved Hide resolved
initialized chan bool

// recreated on each restart
cmd *exec.Cmd
Expand Down Expand Up @@ -88,15 +91,25 @@ func (p *Params) String() string {
// New creates new process.
func New(params *Params, redactWords []string, l *logrus.Entry) *Process {
return &Process{
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
params: params,
l: l,
pl: newProcessLogger(l, keepLogLines, redactWords),
changes: make(chan inventorypb.AgentStatus, 10),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
ctxDone: make(chan struct{}),
err: make(chan error),
initialized: make(chan bool, 2),
}
}

func (p *Process) IsInitialized() <-chan bool {
return p.initialized
}

func (p *Process) GetError() <-chan error {
return p.err
}

// Run starts process and runs until ctx is canceled.
func (p *Process) Run(ctx context.Context) {
go p.toStarting()
Expand All @@ -107,7 +120,7 @@ func (p *Process) Run(ctx context.Context) {
}

// STARTING -> RUNNING.
// STARTING -> WAITING.
// STARTING -> FAILING
func (p *Process) toStarting() {
p.l.Tracef("Process: starting.")
p.changes <- inventorypb.AgentStatus_STARTING
Expand All @@ -128,7 +141,7 @@ func (p *Process) toStarting() {

if err := p.cmd.Start(); err != nil {
p.l.Warnf("Process: failed to start: %s.", err)
go p.toWaiting()
go p.toFailing(err)
return
}

Expand All @@ -142,10 +155,11 @@ func (p *Process) toStarting() {
defer t.Stop()
select {
case <-t.C:
p.initialized <- true
go p.toRunning()
case <-p.cmdDone:
p.l.Warnf("Process: exited early: %s.", p.cmd.ProcessState)
go p.toWaiting()
go p.toFailing(errors.New("exited early"))
}
}

Expand Down Expand Up @@ -192,6 +206,16 @@ func (p *Process) toWaiting() {
}
}

// FAILING -> DONE
func (p *Process) toFailing(err error) {
p.l.Tracef("Process: failing")
p.changes <- inventorypb.AgentStatus_INITIALIZATION_ERROR
p.l.Infof("Process: exited: %s.", p.cmd.ProcessState)
go p.toDone()
p.initialized <- false
p.err <- err
}

// STOPPING -> DONE.
func (p *Process) toStopping() {
p.l.Tracef("Process: stopping (sending SIGTERM)...")
Expand Down
25 changes: 6 additions & 19 deletions agent/agents/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,22 @@ func TestProcess(t *testing.T) {
})

t.Run("FailedToStart", func(t *testing.T) {
ctx, cancel, l := setup(t)
ctx, _, l := setup(t)
p := New(&Params{Path: "no_such_command"}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_INITIALIZATION_ERROR,
inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("ExitedEarly", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("CancelStarting", func(t *testing.T) {
sleep := strconv.FormatFloat(runningT.Seconds()-0.5, 'f', -1, 64)
ctx, cancel, l := setup(t)
ctx, _, l := setup(t)
p := New(&Params{Path: "sleep", Args: []string{sleep}}, nil, l)
go p.Run(ctx)

assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_STARTING)
cancel()
assertStates(t, p, inventorypb.AgentStatus_WAITING, inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
assertStates(t, p, inventorypb.AgentStatus_STARTING, inventorypb.AgentStatus_INITIALIZATION_ERROR,
inventorypb.AgentStatus_DONE, inventorypb.AgentStatus_AGENT_STATUS_INVALID)
})

t.Run("Exited", func(t *testing.T) {
Expand Down
44 changes: 32 additions & 12 deletions agent/agents/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (s *Supervisor) RestartAgents() {
agent.cancel()
<-agent.done

if err := s.startProcess(id, agent.requestedState, agent.listenPort); err != nil {
if err := s.tryStartProcess(id, agent.requestedState, agent.listenPort); err != nil {
s.l.Errorf("Failed to restart Agent: %s.", err)
}
}
Expand Down Expand Up @@ -310,22 +310,15 @@ func (s *Supervisor) setAgentProcesses(agentProcesses map[string]*agentpb.SetSta
agent.cancel()
<-agent.done

if err := s.startProcess(agentID, agentProcesses[agentID], agent.listenPort); err != nil {
if err := s.tryStartProcess(agentID, agentProcesses[agentID], agent.listenPort); err != nil {
s.l.Errorf("Failed to start Agent: %s.", err)
// TODO report that error to server
}
}

// start new agents
for _, agentID := range toStart {
port, err := s.portsRegistry.Reserve()
if err != nil {
s.l.Errorf("Failed to reserve port: %s.", err)
// TODO report that error to server
continue
}

if err := s.startProcess(agentID, agentProcesses[agentID], port); err != nil {
if err := s.tryStartProcess(agentID, agentProcesses[agentID], 0); err != nil {
s.l.Errorf("Failed to start Agent: %s.", err)
// TODO report that error to server
}
Expand Down Expand Up @@ -427,10 +420,32 @@ func filter(existing, ap map[string]agentpb.AgentParams) ([]string, []string, []

//nolint:golint,stylecheck,revive
const (
type_TEST_SLEEP inventorypb.AgentType = 998 // process
type_TEST_NOOP inventorypb.AgentType = 999 // built-in
type_TEST_SLEEP inventorypb.AgentType = 998 // process
type_TEST_NOOP inventorypb.AgentType = 999 // built-in
process_Retry_Time int = 3
)

func (s *Supervisor) tryStartProcess(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess, port uint16) error {
var err error = nil
for i := 0; i < process_Retry_Time; i++ {
if port == 0 {
_port, err := s.portsRegistry.Reserve()
if err != nil {
s.l.Errorf("Failed to reserve port: %s.", err)
continue
}
port = _port
}

if err = s.startProcess(agentID, agentProcess, port); err == nil {
return nil
}

port = 0
}
return err
}

// startProcess starts Agent's process.
// Must be called with s.rw held for writing.
func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetStateRequest_AgentProcess, port uint16) error {
Expand Down Expand Up @@ -473,6 +488,11 @@ func (s *Supervisor) startProcess(agentID string, agentProcess *agentpb.SetState
close(done)
}()

if !<-process.IsInitialized() {
defer cancel()
return <-process.GetError()
}

//nolint:forcetypeassert
s.agentProcesses[agentID] = &agentProcessInfo{
cancel: cancel,
Expand Down
58 changes: 48 additions & 10 deletions agent/agents/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ func TestSupervisor(t *testing.T) {

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_STARTING},
GuanqunYang193 marked this conversation as resolved.
Show resolved Hide resolved
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_STARTING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
}
assert.Equal(t, expectedList, s.AgentsList())

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
Expand Down Expand Up @@ -114,17 +114,17 @@ func TestSupervisor(t *testing.T) {

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"})
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"},
)
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep2", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
{AgentType: type_TEST_SLEEP, AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"},
}
assert.Equal(t, expectedList, s.AgentsList())

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep2", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65001, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{
{AgentType: type_TEST_NOOP, AgentId: "noop3", Status: inventorypb.AgentStatus_RUNNING},
{AgentType: type_TEST_SLEEP, AgentId: "sleep1", Status: inventorypb.AgentStatus_RUNNING, ListenPort: 65000, ProcessExecPath: "sleep"},
Expand Down Expand Up @@ -259,6 +259,44 @@ func TestSupervisor(t *testing.T) {
})
}

func TestStartProcessFail(t *testing.T) {
GuanqunYang193 marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tempDir := t.TempDir()
cfgStorage := config.NewStorage(&config.Config{
Paths: config.Paths{TempDir: tempDir},
Ports: config.Ports{Min: 65000, Max: 65099},
Server: config.Server{Address: "localhost:443"},
LogLinesCount: 1,
})
s := NewSupervisor(ctx, nil, cfgStorage)
go s.Run(ctx)

t.Run("Start", func(t *testing.T) {
expectedList := []*agentlocalpb.AgentInfo{}
require.Equal(t, expectedList, s.AgentsList())

s.SetState(&agentpb.SetStateRequest{
AgentProcesses: map[string]*agentpb.SetStateRequest_AgentProcess{
"sleep1": {Type: type_TEST_SLEEP, Args: []string{"wrong format"}},
},
})

assertChanges(t, s,
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65000, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65001, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_STARTING, ListenPort: 65002, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_INITIALIZATION_ERROR, ListenPort: 65002, ProcessExecPath: "sleep"},
&agentpb.StateChangedRequest{AgentId: "sleep1", Status: inventorypb.AgentStatus_DONE, ListenPort: 65002, ProcessExecPath: "sleep"})
expectedList = []*agentlocalpb.AgentInfo{}
require.Equal(t, expectedList, s.AgentsList())
})
}

func TestFilter(t *testing.T) {
t.Parallel()

Expand Down
10 changes: 6 additions & 4 deletions api/agentlocalpb/json/agentlocalpb.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
"x-order": 4
},
"status": {
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.\n - INITIALIZATION_ERROR: Agent encountered error when starting.",
"type": "string",
"default": "AGENT_STATUS_INVALID",
"enum": [
Expand All @@ -159,7 +159,8 @@
"WAITING",
"STOPPING",
"DONE",
"UNKNOWN"
"UNKNOWN",
"INITIALIZATION_ERROR"
],
"x-order": 2
}
Expand Down Expand Up @@ -342,7 +343,7 @@
"x-order": 4
},
"status": {
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent encountered error and will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.",
"description": "AgentStatus represents actual Agent status.\n\n - STARTING: Agent is starting.\n - RUNNING: Agent is running.\n - WAITING: Agent will be restarted automatically soon.\n - STOPPING: Agent is stopping.\n - DONE: Agent finished.\n - UNKNOWN: Agent is not connected, we don't know anything about it's state.\n - INITIALIZATION_ERROR: Agent encountered error when starting.",
"type": "string",
"default": "AGENT_STATUS_INVALID",
"enum": [
Expand All @@ -352,7 +353,8 @@
"WAITING",
"STOPPING",
"DONE",
"UNKNOWN"
"UNKNOWN",
"INITIALIZATION_ERROR"
],
"x-order": 2
}
Expand Down
10 changes: 7 additions & 3 deletions api/agentlocalpb/json/client/agent_local/status2_responses.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading