Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Middleware #435

Merged
merged 11 commits into from
Jan 16, 2024
15 changes: 8 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
// Config is the top level configuration for the caduceus service. Everything
// is contained in this structure or it will intentially cause a failure.
type Config struct {
Logging sallust.Config
Tracing candlelight.Config
Prometheus touchstone.Config
Servers Servers
ArgusClientTimeout HttpClientTimeout
JWTValidator JWTValidator
// Webhook ancla.Config //@TODO: need to fix the ancla/argus dependency issue
Logging sallust.Config
Tracing candlelight.Config
Prometheus touchstone.Config
Servers Servers
ArgusClientTimeout HttpClientTimeout
JWTValidator JWTValidator
Sender SenderConfig
Service Service
AuthHeader []string
Expand All @@ -36,6 +35,8 @@ type Config struct {
Flavor string
PreviousVersionSupport bool
Region string
// Webhook ancla.Config //@TODO: need to fix the ancla/argus dependency issue

}

type Service struct {
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/alecthomas/kong v0.8.1
github.com/go-chi/chi/v5 v5.0.10
github.com/go-kit/kit v0.13.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/mux v1.8.1
github.com/goschtalt/goschtalt v0.22.1
github.com/goschtalt/yaml-decoder v0.0.1
github.com/goschtalt/yaml-encoder v0.0.3
Expand All @@ -27,6 +27,7 @@ require (
github.com/xmidt-org/touchstone v0.1.3
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.1
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.40.0
go.uber.org/fx v1.20.1
go.uber.org/zap v1.26.0
gopkg.in/dealancer/validate.v2 v2.1.0
Expand All @@ -41,6 +42,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
Expand Down Expand Up @@ -85,16 +87,16 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/xmidt-org/chronon v0.1.1 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
18 changes: 11 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8Wlg
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down Expand Up @@ -1027,8 +1029,9 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/schema v1.0.3-0.20180614150749-e0e4b92809ac/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU=
github.com/gorilla/schema v1.2.0/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
Expand Down Expand Up @@ -1762,6 +1765,7 @@ go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib v0.19.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.19.0/go.mod h1:ze4w2zyQP+FvZjaahHaUVD7h4razLhDOsZD3qFKXc3c=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.38.0/go.mod h1:iUSPEXZM7sckWSTCtzog1lU42Qaiu9U2WY6vdqwFHDI=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.40.0 h1:KToMJH0+5VxWBGtfeluRmWR3wLtE7nP+80YrxNI5FGs=
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.40.0/go.mod h1:RK3vgddjxVcF1q7IBVppzG6k2cW/NBnZHQ3X4g+EYBQ=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.19.0/go.mod h1:7RDsakVbjb124lYDEjKuHTuzdqf04hLMEvPv/ufmqMs=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.38.0/go.mod h1:w6xNm+kC506KNs5cknSHal6dtdRnc4uema0uN9GSQc0=
Expand All @@ -1772,8 +1776,8 @@ go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9
go.opentelemetry.io/otel v1.12.0/go.mod h1:geaoz0L0r1BEOR81k7/n9W4TCXYCJ7bPO7K374jQHG0=
go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg=
go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/exporters/jaeger v1.11.2/go.mod h1:nwcF/DK4Hk0auZ/a5vw20uMsaJSXbzeeimhN5f9d0Lc=
go.opentelemetry.io/otel/exporters/jaeger v1.14.0/go.mod h1:4Ay9kk5vELRrbg5z4cpP9EtmQRFap2Wb0woPG4lujZA=
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4=
Expand Down Expand Up @@ -1807,8 +1811,8 @@ go.opentelemetry.io/otel/exporters/zipkin v1.19.0/go.mod h1:JQgTGJP11yi3o4GHzIWY
go.opentelemetry.io/otel/metric v0.19.0/go.mod h1:8f9fglJPRnXuskQmKpnad31lcLJ2VmNNqIsx/uIwBSc=
go.opentelemetry.io/otel/metric v0.35.0/go.mod h1:qAcbhaTRFU6uG8QM7dDo7XvFsWcugziq/5YI065TokQ=
go.opentelemetry.io/otel/metric v0.37.0/go.mod h1:DmdaHfGt54iV6UKxsV9slj2bBRJcKC1B1uvDLIioc1s=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/oteltest v0.19.0/go.mod h1:tI4yxwh8U21v7JD6R3BcA/2+RBoTKFexE/PJ/nSO7IA=
go.opentelemetry.io/otel/sdk v0.19.0/go.mod h1:ouO7auJYMivDjywCHA6bqTI7jJMVQV1HdKR5CmH8DGo=
go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU=
Expand All @@ -1824,8 +1828,8 @@ go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06O
go.opentelemetry.io/otel/trace v1.12.0/go.mod h1:pHlgBynn6s25qJ2szD+Bv+iwKJttjHSI3lUAyf0GNuQ=
go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds=
go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
Expand Down
105 changes: 81 additions & 24 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,53 @@ import (
"sync/atomic"
"time"

"go.uber.org/fx"
"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
uuid "github.com/satori/go.uuid"

"github.com/xmidt-org/sallust"
"github.com/xmidt-org/webpa-common/v2/adapter"
"github.com/xmidt-org/wrp-go/v3"
)

type ServerHandlerIn struct {
fx.In
Logger *zap.Logger
Telemetry *HandlerTelemetry
}

type ServerHandlerOut struct {
fx.Out
Handler *ServerHandler
}

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
*zap.Logger
caduceusHandler RequestHandler
errorRequests metrics.Counter
emptyRequests metrics.Counter
invalidCount metrics.Counter
incomingQueueDepthMetric metrics.Gauge
modifiedWRPCount metrics.Counter
incomingQueueDepth int64
maxOutstanding int64
incomingQueueLatency metrics.Histogram
now func() time.Time
log *zap.Logger
// caduceusHandler RequestHandler
telemetry *HandlerTelemetry
incomingQueueDepth int64
maxOutstanding int64
now func() time.Time
}
type HandlerTelemetryIn struct {
fx.In
ErrorRequests prometheus.Counter `name:"error_request_body_counter"`
EmptyRequests prometheus.Counter `name:"empty_request_boyd_counter"`
InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"`
IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"`
ModifiedWRPCount prometheus.CounterVec `name:"modified_wrp_count"`
IncomingQueueLatency prometheus.HistogramVec `name:"incoming_queue_latency_histogram_seconds"`
}
type HandlerTelemetry struct {
errorRequests prometheus.Counter
emptyRequests prometheus.Counter
invalidCount prometheus.Counter
incomingQueueDepthMetric prometheus.Gauge
modifiedWRPCount prometheus.CounterVec
incomingQueueLatency prometheus.HistogramVec
}

func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
Expand All @@ -42,7 +66,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R

logger := sallust.Get(request.Context())
if logger == adapter.DefaultLogger().Logger {
logger = sh.Logger
logger = sh.log
}

logger.Info("Receiving incoming request...")
Expand All @@ -66,19 +90,19 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
return
}

sh.incomingQueueDepthMetric.Add(1.0)
defer sh.incomingQueueDepthMetric.Add(-1.0)
sh.telemetry.incomingQueueDepthMetric.Add(1.0)
defer sh.telemetry.incomingQueueDepthMetric.Add(-1.0)

payload, err := io.ReadAll(request.Body)
if err != nil {
sh.errorRequests.Add(1.0)
sh.telemetry.errorRequests.Add(1.0)
logger.Error("Unable to retrieve the request body.", zap.Error(err))
response.WriteHeader(http.StatusBadRequest)
return
}

if len(payload) == 0 {
sh.emptyRequests.Add(1.0)
sh.telemetry.emptyRequests.Add(1.0)
logger.Error("Empty payload.")
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Empty payload.\n"))
Expand All @@ -91,7 +115,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
err = decoder.Decode(msg)
if err != nil || msg.MessageType() != 4 {
// return a 400
sh.invalidCount.Add(1.0)
sh.telemetry.invalidCount.Add(1.0)
response.WriteHeader(http.StatusBadRequest)
if err != nil {
response.Write([]byte("Invalid payload format.\n"))
Expand All @@ -106,15 +130,15 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
err = wrp.UTF8(msg)
if err != nil {
// return a 400
sh.invalidCount.Add(1.0)
sh.telemetry.invalidCount.Add(1.0)
response.WriteHeader(http.StatusBadRequest)
response.Write([]byte("Strings must be UTF-8.\n"))
logger.Debug("Strings must be UTF-8.")
return
}
eventType = msg.FindEventStringSubMatch()

sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))
// sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))

// return a 202
response.WriteHeader(http.StatusAccepted)
Expand All @@ -125,7 +149,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R

func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {
endTime := sh.now()
sh.incomingQueueLatency.With("event", eventType).Observe(endTime.Sub(startTime).Seconds())
sh.telemetry.incomingQueueLatency.With(prometheus.Labels{"event": eventType}).Observe(float64(endTime.Sub(startTime).Seconds()))
}

func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
Expand All @@ -134,13 +158,13 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {

// Default to "application/json" if there is no content type, otherwise
// use the one the source specified.
if "" == msg.ContentType {
if msg.ContentType == "" {
msg.ContentType = wrp.MimeTypeJson
reason = emptyContentTypeReason
}

// Ensure there is a transaction id even if we make one up
if "" == msg.TransactionUUID {
if msg.TransactionUUID == "" {
msg.TransactionUUID = uuid.NewV4().String()
if reason == "" {
reason = emptyUUIDReason
Expand All @@ -150,8 +174,41 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
}

if reason != "" {
sh.modifiedWRPCount.With("reason", reason).Add(1.0)
sh.telemetry.modifiedWRPCount.With(prometheus.Labels{"reason": reason}).Add(1.0)
}

return msg
}

var HandlerModule = fx.Module("server",
fx.Provide(
func(in HandlerTelemetryIn) *HandlerTelemetry {
return &HandlerTelemetry{
errorRequests: in.ErrorRequests,
emptyRequests: in.EmptyRequests,
invalidCount: in.InvalidCount,
incomingQueueDepthMetric: in.IncomingQueueDepthMetric,
modifiedWRPCount: in.ModifiedWRPCount,
incomingQueueLatency: in.IncomingQueueLatency,
}
}),
fx.Provide(
func(in ServerHandlerIn) (ServerHandlerOut, error) {
//Hard coding maxOutstanding and incomingQueueDepth for now
handler, err := New(in.Logger, in.Telemetry, 0.0, 0.0)
return ServerHandlerOut{
Handler: handler,
}, err
},
),
)

func New(log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {
return &ServerHandler{
log: log,
telemetry: t,
maxOutstanding: maxOutstanding,
incomingQueueDepth: incomingQueueDepth,
now: time.Now,
}, nil
}
Loading
Loading