From ce321af3142f4fbc7aed9d2d71d988de6c0a32c7 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Fri, 12 Jul 2024 20:33:08 -0700 Subject: [PATCH] feat(telemetry)_: send connection failure metric --- telemetry/client.go | 90 ++++++++++++++++++++++++++++++++------------- wakuv2/waku.go | 13 +++++++ 2 files changed, 77 insertions(+), 26 deletions(-) diff --git a/telemetry/client.go b/telemetry/client.go index 587e290fbf1..6ddd52400a1 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -30,6 +30,7 @@ const ( ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" PeerCountMetric TelemetryType = "PeerCount" + PeerConnFailuresMetric TelemetryType = "PeerConnFailures" MaxRetryCache = 5000 ) @@ -60,6 +61,18 @@ func (c *Client) PushPeerCount(peerCount int) { c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) } +func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) { + for peerID, failures := range peerConnFailures { + if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { + if failures == lastFailures { + continue + } + } + c.lastPeerConnFailures[peerID] = failures + c.processAndPushTelemetry(PeerConnFailure{PeerID: peerID}) + } +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -70,20 +83,25 @@ type PeerCount struct { PeerCount int } +type PeerConnFailure struct { + PeerID string +} + type Client struct { - serverURL string - httpClient *http.Client - logger *zap.Logger - keyUID string - nodeName string - version string - telemetryCh chan TelemetryRequest - telemetryCacheLock sync.Mutex - telemetryCache []TelemetryRequest - telemetryRetryCache []TelemetryRequest - nextIdLock sync.Mutex - nextId int - sendPeriod time.Duration + serverURL string + httpClient *http.Client + logger *zap.Logger + keyUID string + nodeName string + version string + telemetryCh chan TelemetryRequest + telemetryCacheLock sync.Mutex + telemetryCache []TelemetryRequest + telemetryRetryCache []TelemetryRequest + nextIdLock sync.Mutex + nextId int + sendPeriod time.Duration + lastPeerConnFailures map[string]int } type TelemetryClientOption func(*Client) @@ -96,19 +114,20 @@ func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption { func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { client := &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - logger: logger, - keyUID: keyUID, - nodeName: nodeName, - version: version, - telemetryCh: make(chan TelemetryRequest), - telemetryCacheLock: sync.Mutex{}, - telemetryCache: make([]TelemetryRequest, 0), - telemetryRetryCache: make([]TelemetryRequest, 0), - nextId: 0, - nextIdLock: sync.Mutex{}, - sendPeriod: 10 * time.Second, // default value + serverURL: serverURL, + httpClient: &http.Client{Timeout: time.Minute}, + logger: logger, + keyUID: keyUID, + nodeName: nodeName, + version: version, + telemetryCh: make(chan TelemetryRequest), + telemetryCacheLock: sync.Mutex{}, + telemetryCache: make([]TelemetryRequest, 0), + telemetryRetryCache: make([]TelemetryRequest, 0), + nextId: 0, + nextIdLock: sync.Mutex{}, + sendPeriod: 10 * time.Second, // default value + lastPeerConnFailures: make(map[string]int), } for _, opt := range opts { @@ -198,6 +217,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) { TelemetryType: PeerCountMetric, TelemetryData: c.ProcessPeerCount(v), } + case PeerConnFailure: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: PeerConnFailuresMetric, + TelemetryData: c.ProcessPeerConnFailure(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -326,6 +351,19 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { return &jsonRawMessage } +func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { + postBody := map[string]interface{}{ + "peerID": peerConnFailure.PeerID, + "nodeName": c.nodeName, + "nodeKeyUID": c.keyUID, + "statusVersion": c.version, + "timestamp": time.Now().Unix(), + } + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 807ec0f5089..9e5544134e4 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -107,6 +107,7 @@ type ITelemetryClient interface { PushSentEnvelope(sentEnvelope SentEnvelope) PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) PushPeerCount(peerCount int) + PushPeerConnFailures(peerConnFailures map[string]int) } // Waku represents a dark communication interface through the Ethereum @@ -1384,7 +1385,9 @@ func (w *Waku) Start() error { } if w.statusTelemetryClient != nil { + connFailures := FormatPeerConnFailures(w.node) w.statusTelemetryClient.PushPeerCount(w.PeerCount()) + w.statusTelemetryClient.PushPeerConnFailures(connFailures) } //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. @@ -2037,6 +2040,16 @@ func FormatPeerStats(wakuNode *node.WakuNode) map[string]types.WakuV2Peer { return p } +func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int { + p := make(map[string]int) + for _, peerID := range wakuNode.Host().Network().Peers() { + peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) + connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo) + p[peerID.String()] = connFailures + } + return p +} + func (w *Waku) StoreNode() legacy_store.Store { return w.node.LegacyStore() }