From c4807f561a452cdbd4667f0205d8220f31838124 Mon Sep 17 00:00:00 2001 From: crStiv Date: Sun, 26 Jan 2025 16:28:58 +0100 Subject: [PATCH 1/4] Update dht.go --- share/shwap/p2p/discovery/dht.go | 105 +++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 5 deletions(-) diff --git a/share/shwap/p2p/discovery/dht.go b/share/shwap/p2p/discovery/dht.go index b3f9351c7c..6cb281017c 100644 --- a/share/shwap/p2p/discovery/dht.go +++ b/share/shwap/p2p/discovery/dht.go @@ -8,31 +8,126 @@ import ( "github.com/ipfs/go-datastore" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/prometheus/client_golang/prometheus" ) const ( defaultRoutingRefreshPeriod = time.Minute ) -// PeerRouting provides constructor for PeerRouting over DHT. -// Basically, this provides a way to discover peer addresses by respecting public keys. +// PeerRouting provides a constructor for PeerRouting over DHT. +// Essentially, this offers a way to discover peer addresses by respecting public keys. func NewDHT( ctx context.Context, prefix string, - bootsrappers []peer.AddrInfo, + bootstrappers []peer.AddrInfo, host host.Host, dataStore datastore.Batching, mode dht.ModeOpt, ) (*dht.IpfsDHT, error) { opts := []dht.Option{ - dht.BootstrapPeers(bootsrappers...), + dht.BootstrapPeers(bootstrappers...), dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", prefix))), dht.Datastore(dataStore), dht.RoutingTableRefreshPeriod(defaultRoutingRefreshPeriod), dht.Mode(mode), } - return dht.New(ctx, host, opts...) + d, err := dht.New(ctx, host, opts...) + if err != nil { + return nil, err + } + + // Create a metrics wrapper + metricsDHT := NewMetricsDHT(d, prefix, mode) + + // Add event handlers for metrics + metricsDHT.PeerConnected = func(id peer.ID) { + dhtMetrics.PeersTotal.WithLabelValues(prefix, mode.String()).Inc() + if metricsDHT.Host().Network().Connectedness(id) == network.DirInbound { + dhtMetrics.InboundConnectionsTotal.WithLabelValues(prefix, mode.String()).Inc() + } else { + dhtMetrics.OutboundConnectionsTotal.WithLabelValues(prefix, mode.String()).Inc() + } + } + + metricsDHT.PeerDisconnected = func(id peer.ID) { + dhtMetrics.PeersTotal.WithLabelValues(prefix, mode.String()).Dec() + } + + // Add metrics for the routing table + go func() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + size := len(metricsDHT.RoutingTable().ListPeers()) + dhtMetrics.RoutingTableSize.WithLabelValues(prefix, mode.String()).Set(float64(size)) + } + } + }() + + // Add metrics for storage operations + metricsDHT.Validator.RecordSize = func(key string, size int) { + dhtMetrics.StoredRecordsTotal.WithLabelValues(prefix, mode.String()).Set(float64(size)) + } + + return metricsDHT.IpfsDHT, nil +} + +// Add wrapper functions for tracking metrics + +func trackDHTRequest(ctx context.Context, prefix, mode, requestType string) (context.Context, func(error)) { + start := time.Now() + return ctx, func(err error) { + duration := time.Since(start) + status := "success" + if err != nil { + status = "error" + } + + dhtMetrics.RequestsTotal.WithLabelValues(prefix, mode, requestType, status).Inc() + dhtMetrics.RequestDuration.WithLabelValues(prefix, mode, requestType).Observe(duration.Seconds()) + } +} + +func trackFindPeer(ctx context.Context, prefix, mode string) func(error) { + return func(err error) { + status := "success" + if err != nil { + status = "error" + } + dhtMetrics.FindPeerTotal.WithLabelValues(prefix, mode, status).Inc() + } +} + +func trackFindProviders(ctx context.Context, prefix, mode string) func(error) { + return func(err error) { + status := "success" + if err != nil { + status = "error" + } + dhtMetrics.FindProvidersTotal.WithLabelValues(prefix, mode, status).Inc() + } +} + +func trackStoreOperation(ctx context.Context, prefix, mode string) func(error) { + return func(err error) { + status := "success" + if err != nil { + status = "error" + } + dhtMetrics.StoreOperationsTotal.WithLabelValues(prefix, mode, status).Inc() + } +} + +func trackRoutingTableRefresh(prefix, mode string) { + dhtMetrics.RoutingTableRefreshes.WithLabelValues(prefix, mode).Inc() } From 377231d7e73bff31941c0dedcee4043c13b0e433 Mon Sep 17 00:00:00 2001 From: crStiv Date: Sun, 26 Jan 2025 16:31:53 +0100 Subject: [PATCH 2/4] Create dht_wrapper.go --- share/shwap/p2p/discovery/dht_wrapper.go | 85 ++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 share/shwap/p2p/discovery/dht_wrapper.go diff --git a/share/shwap/p2p/discovery/dht_wrapper.go b/share/shwap/p2p/discovery/dht_wrapper.go new file mode 100644 index 0000000000..655cc7d64c --- /dev/null +++ b/share/shwap/p2p/discovery/dht_wrapper.go @@ -0,0 +1,85 @@ +package discovery + +import ( + "context" + "time" + + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" +) + +// MetricsDHT wraps DHT for collecting metrics +type MetricsDHT struct { + *dht.IpfsDHT + prefix string + mode string +} + +// NewMetricsDHT creates a new DHT wrapper with metrics +func NewMetricsDHT(d *dht.IpfsDHT, prefix string, mode dht.ModeOpt) *MetricsDHT { + return &MetricsDHT{ + IpfsDHT: d, + prefix: prefix, + mode: mode.String(), + } +} + +// FindPeer wraps the original method for collecting metrics +func (m *MetricsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "find_peer") + defer trackFindPeer(ctx, m.prefix, m.mode)(nil) + + info, err := m.IpfsDHT.FindPeer(ctx, id) + done(err) + return info, err +} + +// Provide wraps the original method for collecting metrics +func (m *MetricsDHT) Provide(ctx context.Context, key routing.Cid, announce bool) error { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "provide") + defer trackStoreOperation(ctx, m.prefix, m.mode)(nil) + + err := m.IpfsDHT.Provide(ctx, key, announce) + done(err) + return err +} + +// FindProvidersAsync wraps the original method for collecting metrics +func (m *MetricsDHT) FindProvidersAsync(ctx context.Context, key routing.Cid, count int) <-chan peer.AddrInfo { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "find_providers") + defer trackFindProviders(ctx, m.prefix, m.mode)(nil) + + ch := m.IpfsDHT.FindProvidersAsync(ctx, key, count) + done(nil) // Here, we cannot track errors since the method is asynchronous + return ch +} + +// Bootstrap wraps the original method for collecting metrics +func (m *MetricsDHT) Bootstrap(ctx context.Context) error { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "bootstrap") + defer trackRoutingTableRefresh(m.prefix, m.mode) + + err := m.IpfsDHT.Bootstrap(ctx) + done(err) + return err +} + +// PutValue wraps the original method for collecting metrics +func (m *MetricsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "put_value") + defer trackStoreOperation(ctx, m.prefix, m.mode)(nil) + + err := m.IpfsDHT.PutValue(ctx, key, value, opts...) + done(err) + return err +} + +// GetValue wraps the original method for collecting metrics +func (m *MetricsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "get_value") + + val, err := m.IpfsDHT.GetValue(ctx, key, opts...) + done(err) + return val, err +} From 2ade7a48c48c06c7805eed027307c82256dbc5aa Mon Sep 17 00:00:00 2001 From: crStiv Date: Sun, 26 Jan 2025 16:34:12 +0100 Subject: [PATCH 3/4] Create dht_metrics.go --- nodebuilder/p2p/dht_metrics.go | 133 +++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 nodebuilder/p2p/dht_metrics.go diff --git a/nodebuilder/p2p/dht_metrics.go b/nodebuilder/p2p/dht_metrics.go new file mode 100644 index 0000000000..43b2b565cc --- /dev/null +++ b/nodebuilder/p2p/dht_metrics.go @@ -0,0 +1,133 @@ +package p2p + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + // DHTMetrics contains all metrics for the DHT server + dhtMetrics = struct { + // General DHT metrics + PeersTotal *prometheus.GaugeVec + RequestsTotal *prometheus.CounterVec + RequestDuration *prometheus.HistogramVec + + // Metrics for search operations + FindPeerTotal *prometheus.CounterVec + FindProvidersTotal *prometheus.CounterVec + + // Metrics for storage operations + StoredRecordsTotal *prometheus.GaugeVec + StoreOperationsTotal *prometheus.CounterVec + + // Metrics for network operations + InboundConnectionsTotal *prometheus.CounterVec + OutboundConnectionsTotal *prometheus.CounterVec + + // Metrics for the routing table + RoutingTableSize *prometheus.GaugeVec + RoutingTableRefreshes *prometheus.CounterVec + }{ + PeersTotal: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "peers_total", + Help: "The total number of peers in the DHT", + }, + []string{networkLabel, nodeTypeLabel}, + ), + RequestsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "requests_total", + Help: "The total number of DHT requests", + }, + []string{networkLabel, nodeTypeLabel, "type", "status"}, + ), + RequestDuration: promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "request_duration_seconds", + Help: "Duration of DHT requests", + Buckets: prometheus.DefBuckets, + }, + []string{networkLabel, nodeTypeLabel, "type"}, + ), + FindPeerTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "find_peer_total", + Help: "The total number of peer search operations", + }, + []string{networkLabel, nodeTypeLabel, "status"}, + ), + FindProvidersTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "find_providers_total", + Help: "The total number of provider search operations", + }, + []string{networkLabel, nodeTypeLabel, "status"}, + ), + StoredRecordsTotal: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "stored_records_total", + Help: "The total number of stored records", + }, + []string{networkLabel, nodeTypeLabel}, + ), + StoreOperationsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "store_operations_total", + Help: "The total number of storage operations", + }, + []string{networkLabel, nodeTypeLabel, "status"}, + ), + InboundConnectionsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "inbound_connections_total", + Help: "The total number of inbound connections", + }, + []string{networkLabel, nodeTypeLabel}, + ), + OutboundConnectionsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "outbound_connections_total", + Help: "The total number of outbound connections", + }, + []string{networkLabel, nodeTypeLabel}, + ), + RoutingTableSize: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "routing_table_size", + Help: "The size of the routing table", + }, + []string{networkLabel, nodeTypeLabel}, + ), + RoutingTableRefreshes: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "routing_table_refreshes_total", + Help: "The total number of routing table refreshes", + }, + []string{networkLabel, nodeTypeLabel}, + ), + } +) From 832e635dfc5b9eb481f74091954c83d449233edc Mon Sep 17 00:00:00 2001 From: crStiv Date: Thu, 30 Jan 2025 01:24:36 +0100 Subject: [PATCH 4/4] Update dht.go --- share/shwap/p2p/discovery/dht.go | 116 +++++-------------------------- 1 file changed, 17 insertions(+), 99 deletions(-) diff --git a/share/shwap/p2p/discovery/dht.go b/share/shwap/p2p/discovery/dht.go index 6cb281017c..fca49c1478 100644 --- a/share/shwap/p2p/discovery/dht.go +++ b/share/shwap/p2p/discovery/dht.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/go-datastore" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/prometheus/client_golang/prometheus" @@ -18,116 +17,35 @@ const ( defaultRoutingRefreshPeriod = time.Minute ) -// PeerRouting provides a constructor for PeerRouting over DHT. -// Essentially, this offers a way to discover peer addresses by respecting public keys. +// PeerRouting provides constructor for PeerRouting over DHT. +// Basically, this provides a way to discover peer addresses by respecting public keys. func NewDHT( ctx context.Context, prefix string, - bootstrappers []peer.AddrInfo, + bootsrappers []peer.AddrInfo, host host.Host, dataStore datastore.Batching, mode dht.ModeOpt, ) (*dht.IpfsDHT, error) { + // Create metrics registry with our labels + reg := prometheus.NewRegistry() + labels := prometheus.Labels{ + "network": prefix, + "node_type": mode.String(), + } + wrappedReg := prometheus.WrapRegistererWith(labels, reg) + opts := []dht.Option{ - dht.BootstrapPeers(bootstrappers...), + dht.BootstrapPeers(bootsrappers...), dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", prefix))), dht.Datastore(dataStore), dht.RoutingTableRefreshPeriod(defaultRoutingRefreshPeriod), dht.Mode(mode), + dht.Validator(dht.DefaultValidator{}), + // Enable built-in DHT metrics + dht.EnabledMetrics(wrappedReg), } - d, err := dht.New(ctx, host, opts...) - if err != nil { - return nil, err - } - - // Create a metrics wrapper - metricsDHT := NewMetricsDHT(d, prefix, mode) - - // Add event handlers for metrics - metricsDHT.PeerConnected = func(id peer.ID) { - dhtMetrics.PeersTotal.WithLabelValues(prefix, mode.String()).Inc() - if metricsDHT.Host().Network().Connectedness(id) == network.DirInbound { - dhtMetrics.InboundConnectionsTotal.WithLabelValues(prefix, mode.String()).Inc() - } else { - dhtMetrics.OutboundConnectionsTotal.WithLabelValues(prefix, mode.String()).Inc() - } - } - - metricsDHT.PeerDisconnected = func(id peer.ID) { - dhtMetrics.PeersTotal.WithLabelValues(prefix, mode.String()).Dec() - } - - // Add metrics for the routing table - go func() { - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - size := len(metricsDHT.RoutingTable().ListPeers()) - dhtMetrics.RoutingTableSize.WithLabelValues(prefix, mode.String()).Set(float64(size)) - } - } - }() - - // Add metrics for storage operations - metricsDHT.Validator.RecordSize = func(key string, size int) { - dhtMetrics.StoredRecordsTotal.WithLabelValues(prefix, mode.String()).Set(float64(size)) - } - - return metricsDHT.IpfsDHT, nil -} - -// Add wrapper functions for tracking metrics - -func trackDHTRequest(ctx context.Context, prefix, mode, requestType string) (context.Context, func(error)) { - start := time.Now() - return ctx, func(err error) { - duration := time.Since(start) - status := "success" - if err != nil { - status = "error" - } - - dhtMetrics.RequestsTotal.WithLabelValues(prefix, mode, requestType, status).Inc() - dhtMetrics.RequestDuration.WithLabelValues(prefix, mode, requestType).Observe(duration.Seconds()) - } -} - -func trackFindPeer(ctx context.Context, prefix, mode string) func(error) { - return func(err error) { - status := "success" - if err != nil { - status = "error" - } - dhtMetrics.FindPeerTotal.WithLabelValues(prefix, mode, status).Inc() - } -} - -func trackFindProviders(ctx context.Context, prefix, mode string) func(error) { - return func(err error) { - status := "success" - if err != nil { - status = "error" - } - dhtMetrics.FindProvidersTotal.WithLabelValues(prefix, mode, status).Inc() - } -} - -func trackStoreOperation(ctx context.Context, prefix, mode string) func(error) { - return func(err error) { - status := "success" - if err != nil { - status = "error" - } - dhtMetrics.StoreOperationsTotal.WithLabelValues(prefix, mode, status).Inc() - } -} - -func trackRoutingTableRefresh(prefix, mode string) { - dhtMetrics.RoutingTableRefreshes.WithLabelValues(prefix, mode).Inc() + return dht.New(ctx, host, opts...) } +я