Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Make addProcess() parallel #10

Open
wants to merge 2 commits into
base: 17.06.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ sudo: required
language: go

go:
- 1.8.x
- tip
- 1.10.x

go_import_path: github.com/containerd/containerd

Expand Down
10 changes: 10 additions & 0 deletions runtime/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -228,6 +229,7 @@ type container struct {
runtime string
runtimeArgs []string
shim string
pmu sync.Mutex
processes map[string]*process
labels []string
oomFds []int
Expand Down Expand Up @@ -278,19 +280,25 @@ func (c *container) Delete() error {

func (c *container) Processes() ([]Process, error) {
out := []Process{}
c.pmu.Lock()
for _, p := range c.processes {
out = append(out, p)
}
c.pmu.Unlock()
return out, nil
}

func (c *container) RemoveProcess(pid string) error {
c.pmu.Lock()
delete(c.processes, pid)
c.pmu.Unlock()
return os.RemoveAll(filepath.Join(c.root, c.id, pid))
}

func (c *container) State() State {
c.pmu.Lock()
proc := c.processes[InitProcessID]
c.pmu.Unlock()
if proc == nil {
return Stopped
}
Expand Down Expand Up @@ -532,7 +540,9 @@ func (c *container) createCmd(ctx context.Context, pid string, cmd *exec.Cmd, p
ch <- err
return
}
c.pmu.Lock()
c.processes[pid] = p
c.pmu.Unlock()
ch <- nil
}()
select {
Expand Down
47 changes: 26 additions & 21 deletions supervisor/add_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,30 @@ func (s *Supervisor) addProcess(t *AddProcessTask) error {
if !ok {
return ErrContainerNotFound
}
process, err := ci.container.Exec(t.Ctx(), t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
return err
}
s.newExecSyncChannel(t.ID, t.PID)
if err := s.monitorProcess(process); err != nil {
s.deleteExecSyncChannel(t.ID, t.PID)
// Kill process
process.Signal(os.Kill)
ci.container.RemoveProcess(t.PID)
return err
}
ExecProcessTimer.UpdateSince(start)
t.StartResponse <- StartResponse{ExecPid: process.SystemPid()}
s.notifySubscribers(Event{
Timestamp: time.Now(),
Type: StateStartProcess,
PID: t.PID,
ID: t.ID,
})
return nil
go func() {
process, err := ci.container.Exec(t.Ctx(), t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
t.errCh <- err
return
}
s.newExecSyncChannel(t.ID, t.PID)
if err := s.monitorProcess(process); err != nil {
s.deleteExecSyncChannel(t.ID, t.PID)
// Kill process
process.Signal(os.Kill)
ci.container.RemoveProcess(t.PID)
t.errCh <- err
return
}
ExecProcessTimer.UpdateSince(start)
t.errCh <- nil
t.StartResponse <- StartResponse{ExecPid: process.SystemPid()}
s.notifySubscribers(Event{
Timestamp: time.Now(),
Type: StateStartProcess,
PID: t.PID,
ID: t.ID,
})
}()
return errDeferredResponse
}