Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support rpc JSON replay. #5064

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions cmd/admin-scanner-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/charmbracelet/bubbles/spinner"
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
humanize "github.com/dustin/go-humanize"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/minio/cli"
json "github.com/minio/colorjson"
Expand Down Expand Up @@ -64,9 +64,8 @@ var adminScannerInfoFlags = []cli.Flag{
Value: -1,
},
cli.StringFlag{
Name: "in",
Hidden: true,
Usage: "read previously saved json from file and replay",
Name: "in",
Usage: "read previously saved json from file and replay",
},
cli.StringFlag{
Name: "bucket",
Expand Down Expand Up @@ -198,19 +197,6 @@ func mainAdminScannerInfo(ctx *cli.Context) error {

checkAdminScannerInfoSyntax(ctx)

aliasedURL := ctx.Args().Get(0)

// Create a new MinIO Admin Client
client, err := newAdminClient(aliasedURL)
fatalIf(err.Trace(aliasedURL), "Unable to initialize admin client.")

if bucket := ctx.String("bucket"); bucket != "" {
bucketStats, err := client.BucketScanInfo(globalContext, bucket)
fatalIf(probe.NewError(err).Trace(aliasedURL), "Unable to get bucket stats.")
printMsg(bucketScanMsg{Stats: bucketStats})
return nil
}

ui := tea.NewProgram(initScannerMetricsUI(ctx.Int("max-paths")))
ctxt, cancel := context.WithCancel(globalContext)
defer cancel()
Expand All @@ -220,11 +206,11 @@ func mainAdminScannerInfo(ctx *cli.Context) error {
go func() {
if _, e := ui.Run(); e != nil {
cancel()
fatalIf(probe.NewError(e).Trace(aliasedURL), "Unable to fetch scanner metrics")
fatalIf(probe.NewError(e), "Unable to fetch scanner metrics")
}
}()
f, e := os.Open(inFile)
fatalIf(probe.NewError(e).Trace(aliasedURL), "Unable to open input")
fatalIf(probe.NewError(e), "Unable to open input")
sc := bufio.NewReader(f)
var lastTime time.Time
for {
Expand All @@ -250,6 +236,18 @@ func mainAdminScannerInfo(ctx *cli.Context) error {
os.Exit(0)
}

// Create a new MinIO Admin Client
aliasedURL := ctx.Args().Get(0)
client, err := newAdminClient(aliasedURL)
fatalIf(err.Trace(aliasedURL), "Unable to initialize admin client.")

if bucket := ctx.String("bucket"); bucket != "" {
bucketStats, err := client.BucketScanInfo(globalContext, bucket)
fatalIf(probe.NewError(err).Trace(aliasedURL), "Unable to get bucket stats.")
printMsg(bucketScanMsg{Stats: bucketStats})
return nil
}

opts := madmin.MetricsOptions{
Type: madmin.MetricsScanner,
N: ctx.Int("n"),
Expand Down
64 changes: 55 additions & 9 deletions cmd/support-top-rcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package cmd

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"sort"
"strings"
"time"
Expand All @@ -29,6 +32,7 @@ import (
tea "github.com/charmbracelet/bubbletea"
"github.com/charmbracelet/lipgloss"
"github.com/minio/cli"
json "github.com/minio/colorjson"
"github.com/minio/madmin-go/v3"
"github.com/minio/mc/pkg/probe"
"github.com/olekukonko/tablewriter"
Expand All @@ -49,6 +53,10 @@ var supportTopRPCFlags = []cli.Flag{
Usage: "number of requests to run before exiting. 0 for endless (default)",
Value: 0,
},
cli.StringFlag{
Name: "in",
Usage: "read previously saved json from file and replay",
},
}

var supportTopRPCCmd = cli.Command{
Expand Down Expand Up @@ -77,6 +85,9 @@ EXAMPLES:

// checkSupportTopNetSyntax - validate all the passed arguments
func checkSupportTopRPCSyntax(ctx *cli.Context) {
if ctx.String("in") != "" {
return
}
if len(ctx.Args()) == 0 || len(ctx.Args()) > 1 {
showCommandHelpAndExit(ctx, 1) // last argument is exit code
}
Expand All @@ -85,6 +96,45 @@ func checkSupportTopRPCSyntax(ctx *cli.Context) {
func mainSupportTopRPC(ctx *cli.Context) error {
checkSupportTopRPCSyntax(ctx)

ui := tea.NewProgram(initTopRPCUI())
ctxt, cancel := context.WithCancel(globalContext)
defer cancel()

// Replay from file.
if inFile := ctx.String("in"); inFile != "" {
go func() {
if _, e := ui.Run(); e != nil {
cancel()
fatalIf(probe.NewError(e), "Unable to fetch scanner metrics")
}
}()
f, e := os.Open(inFile)
fatalIf(probe.NewError(e), "Unable to open input")
sc := bufio.NewReader(f)
var lastTime time.Time
for {
b, e := sc.ReadBytes('\n')
if e == io.EOF {
break
}
var metrics madmin.RealtimeMetrics
e = json.Unmarshal(b, &metrics)
if e != nil || metrics.Aggregated.RPC == nil {
continue
}
delay := metrics.Aggregated.RPC.CollectedAt.Sub(lastTime)
if !lastTime.IsZero() && delay > 0 {
if delay > 3*time.Second {
delay = 3 * time.Second
}
time.Sleep(delay)
}
ui.Send(metrics)
lastTime = metrics.Aggregated.RPC.CollectedAt
}
os.Exit(0)
}

aliasedURL := ctx.Args().Get(0)
alias, _ := url2Alias(aliasedURL)
validateClusterRegistered(alias, false)
Expand All @@ -96,9 +146,6 @@ func mainSupportTopRPC(ctx *cli.Context) error {
return nil
}

ctxt, cancel := context.WithCancel(globalContext)
defer cancel()

// MetricsOptions are options provided to Metrics call.
opts := madmin.MetricsOptions{
Type: madmin.MetricsRPC,
Expand All @@ -116,20 +163,19 @@ func mainSupportTopRPC(ctx *cli.Context) error {
}
return nil
}
p := tea.NewProgram(initTopRPCUI())
go func() {
out := func(m madmin.RealtimeMetrics) {
p.Send(m)
ui.Send(m)
}

e := client.Metrics(ctxt, opts, out)
if e != nil {
fatalIf(probe.NewError(e), "Unable to fetch top net events")
}
p.Quit()
ui.Quit()
}()

if _, e := p.Run(); e != nil {
if _, e := ui.Run(); e != nil {
cancel()
fatalIf(probe.NewError(e).Trace(aliasedURL), "Unable to fetch top net events")
}
Expand Down Expand Up @@ -242,7 +288,7 @@ func (m *topRPCUI) View() string {
fmt.Sprintf(" To %s", host),
fmt.Sprintf("%d", v.Connected),
fmt.Sprintf("%0.1fms", v.LastPingMS),
fmt.Sprintf("%ds ago", time.Since(v.LastPongTime)/time.Second),
fmt.Sprintf("%ds ago", v.CollectedAt.Sub(v.LastPongTime)/time.Second),
fmt.Sprintf("%d", v.OutQueue),
fmt.Sprintf("%d", v.ReconnectCount),
fmt.Sprintf("->%d", v.IncomingStreams),
Expand All @@ -256,7 +302,7 @@ func (m *topRPCUI) View() string {
fmt.Sprintf("From %s", host),
fmt.Sprintf("%d", v.Connected),
fmt.Sprintf("%0.1fms", v.LastPingMS),
fmt.Sprintf("%ds ago", time.Since(v.LastPongTime)/time.Second),
fmt.Sprintf("%ds ago", v.CollectedAt.Sub(v.LastPongTime)/time.Second),
fmt.Sprintf("%d", v.OutQueue),
fmt.Sprintf("%d", v.ReconnectCount),
fmt.Sprintf("->%d", v.IncomingStreams),
Expand Down
Loading