P2p's network discovery protocol uses the Kademlia protocol to handle node discovery on the network. Node lookups and node updates. The Kademlia protocol uses the UDP protocol for network communication.
Read the code in this section. First look at the Kademlia protocol introduction in the references to see what the Kademlia protocol is.
First look at the data structure. The network transmitted 4 kinds of data packets (UDP protocol is a message-based protocol. One packet is transmitted), which are ping, pong, findnode and neighbors. The format of the four types of messages is defined below.
// RPC packet types
const (
pingPacket = iota + 1 // zero is 'reserved'
pongPacket
findnodePacket
neighborsPacket
)
// RPC request structures
type (
ping struct {
Version uint // protocol version
From, To rpcEndpoint // Source IP address and Destination IP address
Expiration uint64 // timeout
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// pong is the reply to ping.
pong struct {
// This field should mirror the UDP envelope address
// of the ping packet, which provides a way to discover the
// the external address (after NAT).
To rpcEndpoint
// Explain that this pong package responds to the ping package. Contains the hash value of the ping packet
ReplyTok []byte // This contains the hash of the ping packet.
Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// findnode is a query for nodes close to the given target.
findnode struct {
// Destination node
Target NodeID // doesn't need to be an actual public key
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// reply to findnode
neighbors struct {
// A node value that is closer to the target.
Nodes []rpcNode
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
rpcNode struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
ID NodeID
}
rpcEndpoint struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
}
)
Two interface types are defined, and the packet interface type should be assigned different handle methods for 4 different types of packages. The conn interface defines the functionality of a udp connection.
type packet interface {
handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
name() string
}
type conn interface {
ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
Close() error
LocalAddr() net.Addr
}
The structure of udp, it should be noted that the last field *Table is an anonymous field in go. In other words, udp can directly call the method of the anonymous field Table.
// udp implements the RPC protocol.
type udp struct {
conn conn // connection
netrestrict *netutil.Netlist
priv *ecdsa.PrivateKey // the private key of the node
ourEndpoint rpcEndpoint
addpending chan *pending // Used to apply for a pending
gotreply chan reply // The queue used to get the response
closing chan struct{}
nat nat.Interface
*Table
}
Pending and reply structures. The structure that communicates between the go routines inside the two structure users.
// pending represents a pending reply.
// some implementations of the protocol wish to send more than one
// reply packet to findnode. in general, any neighbors packet cannot
// be matched up with a specific findnode packet.
// our implementation handles this by storing a callback function for
// each pending reply. incoming packets from a node are dispatched
// to all the callback functions for that node.
type pending struct {
// these fields must match in the reply.
from NodeID
ptype byte
// time when the request must complete
deadline time.Time
// callback is called when a matching reply arrives. if it returns
// true, the callback is removed from the pending reply queue.
// if it returns false, the reply is considered incomplete and
// the callback will be invoked again for the next matching reply.
callback func(resp interface{}) (done bool)
// errc receives nil when the callback indicates completion or an
// error if no further reply is received within the timeout.
errc chan<- error
}
type reply struct {
from NodeID
ptype byte
data interface{}
// loop indicates whether there was
// a matching request by sending on this channel.
matched chan<- bool
}
UDP creation
// ListenUDP returns a new table that listens for UDP packets on laddr.
func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, error) {
addr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
tab, _, err := newUDP(priv, conn, natm, nodeDBPath, netrestrict)
if err != nil {
return nil, err
}
log.Info("UDP listener up", "self", tab.self)
return tab, nil
}
func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, *udp, error) {
udp := &udp{
conn: c,
priv: priv,
netrestrict: netrestrict,
closing: make(chan struct{}),
gotreply: make(chan reply),
addpending: make(chan *pending),
}
realaddr := c.LocalAddr().(*net.UDPAddr)
if natm != nil { // natm nat mapping Used to obtain the external network address
if !realaddr.IP.IsLoopback() { // If the address is a local loopback address
go nat.Map(natm, udp.closing, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
}
// TODO: react to external IP changes over time.
if ext, err := natm.ExternalIP(); err == nil {
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
// TODO: separate TCP port
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
// Creating a table will be introduced later. The main logic of Kademlia is implemented in this class.
tab, err := newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
if err != nil {
return nil, nil, err
}
udp.Table = tab // Assignment of anonymous fields
go udp.loop() //go routine
go udp.readLoop() // Used for network data reading.
return udp.Table, udp, nil
}
The ping method is handled with pending, before talking about pending is waiting for a reply. Here is a code to analyze how it is implemented waiting for reply.
The pending method sends the pending structure to addpending. It then waits for the message to be processed and received.
// ping sends a ping message to the given node and waits for a reply.
func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
// TODO: maybe check for ReplyTo field in callback to measure RTT
errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
t.send(toaddr, pingPacket, &ping{
Version: Version,
From: t.ourEndpoint,
To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
return <-errc
}
// pending adds a reply callback to the pending reply queue.
// see the documentation of type pending for a detailed explanation.
func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error {
ch := make(chan error, 1)
p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
select {
case t.addpending <- p:
// loop will handle it
case <-t.closing:
ch <- errClosed
}
return ch
}
Processing of the addpending message. The newUDP method was called when the udp was created. There are two goroutines launched inside. The loop() is used to process the pending message.
// loop runs in its own goroutine. it keeps track of
// the refresh timer and the pending reply queue.
func (t *udp) loop() {
var (
plist = list.New()
timeout = time.NewTimer(0)
nextTimeout *pending // head of plist when timeout was last reset
contTimeouts = 0 // number of continuous timeouts to do NTP checks
ntpWarnTime = time.Unix(0, 0)
)
<-timeout.C // ignore first timeout
defer timeout.Stop()
resetTimeout := func() {
// The main function of this method is to check whether there is a pending message in the queue that needs to time out. If there is. Then
// Wake up according to the timeout of the first timeout.
if plist.Front() == nil || nextTimeout == plist.Front().Value {
return
}
// Start the timer so it fires when the next pending reply has expired.
now := time.Now()
for el := plist.Front(); el != nil; el = el.Next() {
nextTimeout = el.Value.(*pending)
if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
timeout.Reset(dist)
return
}
// Remove pending replies whose deadline is too far in the
// future. These can occur if the system clock jumped
// backwards after the deadline was assigned.
nextTimeout.errc <- errClockWarp
plist.Remove(el)
}
nextTimeout = nil
timeout.Stop()
}
for {
resetTimeout() // First handle the timeout.
select {
case <-t.closing: // Received a close message. Timeout all blocked queues
for el := plist.Front(); el != nil; el = el.Next() {
el.Value.(*pending).errc <- errClosed
}
return
case p := <-t.addpending: // Add a pending setting to the deadline
p.deadline = time.Now().Add(respTimeout)
plist.PushBack(p)
case r := <-t.gotreply: // Received a reply to find a matching pending
var matched bool
for el := plist.Front(); el != nil; el = el.Next() {
p := el.Value.(*pending)
if p.from == r.from && p.ptype == r.ptype { // If from the same person. And the same type
matched = true
// Remove the matcher if its callback indicates
// that all replies have been received. This is
// required for packet types that expect multiple
// reply packets.
if p.callback(r.data) { // If the callback return value is true. Description pending has been completed. Write nil to p.errc. Pending completed.
p.errc <- nil
plist.Remove(el)
}
// Reset the continuous timeout counter (time drift detection)
contTimeouts = 0
}
}
r.matched <- matched // Write reply to match
case now := <-timeout.C: // Processing timeout information
nextTimeout = nil
// Notify and remove callbacks whose deadline is in the past.
for el := plist.Front(); el != nil; el = el.Next() {
p := el.Value.(*pending)
if now.After(p.deadline) || now.Equal(p.deadline) { // If the timeout is written to the timeout information and removed
p.errc <- errTimeout
plist.Remove(el)
contTimeouts++
}
}
// If we've accumulated too many timeouts, do an NTP time sync check
if contTimeouts > ntpFailureThreshold {
if time.Since(ntpWarnTime) >= ntpWarningCooldown {
ntpWarnTime = time.Now()
go checkClockDrift()
}
contTimeouts = 0
}
}
}
}
I saw the processing of pending above. However, the loop() method also has a handleply handler. This is actually generated in the goroutine of readLoop().
// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop() {
defer t.conn.Close()
// Discovery packets are defined to be no larger than 1280 bytes.
// Packets larger than this size will be cut at the end and treated
// as invalid because their hash won't match.
buf := make([]byte, 1280)
for {
nbytes, from, err := t.conn.ReadFromUDP(buf)
if netutil.IsTemporaryError(err) {
// Ignore temporary read errors.
log.Debug("Temporary UDP read error", "err", err)
continue
} else if err != nil {
// Shut down the loop for permament errors.
log.Debug("UDP read error", "err", err)
return
}
t.handlePacket(from, buf[:nbytes])
}
}
func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
packet, fromID, hash, err := decodePacket(buf)
if err != nil {
log.Debug("Bad discv4 packet", "addr", from, "err", err)
return err
}
err = packet.handle(t, from, fromID, hash)
log.Trace("<< "+packet.name(), "addr", from, "err", err)
return err
}
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
t.send(from, pongPacket, &pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
if !t.handleReply(fromID, pingPacket, req) {
// Note: we're ignoring the provided IP address right now
go t.bond(true, fromID, from, req.From.TCP)
}
return nil
}
func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
matched := make(chan bool, 1)
select {
case t.gotreply <- reply{from, ptype, req, matched}:
// loop will handle it
return <-matched
case <-t.closing:
return false
}
}
The general process of udp is described above. The following describes the main processing business of udp. Udp mainly sends two kinds of requests, and the corresponding ones also receive the two kinds of requests sent by others, and two kinds of responses are generated corresponding to the two kinds of requests.
Ping the request, you can see that the ping request wants a pong answer. Then return.
// ping sends a ping message to the given node and waits for a reply.
func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
// TODO: maybe check for ReplyTo field in callback to measure RTT
errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
t.send(toaddr, pingPacket, &ping{
Version: Version,
From: t.ourEndpoint,
To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
return <-errc
}
Pong replied that if the pong answer does not match a corresponding ping request. Then return the errUnsolicitedReply exception.
func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
if !t.handleReply(fromID, pongPacket, req) {
return errUnsolicitedReply
}
return nil
}
Findnode request, send findnode request, and then wait for node to respond to k neighbors.
// findnode sends a findnode request to the given node and waits until
// the node has sent up to k neighbors.
func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
nodes := make([]*Node, 0, bucketSize)
nreceived := 0
errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
reply := r.(*neighbors)
for _, rn := range reply.Nodes {
nreceived++
n, err := t.nodeFromRPC(toaddr, rn)
if err != nil {
log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
continue
}
nodes = append(nodes, n)
}
return nreceived >= bucketSize
})
t.send(toaddr, findnodePacket, &findnode{
Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
err := <-errc
return nodes, err
}
Neighbors respond, very simple. Send the response to the gotreply queue. If no matching findnode request is found. Return errUnsolicitedReply error
func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
if !t.handleReply(fromID, neighborsPacket, req) {
return errUnsolicitedReply
}
return nil
}
Receive a ping request from another node and send a pong answer. If the previous pending is not matched (indicating that it is not the result of the request). The bond method is called to add this node to its own bucket cache. (This part of the principle will be described in detail in table.go)
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
t.send(from, pongPacket, &pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
if !t.handleReply(fromID, pingPacket, req) {
// Note: we're ignoring the provided IP address right now
go t.bond(true, fromID, from, req.From.TCP)
}
return nil
}
Received a findnode request from someone else. This request wants to send back k nodes that are close to the target distance. For details on the algorithm, please refer to the pdf documentation in the references directory.
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
if t.db.node(fromID) == nil {
// No bond exists, we don't process the packet. This prevents
// an attack vector where the discovery protocol could be used
// to amplify traffic in a DDOS attack. A malicious actor
// would send a findnode request with the IP address and UDP
// port of the target as the source address. The recipient of
// the findnode packet would then send a neighbors packet
// (which is a much bigger packet than findnode) to the victim.
return errUnknownNode
}
target := crypto.Keccak256Hash(req.Target[:])
t.mutex.Lock()
// Get the bucketSize nodes that are close to the target. This method is implemented inside table.go. Follow-up details
closest := t.closest(target, bucketSize).entries
t.mutex.Unlock()
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the 1280 byte limit.
for i, n := range closest {
if netutil.CheckRelayIP(from.IP, n.IP) != nil {
continue
}
p.Nodes = append(p.Nodes, nodeToRPC(n))
if len(p.Nodes) == maxNeighbors || i == len(closest)-1 {
t.send(from, neighborsPacket, &p)
p.Nodes = p.Nodes[:0]
}
}
return nil
}
The discover protocol does not carry any sensitive data, so the data is transmitted in clear text, but in order to ensure the integrity of the data and not to be tampered with, the digital signature of the packet header is added.
func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, error) {
b := new(bytes.Buffer)
b.Write(headSpace)
b.WriteByte(ptype)
if err := rlp.Encode(b, req); err != nil {
log.Error("Can't encode discv4 packet", "err", err)
return nil, err
}
packet := b.Bytes()
sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv)
if err != nil {
log.Error("Can't sign discv4 packet", "err", err)
return nil, err
}
copy(packet[macSize:], sig)
// add the hash to the front. Note: this doesn't protect the
// packet in any way. Our public key will be part of this hash in
// The future.
copy(packet, crypto.Keccak256(packet[macSize:]))
return packet, nil
}
func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
if len(buf) < headSize+1 {
return nil, NodeID{}, nil, errPacketTooSmall
}
hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
shouldhash := crypto.Keccak256(buf[macSize:])
if !bytes.Equal(hash, shouldhash) {
return nil, NodeID{}, nil, errBadHash
}
fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig)
if err != nil {
return nil, NodeID{}, hash, err
}
var req packet
switch ptype := sigdata[0]; ptype {
case pingPacket:
req = new(ping)
case pongPacket:
req = new(pong)
case findnodePacket:
req = new(findnode)
case neighborsPacket:
req = new(neighbors)
default:
return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
}
s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0)
err = s.Decode(req)
return req, fromID, hash, err
}