Skip to content

Commit

Permalink
test: add 'piko test workload'
Browse files Browse the repository at this point in the history
Adds 'piko test workload', which contains tools for generating test piko
workloads, including adding upstream listeners and generating HTTP and
TCP traffic.
  • Loading branch information
andydunstall committed Jul 13, 2024
1 parent b045a40 commit 8432480
Show file tree
Hide file tree
Showing 14 changed files with 893 additions and 523 deletions.
4 changes: 2 additions & 2 deletions cli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/andydunstall/piko/cli/agent"
"github.com/andydunstall/piko/cli/forward"
"github.com/andydunstall/piko/cli/server"
"github.com/andydunstall/piko/cli/workload"
"github.com/andydunstall/piko/cli/test"
workloadv2 "github.com/andydunstall/piko/cli/workloadv2"
"github.com/andydunstall/piko/pkg/build"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ to an upstream listener via Piko. Such as to forward port 3000 to endpoint
cmd.AddCommand(server.NewCommand())
cmd.AddCommand(agent.NewCommand())
cmd.AddCommand(forward.NewCommand())
cmd.AddCommand(workload.NewCommand())
cmd.AddCommand(test.NewCommand())
cmd.AddCommand(workloadv2.NewCommand())

return cmd
Expand Down
19 changes: 19 additions & 0 deletions cli/test/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package test

import (
"github.com/spf13/cobra"

"github.com/andydunstall/piko/cli/test/workload"
)

func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "test",
Short: "tools for testing piko clusters",
Long: `Tools for testing Piko clusters.`,
}

cmd.AddCommand(workload.NewCommand())

return cmd
}
37 changes: 37 additions & 0 deletions cli/test/workload/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package workload

import (
"github.com/spf13/cobra"

"github.com/andydunstall/piko/cli/test/workload/traffic"
"github.com/andydunstall/piko/cli/test/workload/upstreams"
)

func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "workload",
Short: "tools for generating test piko workloads",
Long: `Tools for generating test Piko workloads.
This includes adding upstream listeners and generating HTTP and TCP traffic.
Examples:
# Register HTTP upstreams.
piko test workload upstreams --protocol http
# Register TCP upstreams.
piko test workload upstreams --protocol tcp
# Generate HTTP traffic.
piko test workload traffic --protocol http
# Generate TCP traffic.
piko test workload traffic --protocol tcp
`,
}

cmd.AddCommand(upstreams.NewCommand())
cmd.AddCommand(traffic.NewCommand())

return cmd
}
122 changes: 122 additions & 0 deletions cli/test/workload/traffic/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package traffic

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
"go.uber.org/zap"

agentconfig "github.com/andydunstall/piko/agent/config"
"github.com/andydunstall/piko/pikotest/workload/traffic"
"github.com/andydunstall/piko/pikotest/workload/traffic/config"
"github.com/andydunstall/piko/pkg/build"
pikoconfig "github.com/andydunstall/piko/pkg/config"
"github.com/andydunstall/piko/pkg/log"
)

func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "traffic",
Short: "generate downstream traffic",
Long: `Generates downstream traffic.
Starts the configured number of clients, which connect to upstream listeners,
send random traffic, then verify the request is echoed back.
You can generate either HTTP or TCP traffic. HTTP workloads send HTTP requests
with random bodies and expect the request body to be echoed back. TCP workloads
open connections to upstreams then send random data and expect the data to be
echoed back.
Each request/connection selects a random endpoint ID from the configured number
of endpoints.
Examples:
# Generate HTTP traffic.
piko test workload traffic --protocol http
# Generate TCP traffic.
piko test workload traffic --protocol tcp
# Start 10 HTTP clients, each sending 5 requests per second to a random
# endpoint from 0 to 9999.
piko test workload traffic --protocol http --endpoints 1000 --rate 5 --clients 10
# Start 10 TCP clients, each opening a connection and sending a request 5
# times per second.
piko test workload traffic --protocol tcp --rate 5 --clients 10
`,
}

conf := config.Default()
var loadConf pikoconfig.Config

// Register flags and set default values.
conf.RegisterFlags(cmd.PersistentFlags())
loadConf.RegisterFlags(cmd.PersistentFlags())

var logger log.Logger

cmd.PersistentPreRun = func(_ *cobra.Command, _ []string) {
if err := loadConf.Load(conf); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}

if err := conf.Validate(); err != nil {
fmt.Printf("config: %s\n", err.Error())
os.Exit(1)
}

var err error
logger, err = log.NewLogger(conf.Log.Level, conf.Log.Subsystems)
if err != nil {
fmt.Printf("failed to setup logger: %s\n", err.Error())
os.Exit(1)
}
}

