Skip to content

Commit

Permalink
Generate csv report when running example rpc client (#7)
Browse files Browse the repository at this point in the history
- add configurable flags for host/port/recipient
  • Loading branch information
knikos authored Feb 21, 2024
1 parent 2d69718 commit f5a603a
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 28 deletions.
134 changes: 118 additions & 16 deletions examples/rpc/client.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package main
import (
typesv1alpha "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha"
"context"
"encoding/csv"
"flag"
"fmt"
"log"
"os"
"sort"
"strconv"
"sync"
"time"

"buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha/accommodationv1alphagrpc"
Expand All @@ -20,27 +24,52 @@ import (
)

func main() {
var mu sync.Mutex
var wg sync.WaitGroup
var logger *zap.Logger
cfg := zap.NewDevelopmentConfig()
cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, _ = cfg.Build()
sLogger := logger.Sugar()
logger.Sync()

argsWithoutProg := os.Args[1:]
unencrypted := len(argsWithoutProg) == 0
times := flag.Int("requests", 1, "Repeat the request n times")
host := flag.String("host", "127.0.0.1", "Distributor bot host")
port := flag.Int("port", 9092, "Distributor bot port")
recipient := flag.String("recipient", "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", "Recipient address (format: @t-kopernikus[...]:matrix.camino.network")
caCertFile := flag.String("ca-cert-file", "", "CA certificate file (optional)")
flag.Parse()

ppConfig := config.PartnerPluginConfig{
Host: "localhost",
Port: 9092,
Unencrypted: unencrypted,
Host: *host,
Port: *port,
Unencrypted: *caCertFile == "",
}
ppConfig.CACertFile = *caCertFile

loadTestData := make([][]string, *times)
for i := 0; i < *times; i++ {
loadTestData[i] = make([]string, 6)
wg.Add(1)
go func(counter int) {
defer wg.Done()
createClientAndRunRequest(counter, ppConfig, sLogger, *recipient, loadTestData, mu)
}(i)
}
if !unencrypted {
ppConfig.CACertFile = argsWithoutProg[0]

wg.Wait()

if len(loadTestData) > 1 || len(loadTestData) == 1 && loadTestData[0][0] != "" { // otherwise no data have been recorded
persistToCSV(loadTestData)
}
}

func createClientAndRunRequest(i int, ppConfig config.PartnerPluginConfig, sLogger *zap.SugaredLogger, recipient string, loadTestData [][]string, mu sync.Mutex) {
c := client.NewClient(&ppConfig, sLogger)
err := c.Start()
if err != nil {
panic(err)
fmt.Errorf("error starting client: %v", err)
return
}
request := &accommodationv1alpha.AccommodationSearchRequest{
Header: nil,
Expand All @@ -59,12 +88,8 @@ func main() {
},
}

err = c.Start()
if err != nil {
panic(err)
}
md := metadata.New(map[string]string{
"recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network",
"recipient": recipient,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)

Expand All @@ -73,14 +98,91 @@ func main() {
var header metadata.MD
resp, err := ass.AccommodationSearch(ctx, request, grpc.Header(&header))
if err != nil {
log.Fatal(err)
sLogger.Errorf("error when performing search: %v", err)
return
}
totalTime := time.Since(begin)
fmt.Printf("Total time|%s|%s\n", resp.Metadata.SearchId, totalTime)
metadata := &internalmetadata.Metadata{}
err = metadata.FromGrpcMD(header)
if err != nil {
fmt.Print("error extracting metadata")
sLogger.Errorf("error extracting metadata: %v", err)
}

addToDataset(int64(i), totalTime.Milliseconds(), resp, metadata, loadTestData, mu)
fmt.Printf("Received response after %s => ID: %s\n", time.Since(begin), resp.Metadata.SearchId)
c.Shutdown()
}

func addToDataset(counter int64, totalTime int64, resp *accommodationv1alpha.AccommodationSearchResponse, metadata *internalmetadata.Metadata, loadTestData [][]string, mu sync.Mutex) {
var data []string
var entries []struct {
Key string
Value int64
}
// Populate the slice with map entries
for key, value := range metadata.Timestamps {
entries = append(entries, struct {
Key string
Value int64
}{Key: key, Value: value})
}

// Sort the slice based on values
sort.Slice(entries, func(i, j int) bool {
return entries[i].Value < entries[j].Value
})
lastValue := int64(0)
data = append(data, strconv.FormatInt(counter+1, 10))
data = append(data, strconv.FormatInt(totalTime, 10))
for _, entry := range entries {

if entry.Key == "request-gateway-request" {
lastValue = entry.Value
continue //skip
}
if entry.Key == "processor-request" {

//lastValue = entry.Value
continue //skip
}
fmt.Printf("%d|%s|%s|%d|%.2f\n", entry.Value, entry.Key, resp.Metadata.SearchId.GetValue(), entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime))

data = append(data, strconv.FormatInt(entry.Value-lastValue, 10))
lastValue = entry.Value
}

mu.Lock()
loadTestData[counter] = data
mu.Unlock()
}
func persistToCSV(dataset [][]string) {
// Open a new CSV file
file, err := os.Create("load_test_data.csv")
if err != nil {
fmt.Println("Error creating CSV file:", err)
return
}
defer file.Close()

// Create a CSV writer
writer := csv.NewWriter(file)
defer writer.Flush()

// Write the header row
header := []string{"Request ID", "Total Time", "distributor -> matrix", "matrix -> provider", "provider -> matrix", "matrix -> distributor", "process-response"}
if err := writer.Write(header); err != nil {
fmt.Println("Error writing header:", err)
return
}

// Write the load test data rows
for _, dataRow := range dataset {
if err := writer.Write(dataRow); err != nil {
fmt.Println("Error writing data row:", err)
return
}
}

fmt.Println("CSV file created successfully.")
}
5 changes: 4 additions & 1 deletion internal/matrix/matrix_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/formatting"
Expand Down Expand Up @@ -64,6 +65,7 @@ func (m *messenger) StartReceiver() (string, error) {

syncer.OnEventType(C4TMessage, func(source mautrix.EventSource, evt *event.Event) {
msg := evt.Content.Parsed.(*CaminoMatrixMessage)
t := time.Now()
completeMsg, err, completed := m.msgAssembler.AssembleMessage(*msg)
if err != nil {
m.logger.Errorf("failed to assemble message: %v", err)
Expand All @@ -72,7 +74,8 @@ func (m *messenger) StartReceiver() (string, error) {
if !completed {
return // partial messages are not passed down to the msgChannel
}
completeMsg.Metadata.Stamp(fmt.Sprintf("%s-%s", m.Checkpoint(), "received"))
completeMsg.Metadata.StampOn(fmt.Sprintf("matrix-sent-%s", completeMsg.MsgType), evt.Timestamp)
completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s-%s", m.Checkpoint(), "received", completeMsg.MsgType), t.UnixMilli())
m.msgChannel <- messaging.Message{
Metadata: completeMsg.Metadata,
Content: completeMsg.Content,
Expand Down
10 changes: 9 additions & 1 deletion internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,13 @@ func (m *Metadata) Stamp(checkpoint string) {
if m.Timestamps == nil {
m.Timestamps = make(map[string]int64)
}
m.Timestamps[checkpoint] = time.Now().UnixMilli()
idx := len(m.Timestamps) // for analysis' sake, we want to know the order of the checkpoints
m.Timestamps[fmt.Sprintf("%d-%s", idx, checkpoint)] = time.Now().UnixMilli()
}
func (m *Metadata) StampOn(checkpoint string, t int64) {
if m.Timestamps == nil {
m.Timestamps = make(map[string]int64)
}
idx := len(m.Timestamps) // for analysis' sake, we want to know the order of the checkpoints
m.Timestamps[fmt.Sprintf("%d-%s", idx, checkpoint)] = t
}
42 changes: 42 additions & 0 deletions scripts/printReport.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash

if [ $# -eq 0 ]; then
echo "Usage: $0 <filename>"
exit 1
fi

filename=$1

if [ ! -f "$filename" ]; then
echo "File not found: $filename"
exit 1
fi

# Read data from the file into an array
mapfile -t data < "$filename"

# Function to calculate the average of an array
calculate_average() {
local sum=0
local count=${#data[@]}
for value in "${data[@]}"; do
sum=$((sum + value))
done
echo "scale=2; $sum / $count" | bc
}

# Sort the array
sorted_data=($(for i in "${data[@]}"; do echo $i; done | sort -n))

# Calculate min, max, median, and average
min=${sorted_data[0]}
max=${sorted_data[-1]}
median=${sorted_data[${#sorted_data[@]}/2]}
average=$(calculate_average)
total=${#data[@]}
# Print the results
echo "Min: $min"
echo "Max: $max"
echo "Median: $median"
echo "Average: $average"
echo "Total: $total"
19 changes: 9 additions & 10 deletions scripts/sendXRequests.sh
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
#!/bin/bash

if ! [[ "$0" =~ scripts/sendXRequests.sh ]]; then
echo "must be run from repository root"
exit 255
fi

# Check if the number of arguments provided is correct
if [ "$#" -ne 1 ]; then
echo "Usage: $0 <number_of_requests>"
exit 1
fi

# Store the argument in a variable
times_to_run=$1
go_file_path="examples/rpc/client.go"

# Loop to run the Go file X times in parallel
for ((i=1; i<=$times_to_run; i++))
do
echo "Sending $i request..."
go run $go_file_path &
done

# Wait for all background processes to finish
wait
# Change the path to your Go file below
go_file_path="examples/rpc/client.go"
go run $go_file_path $times_to_run

0 comments on commit f5a603a

Please sign in to comment.