Skip to content

Commit

Permalink
Merge branch 'main' into rp/bump-celestia-app
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay authored Jan 20, 2025
2 parents e6c86be + e11cd71 commit 3f43d8e
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 37 deletions.
19 changes: 16 additions & 3 deletions header/headertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/proto/tendermint/version"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"

"github.com/celestiaorg/celestia-app/v3/pkg/da"
libhead "github.com/celestiaorg/go-header"
Expand All @@ -40,6 +39,7 @@ type TestSuite struct {
// blockTime is optional - if set, the test suite will generate
// blocks timestamped at the specified interval
blockTime time.Duration
startTime time.Time
}

func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] {
Expand All @@ -62,6 +62,18 @@ func NewTestSuite(t *testing.T, numValidators int, blockTime time.Duration) *Tes
vals: vals,
valSet: valSet,
blockTime: blockTime,
startTime: time.Now(),
}
}

func NewTestSuiteWithGenesisTime(t *testing.T, startTime time.Time, blockTime time.Duration) *TestSuite {
valSet, vals := RandValidatorSet(3, 1)
return &TestSuite{
t: t,
vals: vals,
valSet: valSet,
blockTime: blockTime,
startTime: startTime,
}
}

Expand All @@ -74,10 +86,11 @@ func (s *TestSuite) genesis() *header.ExtendedHeader {
gen.ValidatorsHash = s.valSet.Hash()
gen.NextValidatorsHash = s.valSet.Hash()
gen.Height = 1
gen.Time = s.startTime
voteSet := types.NewVoteSet(gen.ChainID, gen.Height, 0, tmproto.PrecommitType, s.valSet)
blockID := RandBlockID(s.t)
blockID.Hash = gen.Hash()
commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, time.Now())
commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, s.startTime)
require.NoError(s.t, err)

eh := &header.ExtendedHeader{
Expand Down Expand Up @@ -199,7 +212,7 @@ func (s *TestSuite) Commit(h *header.RawHeader) *types.Commit {
ValidatorIndex: int32(i),
Height: h.Height,
Round: round,
Timestamp: tmtime.Now().UTC(),
Timestamp: h.Time,
Type: tmproto.PrecommitType,
BlockID: bid,
}
Expand Down
74 changes: 74 additions & 0 deletions nodebuilder/p2p/cmd/p2p.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"time"

"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -9,6 +11,7 @@ import (
"github.com/spf13/cobra"

cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

type peerInfo struct {
Expand All @@ -35,6 +38,8 @@ func init() {
bandwidthForProtocolCmd,
pubsubPeersCmd,
pubsubTopicsCmd,
connectionInfoCmd,
pingCmd,
)
}

Expand Down Expand Up @@ -599,3 +604,72 @@ var pubsubTopicsCmd = &cobra.Command{
return cmdnode.PrintOutput(topics, err, formatter)
},
}

var connectionInfoCmd = &cobra.Command{
Use: "connection-state [peerID]",
Short: "Gets connection info for a given peer ID",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
client, err := cmdnode.ParseClientFromCtx(cmd.Context())
if err != nil {
return err
}
defer client.Close()

pid, err := peer.Decode(args[0])
if err != nil {
return err
}

infos, err := client.P2P.ConnectionState(cmd.Context(), pid)
return cmdnode.PrintOutput(infos, err, func(i interface{}) interface{} {
type state struct {
Info network.ConnectionState
NumStreams int
Direction string
Opened string
Limited bool
}

states := i.([]p2p.ConnectionState)
infos := make([]state, len(states))
for i, s := range states {
infos[i] = state{
Info: s.Info,
NumStreams: s.NumStreams,
Direction: s.Direction.String(),
Opened: s.Opened.Format("2006-01-02 15:04:05"),
Limited: s.Limited,
}
}

if len(infos) == 1 {
return infos[0]
}
return infos
})
},
}

var pingCmd = &cobra.Command{
Use: "ping [peerID]",
Short: "Pings given peer and tell how much time that took or errors",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
client, err := cmdnode.ParseClientFromCtx(cmd.Context())
if err != nil {
return err
}
defer client.Close()

pid, err := peer.Decode(args[0])
if err != nil {
return err
}

pingDuration, err := client.P2P.Ping(cmd.Context(), pid)
return cmdnode.PrintOutput(pingDuration, err, func(i interface{}) interface{} {
return i.(time.Duration).String()
})
},
}
32 changes: 32 additions & 0 deletions nodebuilder/p2p/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 59 additions & 3 deletions nodebuilder/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
libhost "github.com/libp2p/go-libp2p/core/host"
Expand All @@ -14,14 +15,29 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autonat"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

var _ Module = (*API)(nil)

// ConnectionState holds information about a connection.
type ConnectionState struct {
Info network.ConnectionState
// NumStreams is the number of streams on the connection.
NumStreams int
// Direction specifies whether this is an inbound or an outbound connection.
Direction network.Direction
// Opened is the timestamp when this connection was opened.
Opened time.Time
// Limited indicates that this connection is Limited. It maybe limited by
// bytes or time. In practice, this is a connection formed over a circuit v2
// relay.
Limited bool
}

