Skip to content

Commit

Permalink
🔥 improve tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultleouay committed Oct 3, 2024
1 parent 16f2459 commit 6a4c06e
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 79 deletions.
2 changes: 1 addition & 1 deletion apps/checker/handlers/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (h Handler) HTTPCheckerHandler(c *gin.Context) {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}

if req.Status == "active" {
if req.Status != "error" {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "error",
Expand Down
1 change: 1 addition & 0 deletions apps/checker/handlers/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (h Handler) PingRegionHandler(c *gin.Context) {
}

res = r
res.Region = h.Region

if tbData.RequestId != 0 {
if err := h.TbClient.SendEvent(ctx, tbData, dataSourceName); err != nil {
Expand Down
134 changes: 103 additions & 31 deletions apps/checker/handlers/tcp.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handlers

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
Expand All @@ -22,6 +23,21 @@ type TCPResponse struct {
Timing checker.TCPResponseTiming `json:"timing"`
}

// Only used for Tinybird
type TCPData struct {
Timing string `json:"timing"`
ErrorMessage string `json:"error"`
Region string `json:"region"`

RequestId int64 `json:"requestId,omitempty"`
WorkspaceID int64 `json:"workspaceId"`
MonitorID int64 `json:"monitorId"`
Timestamp int64 `json:"timestamp"`
Latency int64 `json:"latency"`

Error uint8 `json:"errorMessage"`
}

func (h Handler) TCPHandler(c *gin.Context) {
ctx := c.Request.Context()
dataSourceName := "tcp_response__v0"
Expand Down Expand Up @@ -50,13 +66,15 @@ func (h Handler) TCPHandler(c *gin.Context) {

return
}

workspaceId, err := strconv.ParseInt(req.WorkspaceID, 10, 64)

if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})

return
}

monitorId, err := strconv.ParseInt(req.MonitorID, 10, 64)

if err != nil {
Expand All @@ -66,6 +84,7 @@ func (h Handler) TCPHandler(c *gin.Context) {
}

var called int

op := func() error {
called++
res, err := checker.PingTcp(int(req.Timeout), req.URL)
Expand All @@ -74,18 +93,24 @@ func (h Handler) TCPHandler(c *gin.Context) {
return fmt.Errorf("unable to check tcp %s", err)
}

r := TCPResponse{
WorkspaceID: workspaceId,
Timestamp: req.CronTimestamp,
Timing: checker.TCPResponseTiming{
TCPStart: res.TCPStart,
TCPDone: res.TCPDone,
},
Region: h.Region,
MonitorID: monitorId,
timingAsString, err := json.Marshal(res)
if err != nil {
return fmt.Errorf("error while parsing timing data %s: %w", req.URL, err)
}

latency := res.TCPDone - res.TCPStart

data := TCPData{
WorkspaceID: workspaceId,
Timestamp: req.CronTimestamp,
Error: 0,
ErrorMessage: "",
Region: h.Region,
MonitorID: monitorId,
Timing: string(timingAsString),
Latency: latency,
}

if req.Status == "active" && req.DegradedAfter > 0 && latency > req.DegradedAfter {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Expand All @@ -105,33 +130,46 @@ func (h Handler) TCPHandler(c *gin.Context) {
}

if req.Status == "error" {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "active",
Region: h.Region,
CronTimestamp: req.CronTimestamp,
})
if req.DegradedAfter == 0 || (req.DegradedAfter > 0 && latency < req.DegradedAfter) {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "active",
Region: h.Region,
CronTimestamp: req.CronTimestamp,
})
}

if req.DegradedAfter > 0 && latency > req.DegradedAfter {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "degraded",
Region: h.Region,
CronTimestamp: req.CronTimestamp,
})
}

}

if err := h.TbClient.SendEvent(ctx, r, dataSourceName); err != nil {
if err := h.TbClient.SendEvent(ctx, data, dataSourceName); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}

return nil
}

