diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index a3541df7fc35..223ec5acab77 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -755,17 +755,20 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) { } // generate 50 txs - hashMap, _, err := generateTxs(s, 50) + _, txs, err := generateTxs(s, 50) if err != nil { t.Fatalf("failed to generate transactions: %v", err) } // create new pooled tx hashes announcement - hashes := make([]common.Hash, 0) - for _, hash := range hashMap { - hashes = append(hashes, hash) + hashes := make([]common.Hash, len(txs)) + types := make([]byte, len(txs)) + sizes := make([]uint32, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + types[i] = tx.Type() + sizes[i] = uint32(tx.Size()) } - announce := NewPooledTransactionHashes(hashes) // send announcement conn, err := s.dial66() @@ -776,6 +779,14 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) { if err = conn.peer(s.chain, nil); err != nil { t.Fatalf("peering failed: %v", err) } + + var announce Message + if conn.negotiatedProtoVersion == eth.ETH68 { + announce = NewPooledTransactionHashes68{Types: types} + } else { + announce = NewPooledTransactionHashes(hashes) + } + if err = conn.Write(announce); err != nil { t.Fatalf("failed to write to connection: %v", err) } @@ -792,6 +803,8 @@ func (s *Suite) TestNewPooledTxs66(t *utesting.T) { // ignore propagated txs from previous tests case *NewPooledTransactionHashes: continue + case *NewPooledTransactionHashes68: + continue // ignore block announcements from previous tests case *NewBlockHashes: continue diff --git a/cmd/devp2p/internal/ethtest/types.go b/cmd/devp2p/internal/ethtest/types.go index e92b54394067..becb1b5c5bcf 100644 --- a/cmd/devp2p/internal/ethtest/types.go +++ b/cmd/devp2p/internal/ethtest/types.go @@ -116,6 +116,10 @@ type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket func (nb NewPooledTransactionHashes) Code() int { return 24 } +type NewPooledTransactionHashes68 eth.NewPooledTransactionHashesPacket68 + +func (nb NewPooledTransactionHashes68) Code() int { return 24 } + type GetPooledTransactions eth.GetPooledTransactionsPacket func (gpt GetPooledTransactions) Code() int { return 25 } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 12e91ec7f534..63568c0c3959 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -70,6 +70,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewPooledTransactionHashesPacket: return h.txFetcher.Notify(peer.ID(), *packet) + case *eth.NewPooledTransactionHashesPacket68: + return h.txFetcher.Notify(peer.ID(), packet.Hashes) + case *eth.TransactionsPacket: return h.txFetcher.Enqueue(peer.ID(), *packet, false) diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index e002160fbcef..ad7193f558c2 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -82,6 +82,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // fork IDs in the protocol handshake. func TestForkIDSplit65(t *testing.T) { testForkIDSplit(t, eth.ETH65) } func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) } +func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) } func testForkIDSplit(t *testing.T, protocol uint) { t.Parallel() @@ -240,6 +241,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { // Tests that received transactions are added to the local pool. func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, eth.ETH65) } func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) } +func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) } func testRecvTransactions(t *testing.T, protocol uint) { t.Parallel() @@ -297,6 +299,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { // This test checks that pending transactions are sent. func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) } +func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) } func testSendTransactions(t *testing.T, protocol uint) { t.Parallel() @@ -383,6 +386,7 @@ func testSendTransactions(t *testing.T, protocol uint) { // broadcasts or via announcements/retrievals. func TestTransactionPropagation65(t *testing.T) { testTransactionPropagation(t, eth.ETH65) } func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) } +func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) } func testTransactionPropagation(t *testing.T, protocol uint) { t.Parallel() @@ -690,6 +694,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // with the hashes in the header) gets discarded and not broadcast forward. func TestBroadcastMalformedBlock65(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH65) } func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) } +func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) } func testBroadcastMalformedBlock(t *testing.T, protocol uint) { t.Parallel() diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 09330cfdf320..8febd1c611d2 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() { if done == nil && len(queue) > 0 { // Pile transaction hashes until we reach our allowed network limit var ( - count int - pending []common.Hash - size common.StorageSize + count int + pending []common.Hash + pendingTypes []byte + pendingSizes []uint32 + size common.StorageSize ) for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { - if p.txpool.Get(queue[count]) != nil { + if tx := p.txpool.Get(queue[count]); tx != nil { pending = append(pending, queue[count]) + pendingTypes = append(pendingTypes, tx.Type()) + pendingSizes = append(pendingSizes, uint32(tx.Size())) size += common.HashLength } } @@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() { if len(pending) > 0 { done = make(chan struct{}) go func() { - if err := p.sendPooledTransactionHashes(pending); err != nil { - fail <- err - return + if p.version >= ETH68 { + if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { + fail <- err + return + } + } else { + if err := p.sendPooledTransactionHashes(pending); err != nil { + fail <- err + return + } } close(done) p.Log().Trace("Sent transaction announcements", "count", len(pending)) diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 27425ad16700..73fd3d044375 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -216,6 +216,21 @@ var eth66 = map[uint64]msgHandler{ TransactionsExMsg: handleTransactionsEx, } +var eth68 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68, + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, +} + // handleMessage is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. func handleMessage(backend Backend, peer *Peer) error { @@ -230,9 +245,12 @@ func handleMessage(backend Backend, peer *Peer) error { defer msg.Discard() var handlers = eth65 - if peer.Version() >= ETH66 { + if peer.Version() == ETH66 { handlers = eth66 } + if peer.Version() == ETH68 { + handlers = eth68 + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index c3d189b95156..8fc9da487748 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -110,6 +110,7 @@ func (b *testBackend) Handle(*Peer, Packet) error { // Tests that block headers can be retrieved from a remote chain based on user queries. func TestGetBlockHeaders65(t *testing.T) { testGetBlockHeaders(t, ETH65) } func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) } +func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) } func testGetBlockHeaders(t *testing.T, protocol uint) { t.Parallel() @@ -312,6 +313,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { // Tests that block contents can be retrieved from a remote chain based on their hashes. func TestGetBlockBodies65(t *testing.T) { testGetBlockBodies(t, ETH65) } func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) } +func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) } func testGetBlockBodies(t *testing.T, protocol uint) { t.Parallel() @@ -403,6 +405,7 @@ func testGetBlockBodies(t *testing.T, protocol uint) { // Tests that the state trie nodes can be retrieved based on hashes. func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65) } func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) } +func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68) } func testGetNodeData(t *testing.T, protocol uint) { t.Parallel() @@ -524,6 +527,7 @@ func testGetNodeData(t *testing.T, protocol uint) { // Tests that the transaction receipts can be retrieved based on hashes. func TestGetBlockReceipts65(t *testing.T) { testGetBlockReceipts(t, ETH65) } func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) } +func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) } func testGetBlockReceipts(t *testing.T, protocol uint) { t.Parallel() diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 21ceb2a84be4..dc30835201c1 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -798,6 +798,33 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) return nil } +func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error { + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + ann := new(NewPooledTransactionHashesPacket68) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) { + return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes)) + } + f := func() error { + // Schedule all the unknown hashes for retrieval + for _, hash := range ann.Hashes { + peer.markTransaction(hash) + } + return backend.Handle(peer, ann) + } + if params.ConsensusMethod == params.ConsensusPoW { + return f() + } + go f() + return nil +} + func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error { // Decode the pooled transactions retrieval message var query GetPooledTransactionsPacket diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 8263fef4b60f..f521f1586bcd 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -227,6 +227,12 @@ func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error { return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes)) } +func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + p.knownTxs.Add(hashes...) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes}) +} + // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually // announce to a remote peer. The number of pending sends are capped (new ones // will force old sends to be dropped) diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 40cad40301c2..aac1d505b3c1 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -33,6 +33,7 @@ import ( const ( ETH65 = 65 ETH66 = 66 + ETH68 = 68 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -41,11 +42,11 @@ const ProtocolName = "mir" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH66, ETH65} +var ProtocolVersions = []uint{ETH68, ETH66, ETH65} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH66: 23, ETH65: 23} +var protocolLengths = map[uint]uint64{ETH66: 23, ETH65: 23, ETH68: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 100 * 1024 * 1024 @@ -316,6 +317,13 @@ type ReceiptsRLPPacket66 struct { // NewPooledTransactionHashesPacket represents a transaction announcement packet. type NewPooledTransactionHashesPacket []common.Hash +// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer. +type NewPooledTransactionHashesPacket68 struct { + Types []byte + Sizes []uint32 + Hashes []common.Hash +} + // GetPooledTransactionsPacket represents a transaction query. type GetPooledTransactionsPacket []common.Hash @@ -421,6 +429,9 @@ func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg } func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" } func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } + func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" } func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }