Skip to content

Commit

Permalink
Merge pull request #6 from NEU-SNS/e2e-metric
Browse files Browse the repository at this point in the history
Add event processing time metric
  • Loading branch information
kvermeul authored Oct 12, 2023
2 parents 0da2a74 + 0060cf7 commit 0948f4c
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ var (
Help: "Reverse Traceroute API request duration",
})

revtrEventSocketHandlingDurationHist = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "revtr_eventsocket_polling_duration_seconds",
Buckets: []float64{0.001, 0.01, 0.1, 0.2, 0.4, 1},
Help: "Reverse Traceroute eventsocket polling duration",
revtrEventProcessingDurationHist = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "revtr_event_processing_duration_seconds",
Buckets: []float64{2, 4, 6, 8, 10, 12, 14, 16},
Help: "Reverse Traceroute end-to-end event processing duration",
})

revtrEventsProcessedMetric = promauto.NewCounterVec(
Expand All @@ -90,6 +90,7 @@ type event struct {
timestamp time.Time
uuid string
id *inetdiag.SockID
*revtrpb.RevtrMeasurement
}

// handler implements the eventsocket.Handler interface.
Expand Down Expand Up @@ -121,7 +122,14 @@ func (h *handler) Close(ctx context.Context, timestamp time.Time, uuid string) {
log.Println("close", uuid, timestamp)
}

func callRevtr(client *revtrpb.RevtrClient, revtrMeasurements []*revtrpb.RevtrMeasurement, revtrAPIKey string, revtrSampling int) {
func callRevtr(client *revtrpb.RevtrClient, events []event, revtrAPIKey string, revtrSampling int) {
// Extract revtr measurements from events and record e2e processing time.
revtrMeasurements := make([]*revtrpb.RevtrMeasurement, len(events))
for i, event := range events {
revtrMeasurements[i] = event.RevtrMeasurement
revtrEventProcessingDurationHist.Observe(time.Since(event.timestamp).Seconds())
}

// Put a timeout in context
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
Expand Down Expand Up @@ -234,45 +242,41 @@ func (h *handler) ProcessOpenEvents(ctx context.Context, revtrAPIKey string, rev

t := time.NewTicker(15 * time.Second)

revtrsToSend := []*revtrpb.RevtrMeasurement{}
eventsToSend := []event{}
i := 0

for {
select {
case e := <-h.events:
start := time.Now()
log.Println("processing", e)
log.Println("processing", e.uuid)
// Call the gRPC API of Reverse Traceroute
// Match the sources with the mapping of the revtr sites / IP addresses
// Check if we have a source in the same site as the NDT
if site, ok := h.mlabIPtoSite[e.id.SrcIP]; ok {
if revtrSrc, ok := revtrSiteToIP[site]; ok {
if i%revtrSampling == 0 {
// Sample the revtrs
revtrMeasurementToSend := newRevtrMeasurement(revtrSrc, e.id.DstIP, e.uuid)
// Sample the event.
e.RevtrMeasurement = newRevtrMeasurement(revtrSrc, e.id.DstIP, e.uuid)
eventsToSend = append(eventsToSend, e)
revtrSamplesMetric.Inc()
logger.Debugf("Adding reverse traceroute with source %s and destination %s to send",
revtrMeasurementToSend.Src, revtrMeasurementToSend.Dst)
revtrsToSend = append(revtrsToSend, &revtrMeasurementToSend)
revtrSrc, e.id.DstIP)
}
i++
} else {
log.Infof("Site %s IP %s is not a revtr site", site, e.id.SrcIP)
}

} else {
log.Infof("No NDT site matching for IP %s", e.id.SrcIP)
}
elapsed := time.Since(start)
log.Debugf("eventsocket handling took %d us", elapsed.Microseconds())
revtrEventSocketHandlingDurationHist.Observe(elapsed.Seconds())
case <-t.C:
if len(revtrsToSend) > 0 {
if len(eventsToSend) > 0 {
// Flush what we can flush
log.Infof("Collected batch of %d revtrs to send (sampling 1/%d)", len(revtrsToSend), revtrSampling)
revtrs := make([]*revtrpb.RevtrMeasurement, len(revtrsToSend))
copy(revtrs, revtrsToSend)
go callRevtr(&client, revtrs, revtrAPIKey, revtrSampling)
revtrsToSend = nil
log.Infof("Collected batch of %d revtrs to send (sampling 1/%d)", len(eventsToSend), revtrSampling)
events := make([]event, len(eventsToSend))
copy(events, eventsToSend)
go callRevtr(&client, events, revtrAPIKey, revtrSampling)
eventsToSend = nil
}
case <-ctx.Done():
log.Println("shutdown")
Expand All @@ -282,8 +286,8 @@ func (h *handler) ProcessOpenEvents(ctx context.Context, revtrAPIKey string, rev
}

// newRevtrMeasurement creates a new revtr measurement with the given src, dst and uuid.
func newRevtrMeasurement(src, dst, uuid string) revtrpb.RevtrMeasurement {
return revtrpb.RevtrMeasurement{
func newRevtrMeasurement(src, dst, uuid string) *revtrpb.RevtrMeasurement {
return &revtrpb.RevtrMeasurement{
Src: src,
Dst: dst,
Uuid: uuid,
Expand Down

0 comments on commit 0948f4c

Please sign in to comment.