From e2d5ef6db8d837182c01e7af5b62a0d59adf3070 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Thu, 6 May 2021 18:06:21 -0700 Subject: [PATCH 01/19] Unregister subprocesses once they are no longer running Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 016970e..483b5bc 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -286,6 +286,7 @@ func startMainProcess( processRegistry.Register(process) go func() { + defer processRegistry.Unregister(process) if err := process.Wait(); err != nil { errors <- &runErr{err} return @@ -330,6 +331,7 @@ func startPostStartProcesses( return } processRegistry.Register(postStartProcess) + defer processRegistry.Unregister(postStartProcess) if err := postStartProcess.Wait(); err != nil { errors <- &runErr{err} return @@ -522,6 +524,21 @@ func (pr *ProcessRegistry) Register(p Process) int { return len(pr.processes) } +// Unregister removes a process from the registry and returns how many processes are registered. +func (pr *ProcessRegistry) Unregister(p Process) int { + pr.Lock() + defer pr.Unlock() + + processes := make([]Process, 0) + for _, process := range pr.processes { + if p != process { + processes = append(processes, process) + } + } + pr.processes = processes + return len(pr.processes) +} + // SignalAll sends a signal to all registered processes. func (pr *ProcessRegistry) SignalAll(sig os.Signal) []error { pr.Lock() From 19b5b5df2afcd9fa729358e6be7038fa3674d360 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Thu, 6 May 2021 18:07:48 -0700 Subject: [PATCH 02/19] Don't exit container-run until SIGTERM is received, or the main process fails To avoid terminating early and killing a still running drain script it is necessary to keep the container-run process running even if the main process has already exited. Once container-run receives the SIGTERM signal it will pass it on to the child processes and then wait until all direct children have exited before exiting itself. This should allow the processes to terminate cleanly as long as the grace period has not yet expired. container-run will also terminate immediately if the main process terminates with an error. This allows the container to fail and be restarted by k8s. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 34 ++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 483b5bc..91d75d0 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -67,7 +67,8 @@ func Run( } done := make(chan struct{}, 1) - errors := make(chan error) + sigterm := make(chan struct{}, 1) + errors := make(chan error, 1) sigs := make(chan os.Signal, 1) commands := make(chan processCommand) @@ -102,7 +103,7 @@ func Run( return err } - go processRegistry.HandleSignals(sigs, errors) + go processRegistry.HandleSignals(sigs, sigterm, errors) // This flag records the state of the system and its child // processes. It is set to true when the child processes are @@ -161,15 +162,24 @@ func Run( } } case <-done: - // Ignore a done process when we actively - // stopped the children via ProcessStop. - if active { - return nil + // When the main process returns without error, treat it the + // same as if it has been stopped. + active = false + case <-sigterm: + // Once we receive a SIGTERM we wait until all child processes have terminated + // because Kubernetes will kill the container once the main process exits. + for { + count := processRegistry.Count() + if count == 0 { + return nil + } + time.Sleep(1 * time.Second) } case err := <-errors: + fmt.Printf("Error: %v\n", err) // Ignore done signals when we actively // stopped the children via ProcessStop. - // Wait returns with !state.Sucess, `signal: killed` + // Wait returns with !state.Success, `signal: killed` if active { return err } @@ -516,6 +526,11 @@ func NewProcessRegistry() *ProcessRegistry { } } +// Count returns the number of processes in the registry +func (pr *ProcessRegistry) Count() int { + return len(pr.processes) +} + // Register registers a process in the registry and returns how many processes are registered. func (pr *ProcessRegistry) Register(p Process) int { pr.Lock() @@ -555,12 +570,15 @@ func (pr *ProcessRegistry) SignalAll(sig os.Signal) []error { // HandleSignals handles the signals channel and forwards them to the // registered processes. After a signal is handled it keeps running to // handle any future ones. -func (pr *ProcessRegistry) HandleSignals(sigs <-chan os.Signal, errors chan<- error) { +func (pr *ProcessRegistry) HandleSignals(sigs <-chan os.Signal, sigterm chan<- struct{}, errors chan<- error) { for { sig := <-sigs for _, err := range pr.SignalAll(sig) { errors <- err } + if sig == syscall.SIGTERM { + sigterm <- struct{}{} + } } } From f009f41a72d665b372e9e87b1db874cff2338694 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Thu, 6 May 2021 19:03:20 -0700 Subject: [PATCH 03/19] Add QUIT command to send SIGQUIT to process This is used to trigger graceful shutdown of nginx. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 91d75d0..7d294b5 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -26,6 +26,8 @@ const ( ProcessStart = "START" // ProcessStop is the command to stop and suspend the child processes. ProcessStop = "STOP" + // SignalQuit is the command to send a QUIT signal to the child processes. + SignalQuit = "QUIT" ) type processCommand string @@ -123,7 +125,6 @@ func Run( // already stopped does nothing. Similarly for // demanding a start when the children are // started/up/active. - switch cmd { case ProcessStop: if active { @@ -160,6 +161,8 @@ func Run( active = true } + case SignalQuit: + processRegistry.SignalAll(syscall.SIGQUIT) } case <-done: // When the main process returns without error, treat it the @@ -232,7 +235,7 @@ func handlePacket( command := strings.TrimSpace(string(packet[:n])) switch command { - case ProcessStart, ProcessStop: + case ProcessStart, ProcessStop, SignalQuit: commands <- processCommand(command) default: // Bad commands are ignored. Else they could be used to DOS the runner. From 50be76f627847a2a9aff5edb755cefc1409b9527 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Thu, 6 May 2021 19:56:21 -0700 Subject: [PATCH 04/19] Create status indicator on the shared volume while the process is running Can be used to wait for a process to stop after sending the STOP command. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 7d294b5..3085852 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -91,6 +91,8 @@ func Run( } if err := startProcesses( + jobName, + processName, runner, conditionRunner, commandChecker, @@ -145,6 +147,8 @@ func Run( case ProcessStart: if !active { err := startProcesses( + jobName, + processName, runner, conditionRunner, commandChecker, @@ -249,6 +253,8 @@ func stopProcesses(processRegistry *ProcessRegistry, errors chan<- error) { } func startProcesses( + jobName string, + processName string, runner Runner, conditionRunner Runner, commandChecker Checker, @@ -261,6 +267,8 @@ func startProcesses( done chan struct{}, ) error { if err := startMainProcess( + jobName, + processName, runner, command, stdio, @@ -285,6 +293,8 @@ func startProcesses( } func startMainProcess( + jobName string, + processName string, runner Runner, command Command, stdio Stdio, @@ -298,8 +308,13 @@ func startMainProcess( } processRegistry.Register(process) + sentinel := fmt.Sprintf("/var/vcap/data/%s/%s_containerrun.running", jobName, processName) + file, _ := os.Create(sentinel) + _ = file.Close() + go func() { defer processRegistry.Unregister(process) + defer os.Remove(sentinel) if err := process.Wait(); err != nil { errors <- &runErr{err} return From 804b25693084f3d8a97ea6b896b33ffdb061c9ae Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Thu, 6 May 2021 20:57:10 -0700 Subject: [PATCH 05/19] Write /var/vcap/jobs/bpm/bin/bpm script for drain script usage It only supports the "start" and "stop" commands from the real bpm, but has additional "quit" and "running" commands to help reimplementing drain scripts that make use of the pid files. Signed-off-by: Jan Dubois --- cmd/main.go | 6 +++-- pkg/containerrun/bpm.go | 53 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 pkg/containerrun/bpm.go diff --git a/cmd/main.go b/cmd/main.go index a51abe6..dd45fc9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,11 +3,13 @@ package main import ( "os" - "code.cloudfoundry.org/quarks-container-run/cmd/containerrun" + cmd "code.cloudfoundry.org/quarks-container-run/cmd/containerrun" + pkg "code.cloudfoundry.org/quarks-container-run/pkg/containerrun" ) func main() { - if err := containerrun.NewDefaultContainerRunCmd().Execute(); err != nil { + pkg.WriteBPMscript() + if err := cmd.NewDefaultContainerRunCmd().Execute(); err != nil { os.Exit(1) } } diff --git a/pkg/containerrun/bpm.go b/pkg/containerrun/bpm.go new file mode 100644 index 0000000..e613e00 --- /dev/null +++ b/pkg/containerrun/bpm.go @@ -0,0 +1,53 @@ +package containerrun + +import ( + "io/ioutil" + "os" + "path/filepath" +) + +// WriteBPMscript creates a bpm script for drain script compatibility. +func WriteBPMscript() error { + fileName := "/var/vcap/jobs/bpm/bin/bpm" + + script := `#!/bin/bash + +function usage { + echo "usage: $0 [start|stop|quit|running] JOBNAME [-p PROCESSNAME]" + exit 1 +} + +if [ $# != 2 -a $# != 4 ]; then + usage +fi +if [ "$1" != "start" -a "$1" != "stop" -a "$1" != "quit" -a "$1" != "running" ]; then + usage +fi + +CMD="$1" +JOB="$2" +PROCESS="$2" + +if [ $# == 4 ]; then + if [ "$3" != "-p" ]; then + usage + fi + PROCESS="$4" +fi + +CONTAINER_RUN="/var/vcap/data/${JOB}/${PROCESS}_containerrun" +if [ "$CMD" == "running" ]; then + test -f "${CONTAINER_RUN}.running" +else + echo "${CMD^^}" | nc -w 1 -uU "${CONTAINER_RUN}.sock" +fi +` + if _, err := os.Stat(fileName); !os.IsNotExist(err) { + // Nothing to do if the file already exists + return nil + } + if err := os.MkdirAll(filepath.Dir(fileName), 0755); err != nil { + return err + } + return ioutil.WriteFile(fileName, []byte(script), 0755) +} From 484a7fb83020cf2b423f7ae8cffa04f5502677fd Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 17:29:56 -0700 Subject: [PATCH 06/19] Add --debug option Signed-off-by: Jan Dubois --- cmd/containerrun/cmd.go | 6 ++++++ go.mod | 1 + go.sum | 6 ++++++ pkg/containerrun/containerrun.go | 3 +++ 4 files changed, 16 insertions(+) diff --git a/cmd/containerrun/cmd.go b/cmd/containerrun/cmd.go index 774f010..ce7e51e 100644 --- a/cmd/containerrun/cmd.go +++ b/cmd/containerrun/cmd.go @@ -6,6 +6,7 @@ import ( "os/exec" "time" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" pkg "code.cloudfoundry.org/quarks-container-run/pkg/containerrun" @@ -26,6 +27,7 @@ func NewContainerRunCmd( var postStartCommandArgs []string var postStartConditionCommandName string var postStartConditionCommandArgs []string + var debug bool cmd := &cobra.Command{ Use: "container-run", @@ -33,6 +35,9 @@ func NewContainerRunCmd( SilenceUsage: true, SilenceErrors: true, RunE: func(cmd *cobra.Command, args []string) (err error) { + if debug { + log.SetLevel(log.DebugLevel) + } return run( runner, conditionRunner, @@ -56,6 +61,7 @@ func NewContainerRunCmd( cmd.Flags().StringArrayVar(&postStartCommandArgs, "post-start-arg", []string{}, "a post-start command arg") cmd.Flags().StringVar(&postStartConditionCommandName, "post-start-condition-name", "", "the post-start condition command name") cmd.Flags().StringArrayVar(&postStartConditionCommandArgs, "post-start-condition-arg", []string{}, "a post-start condition command arg") + cmd.Flags().BoolVar(&debug, "debug", false, "enable debug logging") return cmd } diff --git a/go.mod b/go.mod index f48d68a..d503662 100644 --- a/go.mod +++ b/go.mod @@ -6,5 +6,6 @@ require ( github.com/golang/mock v1.4.3 github.com/onsi/ginkgo v1.12.0 github.com/onsi/gomega v1.9.0 + github.com/sirupsen/logrus v1.2.0 github.com/spf13/cobra v0.0.7 ) diff --git a/go.sum b/go.sum index b4d4e24..8df619d 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,7 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= @@ -49,6 +50,7 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -70,6 +72,7 @@ github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= @@ -83,6 +86,7 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -95,6 +99,7 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= @@ -105,6 +110,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 3085852..f50cd82 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -6,6 +6,7 @@ package containerrun import ( "context" "fmt" + log "github.com/sirupsen/logrus" "io" "io/ioutil" "net" @@ -121,6 +122,7 @@ func Run( for { select { case cmd := <-commands: + log.Debugf("Received %s command\n", cmd) // Note: Commands are ignored if the system is // already in the requested state. I.e // demanding things to stop when things are @@ -173,6 +175,7 @@ func Run( // same as if it has been stopped. active = false case <-sigterm: + log.Debugln("Received SIGTERM; waiting for all children to stop") // Once we receive a SIGTERM we wait until all child processes have terminated // because Kubernetes will kill the container once the main process exits. for { From f822405b4c6723fc8e48c74bac7f6e5666aadb9e Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 17:57:32 -0700 Subject: [PATCH 07/19] Make errors channel unbuffered again It is important to process errors synchronously because their handling depends on the value of the 'active' flag. To avoid deadlocks any code sending errors to the channel must run in a blockable goroutine. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index f50cd82..fa609b0 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -71,7 +71,7 @@ func Run( done := make(chan struct{}, 1) sigterm := make(chan struct{}, 1) - errors := make(chan error, 1) + errors := make(chan error) sigs := make(chan os.Signal, 1) commands := make(chan processCommand) @@ -250,9 +250,15 @@ func handlePacket( } func stopProcesses(processRegistry *ProcessRegistry, errors chan<- error) { - for _, err := range processRegistry.SignalAll(os.Kill) { - errors <- err - } + var wg sync.WaitGroup + wg.Add(1) + go func() { + for _, err := range processRegistry.SignalAll(os.Kill) { + errors <- err + } + wg.Done() + }() + wg.Wait() } func startProcesses( @@ -594,6 +600,7 @@ func (pr *ProcessRegistry) SignalAll(sig os.Signal) []error { func (pr *ProcessRegistry) HandleSignals(sigs <-chan os.Signal, sigterm chan<- struct{}, errors chan<- error) { for { sig := <-sigs + log.Debugf("Sending '%s' signal to %d processes", sig, len(pr.processes)) for _, err := range pr.SignalAll(sig) { errors <- err } From 2b39e8ac813637805d581e7f68605d757842b34a Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 18:10:07 -0700 Subject: [PATCH 08/19] Use SIGTERM instead of SIGKILL to STOP processes (and give them a chance to notify their child processes). If the processes haven't terminated in 20s, send SIGKILL. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index fa609b0..c218819 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -6,7 +6,6 @@ package containerrun import ( "context" "fmt" - log "github.com/sirupsen/logrus" "io" "io/ioutil" "net" @@ -17,11 +16,14 @@ import ( "sync" "syscall" "time" + + log "github.com/sirupsen/logrus" ) const ( postStartTimeout = time.Minute * 15 conditionSleepTime = time.Second * 3 + sigtermTimeout = time.Second * 20 // ProcessStart is the command to restart the suspended child processes. ProcessStart = "START" @@ -148,6 +150,9 @@ func Run( } case ProcessStart: if !active { + // Make sure any previous instance is gone now, and the kill timer is stopped. + processRegistry.KillAll() + err := startProcesses( jobName, processName, @@ -253,9 +258,16 @@ func stopProcesses(processRegistry *ProcessRegistry, errors chan<- error) { var wg sync.WaitGroup wg.Add(1) go func() { - for _, err := range processRegistry.SignalAll(os.Kill) { + log.Debugln("sending SIGTERM") + for _, err := range processRegistry.SignalAll(syscall.SIGTERM) { errors <- err } + // bpm would send a SIGQUIT signal to dump the stack before sending SIGKILL, + // but there doesn't seem to be a point to be doing it in this context. + processRegistry.timer = time.AfterFunc(sigtermTimeout, func() { + log.Debugln("timeout SIGTERM") + processRegistry.KillAll() + }) wg.Done() }() wg.Wait() @@ -543,6 +555,7 @@ type Stdio struct { // ProcessRegistry handles all the processes. type ProcessRegistry struct { processes []Process + timer *time.Timer sync.Mutex } @@ -550,6 +563,8 @@ type ProcessRegistry struct { func NewProcessRegistry() *ProcessRegistry { return &ProcessRegistry{ processes: make([]Process, 0), + // It should always be safe to call timer.Stop(), so it must not be nil. + timer: time.NewTimer(time.Millisecond), } } @@ -578,6 +593,9 @@ func (pr *ProcessRegistry) Unregister(p Process) int { } } pr.processes = processes + if len(pr.processes) == 0 { + pr.timer.Stop() + } return len(pr.processes) } @@ -594,6 +612,13 @@ func (pr *ProcessRegistry) SignalAll(sig os.Signal) []error { return errors } +// KillAll stops the timer and sends a kill signal to all registered processes. +func (pr *ProcessRegistry) KillAll() { + log.Debugln("KillAll") + pr.timer.Stop() + pr.SignalAll(os.Kill) +} + // HandleSignals handles the signals channel and forwards them to the // registered processes. After a signal is handled it keeps running to // handle any future ones. From 6cb69e371b94d822f3453ea28b3973360c41f301 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 21:55:55 -0700 Subject: [PATCH 09/19] Add some additional debug logging Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index c218819..b483146 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -180,7 +180,7 @@ func Run( // same as if it has been stopped. active = false case <-sigterm: - log.Debugln("Received SIGTERM; waiting for all children to stop") + log.Debugln("Waiting for all children to stop") // Once we receive a SIGTERM we wait until all child processes have terminated // because Kubernetes will kill the container once the main process exits. for { @@ -577,6 +577,8 @@ func (pr *ProcessRegistry) Count() int { func (pr *ProcessRegistry) Register(p Process) int { pr.Lock() defer pr.Unlock() + + log.Debugf("Registering process %s\n", p) pr.processes = append(pr.processes, p) return len(pr.processes) } @@ -586,6 +588,7 @@ func (pr *ProcessRegistry) Unregister(p Process) int { pr.Lock() defer pr.Unlock() + log.Debugf("Unregistering process %s\n", p) processes := make([]Process, 0) for _, process := range pr.processes { if p != process { @@ -603,6 +606,7 @@ func (pr *ProcessRegistry) Unregister(p Process) int { func (pr *ProcessRegistry) SignalAll(sig os.Signal) []error { pr.Lock() defer pr.Unlock() + log.Debugf("Sending '%s' signal to %d processes\n", sig, len(pr.processes)) errors := make([]error, 0) for _, p := range pr.processes { if err := p.Signal(sig); err != nil { From d72b104ec923dda64314b835bfc404c0bef26b11 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 21:56:44 -0700 Subject: [PATCH 10/19] Unregister process as soon as it quits Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index b483146..151763c 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -334,12 +334,16 @@ func startMainProcess( _ = file.Close() go func() { - defer processRegistry.Unregister(process) - defer os.Remove(sentinel) if err := process.Wait(); err != nil { + log.Debugf("Process has failed with error: %s\n", err) + processRegistry.Unregister(process) + os.Remove(sentinel) errors <- &runErr{err} return } + log.Debugln("Process has ended normally") + processRegistry.Unregister(process) + os.Remove(sentinel) done <- struct{}{} }() From f4213ceb82c47252bf499c92ba2142e8a59075e9 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 21:59:06 -0700 Subject: [PATCH 11/19] Call stopProcesses on SIGTERM (and SIGINT for testing) That way it also starts the timeout trigger if the process doesn't stop within 20s. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 151763c..6683a92 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -633,12 +633,15 @@ func (pr *ProcessRegistry) KillAll() { func (pr *ProcessRegistry) HandleSignals(sigs <-chan os.Signal, sigterm chan<- struct{}, errors chan<- error) { for { sig := <-sigs - log.Debugf("Sending '%s' signal to %d processes", sig, len(pr.processes)) - for _, err := range pr.SignalAll(sig) { - errors <- err - } - if sig == syscall.SIGTERM { + log.Debugf("Received '%s' signal\n", sig) + if sig == syscall.SIGTERM || sig == syscall.SIGINT { + stopProcesses(pr, errors) + log.Debugln("Write to sigterm channel") sigterm <- struct{}{} + } else { + for _, err := range pr.SignalAll(sig) { + errors <- err + } } } } From 2f616e577d0bb19b3dd02e6f64eef16cad5ba343 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 22:00:12 -0700 Subject: [PATCH 12/19] Only handle SIGINT, SIGQUIT, and SIGTERM signals The rest are just confusing the debug output, and passing on e.g. SIGCHLD to the child processes is neither correct nor useful. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 6683a92..8c33b4f 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -77,7 +77,7 @@ func Run( sigs := make(chan os.Signal, 1) commands := make(chan processCommand) - signal.Notify(sigs) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) processRegistry := NewProcessRegistry() command := Command{ From 1029a22553cd0a21c5620b453f365fc11dc36f4e Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 22:41:45 -0700 Subject: [PATCH 13/19] bpm stop now waits up to 30s for the process to stop container-run should send a SIGKILL if the process is still alive after 20s, so if it isn't gone after 30, then it probably will not stop. The `bpm running` subcommand will also now print "yes" or "no" in addition to setting the exit code. Signed-off-by: Jan Dubois --- pkg/containerrun/bpm.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/containerrun/bpm.go b/pkg/containerrun/bpm.go index e613e00..38cdecf 100644 --- a/pkg/containerrun/bpm.go +++ b/pkg/containerrun/bpm.go @@ -37,9 +37,23 @@ fi CONTAINER_RUN="/var/vcap/data/${JOB}/${PROCESS}_containerrun" if [ "$CMD" == "running" ]; then - test -f "${CONTAINER_RUN}.running" + if [ -f "${CONTAINER_RUN}.running" ]; then + echo "yes" + exit 0 + else + echo "no" + exit 1 + fi else echo "${CMD^^}" | nc -w 1 -uU "${CONTAINER_RUN}.sock" + if [ "${CMD}" == "stop" ]; then + for i in $(seq 30); do + test ! -f "${CONTAINER_RUN}.running" && exit 0 + sleep 1 + done + echo Process did not stop within 30 seconds + exit 1 + fi fi ` if _, err := os.Stat(fileName); !os.IsNotExist(err) { From 8480c609049a2ebf8259a6174e59cda66b8d03d0 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Fri, 7 May 2021 22:44:10 -0700 Subject: [PATCH 14/19] Make the tests compile again (but they no longer pass because they have not been updated with the new application logic). Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/containerrun/containerrun_test.go b/pkg/containerrun/containerrun_test.go index 888f814..a807d41 100644 --- a/pkg/containerrun/containerrun_test.go +++ b/pkg/containerrun/containerrun_test.go @@ -871,6 +871,7 @@ var _ = Describe("ProcessRegistry", func() { It("receives an error on the errors channel when signaling a process fails", func() { expectedErr := fmt.Errorf("failed to signal") sigs := make(chan os.Signal, 1) + sigterm := make(chan struct{}, 1) errors := make(chan error) sig := syscall.SIGTERM @@ -881,7 +882,7 @@ var _ = Describe("ProcessRegistry", func() { p1.EXPECT().Signal(sig).Return(expectedErr) - go pr.HandleSignals(sigs, errors) + go pr.HandleSignals(sigs, sigterm, errors) sigs <- sig err := <-errors Expect(err).To(Equal(expectedErr)) @@ -889,6 +890,7 @@ var _ = Describe("ProcessRegistry", func() { It("receives no error when signaling a process succeeds", func() { sigs := make(chan os.Signal, 1) + sigterm := make(chan struct{}, 1) errors := make(chan error) sig := syscall.SIGTERM @@ -899,7 +901,7 @@ var _ = Describe("ProcessRegistry", func() { p1.EXPECT().Signal(sig).Return(nil) - go pr.HandleSignals(sigs, errors) + go pr.HandleSignals(sigs, sigterm, errors) sigs <- sig Consistently(errors).ShouldNot(Receive()) }) From f48b916c2000e07578086224d4ff7080742aa309 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Sat, 8 May 2021 12:09:15 -0700 Subject: [PATCH 15/19] Add `bpm term` command; same as stop, but don't wait Also modify `bpm running` to only print yes/no when stdout is a tty. Signed-off-by: Jan Dubois --- pkg/containerrun/bpm.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/containerrun/bpm.go b/pkg/containerrun/bpm.go index 38cdecf..0db273a 100644 --- a/pkg/containerrun/bpm.go +++ b/pkg/containerrun/bpm.go @@ -13,14 +13,16 @@ func WriteBPMscript() error { script := `#!/bin/bash function usage { - echo "usage: $0 [start|stop|quit|running] JOBNAME [-p PROCESSNAME]" + echo "usage: $0 [start|stop|quit|term|running] JOBNAME [-p PROCESSNAME]" exit 1 } if [ $# != 2 -a $# != 4 ]; then usage fi -if [ "$1" != "start" -a "$1" != "stop" -a "$1" != "quit" -a "$1" != "running" ]; then +if [ "$1" != "start" -a "$1" != "stop" -a "$1" != "running" -a \ + "$1" != "quit" -a "$1" != "term" ] +then usage fi @@ -37,15 +39,18 @@ fi CONTAINER_RUN="/var/vcap/data/${JOB}/${PROCESS}_containerrun" if [ "$CMD" == "running" ]; then + # Print yes/no if stdout is a tty if [ -f "${CONTAINER_RUN}.running" ]; then - echo "yes" + test -t 1 && echo "yes" exit 0 else - echo "no" + test -t 1 && echo "no" exit 1 fi else - echo "${CMD^^}" | nc -w 1 -uU "${CONTAINER_RUN}.sock" + # "term" is the same as "stop", except we won't wait + ACTION="${CMD/term/stop}" + echo "${ACTION^^}" | nc -w 1 -uU "${CONTAINER_RUN}.sock" if [ "${CMD}" == "stop" ]; then for i in $(seq 30); do test ! -f "${CONTAINER_RUN}.running" && exit 0 From a3870a4dc4f3c541e69e4c7422e852454cfeaaaa Mon Sep 17 00:00:00 2001 From: Andrew Edgar Date: Mon, 10 May 2021 11:26:32 -0600 Subject: [PATCH 16/19] Refactor tests to work when Run does not exit --- pkg/containerrun/containerrun.go | 27 ++++++- pkg/containerrun/containerrun_test.go | 101 +++++++++++++++++++++++--- 2 files changed, 115 insertions(+), 13 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 8c33b4f..84bf0da 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -51,7 +51,6 @@ type CmdRun func( postStartConditionCommandArgs []string, ) error -// Run implements the logic for the container-run CLI command. func Run( runner Runner, conditionRunner Runner, @@ -65,6 +64,28 @@ func Run( postStartCommandArgs []string, postStartConditionCommandName string, postStartConditionCommandArgs []string, +) error { + return RunWithTestChan(runner, conditionRunner, commandChecker, listener, + stdio, args, jobName, processName, + postStartCommandName, postStartCommandArgs, postStartConditionCommandName, + postStartConditionCommandArgs, nil) +} + +// Run implements the logic for the container-run CLI command. +func RunWithTestChan( + runner Runner, + conditionRunner Runner, + commandChecker Checker, + listener PacketListener, + stdio Stdio, + args []string, + jobName string, + processName string, + postStartCommandName string, + postStartCommandArgs []string, + postStartConditionCommandName string, + postStartConditionCommandArgs []string, + testSigTermChan chan struct{}, ) error { if len(args) == 0 { err := fmt.Errorf("a command is required") @@ -73,6 +94,9 @@ func Run( done := make(chan struct{}, 1) sigterm := make(chan struct{}, 1) + if testSigTermChan != nil { + sigterm = testSigTermChan + } errors := make(chan error) sigs := make(chan os.Signal, 1) commands := make(chan processCommand) @@ -191,7 +215,6 @@ func Run( time.Sleep(1 * time.Second) } case err := <-errors: - fmt.Printf("Error: %v\n", err) // Ignore done signals when we actively // stopped the children via ProcessStop. // Wait returns with !state.Success, `signal: killed` diff --git a/pkg/containerrun/containerrun_test.go b/pkg/containerrun/containerrun_test.go index a807d41..5330da0 100644 --- a/pkg/containerrun/containerrun_test.go +++ b/pkg/containerrun/containerrun_test.go @@ -135,7 +135,17 @@ var _ = Describe("Run", func() { Run(command, stdio). Return(process, nil). Times(1) - err := Run(runner, nil, nil, spinner, stdio, commandLine, "job", "process", "", []string{}, "", []string{}) + + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, nil, nil, spinner, stdio, commandLine, "job", "process", "", []string{}, "", []string{}, sigTermChan) + resultChan <- err + }() + + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) @@ -159,7 +169,16 @@ var _ = Describe("Run", func() { Check(postStart.Name). Return(false). Times(1) - err := Run(runner, nil, checker, spinner, stdio, commandLine, "job", "process", postStart.Name, postStart.Arg, "", []string{}) + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, nil, checker, spinner, stdio, commandLine, "job", "process", postStart.Name, postStart.Arg, "", []string{}, sigTermChan) + resultChan <- err + }() + + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) @@ -287,7 +306,17 @@ var _ = Describe("Run", func() { Return(true). Times(1) conditionRunner := NewMockRunner(ctrl) - err := Run(runner, conditionRunner, checker, spinner, stdio, commandLine, "job", "process", postStart.Name, postStart.Arg, "", []string{}) + + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, conditionRunner, checker, spinner, stdio, commandLine, "job", "process", postStart.Name, postStart.Arg, "", []string{}, sigTermChan) + resultChan <- err + }() + + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) }) @@ -378,7 +407,17 @@ var _ = Describe("Run", func() { }). Return(nil, nil). Times(1) - err := Run(runner, conditionRunner, checker, spinner, stdio, commandLine, "job", "process", postStart.Name, postStart.Arg, postStartCondition.Name, postStartCondition.Arg) + + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, conditionRunner, checker, spinner, stdio, commandLine, "job", "process", postStart.Name, postStart.Arg, postStartCondition.Name, postStartCondition.Arg, sigTermChan) + resultChan <- err + }() + + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) }) @@ -432,7 +471,16 @@ var _ = Describe("Run", func() { ListenPacket(gomock.Any(), gomock.Any()). Return(bogus, nil). AnyTimes() - err := Run(runner, nil, nil, emit_bogus, stdio, commandLine, "job", "process", "", []string{}, "", []string{}) + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, nil, nil, emit_bogus, stdio, commandLine, "job", "process", "", []string{}, "", []string{}, sigTermChan) + resultChan <- err + }() + + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) @@ -556,7 +604,16 @@ var _ = Describe("Run", func() { Do(func(net, addr string) { <-trigger }). Return(packet_start, nil). AnyTimes() - err := Run(runner, nil, nil, emit_start, stdio, commandLine, "job", "process", "", []string{}, "", []string{}) + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, nil, nil, emit_start, stdio, commandLine, "job", "process", "", []string{}, "", []string{}, sigTermChan) + resultChan <- err + }() + + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) @@ -586,7 +643,7 @@ var _ = Describe("Run", func() { Times(1), ) process.EXPECT(). - Signal(os.Kill). + Signal(syscall.SIGTERM). // Signal kill, then trigger 2nd // `stop`. The emitter contains the // delay giving main the time to @@ -660,7 +717,18 @@ var _ = Describe("Run", func() { Return(packet_start, nil). AnyTimes(), ) - err := Run(runner, nil, nil, emitter, stdio, commandLine, "job", "process", "", []string{}, "", []string{}) + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, nil, nil, emitter, stdio, commandLine, "job", "process", "", []string{}, "", []string{}, sigTermChan) + + resultChan <- err + }() + + time.Sleep(3 * time.Second) + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) }) @@ -705,7 +773,7 @@ var _ = Describe("Run", func() { Times(1), ) process.EXPECT(). - Signal(os.Kill). + Signal(syscall.SIGTERM). // Signal kill, then trigger // `start`. The emitter contains the // delay giving main the time to @@ -772,7 +840,18 @@ var _ = Describe("Run", func() { Return(packet_start, nil). AnyTimes(), ) - err := Run(runner, nil, nil, emitter, stdio, commandLine, "job", "process", "", []string{}, "", []string{}) + sigTermChan := make(chan struct{}, 1) + resultChan := make(chan error) + go func() { + err := RunWithTestChan(runner, nil, nil, emitter, stdio, commandLine, "job", "process", "", []string{}, "", []string{}, sigTermChan) + + resultChan <- err + }() + + time.Sleep(3 * time.Second) + sigTermChan <- struct{}{} + + err := <-resultChan Expect(err).ToNot(HaveOccurred()) }) }) @@ -851,7 +930,7 @@ var _ = Describe("ProcessRegistry", func() { // Golang runtime internally raises SIGURG; we need to ignore them. p1.EXPECT().Signal(syscall.SIGURG).Return(nil).AnyTimes() p2.EXPECT().Signal(syscall.SIGURG).Return(nil).AnyTimes() - + errors := pr.SignalAll(sig) Expect(errors).To(Equal([]error{})) }) From 3b5b6bb9b6e22c702999ae7f333a0baa5baab476 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Tue, 11 May 2021 09:30:11 -0700 Subject: [PATCH 17/19] Code review feedback Signed-off-by: Jan Dubois --- pkg/containerrun/bpm.go | 2 ++ pkg/containerrun/containerrun.go | 13 +++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/containerrun/bpm.go b/pkg/containerrun/bpm.go index 0db273a..90f6e7c 100644 --- a/pkg/containerrun/bpm.go +++ b/pkg/containerrun/bpm.go @@ -50,6 +50,8 @@ if [ "$CMD" == "running" ]; then else # "term" is the same as "stop", except we won't wait ACTION="${CMD/term/stop}" + # Send "START", "STOP", or "QUIT" over UDP to the unix socket + # with a 1 seconds timeout to establish the connection. echo "${ACTION^^}" | nc -w 1 -uU "${CONTAINER_RUN}.sock" if [ "${CMD}" == "stop" ]; then for i in $(seq 30); do diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 84bf0da..60ece54 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -208,8 +208,7 @@ func RunWithTestChan( // Once we receive a SIGTERM we wait until all child processes have terminated // because Kubernetes will kill the container once the main process exits. for { - count := processRegistry.Count() - if count == 0 { + if processRegistry.Count() == 0 { return nil } time.Sleep(1 * time.Second) @@ -278,6 +277,9 @@ func handlePacket( } func stopProcesses(processRegistry *ProcessRegistry, errors chan<- error) { + // Run the code in a separate goroutine because the errors channel is + // unbuffered, so we can't write to it from the same goroutine that + // will be reading from it. var wg sync.WaitGroup wg.Add(1) go func() { @@ -597,6 +599,9 @@ func NewProcessRegistry() *ProcessRegistry { // Count returns the number of processes in the registry func (pr *ProcessRegistry) Count() int { + pr.Lock() + defer pr.Unlock() + return len(pr.processes) } @@ -610,13 +615,13 @@ func (pr *ProcessRegistry) Register(p Process) int { return len(pr.processes) } -// Unregister removes a process from the registry and returns how many processes are registered. +// Unregister removes a process from the registry and returns how many processes are still registered. func (pr *ProcessRegistry) Unregister(p Process) int { pr.Lock() defer pr.Unlock() log.Debugf("Unregistering process %s\n", p) - processes := make([]Process, 0) + processes := make([]Process, 0, len(pr.processes)) for _, process := range pr.processes { if p != process { processes = append(processes, process) From 27768225b550d589cdadff0f16984f3b620fe440 Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Tue, 11 May 2021 10:18:49 -0700 Subject: [PATCH 18/19] Run stopProcesses in a goroutine when called from the main goroutine The errors channel is unbuffered, so stopProcesses cannot deliver errors if it is running from the same goroutine as the receiver. Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 60ece54..4edbfc3 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -170,7 +170,8 @@ func RunWithTestChan( // signals. active = false - stopProcesses(processRegistry, errors) + // Run asynchronously so we can receive errors + go stopProcesses(processRegistry, errors) } case ProcessStart: if !active { @@ -277,12 +278,6 @@ func handlePacket( } func stopProcesses(processRegistry *ProcessRegistry, errors chan<- error) { - // Run the code in a separate goroutine because the errors channel is - // unbuffered, so we can't write to it from the same goroutine that - // will be reading from it. - var wg sync.WaitGroup - wg.Add(1) - go func() { log.Debugln("sending SIGTERM") for _, err := range processRegistry.SignalAll(syscall.SIGTERM) { errors <- err @@ -293,9 +288,6 @@ func stopProcesses(processRegistry *ProcessRegistry, errors chan<- error) { log.Debugln("timeout SIGTERM") processRegistry.KillAll() }) - wg.Done() - }() - wg.Wait() } func startProcesses( From 87b8fed3af1f11a87e7c31e04f680443aea661fc Mon Sep 17 00:00:00 2001 From: Jan Dubois Date: Tue, 11 May 2021 10:24:47 -0700 Subject: [PATCH 19/19] Fix indentation Signed-off-by: Jan Dubois --- pkg/containerrun/containerrun.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/containerrun/containerrun.go b/pkg/containerrun/containerrun.go index 4edbfc3..45f17f4 100644 --- a/pkg/containerrun/containerrun.go +++ b/pkg/containerrun/containerrun.go @@ -278,16 +278,16 @@ func handlePacket( } func stopProcesses(processRegistry *ProcessRegistry, errors chan<- error) { - log.Debugln("sending SIGTERM") - for _, err := range processRegistry.SignalAll(syscall.SIGTERM) { - errors <- err - } - // bpm would send a SIGQUIT signal to dump the stack before sending SIGKILL, - // but there doesn't seem to be a point to be doing it in this context. - processRegistry.timer = time.AfterFunc(sigtermTimeout, func() { - log.Debugln("timeout SIGTERM") - processRegistry.KillAll() - }) + log.Debugln("sending SIGTERM") + for _, err := range processRegistry.SignalAll(syscall.SIGTERM) { + errors <- err + } + // bpm would send a SIGQUIT signal to dump the stack before sending SIGKILL, + // but there doesn't seem to be a point to be doing it in this context. + processRegistry.timer = time.AfterFunc(sigtermTimeout, func() { + log.Debugln("timeout SIGTERM") + processRegistry.KillAll() + }) } func startProcesses(