Skip to content

Commit

Permalink
Merge pull request #13 from venkataanil/netpolproxy
Browse files Browse the repository at this point in the history
netpol proxy for kube-burner
  • Loading branch information
krishvoor authored Oct 16, 2024
2 parents 18d66a2 + 5227ccc commit 6bcc8e3
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ENGINE ?= podman
ORG ?= cloud-bulldozer
REGISTRY ?= quay.io
REG = $(REGISTRY)/$(ORG)
REPOS = perfapp etcd-perf nginx frr netpol-scraper nginxecho eipvalidator sampleapp netpolvalidator
REPOS = perfapp etcd-perf nginx frr netpol-scraper nginxecho eipvalidator sampleapp netpolvalidator netpolproxy

all: build push

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ Container Images for OCP Perf&Scale
- eipvalidator: quay.io/cloud-bulldozer/eipvalidator:latest
- nginxecho: quay.io/cloud-bulldozer/nginxecho:latest
- netpolvalidator: quay.io/cloud-bulldozer/netpolvalidator:latest
- netpolproxy: quay.io/cloud-bulldozer/netpolproxy:latest
12 changes: 12 additions & 0 deletions netpolproxy/Containerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM registry.access.redhat.com/ubi8/ubi-minimal:latest

RUN microdnf install golang -y \
&& microdnf clean all

WORKDIR /app
COPY go.mod *.go ./
RUN go mod download
RUN go mod tidy
RUN CGO_ENABLED=0 GOOS=linux go build -o /netpolproxy
EXPOSE 9002
CMD ["/netpolproxy"]
34 changes: 34 additions & 0 deletions netpolproxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kube Burner Network Policy Proxy Pod for Connection Testing and Latency Measurement
Kube-burner employs a proxy pod to interact with client pods, which helps streamline communication and avoid the need for direct routes or executing commands on each client pod. This is particularly beneficial during large-scale tests, where a significant number of client pods are created. The proxy pod facilitates both the delivery of connection information to client pods and the retrieval of results, reducing overhead and complexity.

The proxy pod is built using a specific image and listens on port 9002. This port is enabled on worker nodes by default via AWS security groups. The proxy pod is equipped with 5 handlers and operates across two primary flows:

- Sending connection information to client pods
- Retrieving connection results from client pods

### Workflow:
- Initialization and Connection Setup:
+ The proxy pod initially waits to receive connection information from Kube-burner.
+ Once Kube-burner sends the connection information via the `/initiate` endpoint, the proxy pod uses 20 parallel Goroutines to distribute this information to all the client pods efficiently.
+ Kube-burner waits for up to 30 minutes, periodically checking every 5 seconds using the `/checkConnectionsStatus` endpoint to confirm whether the proxy pod has successfully delivered the connection details to all client pods.

- Retrieving Test Results:
+ After the testing phase is complete, Kube-burner triggers the `/stop` endpoint on the proxy pod. This signals the proxy pod to begin retrieving results from all the client pods.
+ Similar to the connection phase, the proxy pod employs 20 parallel Goroutines to gather results from the client pods.
+ Kube-burner again waits for a maximum of 30 minutes, checking every 5 seconds via the `/checkStopStatus` endpoint to ensure that the proxy pod has retrieved results from all client pods.
+ Once all results are collected, Kube-burner retrieves the final data by querying the `/results` endpoint on the proxy pod.


Log from one of the client pods

```shell
$ oc logs -n network-policy-proxy network-policy-proxy -f
2024/10/01 11:18:02 Client server started on :9002
2024/10/01 11:18:02 Wait for Connections from kube-burner..
2024/10/01 11:18:25 Number of connections got from kube-burner 2
2024/10/01 11:18:25 Got connections from kube-burner, sending them to 2 pods
2024/10/01 11:18:25 Connections sent to http://10.128.2.50:9001/check successfully
2024/10/01 11:18:25 Connections sent to http://10.128.2.51:9001/check successfully
2024/10/01 11:18:47 Address: 10.128.2.50, Port: 8080, IngressIdx: 0, NpName: ingress-0-1 Timestamp: 2024-10-01 11:18:33.247061614 +0000 UTC
2024/10/01 11:18:47 Address: 10.128.2.51, Port: 8080, IngressIdx: 0, NpName: ingress-1-1 Timestamp: 2024-10-01 11:18:33.248359122 +0000 UTC
```
3 changes: 3 additions & 0 deletions netpolproxy/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module example.com/netpolproxy

go 1.21.10
234 changes: 234 additions & 0 deletions netpolproxy/netpolproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)

type connection struct {
Addresses []string `json:"addresses"`
Ports []int32 `json:"ports"`
Netpol string `json:"netpol"`
}

type connTest struct {
Address string `json:"address"`
Port int `json:"port"`
IngressIdx int `json:"ingressidx"`
NpName string `json:"npname"`
Timestamp time.Time `json:"timestamp"`
}

const (
podPort = 9001
parallelConnections = 20
)

var (
connections = make(map[string][]connection)
connWg sync.WaitGroup
sendConnectionsDone bool
checkStopDone bool
sendConnMutex sync.Mutex
checkStopMutex sync.Mutex
resWg sync.WaitGroup
clusterResults = make(map[string][]connTest)
resultsMutex sync.Mutex
doneInitiate = make(chan bool)
)

