Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitoring endpoint #34

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions HadesAPI/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

var BuildQueue *queue.Queue
var MonitorClient *MonitoringClient

type HadesAPIConfig struct {
APIPort uint `env:"API_PORT,notEmpty" envDefault:"8080"`
Expand All @@ -35,12 +36,19 @@ func main() {
return
}

MonitorClient, err = NewMonitoringClient(cfg.RabbitMQConfig.Url, cfg.RabbitMQConfig.User, cfg.RabbitMQConfig.Password)
if err != nil {
log.WithError(err).Fatal("Failed to connect to RabbitMQ Management API")
return
}

log.Infof("Starting HadesAPI on port %d", cfg.APIPort)
gin.SetMode(gin.ReleaseMode)

r := gin.Default()
r.GET("/ping", ping)
r.POST("/build", AddBuildToQueue)
r.GET("/monitoring", MonitoringQueue)

log.Panic(r.Run(fmt.Sprintf(":%d", cfg.APIPort)))
}
130 changes: 130 additions & 0 deletions HadesAPI/queue_monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/Mtze/HadesCI/shared/payload"
log "github.com/sirupsen/logrus"
)

const monitoring_url = "http://%s:15672/api/queues/%%2F/%s" // %%2F is url encoded `/` for default vhost

type MonitoringValues struct {
MessageSize int `json:"message_size"`
ConsumerSize int `json:"consumer_size"`
Messages []payload.QueuePayload `json:"messages"`
}

type MonitoringClient struct {
host string
user string
pass string
}

func NewMonitoringClient(host, user, pass string) (*MonitoringClient, error) {
u, err := url.Parse(host)
if err != nil {
log.WithError(err).Error("error parsing RabbitMQ URL")
return nil, err
}
return &MonitoringClient{u.Host, user, pass}, nil
}

func (m *MonitoringClient) getSizes() (message_size, consumer_size int) {
url := fmt.Sprintf(monitoring_url, m.host, "builds")
log.Debug("Getting queue size from ", url)

req, _ := http.NewRequest("GET", url, nil)
req.SetBasicAuth(m.user, m.pass)

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
log.WithError(err).Error("error getting queue size")
return -1, -1
}
defer resp.Body.Close()
// Decode the JSON response into a struct
var queueInfo struct {
Messages int `json:"messages"`
Consumers int `json:"consumers"`
}
if err := json.NewDecoder(resp.Body).Decode(&queueInfo); err != nil {
log.WithError(err).Error("error decoding queue information")
return -1, -1
}
return queueInfo.Messages, queueInfo.Consumers
}

func (m *MonitoringClient) GetQueueState() MonitoringValues {
msg_size, cons_size := m.getSizes()
// No messages in queue, save extra request
if msg_size < 1 {
return MonitoringValues{
MessageSize: msg_size,
ConsumerSize: cons_size,
}
}
url := fmt.Sprintf(monitoring_url+"/get", m.host, "builds")

req_payload := struct {
Count int `json:"count"`
AckMode string `json:"ackmode"`
Encoding string `json:"encoding"`
}{
Count: msg_size,
AckMode: "ack_requeue_true",
Encoding: "auto",
}

// Marshal the struct into JSON
jsonValue, err := json.Marshal(req_payload)
if err != nil {
log.Fatal(err)
}

req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jsonValue))
req.SetBasicAuth(m.user, m.pass)
req.Header.Set("Content-Type", "application/json")

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
log.WithError(err).Error("error getting queue size")
return MonitoringValues{}
}
defer resp.Body.Close()
// Decode the JSON response into a struct
var messages []struct {
PayloadString string `json:"payload"`
}
if err := json.NewDecoder(resp.Body).Decode(&messages); err != nil {
log.WithError(err).Error("error decoding queue information")
return MonitoringValues{}
}
// Extracting just the payloads
var payloads []payload.QueuePayload
for _, message := range messages {
var payload payload.QueuePayload
err := json.Unmarshal([]byte(message.PayloadString), &payload)
if err != nil {
log.WithError(err).Error("error decoding queue information")
continue
}
// Filter confidential information
payload.Metadata = map[string]string{}
for i := range payload.Steps {
payload.Steps[i].Metadata = map[string]string{}
}
payloads = append(payloads, payload)
}
return MonitoringValues{
MessageSize: msg_size,
ConsumerSize: cons_size,
Messages: payloads,
}
}
5 changes: 5 additions & 0 deletions HadesAPI/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ func AddBuildToQueue(c *gin.Context) {
log.Debug("Received build request ", payload)
BuildQueue.Enqueue(c.Request.Context(), payload.QueuePayload, uint8(payload.Priority))
}

func MonitoringQueue(c *gin.Context) {
state := MonitorClient.GetQueueState()
c.JSON(http.StatusOK, state)
}