diff --git a/internals/handlers/ingester_handlers.go b/internals/handlers/ingester_handlers.go index 8744d3b..6599d24 100644 --- a/internals/handlers/ingester_handlers.go +++ b/internals/handlers/ingester_handlers.go @@ -1,6 +1,7 @@ package handlers import ( + "errors" jsoniter "github.com/json-iterator/go" "net/http" @@ -53,10 +54,12 @@ func (handler *IngesterHandler) ReceiveData(w http.ResponseWriter, r *http.Reque err = handler.bulkIngester.Ingest(bir) if err != nil { - if err.Error() == "channel overload" { // Replace with custom error + if errors.Is(err, ingester.ErrChannelOverload) { w.WriteHeader(http.StatusTooManyRequests) } else if err.Error() == "elasticsearch healthcheck red" { // Replace with custom error w.WriteHeader(http.StatusInternalServerError) + } else if errors.Is(err, ingester.ErrDocumentTypeEmpty) { + w.WriteHeader(http.StatusBadRequest) } return } diff --git a/internals/ingester/bulk.go b/internals/ingester/bulk.go index 16b5212..6393157 100644 --- a/internals/ingester/bulk.go +++ b/internals/ingester/bulk.go @@ -9,6 +9,11 @@ import ( "go.uber.org/zap" ) +var ( + ErrChannelOverload = errors.New("channel overload") + ErrDocumentTypeEmpty = errors.New("document type is empty") +) + // BulkIngester is a component which split BulkIngestRequest and affect the resulting IngestRequests to dedicated TypedIngester // As a chokepoint, it doesn't do much processing and only acts as a request router type BulkIngester struct { @@ -44,6 +49,10 @@ func (ingester *BulkIngester) getTypedIngester(targetDocumentType string) *Typed func (ingester *BulkIngester) Ingest(bir BulkIngestRequest) error { zap.L().Debug("Processing BulkIngestRequest", zap.String("BulkUUID", bir.UUID)) + if bir.DocumentType == "" { + return ErrDocumentTypeEmpty + } + mergeConfig := bir.MergeConfig[0] typedIngester := ingester.getTypedIngester(bir.DocumentType) @@ -53,7 +62,7 @@ func (ingester *BulkIngester) Ingest(bir BulkIngestRequest) error { for _, worker := range typedIngester.Workers { worker.GetMetricWorkerQueueGauge().Set(float64(len(worker.GetData()))) } - return errors.New("channel overload") // Replace with custom error + return ErrChannelOverload } for i, doc := range bir.Docs {