// Module represents all accessible methods related to the node's p2p
// host / operations.
//
//nolint:dupl
//go:generate mockgen -destination=mocks/api.go -package=mocks . Module
type Module interface {
// Info returns address information about the host.
Expand All @@ -39,6 +55,9 @@ type Module interface {
ClosePeer(ctx context.Context, id peer.ID) error
// Connectedness returns a state signaling connection capabilities.
Connectedness(ctx context.Context, id peer.ID) (network.Connectedness, error)
// ConnectionState returns information about each *active* connection to the peer.
// NOTE: At most cases there should be only a single connection.
ConnectionState(ctx context.Context, id peer.ID) ([]ConnectionState, error)
// NATStatus returns the current NAT status.
NATStatus(context.Context) (network.Reachability, error)

Expand Down Expand Up @@ -80,6 +99,9 @@ type Module interface {
PubSubPeers(ctx context.Context, topic string) ([]peer.ID, error)
// PubSubTopics reports current PubSubTopics the node participates in.
PubSubTopics(ctx context.Context) ([]string, error)

// Ping pings the selected peer and returns time it took or error.
Ping(ctx context.Context, peer peer.ID) (time.Duration, error)
}

// module contains all components necessary to access information and
Expand Down Expand Up @@ -205,9 +227,33 @@ func (m *module) PubSubTopics(_ context.Context) ([]string, error) {
return m.ps.GetTopics(), nil
}

func (m *module) Ping(ctx context.Context, peer peer.ID) (time.Duration, error) {
res := <-ping.Ping(ctx, m.host, peer) // context is handled for us
return res.RTT, res.Error
}

func (m *module) ConnectionState(_ context.Context, peer peer.ID) ([]ConnectionState, error) {
cons := m.host.Network().ConnsToPeer(peer)
if len(cons) == 0 {
return nil, fmt.Errorf("no connections to peer %s", peer)
}

conInfos := make([]ConnectionState, len(cons))
for i, con := range cons {
stat := con.Stat()
conInfos[i] = ConnectionState{
Info: con.ConnState(),
NumStreams: stat.NumStreams,
Direction: stat.Direction,
Opened: stat.Opened,
Limited: stat.Limited,
}
}

return conInfos, nil
}

// API is a wrapper around Module for the RPC.
//
//nolint:dupl
type API struct {
Internal struct {
Info func(context.Context) (peer.AddrInfo, error) `perm:"admin"`
Expand All @@ -229,6 +275,8 @@ type API struct {
ResourceState func(context.Context) (rcmgr.ResourceManagerStat, error) `perm:"admin"`
PubSubPeers func(ctx context.Context, topic string) ([]peer.ID, error) `perm:"admin"`
PubSubTopics func(ctx context.Context) ([]string, error) `perm:"admin"`
Ping func(ctx context.Context, peer peer.ID) (time.Duration, error) `perm:"admin"`
ConnectionState func(context.Context, peer.ID) ([]ConnectionState, error) `perm:"admin"`
}
}

Expand Down Expand Up @@ -307,3 +355,11 @@ func (api *API) PubSubPeers(ctx context.Context, topic string) ([]peer.ID, error
func (api *API) PubSubTopics(ctx context.Context) ([]string, error) {
return api.Internal.PubSubTopics(ctx)
}

func (api *API) Ping(ctx context.Context, peer peer.ID) (time.Duration, error) {
return api.Internal.Ping(ctx, peer)
}

func (api *API) ConnectionState(ctx context.Context, peer peer.ID) ([]ConnectionState, error) {
return api.Internal.ConnectionState(ctx, peer)
}
5 changes: 5 additions & 0 deletions nodebuilder/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func TestP2PModule_Host(t *testing.T) {

connectedness, err := mgr.Connectedness(ctx, peer.ID())
require.NoError(t, err)

infos, err := mgr.ConnectionState(ctx, peer.ID())
require.NoError(t, err)
require.GreaterOrEqual(t, len(infos), 1)

assert.Equal(t, host.Network().Connectedness(peer.ID()), connectedness)
// now disconnect using manager and check for connectedness match again
assert.NoError(t, mgr.ClosePeer(ctx, peer.ID()))
Expand Down
33 changes: 2 additions & 31 deletions pruner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ func TestFindPruneableHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime)
store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount)
suite := headertest.NewTestSuiteWithGenesisTime(t, tc.startTime, tc.blockTime)
store := headertest.NewCustomStore(t, suite, tc.headerAmount)

mp := &mockPruner{}

Expand Down Expand Up @@ -317,32 +317,3 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error {
mp.deletedHeaderHashes = append(mp.deletedHeaderHashes, pruned{hash: h.Hash().String(), height: h.Height()})
return nil
}

// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility.
// https://github.com/celestiaorg/celestia-node/issues/3278.
type SpacedHeaderGenerator struct {
t *testing.T
TimeBetweenHeaders time.Duration
currentTime time.Time
currentHeight int64
}

func NewSpacedHeaderGenerator(
t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration,
) *SpacedHeaderGenerator {
return &SpacedHeaderGenerator{
t: t,
TimeBetweenHeaders: timeBetweenHeaders,
currentTime: startTime,
currentHeight: 1,
}
}

func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader {
h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime)
h.RawHeader.Height = shg.currentHeight
h.RawHeader.Time = shg.currentTime
shg.currentHeight++
shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders)
return h
}

0 comments on commit 3f43d8e

Please sign in to comment.