Skip to content

Commit

Permalink
🚀 TCP New checker type (#982)
Browse files Browse the repository at this point in the history
* 🚧 wip

* 🚧 new checker type

* 🚀 tcp checker

* 🚀 tcp checker

* 🚧 tcp tests

* 🚧 tcp test

* 🚧 tcp test

* ci: apply automated fixes

* 🚀 tcp machine

* ci: apply automated fixes

* 🚀

* 😱 major refactor

* 🤣 fix build

* ci: apply automated fixes

* chore: tcp request form input

* 🔥 tcp

* 🔥 remove useEffect

* 🚀 file upload

* ci: apply automated fixes

* 😱 slowly start migration

* ci: apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Maximilian Kaske <[email protected]>
  • Loading branch information
3 people authored Sep 16, 2024
1 parent 57c316f commit 7c889a5
Show file tree
Hide file tree
Showing 34 changed files with 4,221 additions and 1,156 deletions.
4 changes: 4 additions & 0 deletions apps/checker/.golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
linters:
enable-all: true
disable-all: false
fast: true
302 changes: 15 additions & 287 deletions apps/checker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand All @@ -12,35 +11,13 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/openstatushq/openstatus/apps/checker"
"github.com/openstatushq/openstatus/apps/checker/pkg/assertions"
"github.com/openstatushq/openstatus/apps/checker/handlers"

"github.com/openstatushq/openstatus/apps/checker/pkg/logger"
"github.com/openstatushq/openstatus/apps/checker/pkg/tinybird"
"github.com/openstatushq/openstatus/apps/checker/request"
"github.com/rs/zerolog/log"

backoff "github.com/cenkalti/backoff/v4"
)

type statusCode int

// We should export it
type PingResponse struct {
Body string `json:"body,omitempty"`
Headers string `json:"headers,omitempty"`
Region string `json:"region"`
RequestId int64 `json:"requestId,omitempty"`
WorkspaceId int64 `json:"workspaceId,omitempty"`
Latency int64 `json:"latency"`
Time int64 `json:"time"`
Timing checker.Timing `json:"timing"`
Status int `json:"status,omitempty"`
}

func (s statusCode) IsSuccessful() bool {
return s >= 200 && s < 300
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -71,274 +48,24 @@ func main() {

tinybirdClient := tinybird.NewClient(httpClient, tinyBirdToken)

router := gin.New()
router.POST("/checker", func(c *gin.Context) {
ctx := c.Request.Context()
dataSourceName := "ping_response__v8"
if c.GetHeader("Authorization") != fmt.Sprintf("Basic %s", cronSecret) {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}

if cloudProvider == "fly" {
// if the request has been routed to a wrong region, we forward it to the correct one.
region := c.GetHeader("fly-prefer-region")
if region != "" && region != flyRegion {
c.Header("fly-replay", fmt.Sprintf("region=%s", region))
c.String(http.StatusAccepted, "Forwarding request to %s", region)
return
}
}

var req request.CheckerRequest
if err := c.ShouldBindJSON(&req); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to decode checker request")
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
// We need a new client for each request to avoid connection reuse.
requestClient := &http.Client{
Timeout: time.Duration(req.Timeout) * time.Millisecond,
}
defer requestClient.CloseIdleConnections()

// Might be a more efficient way to do it
var i interface{} = req.RawAssertions
jsonBytes, _ := json.Marshal(i)
assertionAsString := string(jsonBytes)
if assertionAsString == "null" {
assertionAsString = ""
}

var called int
op := func() error {
called++
res, err := checker.Ping(ctx, requestClient, req)
if err != nil {
return fmt.Errorf("unable to ping: %w", err)
}
statusCode := statusCode(res.StatusCode)

var isSuccessfull bool = true
if len(req.RawAssertions) > 0 {
for _, a := range req.RawAssertions {
var assert request.Assertion
err = json.Unmarshal(a, &assert)
if err != nil {
// handle error
return fmt.Errorf("unable to unmarshal assertion: %w", err)
}
switch assert.AssertionType {
case request.AssertionHeader:
var target assertions.HeaderTarget
if err := json.Unmarshal(a, &target); err != nil {
return fmt.Errorf("unable to unmarshal IntTarget: %w", err)
}
isSuccessfull = isSuccessfull && target.HeaderEvaluate(res.Headers)

case request.AssertionTextBody:
var target assertions.StringTargetType
if err := json.Unmarshal(a, &target); err != nil {
return fmt.Errorf("unable to unmarshal IntTarget: %w", err)
}
isSuccessfull = isSuccessfull && target.StringEvaluate(res.Body)

case request.AssertionStatus:
var target assertions.StatusTarget
if err := json.Unmarshal(a, &target); err != nil {
return fmt.Errorf("unable to unmarshal IntTarget: %w", err)
}
isSuccessfull = isSuccessfull && target.StatusEvaluate(int64(res.StatusCode))
case request.AssertionJsonBody:
fmt.Println("assertion type", assert.AssertionType)
default:
fmt.Println("⚠️ Not Handled assertion type", assert.AssertionType)
}
}
} else {
isSuccessfull = statusCode.IsSuccessful()
}

// let's retry at least once if the status code is not successful.
if !isSuccessfull && called < 2 {
return fmt.Errorf("unable to ping: %v with status %v", res, res.StatusCode)
}

// it's in error if not successful
if isSuccessfull {
res.Error = 0
// Small trick to avoid sending the body at the moment to TB
res.Body = ""
} else {
res.Error = 1
}

res.Assertions = assertionAsString
// That part could be refactored
if !isSuccessfull && req.Status == "active" {
// Q: Why here we do not check if the status was previously active?
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "error",
StatusCode: res.StatusCode,
Region: flyRegion,
Message: res.Message,
CronTimestamp: req.CronTimestamp,
})
}
// Check if the status is degraded
if isSuccessfull && req.Status == "active" {
if req.DegradedAfter > 0 && res.Latency > req.DegradedAfter {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "degraded",
Region: flyRegion,
StatusCode: res.StatusCode,
CronTimestamp: req.CronTimestamp,
})
}
}
// We were in error and now we are successful don't check for degraded
if isSuccessfull && req.Status == "error" {
// Q: Why here we check the data before updating the status in this scenario?
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "active",
Region: flyRegion,
StatusCode: res.StatusCode,
CronTimestamp: req.CronTimestamp,
})
}
// if we were in degraded and now we are successful, we should update the status to active
if isSuccessfull && req.Status == "degraded" {
if req.DegradedAfter > 0 && res.Latency <= req.DegradedAfter {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "active",
Region: flyRegion,
StatusCode: res.StatusCode,
CronTimestamp: req.CronTimestamp,
})
}
}

