Skip to content

Commit

Permalink
feat(telemetry)_: send connection failure metric
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Aug 2, 2024
1 parent 5212f33 commit 261dfa8
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 28 deletions.
99 changes: 71 additions & 28 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailures"

MaxRetryCache = 5000
)
Expand Down Expand Up @@ -58,7 +59,22 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi
}

func (c *Client) PushPeerCount(peerCount int) {
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
if peerCount != c.lastPeerCount {
c.lastPeerCount = peerCount
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
}
}

func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) {
for peerID, failures := range peerConnFailures {
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists {
if failures == lastFailures {
continue
}
}
c.lastPeerConnFailures[peerID] = failures
c.processAndPushTelemetry(PeerConnFailure{PeerID: peerID})
}
}

type ReceivedMessages struct {
Expand All @@ -71,21 +87,27 @@ type PeerCount struct {
PeerCount int
}

type PeerConnFailure struct {
PeerID string
}

type Client struct {
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int
}

type TelemetryClientOption func(*Client)
Expand All @@ -105,19 +127,21 @@ func WithPeerID(peerId string) TelemetryClientOption {
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
serverURL = strings.TrimRight(serverURL, "/")
client := &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
lastPeerCount: 0,
lastPeerConnFailures: make(map[string]int),
}

for _, opt := range opts {
Expand Down Expand Up @@ -207,6 +231,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
TelemetryType: PeerCountMetric,
TelemetryData: c.ProcessPeerCount(v),
}
case PeerConnFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -340,6 +370,19 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
return &jsonRawMessage
}

func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"peerID": peerConnFailure.PeerID,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
Expand Down
13 changes: 13 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type ITelemetryClient interface {
PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(peerCount int)
PushPeerConnFailures(peerConnFailures map[string]int)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -1275,7 +1276,9 @@ func (w *Waku) Start() error {
}

if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(connFailures)
}

w.ConnectionChanged(connection.State{
Expand Down Expand Up @@ -1953,6 +1956,16 @@ func (w *Waku) StoreNode() *store.WakuStore {
return w.node.Store()
}

func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int {
p := make(map[string]int)
for _, peerID := range wakuNode.Host().Network().Peers() {
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo)
p[peerID.String()] = connFailures
}
return p
}

func (w *Waku) LegacyStoreNode() legacy_store.Store {
return w.node.LegacyStore()
}

0 comments on commit 261dfa8

Please sign in to comment.