From b1aae87c2679e355a7cc9912e8370a25785d3f4e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 31 Jan 2024 22:34:01 +0100 Subject: [PATCH] Add `cpu-profile` interactive command (#1358) * Add cpu-profile interactive command * better doc markdown Signed-off-by: Tim Vaillancourt * set block profile after isProfiling=1 Signed-off-by: Tim Vaillancourt * improve test Signed-off-by: Tim Vaillancourt * check isCPUProfiling later Signed-off-by: Tim Vaillancourt * Cleanup Signed-off-by: Tim Vaillancourt * Fix discrepancy Signed-off-by: Tim Vaillancourt * move base64 to .applyServerCommand(...) Signed-off-by: Tim Vaillancourt --------- Signed-off-by: Tim Vaillancourt Co-authored-by: meiji163 --- doc/interactive-commands.md | 1 + go/logic/server.go | 69 +++++++++++++++++++++++++++++++++++++ go/logic/server_test.go | 68 ++++++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+) create mode 100644 go/logic/server_test.go diff --git a/doc/interactive-commands.md b/doc/interactive-commands.md index 7ad44f1ac..587c979bf 100644 --- a/doc/interactive-commands.md +++ b/doc/interactive-commands.md @@ -17,6 +17,7 @@ Both interfaces may serve at the same time. Both respond to simple text command, - `help`: shows a brief list of available commands - `status`: returns a detailed status summary of migration progress and configuration - `sup`: returns a brief status summary of migration progress +- `cpu-profile`: returns a base64-encoded [`runtime/pprof`](https://pkg.go.dev/runtime/pprof) CPU profile using a duration, default: `30s`. Comma-separated options `gzip` and/or `block` (blocked profile) may follow the profile duration - `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server - `applier`: returns the hostname of the applier - `inspector`: returns the hostname of the inspector diff --git a/go/logic/server.go b/go/logic/server.go index 4b1b87023..4e41fd26b 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -7,17 +7,30 @@ package logic import ( "bufio" + "bytes" + "compress/gzip" + "encoding/base64" + "errors" "fmt" "io" "net" "os" + "runtime" + "runtime/pprof" "strconv" "strings" "sync/atomic" + "time" "github.com/github/gh-ost/go/base" ) +var ( + ErrCPUProfilingBadOption = errors.New("unrecognized cpu profiling option") + ErrCPUProfilingInProgress = errors.New("cpu profiling already in progress") + defaultCPUProfileDuration = time.Second * 30 +) + type printStatusFunc func(PrintStatusRule, io.Writer) // Server listens for requests on a socket file or via TCP @@ -27,6 +40,7 @@ type Server struct { tcpListener net.Listener hooksExecutor *HooksExecutor printStatus printStatusFunc + isCPUProfiling int64 } func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server { @@ -37,6 +51,54 @@ func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExec } } +func (this *Server) runCPUProfile(args string) (io.Reader, error) { + duration := defaultCPUProfileDuration + + var err error + var blockProfile, useGzip bool + if args != "" { + s := strings.Split(args, ",") + // a duration string must be the 1st field, if any + if duration, err = time.ParseDuration(s[0]); err != nil { + return nil, err + } + for _, arg := range s[1:] { + switch arg { + case "block", "blocked", "blocking": + blockProfile = true + case "gzip": + useGzip = true + default: + return nil, ErrCPUProfilingBadOption + } + } + } + + if atomic.LoadInt64(&this.isCPUProfiling) > 0 { + return nil, ErrCPUProfilingInProgress + } + atomic.StoreInt64(&this.isCPUProfiling, 1) + defer atomic.StoreInt64(&this.isCPUProfiling, 0) + + var buf bytes.Buffer + var writer io.Writer = &buf + if blockProfile { + runtime.SetBlockProfileRate(1) + defer runtime.SetBlockProfileRate(0) + } + if useGzip { + writer = gzip.NewWriter(writer) + } + if err = pprof.StartCPUProfile(writer); err != nil { + return nil, err + } + + time.Sleep(duration) + pprof.StopCPUProfile() + this.migrationContext.Log.Infof("Captured %d byte runtime/pprof CPU profile (gzip=%v)", buf.Len(), useGzip) + return &buf, nil +} + func (this *Server) BindSocketFile() (err error) { if this.migrationContext.ServeSocketFile == "" { return nil @@ -144,6 +206,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr fmt.Fprint(writer, `available commands: status # Print a detailed status message sup # Print a short status message +cpu-profile= # Print a base64-encoded runtime/pprof CPU profile using a duration, default: 30s. Comma-separated options 'gzip' and/or 'block' (blocked profile) may follow the profile duration coordinates # Print the currently inspected coordinates applier # Print the hostname of the applier inspector # Print the hostname of the inspector @@ -169,6 +232,12 @@ help # This message return ForcePrintStatusOnlyRule, nil case "info", "status": return ForcePrintStatusAndHintRule, nil + case "cpu-profile": + cpuProfile, err := this.runCPUProfile(arg) + if err == nil { + fmt.Fprint(base64.NewEncoder(base64.StdEncoding, writer), cpuProfile) + } + return NoPrintStatusRule, err case "coordinates": { if argIsQuestion || arg == "" { diff --git a/go/logic/server_test.go b/go/logic/server_test.go new file mode 100644 index 000000000..5ddedc1f1 --- /dev/null +++ b/go/logic/server_test.go @@ -0,0 +1,68 @@ +package logic + +import ( + "testing" + "time" + + "github.com/github/gh-ost/go/base" + "github.com/openark/golib/tests" +) + +func TestServerRunCPUProfile(t *testing.T) { + t.Parallel() + + t.Run("failed already running", func(t *testing.T) { + s := &Server{isCPUProfiling: 1} + profile, err := s.runCPUProfile("15ms") + tests.S(t).ExpectEquals(err, ErrCPUProfilingInProgress) + tests.S(t).ExpectEquals(profile, nil) + }) + + t.Run("failed bad duration", func(t *testing.T) { + s := &Server{isCPUProfiling: 0} + profile, err := s.runCPUProfile("should-fail") + tests.S(t).ExpectNotNil(err) + tests.S(t).ExpectEquals(profile, nil) + }) + + t.Run("failed bad option", func(t *testing.T) { + s := &Server{isCPUProfiling: 0} + profile, err := s.runCPUProfile("10ms,badoption") + tests.S(t).ExpectEquals(err, ErrCPUProfilingBadOption) + tests.S(t).ExpectEquals(profile, nil) + }) + + t.Run("success", func(t *testing.T) { + s := &Server{ + isCPUProfiling: 0, + migrationContext: base.NewMigrationContext(), + } + defaultCPUProfileDuration = time.Millisecond * 10 + profile, err := s.runCPUProfile("") + tests.S(t).ExpectNil(err) + tests.S(t).ExpectNotEquals(profile, nil) + tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0)) + }) + + t.Run("success with block", func(t *testing.T) { + s := &Server{ + isCPUProfiling: 0, + migrationContext: base.NewMigrationContext(), + } + profile, err := s.runCPUProfile("10ms,block") + tests.S(t).ExpectNil(err) + tests.S(t).ExpectNotEquals(profile, nil) + tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0)) + }) + + t.Run("success with block and gzip", func(t *testing.T) { + s := &Server{ + isCPUProfiling: 0, + migrationContext: base.NewMigrationContext(), + } + profile, err := s.runCPUProfile("10ms,block,gzip") + tests.S(t).ExpectNil(err) + tests.S(t).ExpectNotEquals(profile, nil) + tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0)) + }) +}