Skip to content

Commit

Permalink
workload: add workload tools
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed May 10, 2024
1 parent 81c960e commit 23d4839
Show file tree
Hide file tree
Showing 7 changed files with 601 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/andydunstall/pico/cli/agent"
"github.com/andydunstall/pico/cli/server"
"github.com/andydunstall/pico/cli/status"
"github.com/andydunstall/pico/cli/workload"
"github.com/spf13/cobra"
)

Expand All @@ -22,6 +23,7 @@ func NewCommand() *cobra.Command {
cmd.AddCommand(agent.NewCommand())
cmd.AddCommand(server.NewCommand())
cmd.AddCommand(status.NewCommand())
cmd.AddCommand(workload.NewCommand())

return cmd
}
Expand Down
28 changes: 28 additions & 0 deletions cli/workload/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package workload

import "github.com/spf13/cobra"

func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "workload",
Short: "generate test workloads",
Long: `Generate test workloads.
This tool can be used to register upstream endpoints that echo received
requests and generate proxy request traffic.
Examples:
# Register 1000 endpoints and upstream servers.
pico workload endpoints --endpoints 1000
# Start 10 clients, each sending 5 requests a second where each request is
# send to a random endpoint.
pico workload requests --endpoints 1000 --rate 5 --clients 10
`,
}

cmd.AddCommand(newEndpointsCommand())
cmd.AddCommand(newRequestsCommand())

return cmd
}
130 changes: 130 additions & 0 deletions cli/workload/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package workload

import (
"context"
"fmt"
"os"
"os/signal"
"strconv"
"syscall"

picoconfig "github.com/andydunstall/pico/pkg/config"
"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/workload/config"
"github.com/andydunstall/pico/workload/upstream"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func newEndpointsCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "endpoints",
Short: "add upstream endpoints",
Long: `Add upstream endpoints.
Starts the configured number of upstream HTTP servers that return status code
200 and echo the request body. Each upstream server has a corresponding agent
that registers an endpoint for that server.
Endpoint IDs will be assigned to upstreams from the number of endpoints. Such
as if you have 1000 upstreams and 100 endpoints, then you'll have 10 upstream
servers per endpoint.
Examples:
# Start 1000 upstream servers with 100 endpoints.
pico workload endpoints
# Start 5000 upstream servers with 5000 endpoints (so each upstream has a
# unique endpoint ID).
pico workload endpoints --upstreams 5000 --endpoints 5000
# Specify the Pico server address.
pico workload endpoints --server.url https://pico.example.com:8001
`,
}

var conf config.EndpointsConfig

var configPath string
cmd.Flags().StringVar(
&configPath,
"config.path",
"",
`
YAML config file path.`,
)

var configExpandEnv bool
cmd.Flags().BoolVar(
&configExpandEnv,
"config.expand-env",
false,
`
Whether to expand environment variables in the config file.
This will replaces references to ${VAR} or $VAR with the corresponding
environment variable. The replacement is case-sensitive.
References to undefined variables will be replaced with an empty string. A
default value can be given using form ${VAR:default}.`,
)

// Register flags and set default values.
conf.RegisterFlags(cmd.Flags())

cmd.Run = func(cmd *cobra.Command, args []string) {
if configPath != "" {
if err := picoconfig.Load(configPath, &conf, configExpandEnv); err != nil {
fmt.Printf("load config: %s\n", err.Error())
os.Exit(1)
}
}

if err := conf.Validate(); err != nil {
fmt.Printf("invalid config: %s\n", err.Error())
os.Exit(1)
}

logger, err := log.NewLogger(conf.Log.Level, conf.Log.Subsystems)
if err != nil {
fmt.Printf("failed to setup logger: %s\n", err.Error())
os.Exit(1)
}

if err := runEndpoints(&conf, logger); err != nil {
logger.Error("failed to run server", zap.Error(err))
os.Exit(1)
}
}

return cmd
}

