diff --git a/.changelog/24340.txt b/.changelog/24340.txt new file mode 100644 index 00000000000..dfd0375e267 --- /dev/null +++ b/.changelog/24340.txt @@ -0,0 +1,3 @@ +```release-note:improvement +drivers: Move executor process out of task cgroup after task starts on cgroups v1 +``` diff --git a/drivers/shared/executor/executor.go b/drivers/shared/executor/executor.go index c537d9de52f..97e7e272ab7 100644 --- a/drivers/shared/executor/executor.go +++ b/drivers/shared/executor/executor.go @@ -379,7 +379,8 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) } // setup containment (i.e. cgroups on linux) - if cleanup, err := e.configureResourceContainer(command, os.Getpid()); err != nil { + running, cleanup, err := e.configureResourceContainer(command, os.Getpid()) + if err != nil { e.logger.Error("failed to configure container, process isolation will not work", "error", err) if os.Geteuid() == 0 || e.usesCustomCgroup() { return nil, fmt.Errorf("unable to configure cgroups: %w", err) @@ -424,6 +425,12 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error) return nil, fmt.Errorf("failed to start command path=%q --- args=%q: %v", path, e.childCmd.Args, err) } + // Run the runningFunc hook after the process starts + if err := running(); err != nil { + return nil, err + } + + // Wait on the task process go e.wait() return &ProcessState{Pid: e.childCmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil } diff --git a/drivers/shared/executor/executor_basic.go b/drivers/shared/executor/executor_basic.go index bb0b42bde96..e96cdcff7ed 100644 --- a/drivers/shared/executor/executor_basic.go +++ b/drivers/shared/executor/executor_basic.go @@ -21,9 +21,10 @@ func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Exe return NewExecutor(logger, compute) } -func (e *UniversalExecutor) configureResourceContainer(_ *ExecCommand, _ int) (func(), error) { - nothing := func() {} - return nothing, nil +func (e *UniversalExecutor) configureResourceContainer(_ *ExecCommand, _ int) (func() error, func(), error) { + cleanup := func() {} + running := func() error { return nil } + return running, cleanup, nil } func (e *UniversalExecutor) start(command *ExecCommand) error { diff --git a/drivers/shared/executor/executor_universal_linux.go b/drivers/shared/executor/executor_universal_linux.go index 89c5486038b..5010171dbef 100644 --- a/drivers/shared/executor/executor_universal_linux.go +++ b/drivers/shared/executor/executor_universal_linux.go @@ -114,53 +114,73 @@ func (e *UniversalExecutor) statCG(cgroup string) (int, func(), error) { return fd, cleanup, err } +// runningFunc is called after task startup and is running. +// +// its use case is for moving the executor process out of the task cgroup once +// the child task process has been started (cgroups v1 only) +type runningFunc func() error + +// cleanupFunc is called after task shutdown +// +// its use case is for removing the cgroup from the system once it is no longer +// being used for running the task +type cleanupFunc func() + // configureResourceContainer on Linux configures the cgroups to be used to track // pids created by the executor // // pid: pid of the executor (i.e. ourself) -func (e *UniversalExecutor) configureResourceContainer(command *ExecCommand, pid int) (func(), error) { +func (e *UniversalExecutor) configureResourceContainer( + command *ExecCommand, + pid int, +) (runningFunc, cleanupFunc, error) { cgroup := command.StatsCgroup() // ensure tasks get the desired oom_score_adj value set if err := e.setOomAdj(command.OOMScoreAdj); err != nil { - return nil, err + return nil, nil, err } - // cgCleanup will be called after the task has been launched + // deleteCgroup will be called after the task has been launched // v1: remove the executor process from the task's cgroups // v2: let go of the file descriptor of the task's cgroup - var cgCleanup func() + var ( + deleteCgroup cleanupFunc + moveProcess runningFunc + ) // manually configure cgroup for cpu / memory constraints switch cgroupslib.GetMode() { case cgroupslib.CG1: if err := e.configureCG1(cgroup, command); err != nil { - return nil, err + return nil, nil, err } - cgCleanup = e.enterCG1(cgroup, command.CpusetCgroup()) + moveProcess, deleteCgroup = e.enterCG1(cgroup, command.CpusetCgroup()) default: e.configureCG2(cgroup, command) // configure child process to spawn in the cgroup // get file descriptor of the cgroup made for this task fd, cleanup, err := e.statCG(cgroup) if err != nil { - return nil, err + return nil, nil, err } e.childCmd.SysProcAttr.UseCgroupFD = true e.childCmd.SysProcAttr.CgroupFD = fd - cgCleanup = cleanup + deleteCgroup = cleanup + moveProcess = func() error { return nil } } e.logger.Info("configured cgroup for executor", "pid", pid) - return cgCleanup, nil + return moveProcess, deleteCgroup, nil } // enterCG1 will write the executor PID (i.e. itself) into the cgroups we // created for the task - so that the task and its children will spawn in // those cgroups. The cleanup function moves the executor out of the task's // cgroups and into the nomad/ parent cgroups. -func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() { +func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) (runningFunc, cleanupFunc) { + ed := cgroupslib.OpenPath(cpusetCgroup) pid := strconv.Itoa(unix.Getpid()) // write pid to all the normal interfaces @@ -174,21 +194,27 @@ func (e *UniversalExecutor) enterCG1(statsCgroup, cpusetCgroup string) func() { } // write pid to the cpuset interface, which varies between reserve/share - ed := cgroupslib.OpenPath(cpusetCgroup) err := ed.Write("cgroup.procs", pid) if err != nil { e.logger.Warn("failed to write cpuset cgroup", "error", err) } - // cleanup func that moves executor back up to nomad cgroup - return func() { - for _, iface := range ifaces { + move := func() error { + // move the executor back out + for _, iface := range append(ifaces, "cpuset") { err := cgroupslib.WriteNomadCG1(iface, "cgroup.procs", pid) if err != nil { e.logger.Warn("failed to move executor cgroup", "interface", iface, "error", err) + return err } } + return nil } + + // cleanup func does nothing in cgroups v1 + cleanup := func() {} + + return move, cleanup } func (e *UniversalExecutor) configureCG1(cgroup string, command *ExecCommand) error { diff --git a/drivers/shared/executor/executor_universal_linux_test.go b/drivers/shared/executor/executor_universal_linux_test.go index 5ff033ef9e1..b3cafa549db 100644 --- a/drivers/shared/executor/executor_universal_linux_test.go +++ b/drivers/shared/executor/executor_universal_linux_test.go @@ -8,6 +8,7 @@ package executor import ( "fmt" "os" + "path/filepath" "strconv" "strings" "testing" @@ -127,3 +128,37 @@ func TestUniversalExecutor_setOomAdj(t *testing.T) { oomScoreInt, _ := strconv.Atoi(strings.TrimSuffix(string(oomScore), "\n")) must.Eq(t, execCmd.OOMScoreAdj, int32(oomScoreInt)) } + +func TestUniversalExecutor_cg1_no_executor_pid(t *testing.T) { + testutil.CgroupsCompatibleV1(t) + ci.Parallel(t) + + factory := universalFactory + testExecCmd := testExecutorCommand(t) + execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir + execCmd.Cmd = "sleep" + execCmd.Args = []string{"infinity"} + + factory.configureExecCmd(t, execCmd) + defer allocDir.Destroy() + executor := factory.new(testlog.HCLogger(t), compute) + defer executor.Shutdown("", 0) + + p, err := executor.Launch(execCmd) + must.NoError(t, err) + + alloc := filepath.Base(allocDir.AllocDirPath()) + + ifaces := []string{"cpu", "memory", "freezer"} + for _, iface := range ifaces { + cgroup := fmt.Sprintf("/sys/fs/cgroup/%s/nomad/%s.web/cgroup.procs", iface, alloc) + + content, err := os.ReadFile(cgroup) + must.NoError(t, err) + + // ensure only 1 pid (sleep) is present in this cgroup + pids := strings.Fields(string(content)) + must.SliceLen(t, 1, pids) + must.Eq(t, pids[0], strconv.Itoa(p.Pid)) + } +}