if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil {
if err := h.TbClient.SendEvent(ctx, TCPResponse{
WorkspaceID: workspaceId,
Timestamp: req.CronTimestamp,
Error: err.Error(),
Region: h.Region,
MonitorID: monitorId,
if err := h.TbClient.SendEvent(ctx, TCPData{
WorkspaceID: workspaceId,
Timestamp: req.CronTimestamp,
ErrorMessage: err.Error(),
Region: h.Region,
MonitorID: monitorId,
Error: 1,
}, dataSourceName); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}

if req.Status == "active" {
if req.Status != "error" {
checker.UpdateStatus(ctx, checker.UpdateData{
MonitorId: req.MonitorID,
Status: "error",
Expand All @@ -152,11 +190,13 @@ func (h Handler) TCPHandlerRegion(c *gin.Context) {
region := c.Param("region")
if region == "" {
c.String(http.StatusBadRequest, "region is required")

return
}

if c.GetHeader("Authorization") != fmt.Sprintf("Basic %s", h.Secret) {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})

return
}

Expand All @@ -166,32 +206,42 @@ func (h Handler) TCPHandlerRegion(c *gin.Context) {
if region != "" && region != h.Region {
c.Header("fly-replay", fmt.Sprintf("region=%s", region))
c.String(http.StatusAccepted, "Forwarding request to %s", region)

return
}
}

var req request.TCPCheckerRequest

if err := c.ShouldBindJSON(&req); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to decode checker request")
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})

return
}

workspaceId, err := strconv.ParseInt(req.WorkspaceID, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})

return
}

monitorId, err := strconv.ParseInt(req.MonitorID, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})

return
}

var called int

var response TCPResponse

op := func() error {
called++
res, err := checker.PingTcp(int(req.Timeout), req.URL)

if err != nil {
return fmt.Errorf("unable to check tcp %s", err)
}
Expand All @@ -207,25 +257,47 @@ func (h Handler) TCPHandlerRegion(c *gin.Context) {
MonitorID: monitorId,
}

timingAsString, err := json.Marshal(res)
if err != nil {
return fmt.Errorf("error while parsing timing data %s: %w", req.URL, err)
}

latency := res.TCPDone - res.TCPStart

data := TCPData{
WorkspaceID: workspaceId,
Timestamp: req.CronTimestamp,
Error: 0,
ErrorMessage: "",
Region: h.Region,
MonitorID: monitorId,
Timing: string(timingAsString),
Latency: latency,
RequestId: req.RequestId,
}

if req.RequestId != 0 {
if err := h.TbClient.SendEvent(ctx, response, dataSourceName); err != nil {
if err := h.TbClient.SendEvent(ctx, data, dataSourceName); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}
}

return nil
}

