Skip to content

Commit

Permalink
improve /health, refactor error handling
Browse files Browse the repository at this point in the history
fixes #1
  • Loading branch information
YaroShkvorets committed Feb 28, 2024
1 parent 86af520 commit 52deb6e
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 97 deletions.
87 changes: 21 additions & 66 deletions controllers/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,28 @@ package controllers
import (
"blob-service/dto"
"blob-service/internal"
pbbl "blob-service/pb/pinax/ethereum/blobs/v1"
"blob-service/services"
"context"
"encoding/binary"
"fmt"
"strconv"
"strings"
"time"

pbkv "github.com/streamingfast/substreams-sink-kv/pb/substreams/sink/kv/v1"

"github.com/eosnationftw/eosn-base-api/helper"
"github.com/eosnationftw/eosn-base-api/response"
"github.com/gin-gonic/gin"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
NOT_FOUND_SLOT = "slot_not_found" // no slot found
INVALID_SLOT = "invalid_slot" // invalid slot
)

type BlobsController struct {
sinkClient pbkv.KvClient
blobsService *services.BlobsService
}

func NewBlobsController(sinkClient pbkv.KvClient) *BlobsController {
return &BlobsController{sinkClient: sinkClient}
func NewBlobsController(blobsService *services.BlobsService) *BlobsController {
return &BlobsController{blobsService: blobsService}
}

type blobsBySlotRetType []*dto.Blob

func (bc *BlobsController) parseBlockId(ctx context.Context, block_id string) (uint64, error) {
if block_id == "head" {
resp, err := bc.sinkClient.Get(ctx, &pbkv.GetRequest{Key: "head"})
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(resp.GetValue()), nil
}

if block_id[:2] == "0x" {
resp, err := bc.sinkClient.Get(ctx, &pbkv.GetRequest{Key: "block_root:" + block_id})
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(resp.GetValue()), nil
}

return strconv.ParseUint(block_id, 10, 64)
}

// BlobsByBlockId
//
// @Summary Get Blobs by block id
Expand All @@ -64,61 +33,47 @@ func (bc *BlobsController) parseBlockId(ctx context.Context, block_id string) (u
// @Param block_id path string true "Block identifier. Can be one of: 'head', slot number, hex encoded blockRoot with 0x prefix"
// @Param indices query []string false "Array of indices for blob sidecars to request for in the specified block. Returns all blob sidecars in the block if not specified."
// @Success 200 {object} response.ApiDataResponse{data=blobsBySlotRetType} "Successful response"
// @Failure 400 {object} response.ApiErrorResponse "invalid_slot" "Invalid block id
// @Failure 400 {object} response.ApiErrorResponse "invalid_slot" "Invalid block id"
// @Failure 404 {object} response.ApiErrorResponse "slot_not_found" "Slot not found"
// @Failure 500 {object} response.ApiErrorResponse
// @Router /eth/v1/beacon/blob_sidecars/{block_id} [get]
func (bc *BlobsController) BlobsByBlockId(c *gin.Context) {

blockId := c.Param("block_id")
indices := strings.Split(c.Query("indices"), ",")
if len(indices) == 1 && indices[0] == "" {
indices = []string{}
}

ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
defer cancel()

slotNum, err := bc.parseBlockId(ctx, blockId)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
helper.ReportPublicErrorAndAbort(c, response.GatewayTimeout, err)
return
indices := []uint32{}
for _, str := range strings.Split(c.Query("indices"), ",") {
if str == "" {
continue
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.NotFound {
helper.ReportPublicErrorAndAbort(c, response.NewApiErrorNotFound(NOT_FOUND_SLOT), err)
i, err := strconv.ParseUint(str, 10, 32)
if err != nil {
internal.WriteErrorResponse(c, internal.ErrInvalidIndex)
return
}
helper.ReportPublicErrorAndAbort(c, response.BadGateway, err)
return
indices = append(indices, uint32(i))
}

resp, err := bc.sinkClient.Get(ctx, &pbkv.GetRequest{Key: fmt.Sprintf("slot:%d", slotNum)})
ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
defer cancel()

slot, err := bc.blobsService.GetSlotByBlockId(ctx, blockId)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
helper.ReportPublicErrorAndAbort(c, response.GatewayTimeout, err)
internal.WriteErrorResponse(c, internal.ErrSinkTimeout)
return
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.NotFound {
helper.ReportPublicErrorAndAbort(c, response.NewApiErrorNotFound(NOT_FOUND_SLOT), err)
internal.WriteErrorResponse(c, internal.ErrSlotNotFound)
return
}
helper.ReportPublicErrorAndAbort(c, response.BadGateway, err)
return
}

slot := &pbbl.Slot{}
err = proto.Unmarshal(resp.GetValue(), slot)
if err != nil {
helper.ReportPublicErrorAndAbort(c, response.InternalServerError, err)
internal.WriteErrorResponse(c, err)
return
}

resBlobs := []*dto.Blob{}
for _, blob := range slot.Blobs {
if len(indices) == 0 || internal.Contains(indices, fmt.Sprintf("%d", blob.Index)) {
if len(indices) == 0 || internal.Contains(indices, blob.Index) {
resBlobs = append(resBlobs, dto.NewBlob(blob, slot))
}
}
Expand Down
48 changes: 48 additions & 0 deletions controllers/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package controllers

import (
"blob-service/services"
"context"
"net/http"
"time"

"github.com/gin-gonic/gin"
)

type HealthController struct {
blobsService *services.BlobsService
}

func NewHealthController(blobsService *services.BlobsService) *HealthController {
return &HealthController{blobsService: blobsService}
}

type HealthResponse struct {
Status string `json:"status"`
Detail string `json:"detail,omitempty"`
Head uint64 `json:"head,omitempty"`
}

// Health
// @Summary Returns health status of this API.
// @Tags health
// @Produce json
// @Success 200 {object} HealthResponse
// @Failure 500 {object} response.ApiErrorResponse
// @Router /health [get]
func (hc *HealthController) Health(c *gin.Context) {

ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second)
defer cancel()

response := &HealthResponse{Status: "ok"}
slotNum, err := hc.blobsService.GetSlotNumber(ctx, "head")
if err != nil {
response.Status = "error"
response.Detail = err.Error()
} else {
response.Head = slotNum
}

c.JSON(http.StatusOK, response)
}
17 changes: 0 additions & 17 deletions controllers/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package controllers

import (
"blob-service/flags"
"net/http"

"github.com/eosnationftw/eosn-base-api/response"
"github.com/gin-gonic/gin"
Expand All @@ -28,19 +27,3 @@ func Version(c *gin.Context) {
Features: flags.GetEnabledFeatures(),
}})
}

type HealthResponse struct {
Status string `json:"status"`
}

// Version
// @Summary Returns health status of this API.
// @Tags version
// @Produce json
// @Success 200 {object} HealthResponse
// @Failure 500 {object} response.ApiErrorResponse
// @Router /health [get]
func Health(c *gin.Context) {
response := &HealthResponse{Status: "ok"}
c.JSON(http.StatusOK, response)
}
54 changes: 54 additions & 0 deletions internal/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package internal

import (
"net/http"

"github.com/eosnationftw/eosn-base-api/helper"
"github.com/eosnationftw/eosn-base-api/log"
"github.com/eosnationftw/eosn-base-api/response"
"github.com/friendsofgo/errors"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)

var (
ErrSinkTimeout = errors.New("timeout when trying to reach the sink service")
ErrSlotNotFound = errors.New("slot not found")
ErrInvalidSlot = errors.New("invalid slot")
ErrInvalidIndex = errors.New("invalid index")
)

const (
NOT_FOUND_SLOT = "slot_not_found"
INVALID_SLOT = "invalid_slot"
INVALID_INDEX = "invalid_index"
SINK_TIMEOUT = "sink_timeout"
)

func WriteErrorResponse(c *gin.Context, err error) {

// 404 NOT FOUND
if errors.Is(err, ErrSlotNotFound) {
helper.ReportPublicErrorAndAbort(c, response.NewApiErrorNotFound(NOT_FOUND_SLOT), err)
return
}

// 400 BAD REQUEST
if errors.Is(err, ErrInvalidSlot) {
helper.ReportPublicErrorAndAbort(c, response.NewApiErrorBadRequest(INVALID_SLOT), err)
return
}
if errors.Is(err, ErrInvalidIndex) {
helper.ReportPublicErrorAndAbort(c, response.NewApiErrorBadRequest(INVALID_INDEX), err)
return
}

// 504 GATEWAY TIMEOUT
if errors.Is(err, ErrSinkTimeout) {
helper.ReportPublicErrorAndAbort(c, response.NewApiError(http.StatusGatewayTimeout, SINK_TIMEOUT), err)
return
}

log.Error("unknown error received, writing internal_server_error instead", zap.Error(err))
helper.ReportPrivateErrorAndAbort(c, response.InternalServerError, err)
}
8 changes: 6 additions & 2 deletions server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"blob-service/controllers"
"blob-service/services"
"blob-service/swagger"
"context"
"fmt"
Expand Down Expand Up @@ -53,14 +54,17 @@ func (s *HttpServer) Initialize() {

s.Router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerfiles.Handler))
s.Router.GET("/version", controllers.Version)
s.Router.GET("/health", controllers.Health)
s.Router.NoRoute(NoRoute)
s.Router.NoMethod(NoMethod)

blobsController := controllers.NewBlobsController(s.App.SinkClient)
blobsService := services.NewBlobsService(s.App.SinkClient)
blobsController := controllers.NewBlobsController(blobsService)
healthController := controllers.NewHealthController(blobsService)

v1 := s.Router.Group("/eth/v1")
v1.GET("beacon/blob_sidecars/:block_id", blobsController.BlobsByBlockId)

s.Router.GET("/health", healthController.Health)
}