if err := tinybirdClient.SendEvent(ctx, res, dataSourceName); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}

return nil
}

if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil {
if err := tinybirdClient.SendEvent(ctx, checker.PingData{
URL: req.URL,
Region: flyRegion,
Message: err.Error(),
CronTimestamp: req.CronTimestamp,
Timestamp: req.CronTimestamp,
MonitorID: req.MonitorID,
WorkspaceID: req.WorkspaceID,
Error: 1,
Assertions: assertionAsString,
Body: "",
}, dataSourceName); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}

if req.Status == "active" {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "error",
Message: err.Error(),
Region: flyRegion,
CronTimestamp: req.CronTimestamp,
})
}

}
h := &handlers.Handler{
Secret: cronSecret,
CloudProvider: cloudProvider,
Region: flyRegion,
TbClient: tinybirdClient,
}

c.JSON(http.StatusOK, gin.H{"message": "ok"})
})
router := gin.New()
router.POST("/checker", h.HTTPCheckerHandler)
router.POST("/checker/http", h.HTTPCheckerHandler)
router.POST("/checker/tcp", h.TCPHandler)
router.POST("/ping/:region", h.PingRegionHandler)
router.POST("/tcp/:region", h.TCPHandlerRegion)

router.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "pong", "fly_region": flyRegion})
})

router.POST("/ping/:region", func(c *gin.Context) {
dataSourceName := "check_response__v1"
region := c.Param("region")
if region == "" {
c.String(http.StatusBadRequest, "region is required")
return
}
fmt.Printf("Start of /ping/%s\n", region)

if c.GetHeader("Authorization") != fmt.Sprintf("Basic %s", cronSecret) {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}

if cloudProvider == "fly" {

if region != flyRegion {
c.Header("fly-replay", fmt.Sprintf("region=%s", region))
c.String(http.StatusAccepted, "Forwarding request to %s", region)
return
}
}
// We need a new client for each request to avoid connection reuse.
requestClient := &http.Client{
Timeout: 45 * time.Second,
}
defer requestClient.CloseIdleConnections()

var req request.PingRequest
if err := c.ShouldBindJSON(&req); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to decode checker request")
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
var res checker.Response
op := func() error {
r, err := checker.SinglePing(c.Request.Context(), requestClient, req)
if err != nil {
return fmt.Errorf("unable to ping: %w", err)
}

r.Region = flyRegion

headersAsString, err := json.Marshal(r.Headers)
if err != nil {
return err
}

tbData := PingResponse{
RequestId: req.RequestId,
WorkspaceId: req.WorkspaceId,
Status: r.Status,
Latency: r.Latency,
Body: r.Body,
Headers: string(headersAsString),
Time: r.Time,
Timing: r.Timing,
Region: r.Region,
}

res = r
if tbData.RequestId != 0 {
if err := tinybirdClient.SendEvent(ctx, tbData, dataSourceName); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}
}
return nil
}
if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
c.JSON(http.StatusOK, res)
})

httpServer := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%s", env("PORT", "8080")),
Handler: router,
Expand All @@ -354,6 +81,7 @@ func main() {
<-ctx.Done()
if err := httpServer.Shutdown(ctx); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to shutdown http server")

return
}
}
Expand Down
Loading

0 comments on commit 7c889a5

Please sign in to comment.