Skip to content

Commit

Permalink
Merge pull request #436 from xmidt-org/senderwrapper
Browse files Browse the repository at this point in the history
Senderwrapper
  • Loading branch information
maurafortino authored Jan 28, 2024
2 parents afade6d + 234c7de commit b1caa05
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 437 deletions.
8 changes: 0 additions & 8 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"

"github.com/xmidt-org/wrp-go/v3"
)

Expand Down Expand Up @@ -38,12 +36,6 @@ type SenderConfig struct {
DisablePartnerIDs bool
}

type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

type RequestHandler interface {
HandleRequest(workerID int, msg *wrp.Message)
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
emperror.dev/emperror v0.33.0
github.com/alecthomas/kong v0.8.1
github.com/go-chi/chi/v5 v5.0.10
github.com/go-kit/kit v0.13.0
github.com/gorilla/mux v1.8.1
github.com/goschtalt/goschtalt v0.22.1
github.com/goschtalt/yaml-decoder v0.0.1
Expand All @@ -28,6 +27,7 @@ require (
github.com/xmidt-org/webpa-common/v2 v2.2.2
github.com/xmidt-org/wrp-go/v3 v3.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46.1
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
go.uber.org/fx v1.20.1
go.uber.org/zap v1.26.0
gopkg.in/dealancer/validate.v2 v2.1.0
Expand All @@ -44,6 +44,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-kit/kit v0.13.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.46
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.19.0/go.mod h1:7RDsakVbjb124lYDEjKuHTuzdqf04hLMEvPv/ufmqMs=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.38.0/go.mod h1:w6xNm+kC506KNs5cknSHal6dtdRnc4uema0uN9GSQc0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0/go.mod h1:pcQ3MM3SWvrA71U4GDqv9UFDJ3HQsW7y5ZO3tDTlUdI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo=
go.opentelemetry.io/contrib/propagators v0.19.0/go.mod h1:4QOdZClXISU5S43xZxk5tYaWcpb+lehqfKtE6PK6msE=
go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg=
go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
Expand Down
31 changes: 17 additions & 14 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (

type ServerHandlerIn struct {
fx.In
Logger *zap.Logger
Telemetry *HandlerTelemetry
CaduceusSenderWrapper *CaduceusSenderWrapper
Logger *zap.Logger
Telemetry *HandlerTelemetry
}

type ServerHandlerOut struct {
Expand All @@ -32,8 +33,8 @@ type ServerHandlerOut struct {

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
log *zap.Logger
// caduceusHandler RequestHandler
log *zap.Logger
caduceusHandler RequestHandler
telemetry *HandlerTelemetry
incomingQueueDepth int64
maxOutstanding int64
Expand Down Expand Up @@ -138,7 +139,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
}
eventType = msg.FindEventStringSubMatch()

// sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))
sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))

// return a 202
response.WriteHeader(http.StatusAccepted)
Expand Down Expand Up @@ -180,8 +181,8 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
return msg
}

var HandlerModule = fx.Module("server",
fx.Provide(
func ProvideHandler() fx.Option {
return fx.Provide(
func(in HandlerTelemetryIn) *HandlerTelemetry {
return &HandlerTelemetry{
errorRequests: in.ErrorRequests,
Expand All @@ -191,20 +192,22 @@ var HandlerModule = fx.Module("server",
modifiedWRPCount: in.ModifiedWRPCount,
incomingQueueLatency: in.IncomingQueueLatency,
}
}),
fx.Provide(
},
func(in ServerHandlerIn) (ServerHandlerOut, error) {
//Hard coding maxOutstanding and incomingQueueDepth for now
handler, err := New(in.Logger, in.Telemetry, 0.0, 0.0)
handler, err := New(in.CaduceusSenderWrapper, in.Logger, in.Telemetry, 0.0, 0.0)
return ServerHandlerOut{
Handler: handler,
}, err
},
),
)

func New(log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {
)
}
func New(senderWrapper *CaduceusSenderWrapper, log *zap.Logger, t *HandlerTelemetry, maxOutstanding, incomingQueueDepth int64) (*ServerHandler, error) {
return &ServerHandler{
caduceusHandler: &CaduceusHandler{
senderWrapper: senderWrapper,
Logger: log,
},
log: log,
telemetry: t,
maxOutstanding: maxOutstanding,
Expand Down
12 changes: 7 additions & 5 deletions httpClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strconv"
"time"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
)

var (
Expand All @@ -33,19 +33,21 @@ func (d doerFunc) Do(req *http.Request) (*http.Response, error) {

type metricWrapper struct {
now func() time.Time
queryLatency metrics.Histogram
queryLatency prometheus.HistogramVec
id string
}

func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*metricWrapper, error) {
func newMetricWrapper(now func() time.Time, queryLatency prometheus.HistogramVec, id string) (*metricWrapper, error) {
if now == nil {
now = time.Now
}
if queryLatency == nil {
if queryLatency.MetricVec == nil {
return nil, errNilHistogram
}
return &metricWrapper{
now: now,
queryLatency: queryLatency,
id: id,
}, nil
}

Expand All @@ -62,7 +64,7 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient {

// find time difference, add to metric
var latency = endTime.Sub(startTime)
m.queryLatency.With("code", code).Observe(latency.Seconds())
m.queryLatency.With(prometheus.Labels{UrlLabel: m.id, CodeLabel: code}).Observe(latency.Seconds())

return resp, err
})
Expand Down
56 changes: 56 additions & 0 deletions listenerStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import "time"

//This is a stub for the ancla listener. This will be removed once we can add ancla back into caduceus

type ListenerStub struct {
PartnerIds []string
Webhook Webhook
}

type Webhook struct {
// Address is the subscription request origin HTTP Address.
Address string `json:"registered_from_address"`

// Config contains data to inform how events are delivered.
Config DeliveryConfig `json:"config"`

// FailureURL is the URL used to notify subscribers when they've been cut off due to event overflow.
// Optional, set to "" to disable notifications.
FailureURL string `json:"failure_url"`

// Events is the list of regular expressions to match an event type against.
Events []string `json:"events"`

// Matcher type contains values to match against the metadata.
Matcher MetadataMatcherConfig `json:"matcher,omitempty"`

// Duration describes how long the subscription lasts once added.
Duration time.Duration `json:"duration"`

// Until describes the time this subscription expires.
Until time.Time `json:"until"`
}

// DeliveryConfig is a Webhook substructure with data related to event delivery.
type DeliveryConfig struct {
// URL is the HTTP URL to deliver messages to.
URL string `json:"url"`

// ContentType is content type value to set WRP messages to (unless already specified in the WRP).
ContentType string `json:"content_type"`

// Secret is the string value for the SHA1 HMAC.
// (Optional, set to "" to disable behavior).
Secret string `json:"secret,omitempty"`

// AlternativeURLs is a list of explicit URLs that should be round robin through on failure cases to the main URL.
AlternativeURLs []string `json:"alt_urls,omitempty"`
}

// MetadataMatcherConfig is Webhook substructure with config to match event metadata.
type MetadataMatcherConfig struct {
// DeviceID is the list of regular expressions to match device id type against.
DeviceID []string `json:"device_id"`
}
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func caduceus(arguments []string, run bool) error {
arrangehttp.ProvideServer("servers.primary"),
arrangehttp.ProvideServer("servers.alternate"),

HandlerModule,
ProvideHandler(),
ProvideSenderWrapper(),
touchstone.Provide(),
touchhttp.Provide(),
ProvideMetrics(),
Expand Down
69 changes: 45 additions & 24 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package main

import (
"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/fx"

Expand Down Expand Up @@ -49,29 +48,22 @@ const (
CodeLabel = "code"
)

func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}

func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram {
return m.NewHistogram(QueryDurationHistogram, 11)
type SenderMetricsIn struct {
fx.In
QueryLatency prometheus.HistogramVec `name:"query_duration_histogram_seconds"`
EventType prometheus.CounterVec `name:"incoming_event_type_count"`
DeliveryCounter prometheus.CounterVec `name:"delivery_count"`
DeliveryRetryCounter prometheus.CounterVec `name:"DeliveryRetryCounter"`
DeliveryRetryMaxGauge prometheus.GaugeVec `name:"delivery_retry_max"`
CutOffCounter prometheus.CounterVec `name:"slow_consumer_cut_off_count"`
SlowConsumerDroppedMsgCounter prometheus.CounterVec `name:"slow_consumer_dropped_message_count"`
DropsDueToPanic prometheus.CounterVec `name:"drops_due_to_panic"`
ConsumerDeliverUntilGauge prometheus.GaugeVec `name:"consumer_deliver_until"`
ConsumerDropUntilGauge prometheus.GaugeVec `name:"consumer_drop_until"`
ConsumerDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers"`
ConsumerMaxDeliveryWorkersGauge prometheus.GaugeVec `name:"consumer_delivery_workers_max"`
OutgoingQueueDepth prometheus.GaugeVec `name:"outgoing_queue_depths"`
ConsumerRenewalTimeGauge prometheus.GaugeVec `name:"consumer_renewal_time"`
}

// TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called
Expand Down Expand Up @@ -161,3 +153,32 @@ func ProvideMetrics() fx.Option {
}, EventLabel),
)
}

func ProvideSenderMetrics() fx.Option {
return fx.Provide(
func(in SenderMetricsIn) (SenderWrapperMetrics, OutboundSenderMetrics) {
outbounderMetrics := OutboundSenderMetrics{
DeliveryCounter: in.DeliveryCounter,
DeliveryRetryCounter: in.DeliveryRetryCounter,
DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge,
CutOffCounter: in.CutOffCounter,
SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter,
DropsDueToPanic: in.DropsDueToPanic,
ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge,
ConsumerDropUntilGauge: in.ConsumerDropUntilGauge,
ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge,
ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge,
OutgoingQueueDepth: in.OutgoingQueueDepth,
ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge,
}
wrapperMetrics := SenderWrapperMetrics{
QueryLatency: in.QueryLatency,
EventType: in.EventType,
}

return wrapperMetrics, outbounderMetrics
},
)
}


Loading

0 comments on commit b1caa05

Please sign in to comment.