From f5a603a991aa66cd6184b3372e2d8991fa4a453f Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Wed, 21 Feb 2024 12:02:52 +0200 Subject: [PATCH] Generate csv report when running example rpc client (#7) - add configurable flags for host/port/recipient --- examples/rpc/client.go | 134 ++++++++++++++++++++++++---- internal/matrix/matrix_messenger.go | 5 +- internal/metadata/metadata.go | 10 ++- scripts/printReport.sh | 42 +++++++++ scripts/sendXRequests.sh | 19 ++-- 5 files changed, 182 insertions(+), 28 deletions(-) mode change 100644 => 100755 examples/rpc/client.go create mode 100755 scripts/printReport.sh diff --git a/examples/rpc/client.go b/examples/rpc/client.go old mode 100644 new mode 100755 index b64381ed..68bed3f4 --- a/examples/rpc/client.go +++ b/examples/rpc/client.go @@ -3,9 +3,13 @@ package main import ( typesv1alpha "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha" "context" + "encoding/csv" + "flag" "fmt" - "log" "os" + "sort" + "strconv" + "sync" "time" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha/accommodationv1alphagrpc" @@ -20,6 +24,8 @@ import ( ) func main() { + var mu sync.Mutex + var wg sync.WaitGroup var logger *zap.Logger cfg := zap.NewDevelopmentConfig() cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) @@ -27,20 +33,43 @@ func main() { sLogger := logger.Sugar() logger.Sync() - argsWithoutProg := os.Args[1:] - unencrypted := len(argsWithoutProg) == 0 + times := flag.Int("requests", 1, "Repeat the request n times") + host := flag.String("host", "127.0.0.1", "Distributor bot host") + port := flag.Int("port", 9092, "Distributor bot port") + recipient := flag.String("recipient", "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", "Recipient address (format: @t-kopernikus[...]:matrix.camino.network") + caCertFile := flag.String("ca-cert-file", "", "CA certificate file (optional)") + flag.Parse() + ppConfig := config.PartnerPluginConfig{ - Host: "localhost", - Port: 9092, - Unencrypted: unencrypted, + Host: *host, + Port: *port, + Unencrypted: *caCertFile == "", + } + ppConfig.CACertFile = *caCertFile + + loadTestData := make([][]string, *times) + for i := 0; i < *times; i++ { + loadTestData[i] = make([]string, 6) + wg.Add(1) + go func(counter int) { + defer wg.Done() + createClientAndRunRequest(counter, ppConfig, sLogger, *recipient, loadTestData, mu) + }(i) } - if !unencrypted { - ppConfig.CACertFile = argsWithoutProg[0] + + wg.Wait() + + if len(loadTestData) > 1 || len(loadTestData) == 1 && loadTestData[0][0] != "" { // otherwise no data have been recorded + persistToCSV(loadTestData) } +} + +func createClientAndRunRequest(i int, ppConfig config.PartnerPluginConfig, sLogger *zap.SugaredLogger, recipient string, loadTestData [][]string, mu sync.Mutex) { c := client.NewClient(&ppConfig, sLogger) err := c.Start() if err != nil { - panic(err) + fmt.Errorf("error starting client: %v", err) + return } request := &accommodationv1alpha.AccommodationSearchRequest{ Header: nil, @@ -59,12 +88,8 @@ func main() { }, } - err = c.Start() - if err != nil { - panic(err) - } md := metadata.New(map[string]string{ - "recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", + "recipient": recipient, }) ctx := metadata.NewOutgoingContext(context.Background(), md) @@ -73,14 +98,91 @@ func main() { var header metadata.MD resp, err := ass.AccommodationSearch(ctx, request, grpc.Header(&header)) if err != nil { - log.Fatal(err) + sLogger.Errorf("error when performing search: %v", err) + return } + totalTime := time.Since(begin) + fmt.Printf("Total time|%s|%s\n", resp.Metadata.SearchId, totalTime) metadata := &internalmetadata.Metadata{} err = metadata.FromGrpcMD(header) if err != nil { - fmt.Print("error extracting metadata") + sLogger.Errorf("error extracting metadata: %v", err) } + + addToDataset(int64(i), totalTime.Milliseconds(), resp, metadata, loadTestData, mu) fmt.Printf("Received response after %s => ID: %s\n", time.Since(begin), resp.Metadata.SearchId) c.Shutdown() +} + +func addToDataset(counter int64, totalTime int64, resp *accommodationv1alpha.AccommodationSearchResponse, metadata *internalmetadata.Metadata, loadTestData [][]string, mu sync.Mutex) { + var data []string + var entries []struct { + Key string + Value int64 + } + // Populate the slice with map entries + for key, value := range metadata.Timestamps { + entries = append(entries, struct { + Key string + Value int64 + }{Key: key, Value: value}) + } + + // Sort the slice based on values + sort.Slice(entries, func(i, j int) bool { + return entries[i].Value < entries[j].Value + }) + lastValue := int64(0) + data = append(data, strconv.FormatInt(counter+1, 10)) + data = append(data, strconv.FormatInt(totalTime, 10)) + for _, entry := range entries { + + if entry.Key == "request-gateway-request" { + lastValue = entry.Value + continue //skip + } + if entry.Key == "processor-request" { + + //lastValue = entry.Value + continue //skip + } + fmt.Printf("%d|%s|%s|%d|%.2f\n", entry.Value, entry.Key, resp.Metadata.SearchId.GetValue(), entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime)) + + data = append(data, strconv.FormatInt(entry.Value-lastValue, 10)) + lastValue = entry.Value + } + + mu.Lock() + loadTestData[counter] = data + mu.Unlock() +} +func persistToCSV(dataset [][]string) { + // Open a new CSV file + file, err := os.Create("load_test_data.csv") + if err != nil { + fmt.Println("Error creating CSV file:", err) + return + } + defer file.Close() + + // Create a CSV writer + writer := csv.NewWriter(file) + defer writer.Flush() + + // Write the header row + header := []string{"Request ID", "Total Time", "distributor -> matrix", "matrix -> provider", "provider -> matrix", "matrix -> distributor", "process-response"} + if err := writer.Write(header); err != nil { + fmt.Println("Error writing header:", err) + return + } + + // Write the load test data rows + for _, dataRow := range dataset { + if err := writer.Write(dataRow); err != nil { + fmt.Println("Error writing data row:", err) + return + } + } + fmt.Println("CSV file created successfully.") } diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 0de9f7d8..7ce3d913 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "time" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/formatting" @@ -64,6 +65,7 @@ func (m *messenger) StartReceiver() (string, error) { syncer.OnEventType(C4TMessage, func(source mautrix.EventSource, evt *event.Event) { msg := evt.Content.Parsed.(*CaminoMatrixMessage) + t := time.Now() completeMsg, err, completed := m.msgAssembler.AssembleMessage(*msg) if err != nil { m.logger.Errorf("failed to assemble message: %v", err) @@ -72,7 +74,8 @@ func (m *messenger) StartReceiver() (string, error) { if !completed { return // partial messages are not passed down to the msgChannel } - completeMsg.Metadata.Stamp(fmt.Sprintf("%s-%s", m.Checkpoint(), "received")) + completeMsg.Metadata.StampOn(fmt.Sprintf("matrix-sent-%s", completeMsg.MsgType), evt.Timestamp) + completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s-%s", m.Checkpoint(), "received", completeMsg.MsgType), t.UnixMilli()) m.msgChannel <- messaging.Message{ Metadata: completeMsg.Metadata, Content: completeMsg.Content, diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index e39d0ddb..b0879e40 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -80,5 +80,13 @@ func (m *Metadata) Stamp(checkpoint string) { if m.Timestamps == nil { m.Timestamps = make(map[string]int64) } - m.Timestamps[checkpoint] = time.Now().UnixMilli() + idx := len(m.Timestamps) // for analysis' sake, we want to know the order of the checkpoints + m.Timestamps[fmt.Sprintf("%d-%s", idx, checkpoint)] = time.Now().UnixMilli() +} +func (m *Metadata) StampOn(checkpoint string, t int64) { + if m.Timestamps == nil { + m.Timestamps = make(map[string]int64) + } + idx := len(m.Timestamps) // for analysis' sake, we want to know the order of the checkpoints + m.Timestamps[fmt.Sprintf("%d-%s", idx, checkpoint)] = t } diff --git a/scripts/printReport.sh b/scripts/printReport.sh new file mode 100755 index 00000000..37fa31ca --- /dev/null +++ b/scripts/printReport.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +if [ $# -eq 0 ]; then + echo "Usage: $0 " + exit 1 +fi + +filename=$1 + +if [ ! -f "$filename" ]; then + echo "File not found: $filename" + exit 1 +fi + +# Read data from the file into an array +mapfile -t data < "$filename" + +# Function to calculate the average of an array +calculate_average() { + local sum=0 + local count=${#data[@]} + for value in "${data[@]}"; do + sum=$((sum + value)) + done + echo "scale=2; $sum / $count" | bc +} + +# Sort the array +sorted_data=($(for i in "${data[@]}"; do echo $i; done | sort -n)) + +# Calculate min, max, median, and average +min=${sorted_data[0]} +max=${sorted_data[-1]} +median=${sorted_data[${#sorted_data[@]}/2]} +average=$(calculate_average) +total=${#data[@]} +# Print the results +echo "Min: $min" +echo "Max: $max" +echo "Median: $median" +echo "Average: $average" +echo "Total: $total" diff --git a/scripts/sendXRequests.sh b/scripts/sendXRequests.sh index 71025558..9e1a4849 100755 --- a/scripts/sendXRequests.sh +++ b/scripts/sendXRequests.sh @@ -1,20 +1,19 @@ #!/bin/bash +if ! [[ "$0" =~ scripts/sendXRequests.sh ]]; then + echo "must be run from repository root" + exit 255 +fi + # Check if the number of arguments provided is correct if [ "$#" -ne 1 ]; then echo "Usage: $0 " exit 1 fi +# Store the argument in a variable times_to_run=$1 -go_file_path="examples/rpc/client.go" -# Loop to run the Go file X times in parallel -for ((i=1; i<=$times_to_run; i++)) -do - echo "Sending $i request..." - go run $go_file_path & -done - -# Wait for all background processes to finish -wait \ No newline at end of file +# Change the path to your Go file below +go_file_path="examples/rpc/client.go" +go run $go_file_path $times_to_run