Skip to content

Commit

Permalink
Refuse empty doctype ingest requests & added custom errors (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
SchawnnDev authored Oct 11, 2024
1 parent 9a19eb5 commit 857abfd
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
5 changes: 4 additions & 1 deletion internals/handlers/ingester_handlers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handlers

import (
"errors"
jsoniter "github.com/json-iterator/go"
"net/http"

Expand Down Expand Up @@ -53,10 +54,12 @@ func (handler *IngesterHandler) ReceiveData(w http.ResponseWriter, r *http.Reque

err = handler.bulkIngester.Ingest(bir)
if err != nil {
if err.Error() == "channel overload" { // Replace with custom error
if errors.Is(err, ingester.ErrChannelOverload) {
w.WriteHeader(http.StatusTooManyRequests)
} else if err.Error() == "elasticsearch healthcheck red" { // Replace with custom error
w.WriteHeader(http.StatusInternalServerError)
} else if errors.Is(err, ingester.ErrDocumentTypeEmpty) {
w.WriteHeader(http.StatusBadRequest)
}
return
}
Expand Down
11 changes: 10 additions & 1 deletion internals/ingester/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"go.uber.org/zap"
)

var (
ErrChannelOverload = errors.New("channel overload")
ErrDocumentTypeEmpty = errors.New("document type is empty")
)

// BulkIngester is a component which split BulkIngestRequest and affect the resulting IngestRequests to dedicated TypedIngester
// As a chokepoint, it doesn't do much processing and only acts as a request router
type BulkIngester struct {
Expand Down Expand Up @@ -44,6 +49,10 @@ func (ingester *BulkIngester) getTypedIngester(targetDocumentType string) *Typed
func (ingester *BulkIngester) Ingest(bir BulkIngestRequest) error {
zap.L().Debug("Processing BulkIngestRequest", zap.String("BulkUUID", bir.UUID))

if bir.DocumentType == "" {
return ErrDocumentTypeEmpty
}

mergeConfig := bir.MergeConfig[0]
typedIngester := ingester.getTypedIngester(bir.DocumentType)

Expand All @@ -53,7 +62,7 @@ func (ingester *BulkIngester) Ingest(bir BulkIngestRequest) error {
for _, worker := range typedIngester.Workers {
worker.GetMetricWorkerQueueGauge().Set(float64(len(worker.GetData())))
}
return errors.New("channel overload") // Replace with custom error
return ErrChannelOverload
}

for i, doc := range bir.Docs {
Expand Down

0 comments on commit 857abfd

Please sign in to comment.