cmd.Run = func(_ *cobra.Command, _ []string) {
if err := run(conf, logger); err != nil {
logger.Error("failed to run upstreams", zap.Error(err))
os.Exit(1)
}
}

return cmd
}

func run(conf *config.Config, logger log.Logger) error {
logger.Info(
"starting traffic",
zap.String("version", build.Version),
)
logger.Debug("piko config", zap.Any("config", conf))

for i := 0; i != conf.Clients; i++ {
switch agentconfig.ListenerProtocol(conf.Protocol) {
case agentconfig.ListenerProtocolHTTP:
client := traffic.NewHTTPClient(conf, logger)
defer client.Close()
case agentconfig.ListenerProtocolTCP:
client := traffic.NewTCPClient(conf, logger)
defer client.Close()
default:
// Already verified so this won't happen.
panic("unsupported protocol: " + conf.Protocol)
}
}

// Termination handler.
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-signalCh
logger.Info(
"received shutdown signal",
zap.String("signal", sig.String()),
)
return nil
}
131 changes: 131 additions & 0 deletions cli/test/workload/upstreams/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package upstreams

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
"go.uber.org/zap"

agentconfig "github.com/andydunstall/piko/agent/config"
"github.com/andydunstall/piko/pikotest/workload/upstreams"
"github.com/andydunstall/piko/pikotest/workload/upstreams/config"
"github.com/andydunstall/piko/pkg/build"
pikoconfig "github.com/andydunstall/piko/pkg/config"
"github.com/andydunstall/piko/pkg/log"
)

func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "upstreams",
Short: "add upstream listeners",
Long: `Add upstream listeners.
Starts the configured number of upstream listeners, which can be either HTTP
or TCP.
HTTP upstreams listen for HTTP requests and echo the request body as a 200
response. TCP upstreams listen for connections and echo incoming bytes.
Endpoint IDs are evenly distributed among the listening upstreams, such as if
you configure 1000 upstreams and 100 endpoints, each endpoint will have 10
upstreams.
Examples:
# Start 1000 HTTP upstream servers with 100 endpoints.
piko workload upstreams
# Start 1000 TCP upstreams.
piko workload upstreams --protocol tcp
# Start 5000 HTTP upstream servers with 5000 endpoints (so each upstream has a
# unique endpoint ID).
piko workload upstreams --upstreams 5000 --endpoints 5000
# Configure the Piko server address.
piko workload upstreams --server.url https://piko.example.com:8001
`,
}

conf := config.Default()
var loadConf pikoconfig.Config

// Register flags and set default values.
conf.RegisterFlags(cmd.PersistentFlags())
loadConf.RegisterFlags(cmd.PersistentFlags())

var logger log.Logger

cmd.PersistentPreRun = func(_ *cobra.Command, _ []string) {
if err := loadConf.Load(conf); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}

if err := conf.Validate(); err != nil {
fmt.Printf("config: %s\n", err.Error())
os.Exit(1)
}

var err error
logger, err = log.NewLogger(conf.Log.Level, conf.Log.Subsystems)
if err != nil {
fmt.Printf("failed to setup logger: %s\n", err.Error())
os.Exit(1)
}
}

cmd.Run = func(_ *cobra.Command, _ []string) {
if err := run(conf, logger); err != nil {
logger.Error("failed to run upstreams", zap.Error(err))
os.Exit(1)
}
}

return cmd
}

func run(conf *config.Config, logger log.Logger) error {
logger.Info(
"starting upstreams",
zap.String("version", build.Version),
)
logger.Debug("piko config", zap.Any("config", conf))

nextEndpointID := 0
for i := 0; i != conf.Upstreams; i++ {
endpointID := fmt.Sprintf("endpoint-%d", nextEndpointID)
switch agentconfig.ListenerProtocol(conf.Protocol) {
case agentconfig.ListenerProtocolHTTP:
upstream, err := upstreams.NewHTTPUpstream(endpointID, conf, logger)
if err != nil {
return fmt.Errorf("upstream: %w", err)
}
defer upstream.Close()
case agentconfig.ListenerProtocolTCP:
upstream, err := upstreams.NewTCPUpstream(endpointID, conf, logger)
if err != nil {
return fmt.Errorf("upstream: %w", err)
}
defer upstream.Close()
default:
// Already verified so this won't happen.
panic("unsupported protocol: " + conf.Protocol)
}

nextEndpointID++
nextEndpointID %= conf.Endpoints
}

// Termination handler.
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-signalCh
logger.Info(
"received shutdown signal",
zap.String("signal", sig.String()),
)
return nil
}
28 changes: 0 additions & 28 deletions cli/workload/command.go

This file was deleted.

Loading

0 comments on commit 8432480

Please sign in to comment.