Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry)_: send connection failure metric #5518

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 74 additions & 28 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"

MaxRetryCache = 5000
)
Expand Down Expand Up @@ -58,7 +59,22 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi
}

func (c *Client) PushPeerCount(peerCount int) {
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
if peerCount != c.lastPeerCount {
c.lastPeerCount = peerCount
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
}
}

func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) {
for peerID, failures := range peerConnFailures {
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists {
if failures == lastFailures {
continue
}
}
c.lastPeerConnFailures[peerID] = failures
c.processAndPushTelemetry(PeerConnFailure{FailedPeerId: peerID, FailureCount: failures})
}
}

type ReceivedMessages struct {
Expand All @@ -71,21 +87,28 @@ type PeerCount struct {
PeerCount int
}

type PeerConnFailure struct {
FailedPeerId string
FailureCount int
}

type Client struct {
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
Comment on lines +107 to +108
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion, out of this PR

Suggested change
nextIdLock sync.Mutex
nextId int
nextId atomic.Int32

sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int
}

type TelemetryClientOption func(*Client)
Expand All @@ -105,19 +128,21 @@ func WithPeerID(peerId string) TelemetryClientOption {
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
serverURL = strings.TrimRight(serverURL, "/")
client := &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
lastPeerCount: 0,
lastPeerConnFailures: make(map[string]int),
}

for _, opt := range opts {
Expand Down Expand Up @@ -207,6 +232,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
TelemetryType: PeerCountMetric,
TelemetryData: c.ProcessPeerCount(v),
}
case PeerConnFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -340,6 +371,21 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
return &jsonRawMessage
}

func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"failedPeerId": peerConnFailure.FailedPeerId,
"failureCount": peerConnFailure.FailureCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
Comment on lines +384 to +385
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We marshal and unmarshal here 🤔. And then we marshal again here:

body, err := json.Marshal(c.telemetryRetryCache)

It would be nice to reduce these procedures. We could change Client.TelemetryData type to interface{} and pass the postBody pointer directly. And then just call json.Marshal on it.

I guess it can be done out of this PR ofc.

return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
Expand Down
5 changes: 3 additions & 2 deletions wakuv2/message_publishing.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ func (w *Waku) broadcast() {
}
}

fn = w.limiter.ThrottlePublishFn(w.ctx, fn)

// Wraps the publish function with a call to the telemetry client
if w.statusTelemetryClient != nil {
sendFn := fn
Expand All @@ -125,6 +123,9 @@ func (w *Waku) broadcast() {
}
}

// Wraps the publish function with rate limiter
fn = w.limiter.ThrottlePublishFn(w.ctx, fn)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richard-ramos I moved the rate limit wrapper after the telemetry wrapper so that rate limiting doesn't get caught as a publish error


w.wg.Add(1)
go w.publishEnvelope(envelope, fn, logger)
}
Expand Down
15 changes: 15 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type ITelemetryClient interface {
PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(peerCount int)
PushPeerConnFailures(peerConnFailures map[string]int)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -1275,7 +1276,9 @@ func (w *Waku) Start() error {
}

if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(connFailures)
}

w.ConnectionChanged(connection.State{
Expand Down Expand Up @@ -1953,6 +1956,18 @@ func (w *Waku) StoreNode() *store.WakuStore {
return w.node.Store()
}

func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int {
p := make(map[string]int)
for _, peerID := range wakuNode.Host().Network().Peers() {
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo)
if connFailures > 0 {
p[peerID.String()] = connFailures
}
}
return p
}

func (w *Waku) LegacyStoreNode() legacy_store.Store {
return w.node.LegacyStore()
}