if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil {
if err := h.TbClient.SendEvent(ctx, TCPResponse{
WorkspaceID: workspaceId,
Timestamp: req.CronTimestamp,
Error: err.Error(),
Region: h.Region,
MonitorID: monitorId,
if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil && req.RequestId != 0 {
if err := h.TbClient.SendEvent(ctx, TCPData{
WorkspaceID: workspaceId,
Timestamp: req.CronTimestamp,
ErrorMessage: err.Error(),
Region: h.Region,
MonitorID: monitorId,
Error: 1,
RequestId: req.RequestId,
}, dataSourceName); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird")
}
}

c.JSON(http.StatusOK, response)
}
1 change: 1 addition & 0 deletions apps/checker/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Response struct {
Headers map[string]string `json:"headers,omitempty"`
Body string `json:"body,omitempty"`
Error string `json:"error,omitempty"`
Region string `json:"region"`
Latency int64 `json:"latency"`
Timestamp int64 `json:"timestamp"`
Status int `json:"status,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect, test } from "bun:test";

import { api } from "../index";
import { api } from "../../index";

import { afterEach, mock } from "bun:test";

Expand All @@ -25,9 +25,9 @@ test("Create a single check ", async () => {
Promise.resolve(
new Response(
'{"status":200,"latency":100,"body":"Hello World","headers":{"Content-Type":"application/json"},"timestamp":1234567890,"timing":{"dnsStart":1,"dnsDone":2,"connectStart":3,"connectDone":4,"tlsHandshakeStart":5,"tlsHandshakeDone":6,"firstByteStart":7,"firstByteDone":8,"transferStart":9,"transferDone":10},"region":"ams"}',
{ status: 200, headers: { "content-type": "application/json" } },
),
),
{ status: 200, headers: { "content-type": "application/json" } }
)
)
);

const res = await api.request("/check", {
Expand Down Expand Up @@ -94,9 +94,9 @@ test.todo("Create a multiple check ", async () => {
Promise.resolve(
new Response(
'{"status":200,"latency":100,"body":"Hello World","headers":{"Content-Type":"application/json"},"timestamp":1234567890,"timing":{"dnsStart":1,"dnsDone":2,"connectStart":3,"connectDone":4,"tlsHandshakeStart":5,"tlsHandshakeDone":6,"firstByteStart":7,"firstByteDone":8,"transferStart":9,"transferDone":10},"region":"ams"}',
{ status: 200, headers: { "content-type": "application/json" } },
),
),
{ status: 200, headers: { "content-type": "application/json" } }
)
)
);

const res = await api.request("/check", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { createRoute, type z } from "@hono/zod-openapi";
import { db } from "@openstatus/db";
import { check } from "@openstatus/db/src/schema/check";
import percentile from "percentile";
import { env } from "../../env";
import { openApiErrorResponses } from "../../libs/errors/openapi-error-responses";
import type { checkAPI } from "./index";
import { env } from "../../../env";
import { openApiErrorResponses } from "../../../libs/errors/openapi-error-responses";
import type { checkAPI } from "../index";
import {
AggregatedResponseSchema,
AggregatedResult,
Expand All @@ -18,7 +18,7 @@ const postRoute = createRoute({
method: "post",
tags: ["page"],
description: "Run a single check",
path: "/",
path: "/http",
request: {
body: {
description: "The run request to create",
Expand All @@ -42,7 +42,7 @@ const postRoute = createRoute({
},
});

export function registerPostCheck(api: typeof checkAPI) {
export function registerHTTPPostCheck(api: typeof checkAPI) {
return api.openapi(postRoute, async (c) => {
const data = c.req.valid("json");
const workspaceId = Number(c.get("workspaceId"));
Expand Down Expand Up @@ -157,10 +157,10 @@ function getTiming(data: z.infer<typeof ResponseSchema>[]): ReturnGetTiming {
prev.dns.push(curr.timing.dnsDone - curr.timing.dnsStart);
prev.connect.push(curr.timing.connectDone - curr.timing.connectStart);
prev.tls.push(
curr.timing.tlsHandshakeDone - curr.timing.tlsHandshakeStart,
curr.timing.tlsHandshakeDone - curr.timing.tlsHandshakeStart
);
prev.firstByte.push(
curr.timing.firstByteDone - curr.timing.firstByteStart,
curr.timing.firstByteDone - curr.timing.firstByteStart
);
prev.transfer.push(curr.timing.transferDone - curr.timing.transferStart);
prev.latency.push(curr.latency);
Expand All @@ -173,7 +173,7 @@ function getTiming(data: z.infer<typeof ResponseSchema>[]): ReturnGetTiming {
firstByte: [],
transfer: [],
latency: [],
} as ReturnGetTiming,
} as ReturnGetTiming
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { z } from "zod";
import { MonitorSchema } from "../monitors/schema";
import { MonitorSchema } from "../../monitors/schema";

export const CheckSchema = MonitorSchema.pick({
url: true,
Expand Down
4 changes: 2 additions & 2 deletions apps/server/src/v1/check/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ import { OpenAPIHono } from "@hono/zod-openapi";
import type { Variables } from "../index";

import { handleZodError } from "../../libs/errors";
import { registerPostCheck } from "./post";
import { registerHTTPPostCheck } from "./http/post";

const checkAPI = new OpenAPIHono<{ Variables: Variables }>({
defaultHook: handleZodError,
});

registerPostCheck(checkAPI);
registerHTTPPostCheck(checkAPI);

export { checkAPI };
Loading

0 comments on commit 6a4c06e

Please sign in to comment.