diff --git a/nodebuilder/p2p/dht_metrics.go b/nodebuilder/p2p/dht_metrics.go new file mode 100644 index 0000000000..43b2b565cc --- /dev/null +++ b/nodebuilder/p2p/dht_metrics.go @@ -0,0 +1,133 @@ +package p2p + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + // DHTMetrics contains all metrics for the DHT server + dhtMetrics = struct { + // General DHT metrics + PeersTotal *prometheus.GaugeVec + RequestsTotal *prometheus.CounterVec + RequestDuration *prometheus.HistogramVec + + // Metrics for search operations + FindPeerTotal *prometheus.CounterVec + FindProvidersTotal *prometheus.CounterVec + + // Metrics for storage operations + StoredRecordsTotal *prometheus.GaugeVec + StoreOperationsTotal *prometheus.CounterVec + + // Metrics for network operations + InboundConnectionsTotal *prometheus.CounterVec + OutboundConnectionsTotal *prometheus.CounterVec + + // Metrics for the routing table + RoutingTableSize *prometheus.GaugeVec + RoutingTableRefreshes *prometheus.CounterVec + }{ + PeersTotal: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "peers_total", + Help: "The total number of peers in the DHT", + }, + []string{networkLabel, nodeTypeLabel}, + ), + RequestsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "requests_total", + Help: "The total number of DHT requests", + }, + []string{networkLabel, nodeTypeLabel, "type", "status"}, + ), + RequestDuration: promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "request_duration_seconds", + Help: "Duration of DHT requests", + Buckets: prometheus.DefBuckets, + }, + []string{networkLabel, nodeTypeLabel, "type"}, + ), + FindPeerTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "find_peer_total", + Help: "The total number of peer search operations", + }, + []string{networkLabel, nodeTypeLabel, "status"}, + ), + FindProvidersTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "find_providers_total", + Help: "The total number of provider search operations", + }, + []string{networkLabel, nodeTypeLabel, "status"}, + ), + StoredRecordsTotal: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "stored_records_total", + Help: "The total number of stored records", + }, + []string{networkLabel, nodeTypeLabel}, + ), + StoreOperationsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "store_operations_total", + Help: "The total number of storage operations", + }, + []string{networkLabel, nodeTypeLabel, "status"}, + ), + InboundConnectionsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "inbound_connections_total", + Help: "The total number of inbound connections", + }, + []string{networkLabel, nodeTypeLabel}, + ), + OutboundConnectionsTotal: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "outbound_connections_total", + Help: "The total number of outbound connections", + }, + []string{networkLabel, nodeTypeLabel}, + ), + RoutingTableSize: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "routing_table_size", + Help: "The size of the routing table", + }, + []string{networkLabel, nodeTypeLabel}, + ), + RoutingTableRefreshes: promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "celestia", + Subsystem: "dht", + Name: "routing_table_refreshes_total", + Help: "The total number of routing table refreshes", + }, + []string{networkLabel, nodeTypeLabel}, + ), + } +) diff --git a/share/shwap/p2p/discovery/dht.go b/share/shwap/p2p/discovery/dht.go index b3f9351c7c..fca49c1478 100644 --- a/share/shwap/p2p/discovery/dht.go +++ b/share/shwap/p2p/discovery/dht.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -26,13 +27,25 @@ func NewDHT( dataStore datastore.Batching, mode dht.ModeOpt, ) (*dht.IpfsDHT, error) { + // Create metrics registry with our labels + reg := prometheus.NewRegistry() + labels := prometheus.Labels{ + "network": prefix, + "node_type": mode.String(), + } + wrappedReg := prometheus.WrapRegistererWith(labels, reg) + opts := []dht.Option{ dht.BootstrapPeers(bootsrappers...), dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", prefix))), dht.Datastore(dataStore), dht.RoutingTableRefreshPeriod(defaultRoutingRefreshPeriod), dht.Mode(mode), + dht.Validator(dht.DefaultValidator{}), + // Enable built-in DHT metrics + dht.EnabledMetrics(wrappedReg), } return dht.New(ctx, host, opts...) } +я diff --git a/share/shwap/p2p/discovery/dht_wrapper.go b/share/shwap/p2p/discovery/dht_wrapper.go new file mode 100644 index 0000000000..655cc7d64c --- /dev/null +++ b/share/shwap/p2p/discovery/dht_wrapper.go @@ -0,0 +1,85 @@ +package discovery + +import ( + "context" + "time" + + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" +) + +// MetricsDHT wraps DHT for collecting metrics +type MetricsDHT struct { + *dht.IpfsDHT + prefix string + mode string +} + +// NewMetricsDHT creates a new DHT wrapper with metrics +func NewMetricsDHT(d *dht.IpfsDHT, prefix string, mode dht.ModeOpt) *MetricsDHT { + return &MetricsDHT{ + IpfsDHT: d, + prefix: prefix, + mode: mode.String(), + } +} + +// FindPeer wraps the original method for collecting metrics +func (m *MetricsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "find_peer") + defer trackFindPeer(ctx, m.prefix, m.mode)(nil) + + info, err := m.IpfsDHT.FindPeer(ctx, id) + done(err) + return info, err +} + +// Provide wraps the original method for collecting metrics +func (m *MetricsDHT) Provide(ctx context.Context, key routing.Cid, announce bool) error { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "provide") + defer trackStoreOperation(ctx, m.prefix, m.mode)(nil) + + err := m.IpfsDHT.Provide(ctx, key, announce) + done(err) + return err +} + +// FindProvidersAsync wraps the original method for collecting metrics +func (m *MetricsDHT) FindProvidersAsync(ctx context.Context, key routing.Cid, count int) <-chan peer.AddrInfo { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "find_providers") + defer trackFindProviders(ctx, m.prefix, m.mode)(nil) + + ch := m.IpfsDHT.FindProvidersAsync(ctx, key, count) + done(nil) // Here, we cannot track errors since the method is asynchronous + return ch +} + +// Bootstrap wraps the original method for collecting metrics +func (m *MetricsDHT) Bootstrap(ctx context.Context) error { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "bootstrap") + defer trackRoutingTableRefresh(m.prefix, m.mode) + + err := m.IpfsDHT.Bootstrap(ctx) + done(err) + return err +} + +// PutValue wraps the original method for collecting metrics +func (m *MetricsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "put_value") + defer trackStoreOperation(ctx, m.prefix, m.mode)(nil) + + err := m.IpfsDHT.PutValue(ctx, key, value, opts...) + done(err) + return err +} + +// GetValue wraps the original method for collecting metrics +func (m *MetricsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { + ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "get_value") + + val, err := m.IpfsDHT.GetValue(ctx, key, opts...) + done(err) + return val, err +}