Skip to content

Commit

Permalink
Merge pull request #50 from MerzMax/feature/45_chat_rooms
Browse files Browse the repository at this point in the history
Feature/45 chat rooms
  • Loading branch information
martbock authored Feb 8, 2022
2 parents d9b66ac + db01b7b commit 4ddbdd6
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 48 deletions.
10 changes: 6 additions & 4 deletions grafana/dashboards/cadvisor-exporter.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"fiscalYearStartMonth": 0,
"gnetId": 14282,
"graphTooltip": 0,
"iteration": 1642865978052,
"iteration": 1643972821121,
"links": [],
"liveNow": false,
"panels": [
Expand Down Expand Up @@ -86,7 +86,7 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(container_cpu_usage_seconds_total{instance=~\"$host\",name=~\".+server.+\",name=~\".+\"}[5m])) *100",
"expr": "sum(rate(container_cpu_usage_seconds_total{instance=~\"$host\",name=~\".+server.+\",name=~\".+\"}[60s])) *100",
"hide": false,
"interval": "",
"legendFormat": "Server",
Expand Down Expand Up @@ -431,7 +431,8 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(container_network_receive_bytes_total{instance=~\"$host\",name=~\"$container\",name=~\".+\"}[5m])) by (name)",
"exemplar": true,
"expr": "sum(rate(container_network_receive_bytes_total{instance=~\"$host\",name=~\"$container\",name=~\".+\"}[60s])) by (name)",
"hide": false,
"interval": "",
"legendFormat": "{{name}}",
Expand Down Expand Up @@ -525,7 +526,8 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(container_network_transmit_bytes_total{instance=~\"$host\",name=~\"$container\",name=~\".+\"}[5m])) by (name)",
"exemplar": true,
"expr": "sum(rate(container_network_transmit_bytes_total{instance=~\"$host\",name=~\"$container\",name=~\".+\"}[60s])) by (name)",
"interval": "",
"legendFormat": "{{name}}",
"refId": "A"
Expand Down
25 changes: 18 additions & 7 deletions src/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Client struct {
MsgSize int
MsgFrequency int
MsgEvents chan<- *MessageEventEntry
Room string
}

func (client *Client) Start() error {
Expand All @@ -34,18 +35,28 @@ func (client *Client) Start() error {
if client.IsLoadTestClient {
client.id = uuid.New().String()
} else {
log.Printf("Client started in loadtest mode. Please input your id: ")
log.Printf("Client started in loadtest mode.")
log.Printf("Please input your id:")
input, err := consoleReader.ReadString('\n')
// convert CRLF to LF
client.id = strings.Replace(input, "\n", "", -1)
if err != nil || len(client.id) < 1 {
log.Printf("Failed to read the name input. Using default id: MuM")
log.Printf("Failed to read the name input. Using default name: MuM")
client.id = "MuM"
}

log.Printf("Please input your chat room:")
input, err = consoleReader.ReadString('\n')
// convert CRLF to LF
client.Room = strings.Replace(input, "\n", "", -1)
if err != nil || len(client.Room) < 1 {
log.Printf("Failed to read the chat room input. Using default id: test")
client.Room = "test"
}
}

// Connection Establishment
wsConnection, _, err := websocket.DefaultDialer.Dial(client.ServerUrl, nil)
wsConnection, _, err := websocket.DefaultDialer.Dial(client.ServerUrl+"/"+client.Room, nil)
if err != nil {
log.Fatal("Error connecting to Websocket Server:", err)
}
Expand Down Expand Up @@ -127,7 +138,7 @@ func (client *Client) receiveHandler(ctx context.Context, waitGroup *sync.WaitGr
continue
}

// If the loadtest mode is activated, there will be added a new message event with the metadata of this message.
// If the load test mode is activated, there will be added a new message event with the metadata of this message.
if client.IsLoadTestClient {
var msgEventEntry = MessageEventEntry{
ClientId: client.id,
Expand All @@ -137,9 +148,9 @@ func (client *Client) receiveHandler(ctx context.Context, waitGroup *sync.WaitGr
Type: Received,
}
client.MsgEvents <- &msgEventEntry

log.Printf("%v", message)
}

log.Printf("%v", message)
}
}
}
Expand Down Expand Up @@ -193,7 +204,7 @@ func (client *Client) sendHandler(ctx context.Context, waitGroup *sync.WaitGroup
return
}

// If the loadtest mode is activated, there will be added a new message event with the metadata of this message.
// If the load test mode is activated, there will be added a new message event with the metadata of this message.
if client.IsLoadTestClient {
var msgEventEntry = MessageEventEntry{
ClientId: client.id,
Expand Down
1 change: 1 addition & 0 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/go-echarts/go-echarts/v2 v2.2.4
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/montanaflynn/stats v0.6.6
github.com/prometheus/client_golang v1.11.0
Expand Down
2 changes: 2 additions & 0 deletions src/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down
77 changes: 45 additions & 32 deletions src/load-test-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/csv"
"flag"
"github.com/google/uuid"
"log"
"os"
"os/signal"
Expand All @@ -26,20 +27,26 @@ func main() {
msgSize := flag.Int("msg-size", 256,
"The size of the messages in bytes (just for load test mode)")

numOfClients := flag.Int("clients", 1,
"Number of clients that will be started (just for load test mode")
numOfRooms := flag.Int("rooms", 1,
"Number of chat rooms that will be initialized (just for load test mode)")

roomSize := flag.Int("room-size", 1,
"Number of clients that will be started per room (just for load test mode")

flag.Parse()

var msgEvents chan *client.MessageEventEntry
var cancelFuncs []*context.CancelFunc
waitGroup := &sync.WaitGroup{}
numOfClients := *numOfRooms * *roomSize

if !*loadTest {
*numOfClients = 1
*numOfRooms = 1
*roomSize = 1
numOfClients = 1
}

waitGroup.Add(*numOfClients)
waitGroup.Add(numOfClients)

var csvWriterCancelFunc *context.CancelFunc
csvWriterWaitGroup := &sync.WaitGroup{}
Expand All @@ -54,34 +61,40 @@ func main() {
csvWriterWaitGroup.Add(1)
}

// Create numOfClients clients that can chat
for i := 1; i <= *numOfClients; i++ {
log.Printf("Creating client number: %v / %v", i, *numOfClients)

// Listen to system interrupts -> program will be stopped
closeConnection := make(chan os.Signal, 1)
signal.Notify(closeConnection, os.Interrupt)

ctx, cancelFunc := context.WithCancel(context.Background())
cancelFuncs = append(cancelFuncs, &cancelFunc)

go func() {
chatClient := client.Client{
Context: ctx,
WaitGroup: waitGroup,
ServerUrl: *serverUrl,
CloseConnection: closeConnection,
IsLoadTestClient: *loadTest,
MsgFrequency: *msgFrequency,
MsgSize: *msgSize,
MsgEvents: msgEvents,
}

err := chatClient.Start()
if err != nil {
log.Fatalf("%v", err)
}
}()
// Create rooms
for i := 0; i < *numOfRooms; i++ {
room := uuid.New().String()

// Create clients per room
for j := 1; j <= *roomSize; j++ {
log.Printf("Creating client number: %v / %v", j, *roomSize)

// Listen to system interrupts -> program will be stopped
closeConnection := make(chan os.Signal, 1)
signal.Notify(closeConnection, os.Interrupt)

ctx, cancelFunc := context.WithCancel(context.Background())
cancelFuncs = append(cancelFuncs, &cancelFunc)

go func() {
chatClient := client.Client{
Context: ctx,
WaitGroup: waitGroup,
ServerUrl: *serverUrl,
CloseConnection: closeConnection,
IsLoadTestClient: *loadTest,
MsgFrequency: *msgFrequency,
MsgSize: *msgSize,
MsgEvents: msgEvents,
Room: room,
}

err := chatClient.Start()
if err != nil {
log.Fatalf("%v", err)
}
}()
}
}

// Listen to system interrupts -> program will be stopped
Expand Down
4 changes: 3 additions & 1 deletion src/server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ type Client struct {
wsConn *websocket.Conn
outgoing chan *MessageWrapper
waitGroup *sync.WaitGroup
room string
}

type MessageWrapper struct {
message *chat.Message
processingTimer *prometheus.Timer
room string
}

//HandleOutgoing sends outgoing messages to the client's websocket connection
Expand Down Expand Up @@ -69,7 +71,7 @@ func (client *Client) HandleIncoming(incoming chan<- *MessageWrapper) {
continue
}

wrapper := MessageWrapper{message: &message, processingTimer: timer}
wrapper := MessageWrapper{message: &message, processingTimer: timer, room: client.room}

incoming <- &wrapper
}
Expand Down
7 changes: 6 additions & 1 deletion src/server/clientsManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var incoming = make(chan *MessageWrapper, messageBufferSize)

// StartClient starts a client's incoming and outgoing message handlers
// and waits until the connection breaks to remove the client
func StartClient(wsConn *websocket.Conn) {
func StartClient(wsConn *websocket.Conn, room string) {
outgoing := make(chan *MessageWrapper, messageBufferSize)

waitGroup := sync.WaitGroup{}
Expand All @@ -31,6 +31,7 @@ func StartClient(wsConn *websocket.Conn) {
wsConn: wsConn,
outgoing: outgoing,
waitGroup: &waitGroup,
room: room,
}
clients = append(clients, &client)

Expand All @@ -56,6 +57,10 @@ func BroadcastMessages() {
chatHistory = append(chatHistory, wrapper.message)

for _, client := range clients {
if wrapper.room != client.room {
continue
}

// By providing a default case, we avoid blocking the main broadcasting loop
// in case the buffer of the outgoing channel is full.
select {
Expand Down
3 changes: 2 additions & 1 deletion src/server/demo.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
return
}

const socket = new WebSocket(`ws://${document.location.host}/ws`)
const socket = new WebSocket(`ws://${document.location.host}/ws/demo`)
const userIdInput = document.getElementById('userIdInput')
const messageInput = document.getElementById('messageInput')

// RECEIVING MESSAGES
socket.onopen = function (event) {
displayStatusMessage('Connection established!', true)
displayStatusMessage('Automatically connected to the `demo` chat room.', true)
}

socket.onclose = function (event) {
Expand Down
12 changes: 10 additions & 2 deletions src/server/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
Expand All @@ -18,12 +19,13 @@ func main() {
go BroadcastMessages()

// Register separate ServeMux instances for public endpoints and internal metrics
publicMux := http.NewServeMux()
publicMux := mux.NewRouter()
internalMux := http.NewServeMux()

// Register public endpoints
publicMux.HandleFunc("/", demoHandler)
publicMux.HandleFunc("/ws", wsHandler)
publicMux.HandleFunc("/ws/{room}", wsHandler)

// Register Prometheus endpoint
internalMux.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -60,13 +62,19 @@ func main() {

// Event handler for the /ws endpoint
func wsHandler(writer http.ResponseWriter, req *http.Request) {

log.Println("Got new connection")

vars := mux.Vars(req)
room := vars["room"]

wsConn, err := upgrader.Upgrade(writer, req, nil)
if err != nil {
log.Print("Cannot upgrade to websocket connection:", err)
return
}

StartClient(wsConn)
StartClient(wsConn, room)
}

// Handles the / endpoint and serves the demo html chat client
Expand Down

0 comments on commit 4ddbdd6

Please sign in to comment.