type ProxyResponse struct {
Result bool `json:"result"`
}

// Send connections to all pods
func sendNetpolInfo(pod string, connInfo []connection, semaphore chan struct{}) {
defer connWg.Done()
defer func() { <-semaphore }()
data, err := json.Marshal(connInfo)
if err != nil {
log.Fatalf("Failed to marshal payload: %v", err)
}

url := fmt.Sprintf("http://%s:%d/check", pod, podPort)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
log.Fatalf("Failed to send request: %v", err)
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
log.Printf("Connections sent to %s successfully", url)
}
}

// Wait for connections from kube-burner. Once it recieves connections,
// send them to client pods using 20 threads.
func sendConnections() {
log.Printf("Wait for Connections from kube-burner..")
<-doneInitiate
log.Printf("Got connections from kube-burner, sending them to %d pods", len(connections))
semaphore := make(chan struct{}, parallelConnections)
for pod, connInfo := range connections {
semaphore <- struct{}{}
connWg.Add(1)
go sendNetpolInfo(pod, connInfo, semaphore)
}
connWg.Wait()
sendConnMutex.Lock()
sendConnectionsDone = true
sendConnMutex.Unlock()
}

// kube-burner periodically checks if this proxy pod sent connections to all the client pods or not.
// It replies with "true" once connections are succesfully sent to all client pods.
func handleCheckConnectionsStatus(w http.ResponseWriter, r *http.Request) {
sendConnMutex.Lock()
defer sendConnMutex.Unlock()
result := false

if sendConnectionsDone {
result = true
}
response := ProxyResponse{Result: result}
err := json.NewEncoder(w).Encode(response)
if err != nil {
log.Println("Error encoding response:", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
}

// kube-burner periodically checks if this proxy pod retrived results from all the client pods or not.
// It replies with "true" once it received results from all client pods.
func handleCheckStopStatus(w http.ResponseWriter, r *http.Request) {
checkStopMutex.Lock()
defer checkStopMutex.Unlock()

result := false

if checkStopDone {
result = true
}
response := ProxyResponse{Result: result}
err := json.NewEncoder(w).Encode(response)
if err != nil {
log.Println("Error encoding response:", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
}

// Get connections from kube-burner
func handleInitiate(w http.ResponseWriter, r *http.Request) {
// Send an immediate response to the client
fmt.Fprintln(w, "Initiate Request received, processing...")

// Read data from the request
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Unable to read response body", http.StatusBadRequest)
return
}
err = json.Unmarshal(body, &connections)
if err != nil {
http.Error(w, "Unable to parse response body", http.StatusBadRequest)
return
}
r.Body.Close()
log.Printf("Number of connections got from kube-burner %d", len(connections))
doneInitiate <- true
}

// kube-burner requested to collect results from client pods
func handleStop(w http.ResponseWriter, r *http.Request) {
// Send an immediate response to the client
fmt.Fprintln(w, "Stop Request received, processing...")

// Read data from the request
_, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Unable to read response body", http.StatusBadRequest)
return
}
r.Body.Close()

// Get results from all pods
go getResults(connections)
}

// Get results from a single pod
func getPodResult(pod string, semaphore chan struct{}) {
defer resWg.Done()
defer func() { <-semaphore }()

url := fmt.Sprintf("http://%s:%d/results", pod, podPort)
// Retrieve the results
resp, err := http.Get(url)
if err != nil {
log.Fatalf("Failed to retrieve results: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Fatalf("Unexpected status code: %d", resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalf("Failed to read response body: %v", err)
}

var results []connTest
if err := json.Unmarshal(body, &results); err != nil {
log.Fatalf("Failed to unmarshal results: %v", err)
}
for _, res := range results {
log.Printf("Address: %s, Port: %d, IngressIdx: %v, NpName: %s Timestamp: %v\n", res.Address, res.Port, res.IngressIdx, res.NpName, res.Timestamp)
}
resultsMutex.Lock()
clusterResults[pod] = results
resultsMutex.Unlock()
}

// Get results from all pods
func getResults(cts map[string][]connection) {
semaphore := make(chan struct{}, parallelConnections)
for pod, _ := range cts {
semaphore <- struct{}{}
resWg.Add(1)
go getPodResult(pod, semaphore)
}
resWg.Wait()
checkStopMutex.Lock()
checkStopDone = true
checkStopMutex.Unlock()
}

func resultsHandler(w http.ResponseWriter, r *http.Request) {
resultsMutex.Lock()
defer resultsMutex.Unlock()
if err := json.NewEncoder(w).Encode(clusterResults); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

func main() {
// Send connections to all pods
go sendConnections()
go func() {
http.HandleFunc("/initiate", handleInitiate)
http.HandleFunc("/checkConnectionsStatus", handleCheckConnectionsStatus)
http.HandleFunc("/stop", handleStop)
http.HandleFunc("/checkStopStatus", handleCheckStopStatus)
http.HandleFunc("/results", resultsHandler)
log.Println("Client server started on :9002")
log.Fatal(http.ListenAndServe(":9002", nil))
}()

select {} // keep the client running
}

0 comments on commit 6bcc8e3

Please sign in to comment.