Skip to content

Commit

Permalink
added a metrics package and moved metrics files to package
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Nov 2, 2023
1 parent fbffc52 commit e984464
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 107 deletions.
7 changes: 0 additions & 7 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"

"github.com/xmidt-org/wrp-go/v3"
Expand Down Expand Up @@ -55,12 +54,6 @@ type SenderConfig struct {
DisablePartnerIDs bool
}

type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

type RequestHandler interface {
HandleRequest(workerID int, msg *wrp.Message)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
emperror.dev/emperror v0.33.0
github.com/armon/go-metrics v0.4.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/go-kit/kit v0.13.0
github.com/gorilla/mux v1.8.0
Expand Down Expand Up @@ -31,7 +32,6 @@ require (
emperror.dev/errors v0.8.1 // indirect
github.com/SermoDigital/jose v0.9.2-0.20161205224733-f6df55f235c2 // indirect
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/billhathaway/consistentHash v0.0.0-20140718022140-addea16d2229 // indirect
github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 // indirect
Expand Down
8 changes: 8 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ import (
"github.com/xmidt-org/wrp-go/v3"
)

const (
emptyContentTypeReason = "empty_content_type"
emptyUUIDReason = "empty_uuid"
bothEmptyReason = "empty_uuid_and_content_type"
networkError = "network_err"
unknownEventType = "unknown"
)

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
*zap.Logger
Expand Down
17 changes: 9 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/spf13/viper"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/bascule/basculehelper"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/candlelight"
"github.com/xmidt-org/httpaux/recovery"
"github.com/xmidt-org/sallust"
Expand Down Expand Up @@ -83,7 +84,7 @@ func caduceus(arguments []string) int {
f = pflag.NewFlagSet(applicationName, pflag.ContinueOnError)
v = viper.New()

logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, Metrics, AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics)
logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, metrics.Metrics, metrics.AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics)
)

if parseErr, done := printVersion(f, arguments); done {
Expand Down Expand Up @@ -157,19 +158,19 @@ func caduceus(arguments []string) int {
senderWrapper: caduceusSenderWrapper,
Logger: logger,
},
errorRequests: metricsRegistry.NewCounter(ErrorRequestBodyCounter),
emptyRequests: metricsRegistry.NewCounter(EmptyRequestBodyCounter),
invalidCount: metricsRegistry.NewCounter(DropsDueToInvalidPayload),
incomingQueueDepthMetric: metricsRegistry.NewGauge(IncomingQueueDepth),
modifiedWRPCount: metricsRegistry.NewCounter(ModifiedWRPCounter),
errorRequests: metricsRegistry.NewCounter(metrics.ErrorRequestBodyCounter),
emptyRequests: metricsRegistry.NewCounter(metrics.EmptyRequestBodyCounter),
invalidCount: metricsRegistry.NewCounter(metrics.DropsDueToInvalidPayload),
incomingQueueDepthMetric: metricsRegistry.NewGauge(metrics.IncomingQueueDepth),
modifiedWRPCount: metricsRegistry.NewCounter(metrics.ModifiedWRPCounter),
maxOutstanding: 0,
// 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram
incomingQueueLatency: metricsRegistry.NewHistogram(IncomingQueueLatencyHistogram, 0),
incomingQueueLatency: metricsRegistry.NewHistogram(metrics.IncomingQueueLatencyHistogram, 0),
now: time.Now,
}

caduceusConfig.Webhook.Logger = logger
caduceusConfig.Listener.Measures = NewHelperMeasures(metricsRegistry)
caduceusConfig.Listener.Measures = metrics.NewHelperMeasures(metricsRegistry)
argusClientTimeout, err := newArgusClientTimeout(v)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse argus client timeout config values: %v \n", err)
Expand Down
2 changes: 1 addition & 1 deletion anclaHelper.go → metrics/anclaHelper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package metrics

