Skip to content

Commit

Permalink
feat(telemetry)_: send connection failure metric
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Jul 13, 2024
1 parent 21101c9 commit ce321af
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 26 deletions.
90 changes: 64 additions & 26 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailures"

MaxRetryCache = 5000
)
Expand Down Expand Up @@ -60,6 +61,18 @@ func (c *Client) PushPeerCount(peerCount int) {
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
}

func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) {
for peerID, failures := range peerConnFailures {
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists {
if failures == lastFailures {
continue
}
}
c.lastPeerConnFailures[peerID] = failures
c.processAndPushTelemetry(PeerConnFailure{PeerID: peerID})
}
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand All @@ -70,20 +83,25 @@ type PeerCount struct {
PeerCount int
}

type PeerConnFailure struct {
PeerID string
}

type Client struct {
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
lastPeerConnFailures map[string]int
}

type TelemetryClientOption func(*Client)
Expand All @@ -96,19 +114,20 @@ func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption {

func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
client := &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
lastPeerConnFailures: make(map[string]int),
}

for _, opt := range opts {
Expand Down Expand Up @@ -198,6 +217,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
TelemetryType: PeerCountMetric,
TelemetryData: c.ProcessPeerCount(v),
}
case PeerConnFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -326,6 +351,19 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
return &jsonRawMessage
}

func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"peerID": peerConnFailure.PeerID,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
Expand Down
13 changes: 13 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type ITelemetryClient interface {
PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(peerCount int)
PushPeerConnFailures(peerConnFailures map[string]int)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -1384,7 +1385,9 @@ func (w *Waku) Start() error {
}

if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(connFailures)
}

//TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled.
Expand Down Expand Up @@ -2037,6 +2040,16 @@ func FormatPeerStats(wakuNode *node.WakuNode) map[string]types.WakuV2Peer {
return p
}

func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int {
p := make(map[string]int)
for _, peerID := range wakuNode.Host().Network().Peers() {
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo)
p[peerID.String()] = connFailures
}
return p
}

func (w *Waku) StoreNode() legacy_store.Store {
return w.node.LegacyStore()
}

0 comments on commit ce321af

Please sign in to comment.