func (s *HttpServer) Run(wg *sync.WaitGroup) {
Expand Down
68 changes: 68 additions & 0 deletions services/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package services

import (
"blob-service/internal"
pbbl "blob-service/pb/pinax/ethereum/blobs/v1"
"context"
"encoding/binary"
"fmt"
"strconv"

pbkv "github.com/streamingfast/substreams-sink-kv/pb/substreams/sink/kv/v1"

"github.com/golang/protobuf/proto"
)

type BlobsService struct {
sinkClient pbkv.KvClient
}

func NewBlobsService(sinkClient pbkv.KvClient) *BlobsService {
return &BlobsService{sinkClient: sinkClient}
}

func (bc *BlobsService) GetSlotNumber(ctx context.Context, block_id string) (uint64, error) {
if block_id == "head" {
resp, err := bc.sinkClient.Get(ctx, &pbkv.GetRequest{Key: "head"})
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(resp.GetValue()), nil
}

if len(block_id) > 2 && block_id[:2] == "0x" {
resp, err := bc.sinkClient.Get(ctx, &pbkv.GetRequest{Key: "block_root:" + block_id})
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(resp.GetValue()), nil
}

slot, err := strconv.ParseUint(block_id, 10, 64)
if err != nil {
return 0, internal.ErrInvalidSlot
}

return slot, nil
}

func (bs *BlobsService) GetSlotByBlockId(ctx context.Context, blockId string) (*pbbl.Slot, error) {

slotNum, err := bs.GetSlotNumber(ctx, blockId)
if err != nil {
return nil, err
}

resp, err := bs.sinkClient.Get(ctx, &pbkv.GetRequest{Key: fmt.Sprintf("slot:%d", slotNum)})
if err != nil {
return nil, err
}

slot := &pbbl.Slot{}
err = proto.Unmarshal(resp.GetValue(), slot)
if err != nil {
return nil, err
}

return slot, nil
}
Loading

0 comments on commit 52deb6e

Please sign in to comment.