From 025f212482d762bc11b53a2a12535ef44562c6b8 Mon Sep 17 00:00:00 2001 From: Paul Kramme Date: Mon, 29 Jul 2024 22:16:22 +0200 Subject: [PATCH] fix: shopware messengers are orphaned on crash or SIGINT --- cmd/project/project_worker.go | 39 +++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/cmd/project/project_worker.go b/cmd/project/project_worker.go index a75783f..98001b6 100644 --- a/cmd/project/project_worker.go +++ b/cmd/project/project_worker.go @@ -2,6 +2,7 @@ package project import ( "context" + "errors" "fmt" "os" "os/signal" @@ -9,6 +10,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/FriendsOfShopware/shopware-cli/internal/phpexec" "github.com/FriendsOfShopware/shopware-cli/shop" @@ -30,6 +32,7 @@ var projectWorkerCmd = &cobra.Command{ queuesToConsume, _ := cobraCmd.Flags().GetString("queue") memoryLimit, _ := cobraCmd.Flags().GetString("memory-limit") timeLimit, _ := cobraCmd.Flags().GetString("time-limit") + gracefulStopLimit, _ := cobraCmd.Flags().GetUint("graceful-stop-limit") if projectRoot, err = findClosestShopwareProject(); err != nil { return err @@ -37,7 +40,6 @@ var projectWorkerCmd = &cobra.Command{ if len(args) > 0 { workerAmount, err = strconv.Atoi(args[0]) - if err != nil { return err } @@ -76,15 +78,38 @@ var projectWorkerCmd = &cobra.Command{ for a := 0; a < workerAmount; a++ { wg.Add(1) go func(ctx context.Context, index int) { + defer wg.Done() + for { cmd := phpexec.ConsoleCommand(cancelCtx, consumeArgs...) cmd.Dir = projectRoot cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Env = append(os.Environ(), fmt.Sprintf("MESSENGER_CONSUMER_NAME=%s-%d", baseName, index)) + cmd.WaitDelay = time.Second + cmd.Cancel = func() error { + if gracefulStopLimit > 0 { + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + return err + } + + now := time.Now() + + for time.Since(now) < time.Second*time.Duration(gracefulStopLimit) { + if isProcessStopped(cmd.Process) { + return os.ErrProcessDone + } + time.Sleep(time.Millisecond * 250) + } + } + return cmd.Process.Kill() + } if err := cmd.Run(); err != nil { - logging.FromContext(ctx).Fatal(err) + if errors.Is(err, context.Canceled) { + break + } + logging.FromContext(ctx).Error(err) } } }(cancelCtx, a) @@ -102,14 +127,20 @@ func init() { projectWorkerCmd.PersistentFlags().String("queue", "", "Queues to consume") projectWorkerCmd.PersistentFlags().String("memory-limit", "", "Memory Limit") projectWorkerCmd.PersistentFlags().String("time-limit", "", "Time Limit") + projectWorkerCmd.PersistentFlags().Uint("graceful-stop-limit", 0, "Graceful Stop Limit") } func cancelOnTermination(ctx context.Context, cancel context.CancelFunc) { logging.FromContext(ctx).Infof("setting up a signal handler") s := make(chan os.Signal, 1) - signal.Notify(s, syscall.SIGTERM) + signal.Notify(s, syscall.SIGTERM, syscall.SIGINT) go func() { - logging.FromContext(ctx).Infof("received SIGTERM %v\n", <-s) + sig := <-s + logging.FromContext(ctx).Infof("received signal %v\n", sig.String()) cancel() }() } + +func isProcessStopped(p *os.Process) bool { + return errors.Is(p.Signal(syscall.Signal(0)), os.ErrProcessDone) +}