import (
"github.com/xmidt-org/ancla"
Expand Down
36 changes: 6 additions & 30 deletions metrics.go → metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package main
package metrics

import (
"github.com/go-kit/kit/metrics"
// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/xmetrics"
)

Expand All @@ -29,13 +28,11 @@ const (
IncomingQueueLatencyHistogram = "incoming_queue_latency_histogram_seconds"
)

const (
emptyContentTypeReason = "empty_content_type"
emptyUUIDReason = "empty_uuid"
bothEmptyReason = "empty_uuid_and_content_type"
networkError = "network_err"
unknownEventType = "unknown"
)
type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

func Metrics() []xmetrics.Metric {
return []xmetrics.Metric{
Expand Down Expand Up @@ -160,27 +157,6 @@ func Metrics() []xmetrics.Metric {
}
}

func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}

func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram {
return m.NewHistogram(QueryDurationHistogram, 11)
}
2 changes: 1 addition & 1 deletion metrics_test.go → metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package main
package metrics

// Using Caduceus's test suite:
//
Expand Down
64 changes: 43 additions & 21 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
kitmetrics "github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"
metrics "github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/webpa-common/v2/device"

"github.com/xmidt-org/webpa-common/v2/semaphore"
Expand Down Expand Up @@ -95,7 +96,7 @@ type OutboundSenderFactory struct {
DeliveryInterval time.Duration

// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry
MetricsRegistry metrics.CaduceusMetricsRegistry

// The logger to use.
Logger *zap.Logger
Expand All @@ -107,7 +108,7 @@ type OutboundSenderFactory struct {
// DisablePartnerIDs dictates whether or not to enforce the partner ID check.
DisablePartnerIDs bool

QueryLatency metrics.Histogram
QueryLatency kitmetrics.Histogram
}

type OutboundSender interface {
Expand All @@ -130,23 +131,23 @@ type CaduceusOutboundSender struct {
queueSize int
deliveryRetries int
deliveryInterval time.Duration
deliveryCounter metrics.Counter
deliveryRetryCounter metrics.Counter
droppedQueueFullCounter metrics.Counter
droppedCutoffCounter metrics.Counter
droppedExpiredCounter metrics.Counter
droppedExpiredBeforeQueueCounter metrics.Counter
droppedNetworkErrCounter metrics.Counter
droppedInvalidConfig metrics.Counter
droppedPanic metrics.Counter
cutOffCounter metrics.Counter
queueDepthGauge metrics.Gauge
renewalTimeGauge metrics.Gauge
deliverUntilGauge metrics.Gauge
dropUntilGauge metrics.Gauge
maxWorkersGauge metrics.Gauge
currentWorkersGauge metrics.Gauge
deliveryRetryMaxGauge metrics.Gauge
deliveryCounter kitmetrics.Counter
deliveryRetryCounter kitmetrics.Counter
droppedQueueFullCounter kitmetrics.Counter
droppedCutoffCounter kitmetrics.Counter
droppedExpiredCounter kitmetrics.Counter
droppedExpiredBeforeQueueCounter kitmetrics.Counter
droppedNetworkErrCounter kitmetrics.Counter
droppedInvalidConfig kitmetrics.Counter
droppedPanic kitmetrics.Counter
cutOffCounter kitmetrics.Counter
queueDepthGauge kitmetrics.Gauge
renewalTimeGauge kitmetrics.Gauge
deliverUntilGauge kitmetrics.Gauge
dropUntilGauge kitmetrics.Gauge
maxWorkersGauge kitmetrics.Gauge
currentWorkersGauge kitmetrics.Gauge
deliveryRetryMaxGauge kitmetrics.Gauge
wg sync.WaitGroup
cutOffPeriod time.Duration
workers semaphore.Interface
Expand Down Expand Up @@ -471,7 +472,7 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti
// a fresh one, counting any current messages in the queue as dropped.
// It should never close a queue, as a queue not referenced anywhere will be
// cleaned up by the garbage collector without needing to be closed.
func (obs *CaduceusOutboundSender) Empty(droppedCounter metrics.Counter) {
func (obs *CaduceusOutboundSender) Empty(droppedCounter kitmetrics.Counter) {
droppedMsgs := obs.queue.Load().(chan *wrp.Message)
obs.queue.Store(make(chan *wrp.Message, obs.queueSize))
droppedCounter.Add(float64(len(droppedMsgs)))
Expand Down Expand Up @@ -737,3 +738,24 @@ func (obs *CaduceusOutboundSender) queueOverflow() {

}
}

func CreateOutbounderMetrics(m metrics.CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(metrics.DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(metrics.DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(metrics.DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(metrics.SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(metrics.DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(metrics.OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(metrics.ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(metrics.ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(metrics.ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(metrics.ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(metrics.ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}
29 changes: 15 additions & 14 deletions outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -163,20 +164,20 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
//
// If a new metric within outboundsender is created it must be added here
fakeRegistry := new(mockCaduceusMetricsRegistry)
fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", OutgoingQueueDepth).Return(fakeDC)
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakePanicDrop)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth)
fakeRegistry.On("NewGauge", DeliveryRetryMaxGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerRenewalTimeGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDeliverUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewHistogram", QueryDurationHistogram).Return(fakeLatency)
fakeRegistry.On("NewCounter", metrics.DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.OutgoingQueueDepth).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", metrics.SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
fakeRegistry.On("NewCounter", metrics.DropsDueToPanic).Return(fakePanicDrop)
fakeRegistry.On("NewGauge", metrics.OutgoingQueueDepth).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.DeliveryRetryMaxGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerRenewalTimeGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDeliverUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDropUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewHistogram", metrics.QueryDurationHistogram).Return(fakeLatency)

return &OutboundSenderFactory{
Listener: w,
Expand Down
19 changes: 10 additions & 9 deletions senderWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"sync"
"time"

"github.com/go-kit/kit/metrics"
kitmetrics "github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
)
Expand All @@ -49,12 +50,12 @@ type SenderWrapperFactory struct {
Linger time.Duration

// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry
MetricsRegistry metrics.CaduceusMetricsRegistry

// The metrics counter for dropped messages due to invalid payloads
DroppedMsgCounter metrics.Counter
DroppedMsgCounter kitmetrics.Counter

EventType metrics.Counter
EventType kitmetrics.Counter

// The logger implementation to share with OutboundSenders.
Logger *zap.Logger
Expand Down Expand Up @@ -88,9 +89,9 @@ type CaduceusSenderWrapper struct {
logger *zap.Logger
mutex sync.RWMutex
senders map[string]OutboundSender
metricsRegistry CaduceusMetricsRegistry
eventType metrics.Counter
queryLatency metrics.Histogram
metricsRegistry metrics.CaduceusMetricsRegistry
eventType kitmetrics.Counter
queryLatency kitmetrics.Histogram
wg sync.WaitGroup
shutdown chan struct{}
customPIDs []string
Expand Down Expand Up @@ -120,8 +121,8 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) {
return
}

caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry)
caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter)
caduceusSenderWrapper.queryLatency = metrics.NewMetricWrapperMeasures(swf.MetricsRegistry)
caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(metrics.IncomingEventTypeCounter)

caduceusSenderWrapper.senders = make(map[string]OutboundSender)
caduceusSenderWrapper.shutdown = make(chan struct{})
Expand Down
Loading

0 comments on commit e984464

Please sign in to comment.