Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sync #11

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/ordering/cosipbft/blocksync/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,10 @@ func (h *handler) Stream(out mino.Sender, in mino.Receiver) error {
return h.ack(out, orch)
}

func (h *handler) waitAnnounce(ctx context.Context,
in mino.Receiver) (*types.SyncMessage, mino.Address, error) {
func (h *handler) waitAnnounce(
ctx context.Context,
in mino.Receiver,
) (*types.SyncMessage, mino.Address, error) {

for {
orch, msg, err := in.Recv(ctx)
Expand Down Expand Up @@ -326,7 +328,7 @@ func (h *handler) ack(out mino.Sender, orch mino.Address) error {
}

func iter2arr(iter mino.AddressIterator) []mino.Address {
addrs := []mino.Address{}
var addrs []mino.Address
for iter.HasNext() {
addrs = append(addrs, iter.GetNext())
}
Expand Down
2 changes: 1 addition & 1 deletion core/ordering/cosipbft/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (m miniController) OnStart(flags cli.Flags, inj node.Injector) error {
}

srvc, err := cosipbft.NewService(param, cosipbft.WithGenesisStore(genstore),
cosipbft.WithBlockStore(blocks))
cosipbft.WithBlockStore(blocks), cosipbft.WithFastSync())
if err != nil {
return xerrors.Errorf("service: %v", err)
}
Expand Down
60 changes: 46 additions & 14 deletions core/ordering/cosipbft/cosipbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.dedis.ch/dela/core/ordering/cosipbft/blockstore"
"go.dedis.ch/dela/core/ordering/cosipbft/blocksync"
"go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange"
"go.dedis.ch/dela/core/ordering/cosipbft/fastsync"
"go.dedis.ch/dela/core/ordering/cosipbft/pbft"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand Down Expand Up @@ -80,6 +81,9 @@ const (
// RoundMaxWait is the maximum amount for the backoff.
RoundMaxWait = 5 * time.Minute

// DefaultFastSyncMessageSize defines when a fast sync message will be split.
DefaultFastSyncMessageSize = 1e6

rpcName = "cosipbft"
)

Expand Down Expand Up @@ -115,9 +119,10 @@ type Service struct {
}

type serviceTemplate struct {
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
fastSync bool
}

// ServiceOption is the type of option to set some fields of the service.
Expand All @@ -144,6 +149,13 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption {
}
}

// WithFastSync enables the new syncing algorithm in the cosipbft module.
func WithFastSync() ServiceOption {
return func(tmpl *serviceTemplate) {
tmpl.fastSync = true
}
}

// ServiceParam is the different components to provide to the service. All the
// fields are mandatory and it will panic if any is nil.
type ServiceParam struct {
Expand Down Expand Up @@ -190,6 +202,7 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
proc.tree = blockstore.NewTreeCache(param.Tree)
proc.access = param.Access
proc.logger = dela.Logger.With().Str("addr", param.Mino.GetAddress().String()).Logger()
proc.fastsync = tmpl.fastSync

pcparam := pbft.StateMachineParam{
Logger: proc.logger,
Expand Down Expand Up @@ -220,10 +233,11 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}

bs := blocksync.NewSynchronizer(syncparam)

proc.sync = bs
if proc.fastsync {
proc.fsync = fastsync.NewSynchronizer(syncparam)
} else {
proc.bsync = blocksync.NewSynchronizer(syncparam)
}

fac := types.NewMessageFactory(
types.NewGenesisFactory(proc.rosterFac),
Expand Down Expand Up @@ -275,6 +289,20 @@ func NewServiceStart(s *Service) {
go s.watchBlocks()

if s.genesis.Exists() {
if s.fastsync {
ctx, done := context.WithCancel(context.Background())
roster, err := s.readRoster(s.tree.Get())
if err != nil {
panic("couldn't get roster of latest block: " + err.Error())
}
err = s.fsync.Sync(ctx, roster,
fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize})
if err != nil {
s.logger.Warn().Msgf("while syncing with other nodes: %+v", err)
}
done()
}

// If the genesis already exists, the service can start right away to
// participate in the chain.
close(s.started)
Expand Down Expand Up @@ -541,17 +569,21 @@ func (s *Service) doLeaderRound(

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started")

// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.sync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
// When using blocksync, the updates are sent before every new block, which
// uses a lot of bandwidth if there are more than just a few blocks.
if !s.fastsync {
// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.bsync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
}
}

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started")

err = s.doPBFT(ctx)
err := s.doPBFT(ctx)
if err != nil {
return xerrors.Errorf("pbft failed: %v", err)
}
Expand Down
21 changes: 16 additions & 5 deletions core/ordering/cosipbft/cosipbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,25 @@ import (
"go.dedis.ch/dela/testing/fake"
)

func TestService_Scenario_Basic_Blocksync(t *testing.T) {
testserviceScenarioBasic(t, false)
}
func TestService_Scenario_Basic_Fastsync(t *testing.T) {
testserviceScenarioBasic(t, true)
}

// This test is known to be VERY flaky on Windows.
// Further investigation is needed.
func TestService_Scenario_Basic(t *testing.T) {
func testserviceScenarioBasic(t *testing.T, fastSync bool) {
if testing.Short() {
t.Skip("Skipping flaky test")
}

nodes, ro, clean := makeAuthority(t, 5)
var opts []ServiceOption
if fastSync {
opts = append(opts, WithFastSync())
}
nodes, ro, clean := makeAuthority(t, 5, opts...)
defer clean()

signer := nodes[0].signer
Expand Down Expand Up @@ -450,7 +461,7 @@ func TestService_DoRound(t *testing.T) {
closing: make(chan struct{}),
}
srvc.blocks = blockstore.NewInMemory()
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}
srvc.pool = mem.NewPool()
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
Expand Down Expand Up @@ -618,7 +629,7 @@ func TestService_FailSync_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{err: fake.GetError()}
srvc.bsync = fakeSync{err: fake.GetError()}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -641,7 +652,7 @@ func TestService_FailPBFT_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}

require.NoError(t, srvc.pool.Add(makeTx(t, 0, fake.NewSigner())))

Expand Down
Loading
Loading