From 701d0b7ca9a3cedb30b9d883bb73e5ec1352e1aa Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Tue, 28 Nov 2023 10:28:56 +0100 Subject: [PATCH 1/2] Adding more filters --- mino/option.go | 28 ++++++++++++++++++++++++++++ mino/option_test.go | 34 +++++++++++++++++++++++++++++----- 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/mino/option.go b/mino/option.go index cc9858964..01a1a9d24 100644 --- a/mino/option.go +++ b/mino/option.go @@ -5,6 +5,7 @@ package mino import ( + "math/rand" "sort" ) @@ -68,6 +69,19 @@ func IndexFilter(index int) FilterUpdater { } } +// RejectFilter removes the given index +func RejectFilter(index int) FilterUpdater { + return func(filters *Filter) { + arr := filters.Indices + i := sort.IntSlice(arr).Search(index) + // do nothing if the element is not there + if i == len(arr) || arr[i] != index { + return + } + filters.Indices = append(filters.Indices[0:i], filters.Indices[i+1:]...) + } +} + // RangeFilter is a filter to include a range of indices. func RangeFilter(start, end int) FilterUpdater { return func(filters *Filter) { @@ -95,3 +109,17 @@ func ListFilter(indices []int) FilterUpdater { filters.Indices = indices } } + +// RandomFilter chooses 'count' random elements. +func RandomFilter(count int) FilterUpdater { + return func(filters *Filter) { + if len(filters.Indices) >= count { + return + } + rand.Shuffle(len(filters.Indices), + func(i, j int) { + filters.Indices[i], filters.Indices[j] = filters.Indices[j], filters.Indices[i] + }) + filters.Indices = filters.Indices[:count] + } +} diff --git a/mino/option_test.go b/mino/option_test.go index 0c1b63e8a..0b10c5776 100644 --- a/mino/option_test.go +++ b/mino/option_test.go @@ -33,20 +33,44 @@ func TestFilter_RotateFilter(t *testing.T) { func TestFilter_IndexFilter(t *testing.T) { filters := &Filter{Indices: []int{}} - IndexFilter(1)(filters) - require.Equal(t, filters.Indices, []int{1}) + require.Equal(t, []int{1}, filters.Indices) + + IndexFilter(2)(filters) + require.Equal(t, []int{1, 2}, filters.Indices) IndexFilter(2)(filters) - require.Equal(t, filters.Indices, []int{1, 2}) + require.Equal(t, []int{1, 2}, filters.Indices) IndexFilter(0)(filters) - require.Equal(t, filters.Indices, []int{0, 1, 2}) + require.Equal(t, []int{0, 1, 2}, filters.Indices) IndexFilter(0)(filters) IndexFilter(1)(filters) IndexFilter(2)(filters) - require.Equal(t, filters.Indices, []int{0, 1, 2}) + require.Equal(t, []int{0, 1, 2}, filters.Indices) +} + +func TestFilter_RejectFilter(t *testing.T) { + filters := &Filter{Indices: []int{1, 2, 3, 4}} + + testCases := []struct { + filterVal int + expected []int + }{ + {0, []int{1, 2, 3, 4}}, + {5, []int{1, 2, 3, 4}}, + {2, []int{1, 3, 4}}, + {1, []int{3, 4}}, + {4, []int{3}}, + {4, []int{3}}, + {3, []int{}}, + } + + for _, tc := range testCases { + RejectFilter(tc.filterVal)(filters) + require.Equal(t, tc.expected, filters.Indices) + } } func TestFilter_RangeFilter(t *testing.T) { From e6318f6c7fcbd7a62b266ff0db8e93a3a5e42749 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Tue, 28 Nov 2023 10:31:57 +0100 Subject: [PATCH 2/2] Adding a new synchronisation protocol This commit adds a faster synchronisation protocol which only gets called if a node thinks it's out of sync with the rest of the chain. The messages passed back and forth are kept minimal to avoid using too much bandwidth. The cosipbft can now be run either with the old blocksync, or using fastsync. --- core/ordering/cosipbft/blocksync/default.go | 8 +- .../cosipbft/controller/controller.go | 2 +- core/ordering/cosipbft/cosipbft.go | 60 +++- core/ordering/cosipbft/cosipbft_test.go | 21 +- core/ordering/cosipbft/fastsync/default.go | 314 ++++++++++++++++++ .../cosipbft/fastsync/default_test.go | 162 +++++++++ core/ordering/cosipbft/fastsync/fastsync.go | 48 +++ core/ordering/cosipbft/fastsync/json/json.go | 114 +++++++ .../cosipbft/fastsync/json/json_test.go | 76 +++++ .../ordering/cosipbft/fastsync/types/types.go | 124 +++++++ .../cosipbft/fastsync/types/types_test.go | 90 +++++ core/ordering/cosipbft/proc.go | 83 ++++- core/ordering/cosipbft/proc_test.go | 2 +- core/store/hashtree/binprefix/disk.go | 7 +- mino/minoch/mod.go | 3 +- mino/minoch/rpc.go | 2 +- mino/option.go | 2 +- serde/json/json.go | 1 + 18 files changed, 1073 insertions(+), 46 deletions(-) create mode 100644 core/ordering/cosipbft/fastsync/default.go create mode 100644 core/ordering/cosipbft/fastsync/default_test.go create mode 100644 core/ordering/cosipbft/fastsync/fastsync.go create mode 100644 core/ordering/cosipbft/fastsync/json/json.go create mode 100644 core/ordering/cosipbft/fastsync/json/json_test.go create mode 100644 core/ordering/cosipbft/fastsync/types/types.go create mode 100644 core/ordering/cosipbft/fastsync/types/types_test.go diff --git a/core/ordering/cosipbft/blocksync/default.go b/core/ordering/cosipbft/blocksync/default.go index 7e9936421..e4ffec073 100644 --- a/core/ordering/cosipbft/blocksync/default.go +++ b/core/ordering/cosipbft/blocksync/default.go @@ -296,8 +296,10 @@ func (h *handler) Stream(out mino.Sender, in mino.Receiver) error { return h.ack(out, orch) } -func (h *handler) waitAnnounce(ctx context.Context, - in mino.Receiver) (*types.SyncMessage, mino.Address, error) { +func (h *handler) waitAnnounce( + ctx context.Context, + in mino.Receiver, +) (*types.SyncMessage, mino.Address, error) { for { orch, msg, err := in.Recv(ctx) @@ -326,7 +328,7 @@ func (h *handler) ack(out mino.Sender, orch mino.Address) error { } func iter2arr(iter mino.AddressIterator) []mino.Address { - addrs := []mino.Address{} + var addrs []mino.Address for iter.HasNext() { addrs = append(addrs, iter.GetNext()) } diff --git a/core/ordering/cosipbft/controller/controller.go b/core/ordering/cosipbft/controller/controller.go index a81184d5f..90d0b8d16 100644 --- a/core/ordering/cosipbft/controller/controller.go +++ b/core/ordering/cosipbft/controller/controller.go @@ -176,7 +176,7 @@ func (m miniController) OnStart(flags cli.Flags, inj node.Injector) error { } srvc, err := cosipbft.NewService(param, cosipbft.WithGenesisStore(genstore), - cosipbft.WithBlockStore(blocks)) + cosipbft.WithBlockStore(blocks), cosipbft.WithFastSync()) if err != nil { return xerrors.Errorf("service: %v", err) } diff --git a/core/ordering/cosipbft/cosipbft.go b/core/ordering/cosipbft/cosipbft.go index f11eeba43..955b61168 100644 --- a/core/ordering/cosipbft/cosipbft.go +++ b/core/ordering/cosipbft/cosipbft.go @@ -43,6 +43,7 @@ import ( "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" "go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync" "go.dedis.ch/dela/core/ordering/cosipbft/pbft" "go.dedis.ch/dela/core/ordering/cosipbft/types" "go.dedis.ch/dela/core/store" @@ -80,6 +81,9 @@ const ( // RoundMaxWait is the maximum amount for the backoff. RoundMaxWait = 5 * time.Minute + // DefaultFastSyncMessageSize defines when a fast sync message will be split. + DefaultFastSyncMessageSize = 1e6 + rpcName = "cosipbft" ) @@ -115,9 +119,10 @@ type Service struct { } type serviceTemplate struct { - hashFac crypto.HashFactory - blocks blockstore.BlockStore - genesis blockstore.GenesisStore + hashFac crypto.HashFactory + blocks blockstore.BlockStore + genesis blockstore.GenesisStore + fastSync bool } // ServiceOption is the type of option to set some fields of the service. @@ -144,6 +149,13 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption { } } +// WithFastSync enables the new syncing algorithm in the cosipbft module. +func WithFastSync() ServiceOption { + return func(tmpl *serviceTemplate) { + tmpl.fastSync = true + } +} + // ServiceParam is the different components to provide to the service. All the // fields are mandatory and it will panic if any is nil. type ServiceParam struct { @@ -190,6 +202,7 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro proc.tree = blockstore.NewTreeCache(param.Tree) proc.access = param.Access proc.logger = dela.Logger.With().Str("addr", param.Mino.GetAddress().String()).Logger() + proc.fastsync = tmpl.fastSync pcparam := pbft.StateMachineParam{ Logger: proc.logger, @@ -220,10 +233,11 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro ChainFactory: chainFac, VerifierFactory: param.Cosi.GetVerifierFactory(), } - - bs := blocksync.NewSynchronizer(syncparam) - - proc.sync = bs + if proc.fastsync { + proc.fsync = fastsync.NewSynchronizer(syncparam) + } else { + proc.bsync = blocksync.NewSynchronizer(syncparam) + } fac := types.NewMessageFactory( types.NewGenesisFactory(proc.rosterFac), @@ -275,6 +289,20 @@ func NewServiceStart(s *Service) { go s.watchBlocks() if s.genesis.Exists() { + if s.fastsync { + ctx, done := context.WithCancel(context.Background()) + roster, err := s.readRoster(s.tree.Get()) + if err != nil { + panic("couldn't get roster of latest block: " + err.Error()) + } + err = s.fsync.Sync(ctx, roster, + fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize}) + if err != nil { + s.logger.Warn().Msgf("while syncing with other nodes: %+v", err) + } + done() + } + // If the genesis already exists, the service can start right away to // participate in the chain. close(s.started) @@ -541,17 +569,21 @@ func (s *Service) doLeaderRound( s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started") - // Send a synchronization to the roster so that they can learn about the - // latest block of the chain. - err := s.sync.Sync(ctx, roster, - blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())}) - if err != nil { - return xerrors.Errorf("sync failed: %v", err) + // When using blocksync, the updates are sent before every new block, which + // uses a lot of bandwidth if there are more than just a few blocks. + if !s.fastsync { + // Send a synchronization to the roster so that they can learn about the + // latest block of the chain. + err := s.bsync.Sync(ctx, roster, + blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())}) + if err != nil { + return xerrors.Errorf("sync failed: %v", err) + } } s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started") - err = s.doPBFT(ctx) + err := s.doPBFT(ctx) if err != nil { return xerrors.Errorf("pbft failed: %v", err) } diff --git a/core/ordering/cosipbft/cosipbft_test.go b/core/ordering/cosipbft/cosipbft_test.go index 616983f00..bb0299c55 100644 --- a/core/ordering/cosipbft/cosipbft_test.go +++ b/core/ordering/cosipbft/cosipbft_test.go @@ -43,14 +43,25 @@ import ( "go.dedis.ch/dela/testing/fake" ) +func TestService_Scenario_Basic_Blocksync(t *testing.T) { + testserviceScenarioBasic(t, false) +} +func TestService_Scenario_Basic_Fastsync(t *testing.T) { + testserviceScenarioBasic(t, true) +} + // This test is known to be VERY flaky on Windows. // Further investigation is needed. -func TestService_Scenario_Basic(t *testing.T) { +func testserviceScenarioBasic(t *testing.T, fastSync bool) { if testing.Short() { t.Skip("Skipping flaky test") } - nodes, ro, clean := makeAuthority(t, 5) + var opts []ServiceOption + if fastSync { + opts = append(opts, WithFastSync()) + } + nodes, ro, clean := makeAuthority(t, 5, opts...) defer clean() signer := nodes[0].signer @@ -450,7 +461,7 @@ func TestService_DoRound(t *testing.T) { closing: make(chan struct{}), } srvc.blocks = blockstore.NewInMemory() - srvc.sync = fakeSync{} + srvc.bsync = fakeSync{} srvc.pool = mem.NewPool() srvc.tree = blockstore.NewTreeCache(fakeTree{}) srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) @@ -618,7 +629,7 @@ func TestService_FailSync_DoRound(t *testing.T) { srvc.tree = blockstore.NewTreeCache(fakeTree{}) srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) srvc.pbftsm = fakeSM{} - srvc.sync = fakeSync{err: fake.GetError()} + srvc.bsync = fakeSync{err: fake.GetError()} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -641,7 +652,7 @@ func TestService_FailPBFT_DoRound(t *testing.T) { srvc.tree = blockstore.NewTreeCache(fakeTree{}) srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) srvc.pbftsm = fakeSM{} - srvc.sync = fakeSync{} + srvc.bsync = fakeSync{} require.NoError(t, srvc.pool.Add(makeTx(t, 0, fake.NewSigner()))) diff --git a/core/ordering/cosipbft/fastsync/default.go b/core/ordering/cosipbft/fastsync/default.go new file mode 100644 index 000000000..7758e69fd --- /dev/null +++ b/core/ordering/cosipbft/fastsync/default.go @@ -0,0 +1,314 @@ +package fastsync + +import ( + "context" + "io" + "sync" + "time" + + "github.com/rs/zerolog" + "go.dedis.ch/dela" + "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" + "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/types" + "go.dedis.ch/dela/core/ordering/cosipbft/pbft" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/crypto" + "go.dedis.ch/dela/internal/tracing" + "go.dedis.ch/dela/mino" + "go.dedis.ch/dela/serde/json" + "golang.org/x/xerrors" +) + +var timeoutSync = 20 * time.Second +var protocolName = "fastsync" + +// fastSync is a block synchronizer which quickly catches up to the +// latest block. +// +// - implements fastsync.Synchronizer +type fastSync struct { + logger zerolog.Logger + rpc mino.RPC + pbftsm pbft.StateMachine + blocks blockstore.BlockStore + + Mino mino.Mino + + latest *uint64 + catchUpLock *sync.Mutex + + // This is for debugging + syncMessages *int +} + +// NewSynchronizer creates a new block synchronizer. +func NewSynchronizer(param blocksync.SyncParam) Synchronizer { + latest := param.Blocks.Len() + + logger := dela.Logger.With().Str("addr", param.Mino.GetAddress().String()).Logger() + + h := &handler{ + latest: &latest, + catchUpLock: new(sync.Mutex), + logger: logger, + genesis: param.Genesis, + blocks: param.Blocks, + pbftsm: param.PBFT, + verifierFac: param.VerifierFactory, + } + + fac := types.NewMessageFactory(param.LinkFactory) + + s := fastSync{ + logger: logger, + rpc: mino.MustCreateRPC(param.Mino, "fastsync", h, fac), + pbftsm: param.PBFT, + blocks: param.Blocks, + latest: &latest, + catchUpLock: h.catchUpLock, + Mino: param.Mino, + } + + return s +} + +// Sync implements fastsync.Synchronizer. +// It asks the other nodes what their latest block is, and then chooses some +// nodes randomly to request catching up the missing blocks. +func (s fastSync) Sync(ctx context.Context, players mino.Players, config Config) error { + if players.Len() == 0 { + return xerrors.Errorf("need at least 1 node to contact") + } + ctx = context.WithValue(ctx, tracing.ProtocolKey, protocolName) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(timeoutSync)) + defer cancel() + + // Make sure that the address of this node is at the beginning of the list. + addresses := []mino.Address{s.Mino.GetAddress()} + for iter := players.AddressIterator(); iter.HasNext(); { + addr := iter.GetNext() + if !s.Mino.GetAddress().Equal(addr) { + addresses = append(addresses, addr) + } + } + players = mino.NewAddresses(addresses...) + sender, rcvr, err := s.rpc.Stream(ctx, players) + if err != nil { + return xerrors.Errorf("stream failed: %v", err) + } + + // Send a catchup-request to f+1 nodes with our latest known block, + // but not to this node. + // This should be enough, because the protocol supposes there are only + // f byzantine nodes, so this should contact at least one healthy node. + f := (players.Len() - 1) / 3 + nodes := players.Take(mino.RangeFilter(1, players.Len()), + mino.RandomFilter(f+1)) + + // Send the request as many times as needed, because with a + // SplitMessageSize < size(all missing blocks), multiple requests are + // needed. + blockCount := s.blocks.Len() + for { + more, err := s.requestSync(ctx, sender, rcvr, nodes, config.SplitMessageSize) + if err != nil { + return xerrors.Errorf("error while requesting sync updates: %v", err) + } + if s.syncMessages != nil { + *s.syncMessages += 1 + } + + if !more { + break + } else if blockCount == s.blocks.Len() { + s.logger.Warn().Msgf("one of the nodes returned it has more blocks, "+ + "but didn't deliver: %v", nodes) + break + } + + blockCount = s.blocks.Len() + } + + return nil +} + +// requestSync asks all 'nodes' to send their block updates. +// The return is a boolean indicating whether at least one node indicated +// there are more blocks. +// This might be wrong, as there is no check whether the sending node is +// byzantine or not. +func (s fastSync) requestSync( + ctx context.Context, + sender mino.Sender, + rcvr mino.Receiver, + nodes mino.Players, + sms uint64, +) (bool, error) { + // Send the messages to all nodes + s.logger.Debug().Msgf("Sending catchup req to %+v", nodes) + errs := sender.Send(types.NewRequestCatchupMessage(sms, s.blocks.Len()), + iter2arr(nodes.AddressIterator())...) + if len(errs) == nodes.Len() { + return false, xerrors.Errorf("contacted %d nodes, but all failed: %v", len(errs), errs) + } + for err := range errs { + if err != nil { + s.logger.Warn().Err(err).Msgf("announcement failed to one node") + } + } + + // Wait for all replies, supposing that there are no more than f nodes + // not replying or replying with wrong blocks. + + replies := make(map[string]struct{}) + moreBlocks := false + for len(replies) < nodes.Len() { + s.logger.Debug().Msgf("Waiting for replies: %d < %d", len(replies), nodes.Len()) + from, msg, err := rcvr.Recv(ctx) + if err == context.Canceled || err == context.DeadlineExceeded || err == io.EOF { + return moreBlocks, nil + } + if err != nil { + s.logger.Debug().Err(err).Msg("sync finished with error") + return false, nil + } + + _, received := replies[from.String()] + if received { + s.logger.Warn().Msgf("received two fastsync messages from %s", from) + continue + } + + catchup, ok := msg.(types.CatchupMessage) + if ok { + s.logger.Trace().Msgf("Got %d blocks from %v", + len(catchup.GetBlockLinks()), from) + + replies[from.String()] = struct{}{} + moreBlocks = moreBlocks || catchup.GetSplitMessage() + + for _, bl := range catchup.GetBlockLinks() { + if bl.GetBlock().GetIndex() >= s.blocks.Len() { + err := s.pbftsm.CatchUp(bl) + if err != nil { + s.logger.Warn().Err(err).Msg("while using block to catchup") + } + } + } + } + } + + return moreBlocks, nil +} + +// handler is a Mino handler for the synchronization messages. +// +// - implements mino.Handler +type handler struct { + mino.UnsupportedHandler + + latest *uint64 + catchUpLock *sync.Mutex + + logger zerolog.Logger + blocks blockstore.BlockStore + genesis blockstore.GenesisStore + pbftsm pbft.StateMachine + verifierFac crypto.VerifierFactory +} + +// Stream implements mino.Handler. It waits for a request message and then +// replies with eventually missing BlockLinks of the requester. +func (h *handler) Stream(out mino.Sender, in mino.Receiver) error { + h.logger.Debug().Msg("Starting stream") + ctx := context.Background() + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(timeoutSync)) + defer cancel() + + for sentAllBlocks := false; !sentAllBlocks; { + m, orch, err := h.waitRequest(ctx, in) + if err != nil { + return xerrors.Errorf("no request: %v", err) + } + + blReply, err := h.getBlocks(m) + if err != nil { + return xerrors.Errorf("creating blocks to send failed: %v", err) + } + + sentAllBlocks = m.GetLatest()+uint64(len(blReply)) == h.blocks.Len() + err = <-out.Send(types.NewCatchupMessage( + !sentAllBlocks, blReply), orch) + if err != nil { + return xerrors.Errorf("sending request failed: %v", err) + } + } + + h.logger.Debug().Msg("done sending catchup blocks") + + return nil +} + +// getBlocks creates a reply that will only overflow the given message-size +// budget by at most one block. +func (h *handler) getBlocks(m *types.RequestCatchupMessage) ([]otypes.BlockLink, error) { + var blReply []otypes.BlockLink + + msgSize := uint64(0) + if h.blocks.Len() > m.GetLatest() { + for index := m.GetLatest(); index < h.blocks.Len(); index++ { + bl, err := h.blocks.GetByIndex(index) + if err != nil { + return blReply, xerrors.Errorf("failed to get block with index %d", index) + } + blReply = append(blReply, bl) + b, err := bl.Serialize(json.NewContext()) + if err != nil { + return blReply, xerrors.Errorf("failed to serialize block %d: %v", index, err) + } + msgSize += uint64(len(b)) + + if m.GetSplitMessageSize() > 0 && msgSize >= m.GetSplitMessageSize() { + h.logger.Debug().Msgf("splitting message because size %d >= %d", + msgSize, m.GetSplitMessageSize()) + break + } + } + h.logger.Debug().Msgf("Sending blocks %d..%d", m.GetLatest(), + m.GetLatest()+uint64(len(blReply))-1) + } else { + h.logger.Debug().Msgf("No new blocks to send") + } + + return blReply, nil +} + +func (h *handler) waitRequest( + ctx context.Context, + in mino.Receiver, +) (*types.RequestCatchupMessage, mino.Address, error) { + + for { + orch, msg, err := in.Recv(ctx) + if err != nil { + return nil, nil, xerrors.Errorf("receiver failed: %v", err) + } + + // The SyncMessage contains the chain to the latest block known by the + // leader which allows to verify if it is not lying. + m, ok := msg.(types.RequestCatchupMessage) + if ok { + return &m, orch, nil + } + } +} + +func iter2arr(iter mino.AddressIterator) []mino.Address { + var addrs []mino.Address + for iter.HasNext() { + addrs = append(addrs, iter.GetNext()) + } + + return addrs +} diff --git a/core/ordering/cosipbft/fastsync/default_test.go b/core/ordering/cosipbft/fastsync/default_test.go new file mode 100644 index 000000000..3848aa20f --- /dev/null +++ b/core/ordering/cosipbft/fastsync/default_test.go @@ -0,0 +1,162 @@ +package fastsync + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.dedis.ch/dela/core/ordering/cosipbft/authority" + "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" + "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" + "go.dedis.ch/dela/core/ordering/cosipbft/pbft" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/core/txn/signed" + "go.dedis.ch/dela/core/validation/simple" + "go.dedis.ch/dela/mino" + "go.dedis.ch/dela/mino/minoch" + "go.dedis.ch/dela/testing/fake" +) + +func TestDefaultSync_Basic(t *testing.T) { + n := 20 + f := (n - 1) / 3 + num := 10 + + syncs, genesis, roster := makeNodes(t, n) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := syncs[0].Sync(ctx, roster, Config{SplitMessageSize: 1}) + require.NoError(t, err) + + storeBlocks(t, syncs[0].blocks, num, genesis.GetHash().Bytes()...) + + // Test only a subset of the roster to prepare for the next test. + for node := 1; node < n; node++ { + // Send the sync call to the leader + contact := roster.Take(mino.IndexFilter(0)) + if node >= 2*f+1 { + // Now that there are 2f+1 nodes with the block, sync with + // the whole roster. + contact = roster + } + err = syncs[node].Sync(ctx, contact, Config{}) + require.NoError(t, err) + } + + for i := 0; i < n; i++ { + require.Equal(t, uint64(num), syncs[i].blocks.Len(), strconv.Itoa(i)) + } +} + +func TestDefaultSync_SplitMessage(t *testing.T) { + num := 10 + + tests := []struct { + sms uint64 + msgs int + }{ + {0, 1}, + {1, 10}, + {255, 10}, + {256, 5}, + {1024, 2}, + {2550, 1}, + } + for _, test := range tests { + syncs, genesis, roster := makeNodes(t, 2) + + ctx, cancel := context.WithCancel(context.Background()) + + storeBlocks(t, syncs[0].blocks, num, genesis.GetHash().Bytes()...) + + syncsReceived := 0 + syncs[1].syncMessages = &syncsReceived + err := syncs[1].Sync(ctx, roster, Config{SplitMessageSize: test.sms}) + require.NoError(t, err) + require.Equal(t, test.msgs, syncsReceived) + require.Equal(t, uint64(num), syncs[1].blocks.Len()) + cancel() + } +} + +// ----------------------------------------------------------------------------- +// Utility functions + +func makeNodes(t *testing.T, n int) ([]fastSync, otypes.Genesis, mino.Players) { + manager := minoch.NewManager() + + syncs := make([]fastSync, n) + addrs := make([]mino.Address, n) + + ro := authority.FromAuthority(fake.NewAuthority(3, fake.NewSigner)) + + genesis, err := otypes.NewGenesis(ro) + require.NoError(t, err) + + for i := 0; i < n; i++ { + m := minoch.MustCreate(manager, fmt.Sprintf("node%d", i)) + + addrs[i] = m.GetAddress() + + genstore := blockstore.NewGenesisStore() + require.NoError(t, genstore.Set(genesis)) + + blocks := blockstore.NewInMemory() + blockFac := otypes.NewBlockFactory(simple.NewResultFactory(signed.NewTransactionFactory())) + csFac := authority.NewChangeSetFactory(m.GetAddressFactory(), fake.PublicKeyFactory{}) + linkFac := otypes.NewLinkFactory(blockFac, fake.SignatureFactory{}, csFac) + + param := blocksync.SyncParam{ + Mino: m, + Blocks: blocks, + Genesis: genstore, + LinkFactory: linkFac, + ChainFactory: otypes.NewChainFactory(linkFac), + PBFT: testSM{blocks: blocks}, + VerifierFactory: fake.VerifierFactory{}, + } + + syncs[i] = NewSynchronizer(param).(fastSync) + } + + return syncs, genesis, mino.NewAddresses(addrs...) +} + +// Create n new blocks and store them while creating appropriate links. +func storeBlocks(t *testing.T, blocks blockstore.BlockStore, n int, from ...byte) { + prev := otypes.Digest{} + copy(prev[:], from) + + for i := 0; i < n; i++ { + block, err := otypes.NewBlock(simple.NewResult(nil), otypes.WithIndex(uint64(i))) + require.NoError(t, err) + + link, err := otypes.NewBlockLink(prev, block, + otypes.WithSignatures(fake.Signature{}, fake.Signature{})) + require.NoError(t, err) + + err = blocks.Store(link) + require.NoError(t, err) + + prev = block.GetHash() + } +} + +type testSM struct { + pbft.StateMachine + + blocks blockstore.BlockStore +} + +func (sm testSM) CatchUp(link otypes.BlockLink) error { + err := sm.blocks.Store(link) + if err != nil { + return err + } + + return nil +} diff --git a/core/ordering/cosipbft/fastsync/fastsync.go b/core/ordering/cosipbft/fastsync/fastsync.go new file mode 100644 index 000000000..f46ebc4aa --- /dev/null +++ b/core/ordering/cosipbft/fastsync/fastsync.go @@ -0,0 +1,48 @@ +// Package fastsync defines a block synchronizer for the ordering service. +// +// The block synchronizer is to be called in two situations: +// - if a node is starting up, to make sure it's up-to-date with other nodes +// - if a node receives a request for a block it doesn't hold the parent of +// +// To make it really simple, the node sends a catchup request parallel to +// f+1 random nodes. +// As long as there are enough honest nodes, this will allow the block to +// catch up to the latest block. +// One optimization would be to send the requests serially, waiting for the +// reply before going on. +// But this would involve timeouts and would take much longer. +// So we suppose the node is not that much behind and thus will not waste too +// much bandwidth. +// +// Possible improvements: +// - make the protocol more efficient in the presence of byzantine nodes: +// The node broadcasts a request indicating which is the last block in storage. +// It receives offers from different nodes, and contacts the n nodes with the +// most recent block, where n must be bigger than the maximum number of +// byzantine nodes. +// +// Documentation Last Review: 22.11.2023 +package fastsync + +import ( + "context" + + "go.dedis.ch/dela/mino" +) + +// Config of the current run of the fastsync. +// For future expansion and to make it similar to blocksync, +// this is held as a struct. +type Config struct { + // The size at which the message will be split. + // If the encoding of all blocks is bigger than this value, the + // message is sent as-is. + SplitMessageSize uint64 +} + +// Synchronizer is an interface to synchronize a node with the participants. +type Synchronizer interface { + // Sync sends a synchronization request message to f+1 random participants, + // which will return BlockLinks to the latest block. + Sync(ctx context.Context, players mino.Players, config Config) error +} diff --git a/core/ordering/cosipbft/fastsync/json/json.go b/core/ordering/cosipbft/fastsync/json/json.go new file mode 100644 index 000000000..0c868561e --- /dev/null +++ b/core/ordering/cosipbft/fastsync/json/json.go @@ -0,0 +1,114 @@ +package json + +import ( + "encoding/json" + + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/types" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/serde" + "golang.org/x/xerrors" +) + +func init() { + types.RegisterMessageFormat(serde.FormatJSON, msgFormat{}) +} + +// RequestCatchupMessageJSON is the JSON representation of a request catchup +// message. +type RequestCatchupMessageJSON struct { + SplitMessageSize uint64 + Latest uint64 +} + +// CatchupMessageJSON is the JSON representation of all the new BlockLinks. +type CatchupMessageJSON struct { + SplitMessage bool + BlockLinks []json.RawMessage +} + +// MessageJSON is the JSON representation of a sync message. +type MessageJSON struct { + Request *RequestCatchupMessageJSON `json:",omitempty"` + Catchup *CatchupMessageJSON `json:",omitempty"` +} + +// MsgFormat is the format engine to encode and decode sync messages. +// +// - implements serde.FormatEngine +type msgFormat struct{} + +// Encode implements serde.FormatEngine. It returns the JSON data of the message +// if appropriate, otherwise an error. +func (fmt msgFormat) Encode(ctx serde.Context, msg serde.Message) ([]byte, error) { + var m MessageJSON + + switch in := msg.(type) { + case types.RequestCatchupMessage: + request := RequestCatchupMessageJSON{ + SplitMessageSize: in.GetSplitMessageSize(), + Latest: in.GetLatest(), + } + + m.Request = &request + case types.CatchupMessage: + bls := in.GetBlockLinks() + catchup := CatchupMessageJSON{ + SplitMessage: in.GetSplitMessage(), + BlockLinks: make([]json.RawMessage, len(bls)), + } + + for i, bl := range bls { + blBuf, err := bl.Serialize(ctx) + if err != nil { + return nil, xerrors.Errorf("failed to encode blocklink: %v", err) + } + catchup.BlockLinks[i] = blBuf + } + + m.Catchup = &catchup + default: + return nil, xerrors.Errorf("unsupported message '%T'", msg) + } + + data, err := ctx.Marshal(m) + if err != nil { + return nil, xerrors.Errorf("marshal failed: %v", err) + } + + return data, nil +} + +// Decode implements serde.FormatEngine. It returns the message associated to +// the data if appropriate, otherwise an error. +func (fmt msgFormat) Decode(ctx serde.Context, data []byte) (serde.Message, error) { + m := MessageJSON{} + err := ctx.Unmarshal(data, &m) + if err != nil { + return nil, xerrors.Errorf("unmarshal failed: %v", err) + } + + if m.Request != nil { + return types.NewRequestCatchupMessage(m.Request.SplitMessageSize, m.Request.Latest), nil + } + + if m.Catchup != nil { + fac := ctx.GetFactory(types.LinkKey{}) + + factory, ok := fac.(otypes.LinkFactory) + if !ok { + return nil, xerrors.Errorf("invalid link factory '%T'", fac) + } + + var blockLinks = make([]otypes.BlockLink, len(m.Catchup.BlockLinks)) + for i, blBuf := range m.Catchup.BlockLinks { + blockLinks[i], err = factory.BlockLinkOf(ctx, blBuf) + if err != nil { + return nil, xerrors.Errorf("failed to decode blockLink: %v", err) + } + } + + return types.NewCatchupMessage(m.Catchup.SplitMessage, blockLinks), nil + } + + return nil, xerrors.New("message is empty") +} diff --git a/core/ordering/cosipbft/fastsync/json/json_test.go b/core/ordering/cosipbft/fastsync/json/json_test.go new file mode 100644 index 000000000..b6e2c06f6 --- /dev/null +++ b/core/ordering/cosipbft/fastsync/json/json_test.go @@ -0,0 +1,76 @@ +package json + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/types" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/serde" + "go.dedis.ch/dela/testing/fake" +) + +func TestMsgFormat_Encode(t *testing.T) { + format := msgFormat{} + + ctx := fake.NewContext() + + data, err := format.Encode(ctx, types.NewCatchupMessage(false, []otypes.BlockLink{fakeLink{}})) + require.NoError(t, err) + require.Equal(t, `{"Catchup":{"BlockLinks":[{}]}}`, string(data)) + + data, err = format.Encode(ctx, types.NewRequestCatchupMessage(1, 3)) + require.NoError(t, err) + require.Equal(t, `{"Request":{"Latest":3}}`, string(data)) + + _, err = format.Encode(ctx, fake.Message{}) + require.EqualError(t, err, "unsupported message 'fake.Message'") + + _, err = format.Encode(ctx, + types.NewCatchupMessage(false, []otypes.BlockLink{fakeLink{err: fake.GetError()}})) + require.EqualError(t, err, fake.Err("failed to encode blocklink")) +} + +func TestMsgFormat_Decode(t *testing.T) { + format := msgFormat{} + + ctx := fake.NewContext() + ctx = serde.WithFactory(ctx, types.LinkKey{}, fakeLinkFac{}) + + msg, err := format.Decode(ctx, []byte(`{"Catchup":{"BlockLinks":[{}]}}`)) + require.NoError(t, err) + require.Equal(t, types.NewCatchupMessage(true, []otypes.BlockLink{fakeLink{}}), msg) + + msg, err = format.Decode(ctx, []byte(`{"Request":{"Latest":3}}`)) + require.NoError(t, err) + require.Equal(t, types.NewRequestCatchupMessage(1, 3), msg) + + _, err = format.Decode(ctx, []byte(`{}`)) + require.EqualError(t, err, "message is empty") + + _, err = format.Decode(fake.NewBadContext(), []byte(`{}`)) + require.EqualError(t, err, fake.Err("unmarshal failed")) +} + +// ----------------------------------------------------------------------------- +// Utility functions + +type fakeLink struct { + otypes.BlockLink + + err error +} + +func (link fakeLink) Serialize(serde.Context) ([]byte, error) { + return []byte("{}"), link.err +} + +type fakeLinkFac struct { + otypes.LinkFactory + + err error +} + +func (fac fakeLinkFac) BlockLinkOf(serde.Context, []byte) (otypes.BlockLink, error) { + return fakeLink{}, fac.err +} diff --git a/core/ordering/cosipbft/fastsync/types/types.go b/core/ordering/cosipbft/fastsync/types/types.go new file mode 100644 index 000000000..6e4ae1fba --- /dev/null +++ b/core/ordering/cosipbft/fastsync/types/types.go @@ -0,0 +1,124 @@ +// Package types implements the network messages for a synchronization. +// +// The messages are implemented in a different package to prevent cycle imports +// when importing the serde formats. +// +// Documentation Last Review: 13.10.2020 +package types + +import ( + "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/serde" + "go.dedis.ch/dela/serde/registry" + "golang.org/x/xerrors" +) + +var msgFormats = registry.NewSimpleRegistry() + +// RegisterMessageFormat registers the engine for the given format. +func RegisterMessageFormat(f serde.Format, e serde.FormatEngine) { + msgFormats.Register(f, e) +} + +// RequestCatchupMessage is sent by a node which wants to catch up to the latest +// block. +type RequestCatchupMessage struct { + splitMessageSize uint64 + latest uint64 +} + +// NewRequestCatchupMessage creates a RequestCatchupMessage +func NewRequestCatchupMessage(splitMessageSize, latest uint64) RequestCatchupMessage { + return RequestCatchupMessage{splitMessageSize: splitMessageSize, latest: latest} +} + +// GetLatest returns the latest index requested by the sender. +func (m RequestCatchupMessage) GetLatest() uint64 { + return m.latest +} + +// GetSplitMessageSize returns the size at which a message should be split. +func (m RequestCatchupMessage) GetSplitMessageSize() uint64 { + return m.splitMessageSize +} + +// Serialize implements serde.Message. It returns the serialized data for this +// message. +func (m RequestCatchupMessage) Serialize(ctx serde.Context) ([]byte, error) { + format := msgFormats.Get(ctx.GetFormat()) + + data, err := format.Encode(ctx, m) + if err != nil { + return nil, xerrors.Errorf("encoding failed: %v", err) + } + + return data, nil +} + +// CatchupMessage returns all the blocks, not just the links, so that the +// node can re-create the correct global state. +// 'splitMessage' is true if the node knows about more nodes. +type CatchupMessage struct { + splitMessage bool + blockLinks []types.BlockLink +} + +// NewCatchupMessage creates a reply to RequestLatestMessage. +func NewCatchupMessage(splitMessage bool, blockLinks []types.BlockLink) CatchupMessage { + return CatchupMessage{splitMessage: splitMessage, blockLinks: blockLinks} +} + +// GetBlockLinks returns the BlockLinks of the catchup. +func (m CatchupMessage) GetBlockLinks() []types.BlockLink { + return m.blockLinks +} + +// GetSplitMessage returns if the sending node has more blocks. +func (m CatchupMessage) GetSplitMessage() bool { + return m.splitMessage +} + +// Serialize implements serde.Message. It returns the serialized data for this +// message. +func (m CatchupMessage) Serialize(ctx serde.Context) ([]byte, error) { + format := msgFormats.Get(ctx.GetFormat()) + + data, err := format.Encode(ctx, m) + if err != nil { + return nil, xerrors.Errorf("encoding failed: %v", err) + } + + return data, nil +} + +// LinkKey is the key of the block link factory. +type LinkKey struct{} + +// MessageFactory is a message factory for sync messages. +// +// - implements serde.Factory +type MessageFactory struct { + linkFac types.LinkFactory +} + +// NewMessageFactory creates new message factory. +func NewMessageFactory(fac types.LinkFactory) MessageFactory { + return MessageFactory{ + linkFac: fac, + } +} + +// Deserialize implements serde.Factory. It returns the message associated to +// the data if appropriate, otherwise an error. +func (fac MessageFactory) Deserialize(ctx serde.Context, data []byte) (serde.Message, error) { + format := msgFormats.Get(ctx.GetFormat()) + + ctx = serde.WithFactory(ctx, LinkKey{}, fac.linkFac) + + msg, err := format.Decode(ctx, data) + if err != nil { + return nil, xerrors.Errorf("decoding failed: %v", err) + } + + return msg, nil +} diff --git a/core/ordering/cosipbft/fastsync/types/types_test.go b/core/ordering/cosipbft/fastsync/types/types_test.go new file mode 100644 index 000000000..e73afa9a1 --- /dev/null +++ b/core/ordering/cosipbft/fastsync/types/types_test.go @@ -0,0 +1,90 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/core/validation/simple" + "go.dedis.ch/dela/serde" + "go.dedis.ch/dela/testing/fake" +) + +var testCalls = &fake.Call{} + +func init() { + RegisterMessageFormat(fake.GoodFormat, + fake.Format{Msg: CatchupMessage{}, Call: testCalls}) + RegisterMessageFormat(fake.BadFormat, fake.NewBadFormat()) +} + +func TestRequestCatchupMessage_GetChain(t *testing.T) { + m := NewRequestCatchupMessage(1, 42) + + require.Equal(t, uint64(42), m.GetLatest()) + require.Equal(t, uint64(1), m.GetSplitMessageSize()) +} + +func TestRequestCatchupMessage_Serialize(t *testing.T) { + m := NewRequestCatchupMessage(1, 42) + + data, err := m.Serialize(fake.NewContext()) + require.NoError(t, err) + require.Equal(t, fake.GetFakeFormatValue(), data) + + _, err = m.Serialize(fake.NewBadContext()) + require.EqualError(t, err, fake.Err("encoding failed")) +} + +func TestCatchupMessage_GetBlockLinks(t *testing.T) { + m := NewCatchupMessage(false, makeChain(t, 0, 2)) + + require.Equal(t, 2, len(m.GetBlockLinks())) + require.Equal(t, false, m.GetSplitMessage()) +} + +func TestCatchupMessage_Serialize(t *testing.T) { + m := NewCatchupMessage(false, makeChain(t, 0, 2)) + + data, err := m.Serialize(fake.NewContext()) + require.NoError(t, err) + require.Equal(t, fake.GetFakeFormatValue(), data) + + _, err = m.Serialize(fake.NewBadContext()) + require.EqualError(t, err, fake.Err("encoding failed")) +} + +func TestMessageFactory_Deserialize(t *testing.T) { + testCalls.Clear() + + linkFac := types.NewLinkFactory(nil, nil, nil) + + fac := NewMessageFactory(linkFac) + + msg, err := fac.Deserialize(fake.NewContext(), nil) + require.NoError(t, err) + require.Equal(t, CatchupMessage{}, msg) + + factory := testCalls.Get(0, 0).(serde.Context).GetFactory(LinkKey{}) + require.NotNil(t, factory) + + _, err = fac.Deserialize(fake.NewBadContext(), nil) + require.EqualError(t, err, fake.Err("decoding failed")) +} + +// ----------------------------------------------------------------------------- +// Utility functions + +func makeChain(t *testing.T, start, count uint64) []types.BlockLink { + blocks := make([]types.BlockLink, count) + + for index := uint64(0); index < count; index++ { + block, err := types.NewBlock(simple.NewResult(nil), types.WithIndex(index)) + require.NoError(t, err) + + blocks[index-start], err = types.NewBlockLink(types.Digest{}, block) + require.NoError(t, err) + } + + return blocks +} diff --git a/core/ordering/cosipbft/proc.go b/core/ordering/cosipbft/proc.go index a703c8c2f..f87caf0a5 100644 --- a/core/ordering/cosipbft/proc.go +++ b/core/ordering/cosipbft/proc.go @@ -16,6 +16,7 @@ import ( "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" "go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync" "go.dedis.ch/dela/core/ordering/cosipbft/pbft" "go.dedis.ch/dela/core/ordering/cosipbft/types" "go.dedis.ch/dela/core/store" @@ -38,31 +39,38 @@ type processor struct { logger zerolog.Logger pbftsm pbft.StateMachine - sync blocksync.Synchronizer + bsync blocksync.Synchronizer + fsync fastsync.Synchronizer tree blockstore.TreeCache pool pool.Pool watcher core.Observable rosterFac authority.Factory hashFactory crypto.HashFactory access access.Service + fastsync bool context serde.Context genesis blockstore.GenesisStore blocks blockstore.BlockStore + // catchup sends catchup requests to the players to get new blocks + catchup chan mino.Players started chan struct{} } func newProcessor() *processor { - return &processor{ + proc := &processor{ watcher: core.NewWatcher(), context: json.NewContext(), started: make(chan struct{}), + catchup: make(chan mino.Players), } + go proc.catchupHandler() + return proc } // Invoke implements cosi.Reactor. It processes the messages from the collective -// signature module. The messages are either from the the prepare or the commit +// signature module. The messages are either from the prepare or the commit // phase. func (h *processor) Invoke(from mino.Address, msg serde.Message) ([]byte, error) { switch in := msg.(type) { @@ -70,16 +78,39 @@ func (h *processor) Invoke(from mino.Address, msg serde.Message) ([]byte, error) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - blocks := h.blocks.Watch(ctx) - - // In case the node is falling behind the chain, it gives it a chance to - // catch up before moving forward. - latest := h.sync.GetLatest() + if h.fastsync { + // Check if a catchup is needed + var roster mino.Players + if h.blocks.Len() == 0 && in.GetBlock().GetIndex() > 0 { + h.logger.Info().Msgf("node joined an existing blockchain from %+v", from) + roster = mino.NewAddresses(from) + } else if in.GetBlock().GetIndex() > h.blocks.Len() { + h.logger.Warn().Msgf("node got asked to sign block-index %d, "+ + "but has only %d blocks", in.GetBlock().GetIndex(), + h.blocks.Len()) + var err error + roster, err = h.getCurrentRoster() + if err != nil { + return nil, xerrors.Errorf("failed to get roster: %v", err) + } + } - if latest > h.blocks.Len() { - for link := range blocks { - if link.GetBlock().GetIndex() >= latest { - cancel() + if roster != nil { + h.catchup <- roster + return nil, xerrors.Errorf("needed to catch up") + } + } else { + blocks := h.blocks.Watch(ctx) + + // In case the node is falling behind the chain, it gives it a chance to + // catch up before moving forward. + latest := h.bsync.GetLatest() + + if latest > h.blocks.Len() { + for link := range blocks { + if link.GetBlock().GetIndex() >= latest { + cancel() + } } } } @@ -144,9 +175,14 @@ func (h *processor) Process(req mino.Request) (serde.Message, error) { return nil, h.storeGenesis(msg.GetGenesis().GetRoster(), &root) case types.DoneMessage: - err := h.pbftsm.Finalize(msg.GetID(), msg.GetSignature()) - if err != nil { - return nil, xerrors.Errorf("pbftsm finalized failed: %v", err) + if h.pbftsm.GetState() == pbft.InitialState { + h.logger.Warn().Msgf("Got block without commit from %v - catching up", req.Address) + h.catchup <- mino.NewAddresses(req.Address) + } else { + err := h.pbftsm.Finalize(msg.GetID(), msg.GetSignature()) + if err != nil { + return nil, xerrors.Errorf("pbftsm finalized failed: %v", err) + } } case types.ViewMessage: param := pbft.ViewParam{ @@ -250,3 +286,20 @@ func (h *processor) makeAccess(store store.Snapshot, roster authority.Authority) return nil } + +// catchupHandler listens to incoming requests for potentially missing blocks. +// It is started as a go-routine +func (h *processor) catchupHandler() { + for players := range h.catchup { + if h.fastsync { + ctx, cancel := context.WithCancel(context.Background()) + err := h.fsync.Sync(ctx, players, + fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize}) + if err != nil { + h.logger.Err(err) + } + cancel() + } + } + panic("Should not get here") +} diff --git a/core/ordering/cosipbft/proc_test.go b/core/ordering/cosipbft/proc_test.go index 4f2728d4c..16ace5e93 100644 --- a/core/ordering/cosipbft/proc_test.go +++ b/core/ordering/cosipbft/proc_test.go @@ -24,7 +24,7 @@ func TestProcessor_BlockMessage_Invoke(t *testing.T) { proc := newProcessor() proc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) - proc.sync = fakeSync{latest: 1} + proc.bsync = fakeSync{latest: 1} proc.blocks = fakeStore{} proc.pbftsm = fakeSM{ state: pbft.InitialState, diff --git a/core/store/hashtree/binprefix/disk.go b/core/store/hashtree/binprefix/disk.go index 5e45ef2a0..ca0e031df 100644 --- a/core/store/hashtree/binprefix/disk.go +++ b/core/store/hashtree/binprefix/disk.go @@ -104,8 +104,10 @@ func (n *DiskNode) Delete(key *big.Int, bucket kv.Bucket) (TreeNode, error) { // Prepare implements binprefix.TreeNode. It loads the node and calculates its // hash. The subtree might be loaded in-memory if deeper hashes have not been // computed yet. -func (n *DiskNode) Prepare(nonce []byte, prefix *big.Int, - bucket kv.Bucket, fac crypto.HashFactory) ([]byte, error) { +func (n *DiskNode) Prepare( + nonce []byte, prefix *big.Int, + bucket kv.Bucket, fac crypto.HashFactory, +) ([]byte, error) { if len(n.hash) > 0 { // Hash is already calculated so we can skip and return. @@ -178,7 +180,6 @@ func (n *DiskNode) store(index *big.Int, node TreeNode, b kv.Bucket) error { } key := n.prepareKey(index) - err = b.Set(key, data) if err != nil { return xerrors.Errorf("failed to set key: %v", err) diff --git a/mino/minoch/mod.go b/mino/minoch/mod.go index 65d152f27..b8af3b8b8 100644 --- a/mino/minoch/mod.go +++ b/mino/minoch/mod.go @@ -10,7 +10,6 @@ // instance should drop the message. // // Documentation Last Review: 06.10.2020 -// package minoch import ( @@ -85,7 +84,7 @@ func (m *Minoch) GetAddress() mino.Address { return address{id: m.identifier} } -// AddFilter adds the filter to all of the RPCs. This must be called before +// AddFilter adds the filter to all the RPCs. This must be called before // receiving requests. func (m *Minoch) AddFilter(filter Filter) { m.filters = append(m.filters, filter) diff --git a/mino/minoch/rpc.go b/mino/minoch/rpc.go index f2b4c67e4..1133b5714 100644 --- a/mino/minoch/rpc.go +++ b/mino/minoch/rpc.go @@ -211,7 +211,7 @@ func (c RPC) Stream(ctx context.Context, memship mino.Players) (mino.Sender, min case env := <-in: for _, to := range env.to { output := orchRecv.out - if !to.(address).orchestrator { + if !to.(address).orchestrator || !to.Equal(orchAddr) { output = outs[to.String()].out } diff --git a/mino/option.go b/mino/option.go index 01a1a9d24..c0649cc07 100644 --- a/mino/option.go +++ b/mino/option.go @@ -113,7 +113,7 @@ func ListFilter(indices []int) FilterUpdater { // RandomFilter chooses 'count' random elements. func RandomFilter(count int) FilterUpdater { return func(filters *Filter) { - if len(filters.Indices) >= count { + if len(filters.Indices) < count { return } rand.Shuffle(len(filters.Indices), diff --git a/serde/json/json.go b/serde/json/json.go index 880085e0e..470758f9e 100644 --- a/serde/json/json.go +++ b/serde/json/json.go @@ -11,6 +11,7 @@ import ( _ "go.dedis.ch/dela/core/access/darc/json" _ "go.dedis.ch/dela/core/ordering/cosipbft/authority/json" _ "go.dedis.ch/dela/core/ordering/cosipbft/blocksync/json" + _ "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/json" _ "go.dedis.ch/dela/core/ordering/cosipbft/json" _ "go.dedis.ch/dela/core/txn/signed/json" _ "go.dedis.ch/dela/core/validation/simple/json"