func runEndpoints(conf *config.EndpointsConfig, logger log.Logger) error {
logger.Info("starting endpoints workload", zap.Any("conf", conf))

ctx, cancel := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM,
)
defer cancel()

g, ctx := errgroup.WithContext(ctx)

nextEndpointID := 0
for i := 0; i != conf.Upstreams; i++ {
upstream := upstream.NewUpstream(
strconv.Itoa(nextEndpointID),
conf.Server.URL,
logger,
)
g.Go(func() error {
return upstream.Run(ctx)
})

nextEndpointID++
nextEndpointID %= conf.Endpoints
}

return g.Wait()
}
152 changes: 152 additions & 0 deletions cli/workload/requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package workload

import (
"context"
"fmt"
"math/rand"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

picoconfig "github.com/andydunstall/pico/pkg/config"
"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/workload/config"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func newRequestsCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "requests",
Short: "generate proxy request traffic",
Long: `Generate proxy request traffic.
Starts the configured number of clients sending HTTP requests to Pico which
are proxied to the upstream endpoints.
Each request selects a random endpoint ID from the configured number of
endpoints.
By default requests are empty, though you can configure the payload size of
each message.
Examples:
# Run 50 clients with 10 requests per second using 100 endpoints.
pico workload requests
# Run 100 clients with 2 requests per second using 50 endpoints.
pico workload requests --clients 100 --rate 2 --endpoints 50
# Specify the Pico server address.
pico workload requests --server.url https://pico.example.com:8000
# Specify the request payload size.
pico workload requests --request.size 1024
`,
}

var conf config.RequestsConfig

var configPath string
cmd.Flags().StringVar(
&configPath,
"config.path",
"",
`
YAML config file path.`,
)

var configExpandEnv bool
cmd.Flags().BoolVar(
&configExpandEnv,
"config.expand-env",
false,
`
Whether to expand environment variables in the config file.
This will replaces references to ${VAR} or $VAR with the corresponding
environment variable. The replacement is case-sensitive.
References to undefined variables will be replaced with an empty string. A
default value can be given using form ${VAR:default}.`,
)

// Register flags and set default values.
conf.RegisterFlags(cmd.Flags())

cmd.Run = func(cmd *cobra.Command, args []string) {
if configPath != "" {
if err := picoconfig.Load(configPath, &conf, configExpandEnv); err != nil {
fmt.Printf("load config: %s\n", err.Error())
os.Exit(1)
}
}

if err := conf.Validate(); err != nil {
fmt.Printf("invalid config: %s\n", err.Error())
os.Exit(1)
}

logger, err := log.NewLogger(conf.Log.Level, conf.Log.Subsystems)
if err != nil {
fmt.Printf("failed to setup logger: %s\n", err.Error())
os.Exit(1)
}

if err := runRequests(&conf, logger); err != nil {
logger.Error("failed to run server", zap.Error(err))
os.Exit(1)
}
}

return cmd
}

func runRequests(conf *config.RequestsConfig, logger log.Logger) error {
logger.Info("starting requests workload", zap.Any("conf", conf))

ctx, cancel := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM,
)
defer cancel()

g, ctx := errgroup.WithContext(ctx)

for i := 0; i != conf.Clients; i++ {
g.Go(func() error {
return runClient(ctx, conf, logger)
})
}

return g.Wait()
}

func runClient(ctx context.Context, conf *config.RequestsConfig, logger log.Logger) error {
ticker := time.NewTicker(time.Duration(int(time.Second) / conf.Rate))
defer ticker.Stop()

client := &http.Client{}
for {
select {
case <-ticker.C:
endpointID := rand.Int() % conf.Endpoints
req, _ := http.NewRequest("GET", conf.Server.URL, nil)
req.Header.Set("x-pico-endpoint", strconv.Itoa(endpointID))
resp, err := client.Do(req)
if err != nil {
logger.Warn("request", zap.Error(err))
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
logger.Warn("bad status", zap.Int("status", resp.StatusCode))
}
case <-ctx.Done():
return nil
}
}
}
Loading

0 comments on commit 23d4839

Please sign in to comment.