From b3c755ea1e802f24d5f3f23a301347aa9e2839b2 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Thu, 29 Jun 2023 16:13:32 -0400 Subject: [PATCH 1/6] `MerkleDB` -- remove codec version (#1671) --- x/merkledb/codec.go | 83 +++++++++++----------------------------- x/merkledb/codec_test.go | 23 +++++------ x/merkledb/db.go | 2 +- x/merkledb/node.go | 6 +-- 4 files changed, 37 insertions(+), 77 deletions(-) diff --git a/x/merkledb/codec.go b/x/merkledb/codec.go index 3d9959b3855..4aa7539664c 100644 --- a/x/merkledb/codec.go +++ b/x/merkledb/codec.go @@ -17,26 +17,14 @@ import ( ) const ( - codecVersion = 0 trueByte = 1 falseByte = 0 minVarIntLen = 1 - boolLen = 1 + minMaybeByteSliceLen = 1 idLen = hashing.HashLen - minCodecVersionLen = minVarIntLen minSerializedPathLen = minVarIntLen minByteSliceLen = minVarIntLen - minMaybeByteSliceLen = boolLen - minProofPathLen = minVarIntLen - minKeyValueLen = 2 * minByteSliceLen - minKeyChangeLen = minByteSliceLen + minMaybeByteSliceLen - minProofNodeLen = minSerializedPathLen + minMaybeByteSliceLen + minVarIntLen - minProofLen = minCodecVersionLen + minProofPathLen + minByteSliceLen - minChangeProofLen = minCodecVersionLen + boolLen + 2*minProofPathLen + minVarIntLen - minRangeProofLen = minCodecVersionLen + 2*minProofPathLen + minVarIntLen - minDBNodeLen = minCodecVersionLen + minMaybeByteSliceLen + minVarIntLen - minHashValuesLen = minCodecVersionLen + minVarIntLen + minMaybeByteSliceLen + minSerializedPathLen - minProofNodeChildLen = minVarIntLen + idLen + minDBNodeLen = minMaybeByteSliceLen + minVarIntLen minChildLen = minVarIntLen + minSerializedPathLen + idLen ) @@ -46,7 +34,6 @@ var ( trueBytes = []byte{trueByte} falseBytes = []byte{falseByte} - errUnknownVersion = errors.New("unknown codec version") errEncodeNil = errors.New("can't encode nil pointer or interface") errDecodeNil = errors.New("can't decode nil") errNegativeNumChildren = errors.New("number of children is negative") @@ -59,7 +46,6 @@ var ( errNonZeroNibblePadding = errors.New("nibbles should be padded with 0s") errExtraSpace = errors.New("trailing buffer space") errNegativeSliceLength = errors.New("negative slice length") - errInvalidCodecVersion = errors.New("invalid codec version") ) // encoderDecoder defines the interface needed by merkleDB to marshal @@ -70,41 +56,34 @@ type encoderDecoder interface { } type encoder interface { - encodeDBNode(version uint16, n *dbNode) ([]byte, error) - encodeHashValues(version uint16, hv *hashValues) ([]byte, error) + encodeDBNode(n *dbNode) ([]byte, error) + encodeHashValues(hv *hashValues) ([]byte, error) } type decoder interface { - decodeDBNode(bytes []byte, n *dbNode) (uint16, error) + decodeDBNode(bytes []byte, n *dbNode) error } -func newCodec() (encoderDecoder, uint16) { +func newCodec() encoderDecoder { return &codecImpl{ varIntPool: sync.Pool{ New: func() interface{} { return make([]byte, binary.MaxVarintLen64) }, }, - }, codecVersion + } } type codecImpl struct { varIntPool sync.Pool } -func (c *codecImpl) encodeDBNode(version uint16, n *dbNode) ([]byte, error) { +func (c *codecImpl) encodeDBNode(n *dbNode) ([]byte, error) { if n == nil { return nil, errEncodeNil } - if version != codecVersion { - return nil, fmt.Errorf("%w: %d", errUnknownVersion, version) - } - buf := &bytes.Buffer{} - if err := c.encodeInt(buf, int(version)); err != nil { - return nil, err - } if err := c.encodeMaybeByteSlice(buf, n.value); err != nil { return nil, err } @@ -129,21 +108,13 @@ func (c *codecImpl) encodeDBNode(version uint16, n *dbNode) ([]byte, error) { return buf.Bytes(), nil } -func (c *codecImpl) encodeHashValues(version uint16, hv *hashValues) ([]byte, error) { +func (c *codecImpl) encodeHashValues(hv *hashValues) ([]byte, error) { if hv == nil { return nil, errEncodeNil } - if version != codecVersion { - return nil, fmt.Errorf("%w: %d", errUnknownVersion, version) - } - buf := &bytes.Buffer{} - if err := c.encodeInt(buf, int(version)); err != nil { - return nil, err - } - length := len(hv.Children) if err := c.encodeInt(buf, length); err != nil { return nil, err @@ -170,12 +141,12 @@ func (c *codecImpl) encodeHashValues(version uint16, hv *hashValues) ([]byte, er return buf.Bytes(), nil } -func (c *codecImpl) decodeDBNode(b []byte, n *dbNode) (uint16, error) { +func (c *codecImpl) decodeDBNode(b []byte, n *dbNode) error { if n == nil { - return 0, errDecodeNil + return errDecodeNil } if minDBNodeLen > len(b) { - return 0, io.ErrUnexpectedEOF + return io.ErrUnexpectedEOF } var ( @@ -183,29 +154,21 @@ func (c *codecImpl) decodeDBNode(b []byte, n *dbNode) (uint16, error) { err error ) - gotCodecVersion, err := c.decodeInt(src) - if err != nil { - return 0, err - } - if codecVersion != gotCodecVersion { - return 0, fmt.Errorf("%w: %d", errInvalidCodecVersion, gotCodecVersion) - } - if n.value, err = c.decodeMaybeByteSlice(src); err != nil { - return 0, err + return err } numChildren, err := c.decodeInt(src) if err != nil { - return 0, err + return err } switch { case numChildren < 0: - return 0, errNegativeNumChildren + return errNegativeNumChildren case numChildren > NodeBranchFactor: - return 0, errTooManyChildren + return errTooManyChildren case numChildren > src.Len()/minChildLen: - return 0, io.ErrUnexpectedEOF + return io.ErrUnexpectedEOF } n.children = make(map[byte]child, NodeBranchFactor) @@ -213,20 +176,20 @@ func (c *codecImpl) decodeDBNode(b []byte, n *dbNode) (uint16, error) { for i := 0; i < numChildren; i++ { var index int if index, err = c.decodeInt(src); err != nil { - return 0, err + return err } if index <= previousChild || index > NodeBranchFactor-1 { - return 0, errChildIndexTooLarge + return errChildIndexTooLarge } previousChild = index var compressedPath SerializedPath if compressedPath, err = c.decodeSerializedPath(src); err != nil { - return 0, err + return err } var childID ids.ID if childID, err = c.decodeID(src); err != nil { - return 0, err + return err } n.children[byte(index)] = child{ compressedPath: compressedPath.deserialize(), @@ -234,9 +197,9 @@ func (c *codecImpl) decodeDBNode(b []byte, n *dbNode) (uint16, error) { } } if src.Len() != 0 { - return 0, errExtraSpace + return errExtraSpace } - return codecVersion, err + return err } func (*codecImpl) encodeBool(dst io.Writer, value bool) error { diff --git a/x/merkledb/codec_test.go b/x/merkledb/codec_test.go index 84850c3572d..cff38e51b63 100644 --- a/x/merkledb/codec_test.go +++ b/x/merkledb/codec_test.go @@ -205,13 +205,12 @@ func FuzzCodecDBNodeCanonical(f *testing.F) { codec := codec.(*codecImpl) node := &dbNode{} - got, err := codec.decodeDBNode(b, node) - if err != nil { + if err := codec.decodeDBNode(b, node); err != nil { return } // Encoding [node] should be the same as [b]. - buf, err := codec.encodeDBNode(got, node) + buf, err := codec.encodeDBNode(node) require.NoError(err) require.Equal(b, buf) }, @@ -264,19 +263,17 @@ func FuzzCodecDBNodeDeterministic(f *testing.F) { children: children, } - nodeBytes, err := codec.encodeDBNode(version, &node) + nodeBytes, err := codec.encodeDBNode(&node) require.NoError(err) var gotNode dbNode - gotVersion, err := codec.decodeDBNode(nodeBytes, &gotNode) - require.NoError(err) - require.Equal(version, gotVersion) + require.NoError(codec.decodeDBNode(nodeBytes, &gotNode)) nilEmptySlices(&node) nilEmptySlices(&gotNode) require.Equal(node, gotNode) - nodeBytes2, err := codec.encodeDBNode(version, &gotNode) + nodeBytes2, err := codec.encodeDBNode(&gotNode) require.NoError(err) require.Equal(nodeBytes, nodeBytes2) }, @@ -286,14 +283,14 @@ func FuzzCodecDBNodeDeterministic(f *testing.F) { func TestCodec_DecodeDBNode(t *testing.T) { require := require.New(t) - _, err := codec.decodeDBNode([]byte{1}, nil) + err := codec.decodeDBNode([]byte{1}, nil) require.ErrorIs(err, errDecodeNil) var ( parsedDBNode dbNode tooShortBytes = make([]byte, minDBNodeLen-1) ) - _, err = codec.decodeDBNode(tooShortBytes, &parsedDBNode) + err = codec.decodeDBNode(tooShortBytes, &parsedDBNode) require.ErrorIs(err, io.ErrUnexpectedEOF) proof := dbNode{ @@ -301,7 +298,7 @@ func TestCodec_DecodeDBNode(t *testing.T) { children: map[byte]child{}, } - nodeBytes, err := codec.encodeDBNode(version, &proof) + nodeBytes, err := codec.encodeDBNode(&proof) require.NoError(err) // Remove num children (0) from end @@ -310,7 +307,7 @@ func TestCodec_DecodeDBNode(t *testing.T) { // Put num children -1 at end require.NoError(codec.(*codecImpl).encodeInt(proofBytesBuf, -1)) - _, err = codec.decodeDBNode(proofBytesBuf.Bytes(), &parsedDBNode) + err = codec.decodeDBNode(proofBytesBuf.Bytes(), &parsedDBNode) require.ErrorIs(err, errNegativeNumChildren) // Remove num children from end @@ -320,6 +317,6 @@ func TestCodec_DecodeDBNode(t *testing.T) { // Put num children NodeBranchFactor+1 at end require.NoError(codec.(*codecImpl).encodeInt(proofBytesBuf, NodeBranchFactor+1)) - _, err = codec.decodeDBNode(proofBytesBuf.Bytes(), &parsedDBNode) + err = codec.decodeDBNode(proofBytesBuf.Bytes(), &parsedDBNode) require.ErrorIs(err, errTooManyChildren) } diff --git a/x/merkledb/db.go b/x/merkledb/db.go index de51d5b59c4..74267a78aed 100644 --- a/x/merkledb/db.go +++ b/x/merkledb/db.go @@ -39,7 +39,7 @@ var ( _ TrieView = (*merkleDB)(nil) _ MerkleDB = (*merkleDB)(nil) - codec, version = newCodec() + codec = newCodec() rootKey []byte nodePrefix = []byte("node") diff --git a/x/merkledb/node.go b/x/merkledb/node.go index 8626e70cf06..c584b03a65e 100644 --- a/x/merkledb/node.go +++ b/x/merkledb/node.go @@ -60,7 +60,7 @@ func newNode(parent *node, key path) *node { // Parse [nodeBytes] to a node and set its key to [key]. func parseNode(key path, nodeBytes []byte) (*node, error) { n := dbNode{} - if _, err := codec.decodeDBNode(nodeBytes, &n); err != nil { + if err := codec.decodeDBNode(nodeBytes, &n); err != nil { return nil, err } result := &node{ @@ -84,7 +84,7 @@ func (n *node) marshal() ([]byte, error) { return n.nodeBytes, nil } - nodeBytes, err := codec.encodeDBNode(version, &(n.dbNode)) + nodeBytes, err := codec.encodeDBNode(&n.dbNode) if err != nil { return nil, err } @@ -111,7 +111,7 @@ func (n *node) calculateID(metrics merkleMetrics) error { Key: n.key.Serialize(), } - bytes, err := codec.encodeHashValues(version, hv) + bytes, err := codec.encodeHashValues(hv) if err != nil { return err } From aeb9ac5d49d82c48ec0b71ffbebfc678d29bf0ec Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Thu, 29 Jun 2023 16:53:08 -0400 Subject: [PATCH 2/6] `MerkleDB` -- use default config in all tests (#1590) --- x/merkledb/db.go | 3 +- x/merkledb/db_test.go | 4 +- x/merkledb/history_test.go | 106 ++++++++++++++----------------------- x/merkledb/metrics.go | 1 + x/merkledb/metrics_test.go | 20 +++---- x/merkledb/proof.go | 5 +- x/merkledb/proof_test.go | 6 +-- x/merkledb/trie_test.go | 6 +-- 8 files changed, 57 insertions(+), 94 deletions(-) diff --git a/x/merkledb/db.go b/x/merkledb/db.go index 74267a78aed..4616e51a871 100644 --- a/x/merkledb/db.go +++ b/x/merkledb/db.go @@ -29,7 +29,8 @@ import ( ) const ( - RootPath = EmptyPath + DefaultEvictionBatchSize = 100 + RootPath = EmptyPath // TODO: name better rebuildViewSizeFractionOfCacheSize = 50 minRebuildViewSizePerCommit = 1000 diff --git a/x/merkledb/db_test.go b/x/merkledb/db_test.go index 45bb808bef6..e93521a9aa8 100644 --- a/x/merkledb/db_test.go +++ b/x/merkledb/db_test.go @@ -22,8 +22,6 @@ import ( "github.com/ava-labs/avalanchego/utils/units" ) -const minCacheSize = 1000 - func newNoopTracer() trace.Tracer { tracer, _ := trace.New(trace.Config{Enabled: false}) return tracer @@ -32,7 +30,7 @@ func newNoopTracer() trace.Tracer { func newDefaultConfig() Config { return Config{ EvictionBatchSize: 100, - HistoryLength: 100, + HistoryLength: 300, NodeCacheSize: 1_000, Reg: prometheus.NewRegistry(), Tracer: newNoopTracer(), diff --git a/x/merkledb/history_test.go b/x/merkledb/history_test.go index 7052ec6d516..2905d1c51ee 100644 --- a/x/merkledb/history_test.go +++ b/x/merkledb/history_test.go @@ -21,11 +21,7 @@ func Test_History_Simple(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 300, - NodeCacheSize: minCacheSize, - }, + newDefaultConfig(), ) require.NoError(err) batch := db.NewBatch() @@ -87,23 +83,27 @@ func Test_History_Simple(t *testing.T) { func Test_History_Large(t *testing.T) { require := require.New(t) + numIters := 250 + for i := 1; i < 10; i++ { - now := time.Now().UnixNano() - t.Logf("seed for iter %d: %d", i, now) - r := rand.New(rand.NewSource(now)) // #nosec G404 + config := newDefaultConfig() + // History must be large enough to get the change proof + // after this loop. Multiply by four because every loop + // iteration we do two puts and up to two deletes. + config.HistoryLength = 4 * numIters db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 1500, - NodeCacheSize: 1000, - }, + config, ) require.NoError(err) roots := []ids.ID{} + + now := time.Now().UnixNano() + t.Logf("seed for iter %d: %d", i, now) + r := rand.New(rand.NewSource(now)) // #nosec G404 // make sure they stay in sync - for x := 0; x < 250; x++ { + for x := 0; x < numIters; x++ { addkey := make([]byte, r.Intn(50)) _, err := r.Read(addkey) require.NoError(err) @@ -144,16 +144,17 @@ func Test_History_Large(t *testing.T) { func Test_History_Bad_GetValueChanges_Input(t *testing.T) { require := require.New(t) + config := newDefaultConfig() + config.HistoryLength = 5 + db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 5, - NodeCacheSize: minCacheSize, - }, + config, ) require.NoError(err) + + // Do 5 puts (i.e. the history length) batch := db.NewBatch() require.NoError(batch.Put([]byte("key"), []byte("value"))) require.NoError(batch.Write()) @@ -210,16 +211,17 @@ func Test_History_Bad_GetValueChanges_Input(t *testing.T) { func Test_History_Trigger_History_Queue_Looping(t *testing.T) { require := require.New(t) + config := newDefaultConfig() + config.HistoryLength = 2 + db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 2, - NodeCacheSize: minCacheSize, - }, + config, ) require.NoError(err) + + // Do 2 puts (i.e. the history length) batch := db.NewBatch() require.NoError(batch.Put([]byte("key"), []byte("value"))) require.NoError(batch.Write()) @@ -264,16 +266,16 @@ func Test_History_Trigger_History_Queue_Looping(t *testing.T) { func Test_History_Values_Lookup_Over_Queue_Break(t *testing.T) { require := require.New(t) + config := newDefaultConfig() + config.HistoryLength = 4 db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 4, - NodeCacheSize: minCacheSize, - }, + config, ) require.NoError(err) + + // Do 4 puts (i.e. the history length) batch := db.NewBatch() require.NoError(batch.Put([]byte("key"), []byte("value"))) require.NoError(batch.Write()) @@ -317,11 +319,7 @@ func Test_History_RepeatedRoot(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 100, - NodeCacheSize: minCacheSize, - }, + newDefaultConfig(), ) require.NoError(err) batch := db.NewBatch() @@ -365,11 +363,7 @@ func Test_History_ExcessDeletes(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 100, - NodeCacheSize: minCacheSize, - }, + newDefaultConfig(), ) require.NoError(err) batch := db.NewBatch() @@ -401,11 +395,7 @@ func Test_History_DontIncludeAllNodes(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 100, - NodeCacheSize: minCacheSize, - }, + newDefaultConfig(), ) require.NoError(err) batch := db.NewBatch() @@ -433,11 +423,7 @@ func Test_History_Branching2Nodes(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 100, - NodeCacheSize: minCacheSize, - }, + newDefaultConfig(), ) require.NoError(err) batch := db.NewBatch() @@ -465,11 +451,7 @@ func Test_History_Branching3Nodes(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 100, - NodeCacheSize: minCacheSize, - }, + newDefaultConfig(), ) require.NoError(err) batch := db.NewBatch() @@ -494,14 +476,12 @@ func Test_History_Branching3Nodes(t *testing.T) { func Test_History_MaxLength(t *testing.T) { require := require.New(t) + config := newDefaultConfig() + config.HistoryLength = 2 db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 2, - NodeCacheSize: 1000, - }, + config, ) require.NoError(err) @@ -519,7 +499,7 @@ func Test_History_MaxLength(t *testing.T) { require.Contains(db.history.lastChanges, oldRoot) batch = db.NewBatch() - require.NoError(batch.Put([]byte("k1"), []byte("v2"))) + require.NoError(batch.Put([]byte("k1"), []byte("v2"))) // Overwrites oldest element in history require.NoError(batch.Write()) require.NotContains(db.history.lastChanges, oldRoot) @@ -531,11 +511,7 @@ func Test_Change_List(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 100, - NodeCacheSize: minCacheSize, - }, + newDefaultConfig(), ) require.NoError(err) batch := db.NewBatch() diff --git a/x/merkledb/metrics.go b/x/merkledb/metrics.go index ab672a871db..13edf32e875 100644 --- a/x/merkledb/metrics.go +++ b/x/merkledb/metrics.go @@ -117,6 +117,7 @@ type metrics struct { } func newMetrics(namespace string, reg prometheus.Registerer) (merkleMetrics, error) { + // TODO: Should we instead return an error if reg is nil? if reg == nil { return &mockMetrics{}, nil } diff --git a/x/merkledb/metrics_test.go b/x/merkledb/metrics_test.go index d66fb6fd315..f061faf0cd7 100644 --- a/x/merkledb/metrics_test.go +++ b/x/merkledb/metrics_test.go @@ -7,8 +7,6 @@ import ( "context" "testing" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database" @@ -16,14 +14,15 @@ import ( ) func Test_Metrics_Basic_Usage(t *testing.T) { + config := newDefaultConfig() + // Set to nil so that we use a mockMetrics instead of the real one inside + // merkledb. + config.Reg = nil + db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 300, - NodeCacheSize: minCacheSize, - }, + config, ) require.NoError(t, err) @@ -51,12 +50,7 @@ func Test_Metrics_Initialize(t *testing.T) { db, err := New( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 300, - Reg: prometheus.NewRegistry(), - NodeCacheSize: 1000, - }, + newDefaultConfig(), ) require.NoError(t, err) diff --git a/x/merkledb/proof.go b/x/merkledb/proof.go index a4ef3b5c3ed..4c20e390c0a 100644 --- a/x/merkledb/proof.go +++ b/x/merkledb/proof.go @@ -820,8 +820,9 @@ func getEmptyTrieView(ctx context.Context) (*trieView, error) { ctx, memdb.New(), Config{ - Tracer: tracer, - NodeCacheSize: verificationCacheSize, + EvictionBatchSize: DefaultEvictionBatchSize, + Tracer: tracer, + NodeCacheSize: verificationCacheSize, }, &mockMetrics{}, ) diff --git a/x/merkledb/proof_test.go b/x/merkledb/proof_test.go index 857b8c336ea..026afe55057 100644 --- a/x/merkledb/proof_test.go +++ b/x/merkledb/proof_test.go @@ -25,11 +25,7 @@ func getBasicDB() (*merkleDB, error) { return newDatabase( context.Background(), memdb.New(), - Config{ - Tracer: newNoopTracer(), - HistoryLength: 1000, - NodeCacheSize: 1000, - }, + newDefaultConfig(), &mockMetrics{}, ) } diff --git a/x/merkledb/trie_test.go b/x/merkledb/trie_test.go index 1a284634b42..23b853d59ba 100644 --- a/x/merkledb/trie_test.go +++ b/x/merkledb/trie_test.go @@ -785,11 +785,7 @@ func Test_Trie_MultipleStates(t *testing.T) { db, err := New( context.Background(), rdb, - Config{ - Tracer: newNoopTracer(), - HistoryLength: 100, - NodeCacheSize: 100, - }, + newDefaultConfig(), ) require.NoError(err) defer db.Close() From b3d9f90019369b560315e08266a5ffc8d47741a9 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Fri, 30 Jun 2023 09:27:46 -0400 Subject: [PATCH 3/6] `sync` -- reduce stuttering (#1672) --- proto/pb/sync/sync.pb.go | 106 +++++------ proto/pb/sync/sync_grpc.pb.go | 170 +++++++++--------- proto/sync/sync.proto | 6 +- x/sync/client.go | 4 +- x/sync/client_test.go | 10 +- x/sync/{syncable_db.go => db.go} | 2 +- .../db_client.go} | 26 +-- .../db_server.go} | 28 +-- x/sync/{syncmanager.go => manager.go} | 152 ++++++++-------- x/sync/mock_client.go | 2 +- x/sync/network_server.go | 4 +- x/sync/sync_test.go | 92 +++++----- x/sync/{syncworkheap.go => workheap.go} | 22 +-- ...{syncworkheap_test.go => workheap_test.go} | 62 +++---- 14 files changed, 343 insertions(+), 343 deletions(-) rename x/sync/{syncable_db.go => db.go} (90%) rename x/sync/{gsyncable_db/syncable_db_client.go => g_db/db_client.go} (78%) rename x/sync/{gsyncable_db/syncable_db_server.go => g_db/db_server.go} (85%) rename x/sync/{syncmanager.go => manager.go} (84%) rename x/sync/{syncworkheap.go => workheap.go} (92%) rename x/sync/{syncworkheap_test.go => workheap_test.go} (78%) diff --git a/proto/pb/sync/sync.pb.go b/proto/pb/sync/sync.pb.go index 31307e1d7c0..0657bb7c39d 100644 --- a/proto/pb/sync/sync.pb.go +++ b/proto/pb/sync/sync.pb.go @@ -308,7 +308,7 @@ func (x *Proof) GetProof() []*ProofNode { } // For use in sync client, which has a restriction on the size of -// the response. GetChangeProof in the SyncableDB service doesn't. +// the response. GetChangeProof in the DB service doesn't. type SyncGetChangeProofRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -723,7 +723,7 @@ func (x *CommitChangeProofRequest) GetProof() *ChangeProof { } // For use in sync client, which has a restriction on the size of -// the response. GetRangeProof in the SyncableDB service doesn't. +// the response. GetRangeProof in the DB service doesn't. type SyncGetRangeProofRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1564,43 +1564,43 @@ var file_sync_sync_proto_rawDesc = []byte{ 0x73, 0x4e, 0x6f, 0x74, 0x68, 0x69, 0x6e, 0x67, 0x22, 0x32, 0x0a, 0x08, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x87, 0x04, 0x0a, - 0x0a, 0x53, 0x79, 0x6e, 0x63, 0x61, 0x62, 0x6c, 0x65, 0x44, 0x42, 0x12, 0x44, 0x0a, 0x0d, 0x47, - 0x65, 0x74, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x12, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1b, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x4d, - 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x39, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x15, 0x2e, - 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x50, - 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x0e, - 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1b, - 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, - 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x73, 0x79, - 0x6e, 0x63, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x54, - 0x0a, 0x11, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, - 0x6f, 0x6f, 0x66, 0x12, 0x1e, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x56, 0x65, 0x72, 0x69, 0x66, - 0x79, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x56, 0x65, 0x72, 0x69, 0x66, - 0x79, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x43, 0x68, - 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1e, 0x2e, 0x73, 0x79, 0x6e, 0x63, - 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, - 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x12, 0x48, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, - 0x6f, 0x66, 0x12, 0x1a, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x61, 0x6e, - 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, - 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, - 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x10, 0x43, - 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, - 0x1d, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x61, 0x6e, - 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x2f, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x76, 0x61, 0x2d, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x61, 0x76, - 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x68, 0x65, 0x67, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x70, 0x62, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0xff, 0x03, 0x0a, + 0x02, 0x44, 0x42, 0x12, 0x44, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, + 0x52, 0x6f, 0x6f, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1b, 0x2e, 0x73, + 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x52, 0x6f, 0x6f, + 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x08, 0x47, 0x65, 0x74, + 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x15, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, + 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, + 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x40, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1b, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, + 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x54, 0x0a, 0x11, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, + 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1e, 0x2e, 0x73, 0x79, + 0x6e, 0x63, 0x2e, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, + 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x73, 0x79, + 0x6e, 0x63, 0x2e, 0x56, 0x65, 0x72, 0x69, 0x66, 0x79, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, + 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a, 0x11, + 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, + 0x66, 0x12, 0x1e, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x43, + 0x68, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x48, 0x0a, 0x0d, 0x47, 0x65, 0x74, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1a, 0x2e, 0x73, 0x79, 0x6e, + 0x63, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x47, 0x65, + 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x10, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x61, 0x6e, + 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1d, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x43, + 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x2f, + 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x76, 0x61, + 0x2d, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x61, 0x76, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x68, 0x65, 0x67, + 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x79, 0x6e, 0x63, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1664,20 +1664,20 @@ var file_sync_sync_proto_depIdxs = []int32{ 20, // 18: sync.ProofNode.value_or_hash:type_name -> sync.MaybeBytes 22, // 19: sync.ProofNode.children:type_name -> sync.ProofNode.ChildrenEntry 20, // 20: sync.KeyChange.value:type_name -> sync.MaybeBytes - 23, // 21: sync.SyncableDB.GetMerkleRoot:input_type -> google.protobuf.Empty - 2, // 22: sync.SyncableDB.GetProof:input_type -> sync.GetProofRequest - 7, // 23: sync.SyncableDB.GetChangeProof:input_type -> sync.GetChangeProofRequest - 8, // 24: sync.SyncableDB.VerifyChangeProof:input_type -> sync.VerifyChangeProofRequest - 10, // 25: sync.SyncableDB.CommitChangeProof:input_type -> sync.CommitChangeProofRequest - 12, // 26: sync.SyncableDB.GetRangeProof:input_type -> sync.GetRangeProofRequest - 14, // 27: sync.SyncableDB.CommitRangeProof:input_type -> sync.CommitRangeProofRequest - 1, // 28: sync.SyncableDB.GetMerkleRoot:output_type -> sync.GetMerkleRootResponse - 3, // 29: sync.SyncableDB.GetProof:output_type -> sync.GetProofResponse - 15, // 30: sync.SyncableDB.GetChangeProof:output_type -> sync.ChangeProof - 9, // 31: sync.SyncableDB.VerifyChangeProof:output_type -> sync.VerifyChangeProofResponse - 23, // 32: sync.SyncableDB.CommitChangeProof:output_type -> google.protobuf.Empty - 13, // 33: sync.SyncableDB.GetRangeProof:output_type -> sync.GetRangeProofResponse - 23, // 34: sync.SyncableDB.CommitRangeProof:output_type -> google.protobuf.Empty + 23, // 21: sync.DB.GetMerkleRoot:input_type -> google.protobuf.Empty + 2, // 22: sync.DB.GetProof:input_type -> sync.GetProofRequest + 7, // 23: sync.DB.GetChangeProof:input_type -> sync.GetChangeProofRequest + 8, // 24: sync.DB.VerifyChangeProof:input_type -> sync.VerifyChangeProofRequest + 10, // 25: sync.DB.CommitChangeProof:input_type -> sync.CommitChangeProofRequest + 12, // 26: sync.DB.GetRangeProof:input_type -> sync.GetRangeProofRequest + 14, // 27: sync.DB.CommitRangeProof:input_type -> sync.CommitRangeProofRequest + 1, // 28: sync.DB.GetMerkleRoot:output_type -> sync.GetMerkleRootResponse + 3, // 29: sync.DB.GetProof:output_type -> sync.GetProofResponse + 15, // 30: sync.DB.GetChangeProof:output_type -> sync.ChangeProof + 9, // 31: sync.DB.VerifyChangeProof:output_type -> sync.VerifyChangeProofResponse + 23, // 32: sync.DB.CommitChangeProof:output_type -> google.protobuf.Empty + 13, // 33: sync.DB.GetRangeProof:output_type -> sync.GetRangeProofResponse + 23, // 34: sync.DB.CommitRangeProof:output_type -> google.protobuf.Empty 28, // [28:35] is the sub-list for method output_type 21, // [21:28] is the sub-list for method input_type 21, // [21:21] is the sub-list for extension type_name diff --git a/proto/pb/sync/sync_grpc.pb.go b/proto/pb/sync/sync_grpc.pb.go index b6df42b5500..f708dc0c4b2 100644 --- a/proto/pb/sync/sync_grpc.pb.go +++ b/proto/pb/sync/sync_grpc.pb.go @@ -20,19 +20,19 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - SyncableDB_GetMerkleRoot_FullMethodName = "/sync.SyncableDB/GetMerkleRoot" - SyncableDB_GetProof_FullMethodName = "/sync.SyncableDB/GetProof" - SyncableDB_GetChangeProof_FullMethodName = "/sync.SyncableDB/GetChangeProof" - SyncableDB_VerifyChangeProof_FullMethodName = "/sync.SyncableDB/VerifyChangeProof" - SyncableDB_CommitChangeProof_FullMethodName = "/sync.SyncableDB/CommitChangeProof" - SyncableDB_GetRangeProof_FullMethodName = "/sync.SyncableDB/GetRangeProof" - SyncableDB_CommitRangeProof_FullMethodName = "/sync.SyncableDB/CommitRangeProof" + DB_GetMerkleRoot_FullMethodName = "/sync.DB/GetMerkleRoot" + DB_GetProof_FullMethodName = "/sync.DB/GetProof" + DB_GetChangeProof_FullMethodName = "/sync.DB/GetChangeProof" + DB_VerifyChangeProof_FullMethodName = "/sync.DB/VerifyChangeProof" + DB_CommitChangeProof_FullMethodName = "/sync.DB/CommitChangeProof" + DB_GetRangeProof_FullMethodName = "/sync.DB/GetRangeProof" + DB_CommitRangeProof_FullMethodName = "/sync.DB/CommitRangeProof" ) -// SyncableDBClient is the client API for SyncableDB service. +// DBClient is the client API for DB service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type SyncableDBClient interface { +type DBClient interface { GetMerkleRoot(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMerkleRootResponse, error) GetProof(ctx context.Context, in *GetProofRequest, opts ...grpc.CallOption) (*GetProofResponse, error) GetChangeProof(ctx context.Context, in *GetChangeProofRequest, opts ...grpc.CallOption) (*ChangeProof, error) @@ -42,81 +42,81 @@ type SyncableDBClient interface { CommitRangeProof(ctx context.Context, in *CommitRangeProofRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } -type syncableDBClient struct { +type dBClient struct { cc grpc.ClientConnInterface } -func NewSyncableDBClient(cc grpc.ClientConnInterface) SyncableDBClient { - return &syncableDBClient{cc} +func NewDBClient(cc grpc.ClientConnInterface) DBClient { + return &dBClient{cc} } -func (c *syncableDBClient) GetMerkleRoot(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMerkleRootResponse, error) { +func (c *dBClient) GetMerkleRoot(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMerkleRootResponse, error) { out := new(GetMerkleRootResponse) - err := c.cc.Invoke(ctx, SyncableDB_GetMerkleRoot_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, DB_GetMerkleRoot_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *syncableDBClient) GetProof(ctx context.Context, in *GetProofRequest, opts ...grpc.CallOption) (*GetProofResponse, error) { +func (c *dBClient) GetProof(ctx context.Context, in *GetProofRequest, opts ...grpc.CallOption) (*GetProofResponse, error) { out := new(GetProofResponse) - err := c.cc.Invoke(ctx, SyncableDB_GetProof_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, DB_GetProof_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *syncableDBClient) GetChangeProof(ctx context.Context, in *GetChangeProofRequest, opts ...grpc.CallOption) (*ChangeProof, error) { +func (c *dBClient) GetChangeProof(ctx context.Context, in *GetChangeProofRequest, opts ...grpc.CallOption) (*ChangeProof, error) { out := new(ChangeProof) - err := c.cc.Invoke(ctx, SyncableDB_GetChangeProof_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, DB_GetChangeProof_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *syncableDBClient) VerifyChangeProof(ctx context.Context, in *VerifyChangeProofRequest, opts ...grpc.CallOption) (*VerifyChangeProofResponse, error) { +func (c *dBClient) VerifyChangeProof(ctx context.Context, in *VerifyChangeProofRequest, opts ...grpc.CallOption) (*VerifyChangeProofResponse, error) { out := new(VerifyChangeProofResponse) - err := c.cc.Invoke(ctx, SyncableDB_VerifyChangeProof_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, DB_VerifyChangeProof_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *syncableDBClient) CommitChangeProof(ctx context.Context, in *CommitChangeProofRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *dBClient) CommitChangeProof(ctx context.Context, in *CommitChangeProofRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, SyncableDB_CommitChangeProof_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, DB_CommitChangeProof_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *syncableDBClient) GetRangeProof(ctx context.Context, in *GetRangeProofRequest, opts ...grpc.CallOption) (*GetRangeProofResponse, error) { +func (c *dBClient) GetRangeProof(ctx context.Context, in *GetRangeProofRequest, opts ...grpc.CallOption) (*GetRangeProofResponse, error) { out := new(GetRangeProofResponse) - err := c.cc.Invoke(ctx, SyncableDB_GetRangeProof_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, DB_GetRangeProof_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *syncableDBClient) CommitRangeProof(ctx context.Context, in *CommitRangeProofRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *dBClient) CommitRangeProof(ctx context.Context, in *CommitRangeProofRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, SyncableDB_CommitRangeProof_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, DB_CommitRangeProof_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -// SyncableDBServer is the server API for SyncableDB service. -// All implementations must embed UnimplementedSyncableDBServer +// DBServer is the server API for DB service. +// All implementations must embed UnimplementedDBServer // for forward compatibility -type SyncableDBServer interface { +type DBServer interface { GetMerkleRoot(context.Context, *emptypb.Empty) (*GetMerkleRootResponse, error) GetProof(context.Context, *GetProofRequest) (*GetProofResponse, error) GetChangeProof(context.Context, *GetChangeProofRequest) (*ChangeProof, error) @@ -124,207 +124,207 @@ type SyncableDBServer interface { CommitChangeProof(context.Context, *CommitChangeProofRequest) (*emptypb.Empty, error) GetRangeProof(context.Context, *GetRangeProofRequest) (*GetRangeProofResponse, error) CommitRangeProof(context.Context, *CommitRangeProofRequest) (*emptypb.Empty, error) - mustEmbedUnimplementedSyncableDBServer() + mustEmbedUnimplementedDBServer() } -// UnimplementedSyncableDBServer must be embedded to have forward compatible implementations. -type UnimplementedSyncableDBServer struct { +// UnimplementedDBServer must be embedded to have forward compatible implementations. +type UnimplementedDBServer struct { } -func (UnimplementedSyncableDBServer) GetMerkleRoot(context.Context, *emptypb.Empty) (*GetMerkleRootResponse, error) { +func (UnimplementedDBServer) GetMerkleRoot(context.Context, *emptypb.Empty) (*GetMerkleRootResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetMerkleRoot not implemented") } -func (UnimplementedSyncableDBServer) GetProof(context.Context, *GetProofRequest) (*GetProofResponse, error) { +func (UnimplementedDBServer) GetProof(context.Context, *GetProofRequest) (*GetProofResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetProof not implemented") } -func (UnimplementedSyncableDBServer) GetChangeProof(context.Context, *GetChangeProofRequest) (*ChangeProof, error) { +func (UnimplementedDBServer) GetChangeProof(context.Context, *GetChangeProofRequest) (*ChangeProof, error) { return nil, status.Errorf(codes.Unimplemented, "method GetChangeProof not implemented") } -func (UnimplementedSyncableDBServer) VerifyChangeProof(context.Context, *VerifyChangeProofRequest) (*VerifyChangeProofResponse, error) { +func (UnimplementedDBServer) VerifyChangeProof(context.Context, *VerifyChangeProofRequest) (*VerifyChangeProofResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method VerifyChangeProof not implemented") } -func (UnimplementedSyncableDBServer) CommitChangeProof(context.Context, *CommitChangeProofRequest) (*emptypb.Empty, error) { +func (UnimplementedDBServer) CommitChangeProof(context.Context, *CommitChangeProofRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method CommitChangeProof not implemented") } -func (UnimplementedSyncableDBServer) GetRangeProof(context.Context, *GetRangeProofRequest) (*GetRangeProofResponse, error) { +func (UnimplementedDBServer) GetRangeProof(context.Context, *GetRangeProofRequest) (*GetRangeProofResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetRangeProof not implemented") } -func (UnimplementedSyncableDBServer) CommitRangeProof(context.Context, *CommitRangeProofRequest) (*emptypb.Empty, error) { +func (UnimplementedDBServer) CommitRangeProof(context.Context, *CommitRangeProofRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method CommitRangeProof not implemented") } -func (UnimplementedSyncableDBServer) mustEmbedUnimplementedSyncableDBServer() {} +func (UnimplementedDBServer) mustEmbedUnimplementedDBServer() {} -// UnsafeSyncableDBServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to SyncableDBServer will +// UnsafeDBServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DBServer will // result in compilation errors. -type UnsafeSyncableDBServer interface { - mustEmbedUnimplementedSyncableDBServer() +type UnsafeDBServer interface { + mustEmbedUnimplementedDBServer() } -func RegisterSyncableDBServer(s grpc.ServiceRegistrar, srv SyncableDBServer) { - s.RegisterService(&SyncableDB_ServiceDesc, srv) +func RegisterDBServer(s grpc.ServiceRegistrar, srv DBServer) { + s.RegisterService(&DB_ServiceDesc, srv) } -func _SyncableDB_GetMerkleRoot_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _DB_GetMerkleRoot_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(emptypb.Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SyncableDBServer).GetMerkleRoot(ctx, in) + return srv.(DBServer).GetMerkleRoot(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SyncableDB_GetMerkleRoot_FullMethodName, + FullMethod: DB_GetMerkleRoot_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SyncableDBServer).GetMerkleRoot(ctx, req.(*emptypb.Empty)) + return srv.(DBServer).GetMerkleRoot(ctx, req.(*emptypb.Empty)) } return interceptor(ctx, in, info, handler) } -func _SyncableDB_GetProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _DB_GetProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetProofRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SyncableDBServer).GetProof(ctx, in) + return srv.(DBServer).GetProof(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SyncableDB_GetProof_FullMethodName, + FullMethod: DB_GetProof_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SyncableDBServer).GetProof(ctx, req.(*GetProofRequest)) + return srv.(DBServer).GetProof(ctx, req.(*GetProofRequest)) } return interceptor(ctx, in, info, handler) } -func _SyncableDB_GetChangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _DB_GetChangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetChangeProofRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SyncableDBServer).GetChangeProof(ctx, in) + return srv.(DBServer).GetChangeProof(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SyncableDB_GetChangeProof_FullMethodName, + FullMethod: DB_GetChangeProof_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SyncableDBServer).GetChangeProof(ctx, req.(*GetChangeProofRequest)) + return srv.(DBServer).GetChangeProof(ctx, req.(*GetChangeProofRequest)) } return interceptor(ctx, in, info, handler) } -func _SyncableDB_VerifyChangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _DB_VerifyChangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(VerifyChangeProofRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SyncableDBServer).VerifyChangeProof(ctx, in) + return srv.(DBServer).VerifyChangeProof(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SyncableDB_VerifyChangeProof_FullMethodName, + FullMethod: DB_VerifyChangeProof_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SyncableDBServer).VerifyChangeProof(ctx, req.(*VerifyChangeProofRequest)) + return srv.(DBServer).VerifyChangeProof(ctx, req.(*VerifyChangeProofRequest)) } return interceptor(ctx, in, info, handler) } -func _SyncableDB_CommitChangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _DB_CommitChangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CommitChangeProofRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SyncableDBServer).CommitChangeProof(ctx, in) + return srv.(DBServer).CommitChangeProof(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SyncableDB_CommitChangeProof_FullMethodName, + FullMethod: DB_CommitChangeProof_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SyncableDBServer).CommitChangeProof(ctx, req.(*CommitChangeProofRequest)) + return srv.(DBServer).CommitChangeProof(ctx, req.(*CommitChangeProofRequest)) } return interceptor(ctx, in, info, handler) } -func _SyncableDB_GetRangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _DB_GetRangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetRangeProofRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SyncableDBServer).GetRangeProof(ctx, in) + return srv.(DBServer).GetRangeProof(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SyncableDB_GetRangeProof_FullMethodName, + FullMethod: DB_GetRangeProof_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SyncableDBServer).GetRangeProof(ctx, req.(*GetRangeProofRequest)) + return srv.(DBServer).GetRangeProof(ctx, req.(*GetRangeProofRequest)) } return interceptor(ctx, in, info, handler) } -func _SyncableDB_CommitRangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _DB_CommitRangeProof_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CommitRangeProofRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SyncableDBServer).CommitRangeProof(ctx, in) + return srv.(DBServer).CommitRangeProof(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: SyncableDB_CommitRangeProof_FullMethodName, + FullMethod: DB_CommitRangeProof_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SyncableDBServer).CommitRangeProof(ctx, req.(*CommitRangeProofRequest)) + return srv.(DBServer).CommitRangeProof(ctx, req.(*CommitRangeProofRequest)) } return interceptor(ctx, in, info, handler) } -// SyncableDB_ServiceDesc is the grpc.ServiceDesc for SyncableDB service. +// DB_ServiceDesc is the grpc.ServiceDesc for DB service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) -var SyncableDB_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "sync.SyncableDB", - HandlerType: (*SyncableDBServer)(nil), +var DB_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "sync.DB", + HandlerType: (*DBServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "GetMerkleRoot", - Handler: _SyncableDB_GetMerkleRoot_Handler, + Handler: _DB_GetMerkleRoot_Handler, }, { MethodName: "GetProof", - Handler: _SyncableDB_GetProof_Handler, + Handler: _DB_GetProof_Handler, }, { MethodName: "GetChangeProof", - Handler: _SyncableDB_GetChangeProof_Handler, + Handler: _DB_GetChangeProof_Handler, }, { MethodName: "VerifyChangeProof", - Handler: _SyncableDB_VerifyChangeProof_Handler, + Handler: _DB_VerifyChangeProof_Handler, }, { MethodName: "CommitChangeProof", - Handler: _SyncableDB_CommitChangeProof_Handler, + Handler: _DB_CommitChangeProof_Handler, }, { MethodName: "GetRangeProof", - Handler: _SyncableDB_GetRangeProof_Handler, + Handler: _DB_GetRangeProof_Handler, }, { MethodName: "CommitRangeProof", - Handler: _SyncableDB_CommitRangeProof_Handler, + Handler: _DB_CommitRangeProof_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/proto/sync/sync.proto b/proto/sync/sync.proto index 57c2710c491..49be63b49a1 100644 --- a/proto/sync/sync.proto +++ b/proto/sync/sync.proto @@ -18,7 +18,7 @@ message Request { // Note this service definition only exists for use in tests. // A database shouldn't expose this over the internet, as it // allows for reading/writing to the database. -service SyncableDB { +service DB { rpc GetMerkleRoot(google.protobuf.Empty) returns (GetMerkleRootResponse); rpc GetProof(GetProofRequest) returns (GetProofResponse); @@ -50,7 +50,7 @@ message Proof { } // For use in sync client, which has a restriction on the size of -// the response. GetChangeProof in the SyncableDB service doesn't. +// the response. GetChangeProof in the DB service doesn't. message SyncGetChangeProofRequest { bytes start_root_hash = 1; bytes end_root_hash = 2; @@ -92,7 +92,7 @@ message CommitChangeProofRequest { } // For use in sync client, which has a restriction on the size of -// the response. GetRangeProof in the SyncableDB service doesn't. +// the response. GetRangeProof in the DB service doesn't. message SyncGetRangeProofRequest { bytes root_hash = 1; bytes start_key = 2; diff --git a/x/sync/client.go b/x/sync/client.go index cadfabaa8a5..3d6e0be6fe1 100644 --- a/x/sync/client.go +++ b/x/sync/client.go @@ -45,7 +45,7 @@ type Client interface { // GetChangeProof synchronously sends the given request, returning a parsed ChangesResponse or error // [verificationDB] is the local db that has all key/values in it for the proof's startroot within the proof's key range // Note: this verifies the response including the change proof. - GetChangeProof(ctx context.Context, request *pb.SyncGetChangeProofRequest, verificationDB SyncableDB) (*merkledb.ChangeProof, error) + GetChangeProof(ctx context.Context, request *pb.SyncGetChangeProofRequest, verificationDB DB) (*merkledb.ChangeProof, error) } type client struct { @@ -79,7 +79,7 @@ func NewClient(config *ClientConfig) Client { // GetChangeProof synchronously retrieves the change proof given by [req]. // Upon failure, retries until the context is expired. // The returned change proof is verified. -func (c *client) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db SyncableDB) (*merkledb.ChangeProof, error) { +func (c *client) GetChangeProof(ctx context.Context, req *pb.SyncGetChangeProofRequest, db DB) (*merkledb.ChangeProof, error) { parseFn := func(ctx context.Context, responseBytes []byte) (*merkledb.ChangeProof, error) { if len(responseBytes) > int(req.BytesLimit) { return nil, fmt.Errorf("%w: (%d) > %d)", errTooManyBytes, len(responseBytes), req.BytesLimit) diff --git a/x/sync/client_test.go b/x/sync/client_test.go index b6be46e31aa..0b118c704e4 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -40,7 +40,7 @@ func newDefaultDBConfig() merkledb.Config { func sendRangeRequest( t *testing.T, - db SyncableDB, + db DB, request *pb.SyncGetRangeProofRequest, maxAttempts uint32, modifyResponse func(*merkledb.RangeProof), @@ -144,7 +144,7 @@ func TestGetRangeProof(t *testing.T) { require.NoError(t, err) tests := map[string]struct { - db SyncableDB + db DB request *pb.SyncGetRangeProofRequest modifyResponse func(*merkledb.RangeProof) expectedErr error @@ -299,8 +299,8 @@ func TestGetRangeProof(t *testing.T) { func sendChangeRequest( t *testing.T, - db SyncableDB, - verificationDB SyncableDB, + db DB, + verificationDB DB, request *pb.SyncGetChangeProofRequest, maxAttempts uint32, modifyResponse func(*merkledb.ChangeProof), @@ -450,7 +450,7 @@ func TestGetChangeProof(t *testing.T) { require.NoError(t, err) tests := map[string]struct { - db SyncableDB + db DB request *pb.SyncGetChangeProofRequest modifyResponse func(*merkledb.ChangeProof) expectedErr error diff --git a/x/sync/syncable_db.go b/x/sync/db.go similarity index 90% rename from x/sync/syncable_db.go rename to x/sync/db.go index 86c04c275db..94b5542e34c 100644 --- a/x/sync/syncable_db.go +++ b/x/sync/db.go @@ -5,7 +5,7 @@ package sync import "github.com/ava-labs/avalanchego/x/merkledb" -type SyncableDB interface { +type DB interface { merkledb.MerkleRootGetter merkledb.ProofGetter merkledb.ChangeProofer diff --git a/x/sync/gsyncable_db/syncable_db_client.go b/x/sync/g_db/db_client.go similarity index 78% rename from x/sync/gsyncable_db/syncable_db_client.go rename to x/sync/g_db/db_client.go index 57564be95c6..358db39bd0b 100644 --- a/x/sync/gsyncable_db/syncable_db_client.go +++ b/x/sync/g_db/db_client.go @@ -1,7 +1,7 @@ // Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -package gsyncabledb +package gdb import ( "context" @@ -16,17 +16,17 @@ import ( pb "github.com/ava-labs/avalanchego/proto/pb/sync" ) -var _ sync.SyncableDB = (*SyncableDBClient)(nil) +var _ sync.DB = (*DBClient)(nil) -func NewSyncableDBClient(client pb.SyncableDBClient) *SyncableDBClient { - return &SyncableDBClient{client: client} +func NewDBClient(client pb.DBClient) *DBClient { + return &DBClient{client: client} } -type SyncableDBClient struct { - client pb.SyncableDBClient +type DBClient struct { + client pb.DBClient } -func (c *SyncableDBClient) GetMerkleRoot(ctx context.Context) (ids.ID, error) { +func (c *DBClient) GetMerkleRoot(ctx context.Context) (ids.ID, error) { resp, err := c.client.GetMerkleRoot(ctx, &emptypb.Empty{}) if err != nil { return ids.ID{}, err @@ -34,7 +34,7 @@ func (c *SyncableDBClient) GetMerkleRoot(ctx context.Context) (ids.ID, error) { return ids.ToID(resp.RootHash) } -func (c *SyncableDBClient) GetChangeProof( +func (c *DBClient) GetChangeProof( ctx context.Context, startRootID ids.ID, endRootID ids.ID, @@ -59,7 +59,7 @@ func (c *SyncableDBClient) GetChangeProof( return &proof, nil } -func (c *SyncableDBClient) VerifyChangeProof( +func (c *DBClient) VerifyChangeProof( ctx context.Context, proof *merkledb.ChangeProof, startKey []byte, @@ -83,14 +83,14 @@ func (c *SyncableDBClient) VerifyChangeProof( return errors.New(resp.Error) } -func (c *SyncableDBClient) CommitChangeProof(ctx context.Context, proof *merkledb.ChangeProof) error { +func (c *DBClient) CommitChangeProof(ctx context.Context, proof *merkledb.ChangeProof) error { _, err := c.client.CommitChangeProof(ctx, &pb.CommitChangeProofRequest{ Proof: proof.ToProto(), }) return err } -func (c *SyncableDBClient) GetProof(ctx context.Context, key []byte) (*merkledb.Proof, error) { +func (c *DBClient) GetProof(ctx context.Context, key []byte) (*merkledb.Proof, error) { resp, err := c.client.GetProof(ctx, &pb.GetProofRequest{ Key: key, }) @@ -105,7 +105,7 @@ func (c *SyncableDBClient) GetProof(ctx context.Context, key []byte) (*merkledb. return &proof, nil } -func (c *SyncableDBClient) GetRangeProofAtRoot( +func (c *DBClient) GetRangeProofAtRoot( ctx context.Context, rootID ids.ID, startKey []byte, @@ -129,7 +129,7 @@ func (c *SyncableDBClient) GetRangeProofAtRoot( return &proof, nil } -func (c *SyncableDBClient) CommitRangeProof( +func (c *DBClient) CommitRangeProof( ctx context.Context, startKey []byte, proof *merkledb.RangeProof, diff --git a/x/sync/gsyncable_db/syncable_db_server.go b/x/sync/g_db/db_server.go similarity index 85% rename from x/sync/gsyncable_db/syncable_db_server.go rename to x/sync/g_db/db_server.go index b4d25cac428..c2a300c668f 100644 --- a/x/sync/gsyncable_db/syncable_db_server.go +++ b/x/sync/g_db/db_server.go @@ -1,7 +1,7 @@ // Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -package gsyncabledb +package gdb import ( "context" @@ -15,19 +15,19 @@ import ( pb "github.com/ava-labs/avalanchego/proto/pb/sync" ) -var _ pb.SyncableDBServer = (*SyncableDBServer)(nil) +var _ pb.DBServer = (*DBServer)(nil) -func NewSyncableDBServer(db sync.SyncableDB) *SyncableDBServer { - return &SyncableDBServer{db: db} +func NewDBServer(db sync.DB) *DBServer { + return &DBServer{db: db} } -type SyncableDBServer struct { - pb.UnsafeSyncableDBServer +type DBServer struct { + pb.UnsafeDBServer - db sync.SyncableDB + db sync.DB } -func (s *SyncableDBServer) GetMerkleRoot( +func (s *DBServer) GetMerkleRoot( ctx context.Context, _ *emptypb.Empty, ) (*pb.GetMerkleRootResponse, error) { @@ -40,7 +40,7 @@ func (s *SyncableDBServer) GetMerkleRoot( }, nil } -func (s *SyncableDBServer) GetChangeProof( +func (s *DBServer) GetChangeProof( ctx context.Context, req *pb.GetChangeProofRequest, ) (*pb.ChangeProof, error) { @@ -66,7 +66,7 @@ func (s *SyncableDBServer) GetChangeProof( return changeProof.ToProto(), nil } -func (s *SyncableDBServer) VerifyChangeProof( +func (s *DBServer) VerifyChangeProof( ctx context.Context, req *pb.VerifyChangeProofRequest, ) (*pb.VerifyChangeProofResponse, error) { @@ -90,7 +90,7 @@ func (s *SyncableDBServer) VerifyChangeProof( }, nil } -func (s *SyncableDBServer) CommitChangeProof( +func (s *DBServer) CommitChangeProof( ctx context.Context, req *pb.CommitChangeProofRequest, ) (*emptypb.Empty, error) { @@ -103,7 +103,7 @@ func (s *SyncableDBServer) CommitChangeProof( return &emptypb.Empty{}, err } -func (s *SyncableDBServer) GetProof( +func (s *DBServer) GetProof( ctx context.Context, req *pb.GetProofRequest, ) (*pb.GetProofResponse, error) { @@ -117,7 +117,7 @@ func (s *SyncableDBServer) GetProof( }, nil } -func (s *SyncableDBServer) GetRangeProof( +func (s *DBServer) GetRangeProof( ctx context.Context, req *pb.GetRangeProofRequest, ) (*pb.GetRangeProofResponse, error) { @@ -154,7 +154,7 @@ func (s *SyncableDBServer) GetRangeProof( return protoProof, nil } -func (s *SyncableDBServer) CommitRangeProof( +func (s *DBServer) CommitRangeProof( ctx context.Context, req *pb.CommitRangeProofRequest, ) (*emptypb.Empty, error) { diff --git a/x/sync/syncmanager.go b/x/sync/manager.go similarity index 84% rename from x/sync/syncmanager.go rename to x/sync/manager.go index 98a335edc08..e3e2736270a 100644 --- a/x/sync/syncmanager.go +++ b/x/sync/manager.go @@ -25,8 +25,8 @@ const ( ) var ( - ErrAlreadyStarted = errors.New("cannot start a StateSyncManager that has already been started") - ErrAlreadyClosed = errors.New("StateSyncManager is closed") + ErrAlreadyStarted = errors.New("cannot start a Manager that has already been started") + ErrAlreadyClosed = errors.New("Manager is closed") ErrNoClientProvided = errors.New("client is a required field of the sync config") ErrNoDatabaseProvided = errors.New("sync database is a required field of the sync config") ErrNoLogProvided = errors.New("log is a required field of the sync config") @@ -48,16 +48,16 @@ const ( // nil [end] means there is no upper bound. // [LocalRootID] is the ID of the root of this range in our database. // If we have no local root for this range, [LocalRootID] is ids.Empty. -type syncWorkItem struct { +type workItem struct { start []byte end []byte priority priority LocalRootID ids.ID } -// TODO danlaine look into using a sync.Pool for syncWorkItems -func newWorkItem(localRootID ids.ID, start, end []byte, priority priority) *syncWorkItem { - return &syncWorkItem{ +// TODO danlaine look into using a sync.Pool for workItems +func newWorkItem(localRootID ids.ID, start, end []byte, priority priority) *workItem { + return &workItem{ LocalRootID: localRootID, start: start, end: end, @@ -65,10 +65,10 @@ func newWorkItem(localRootID ids.ID, start, end []byte, priority priority) *sync } } -type StateSyncManager struct { +type Manager struct { // Must be held when accessing [config.TargetRoot]. syncTargetLock sync.RWMutex - config StateSyncConfig + config ManagerConfig workLock sync.Mutex // The number of work items currently being processed. @@ -76,7 +76,7 @@ type StateSyncManager struct { // [workLock] must be held when accessing [processingWorkItems]. processingWorkItems int // [workLock] must be held while accessing [unprocessedWork]. - unprocessedWork *syncWorkHeap + unprocessedWork *workHeap // Signalled when: // - An item is added to [unprocessedWork]. // - An item is added to [processedWork]. @@ -84,13 +84,13 @@ type StateSyncManager struct { // [workLock] is its inner lock. unprocessedWorkCond sync.Cond // [workLock] must be held while accessing [processedWork]. - processedWork *syncWorkHeap + processedWork *workHeap // When this is closed: // - [closed] is true. // - [cancelCtx] was called. // - [workToBeDone] and [completedWork] are closed. - syncDoneChan chan struct{} + doneChan chan struct{} errLock sync.Mutex // If non-nil, there was a fatal error. @@ -105,19 +105,19 @@ type StateSyncManager struct { closeOnce sync.Once } -type StateSyncConfig struct { - SyncDB SyncableDB +type ManagerConfig struct { + DB DB Client Client SimultaneousWorkLimit int Log logging.Logger TargetRoot ids.ID } -func NewStateSyncManager(config StateSyncConfig) (*StateSyncManager, error) { +func NewManager(config ManagerConfig) (*Manager, error) { switch { case config.Client == nil: return nil, ErrNoClientProvided - case config.SyncDB == nil: + case config.DB == nil: return nil, ErrNoDatabaseProvided case config.Log == nil: return nil, ErrNoLogProvided @@ -125,18 +125,18 @@ func NewStateSyncManager(config StateSyncConfig) (*StateSyncManager, error) { return nil, ErrZeroWorkLimit } - m := &StateSyncManager{ + m := &Manager{ config: config, - syncDoneChan: make(chan struct{}), - unprocessedWork: newSyncWorkHeap(), - processedWork: newSyncWorkHeap(), + doneChan: make(chan struct{}), + unprocessedWork: newWorkHeap(), + processedWork: newWorkHeap(), } m.unprocessedWorkCond.L = &m.workLock return m, nil } -func (m *StateSyncManager) StartSyncing(ctx context.Context) error { +func (m *Manager) Start(ctx context.Context) error { m.workLock.Lock() defer m.workLock.Unlock() @@ -158,7 +158,7 @@ func (m *StateSyncManager) StartSyncing(ctx context.Context) error { // sync awaits signal on [m.unprocessedWorkCond], which indicates that there // is work to do or syncing completes. If there is work, sync will dispatch a goroutine to do // the work. -func (m *StateSyncManager) sync(ctx context.Context) { +func (m *Manager) sync(ctx context.Context) { defer func() { // Invariant: [m.workLock] is held when this goroutine begins. m.close() @@ -192,17 +192,17 @@ func (m *StateSyncManager) sync(ctx context.Context) { continue } m.processingWorkItems++ - workItem := m.unprocessedWork.GetWork() + work := m.unprocessedWork.GetWork() // TODO danlaine: We won't release [m.workLock] until // we've started a goroutine for each available work item. // We can't apply proofs we receive until we release [m.workLock]. // Is this OK? Is it possible we end up with too many goroutines? - go m.doWork(ctx, workItem) + go m.doWork(ctx, work) } } // Close will stop the syncing process -func (m *StateSyncManager) Close() { +func (m *Manager) Close() { m.workLock.Lock() defer m.workLock.Unlock() m.close() @@ -210,7 +210,7 @@ func (m *StateSyncManager) Close() { // close is called when there is a fatal error or sync is complete. // [workLock] must be held -func (m *StateSyncManager) close() { +func (m *Manager) close() { m.closeOnce.Do(func() { // Don't process any more work items. // Drop currently processing work items. @@ -224,13 +224,13 @@ func (m *StateSyncManager) close() { m.processedWork.Close() // signal all code waiting on the sync to complete - close(m.syncDoneChan) + close(m.doneChan) }) } // Processes [item] by fetching and applying a change or range proof. // Assumes [m.workLock] is not held. -func (m *StateSyncManager) doWork(ctx context.Context, item *syncWorkItem) { +func (m *Manager) doWork(ctx context.Context, work *workItem) { defer func() { m.workLock.Lock() defer m.workLock.Unlock() @@ -239,37 +239,37 @@ func (m *StateSyncManager) doWork(ctx context.Context, item *syncWorkItem) { m.unprocessedWorkCond.Signal() }() - if item.LocalRootID == ids.Empty { + if work.LocalRootID == ids.Empty { // the keys in this range have not been downloaded, so get all key/values - m.getAndApplyRangeProof(ctx, item) + m.getAndApplyRangeProof(ctx, work) } else { // the keys in this range have already been downloaded, but the root changed, so get all changes - m.getAndApplyChangeProof(ctx, item) + m.getAndApplyChangeProof(ctx, work) } } -// Fetch and apply the change proof given by [workItem]. +// Fetch and apply the change proof given by [work]. // Assumes [m.workLock] is not held. -func (m *StateSyncManager) getAndApplyChangeProof(ctx context.Context, workItem *syncWorkItem) { +func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) { rootID := m.getTargetRoot() - if workItem.LocalRootID == rootID { + if work.LocalRootID == rootID { // Start root is the same as the end root, so we're done. - m.completeWorkItem(ctx, workItem, workItem.end, rootID, nil) + m.completeWorkItem(ctx, work, work.end, rootID, nil) return } changeProof, err := m.config.Client.GetChangeProof( ctx, &pb.SyncGetChangeProofRequest{ - StartRootHash: workItem.LocalRootID[:], + StartRootHash: work.LocalRootID[:], EndRootHash: rootID[:], - StartKey: workItem.start, - EndKey: workItem.end, + StartKey: work.start, + EndKey: work.end, KeyLimit: defaultRequestKeyLimit, BytesLimit: defaultRequestByteSizeLimit, }, - m.config.SyncDB, + m.config.DB, ) if err != nil { m.setError(err) @@ -277,7 +277,7 @@ func (m *StateSyncManager) getAndApplyChangeProof(ctx context.Context, workItem } select { - case <-m.syncDoneChan: + case <-m.doneChan: // If we're closed, don't apply the proof. return default: @@ -287,33 +287,33 @@ func (m *StateSyncManager) getAndApplyChangeProof(ctx context.Context, workItem // Add this range as a fresh uncompleted work item to the work heap. // TODO danlaine send range proof instead of failure notification if !changeProof.HadRootsInHistory { - workItem.LocalRootID = ids.Empty - m.enqueueWork(workItem) + work.LocalRootID = ids.Empty + m.enqueueWork(work) return } - largestHandledKey := workItem.end + largestHandledKey := work.end // if the proof wasn't empty, apply changes to the sync DB if len(changeProof.KeyChanges) > 0 { - if err := m.config.SyncDB.CommitChangeProof(ctx, changeProof); err != nil { + if err := m.config.DB.CommitChangeProof(ctx, changeProof); err != nil { m.setError(err) return } largestHandledKey = changeProof.KeyChanges[len(changeProof.KeyChanges)-1].Key } - m.completeWorkItem(ctx, workItem, largestHandledKey, rootID, changeProof.EndProof) + m.completeWorkItem(ctx, work, largestHandledKey, rootID, changeProof.EndProof) } -// Fetch and apply the range proof given by [workItem]. +// Fetch and apply the range proof given by [work]. // Assumes [m.workLock] is not held. -func (m *StateSyncManager) getAndApplyRangeProof(ctx context.Context, workItem *syncWorkItem) { +func (m *Manager) getAndApplyRangeProof(ctx context.Context, work *workItem) { rootID := m.getTargetRoot() proof, err := m.config.Client.GetRangeProof(ctx, &pb.SyncGetRangeProofRequest{ RootHash: rootID[:], - StartKey: workItem.start, - EndKey: workItem.end, + StartKey: work.start, + EndKey: work.end, KeyLimit: defaultRequestKeyLimit, BytesLimit: defaultRequestByteSizeLimit, }, @@ -324,16 +324,16 @@ func (m *StateSyncManager) getAndApplyRangeProof(ctx context.Context, workItem * } select { - case <-m.syncDoneChan: + case <-m.doneChan: // If we're closed, don't apply the proof. return default: } - largestHandledKey := workItem.end + largestHandledKey := work.end if len(proof.KeyValues) > 0 { // Add all the key-value pairs we got to the database. - if err := m.config.SyncDB.CommitRangeProof(ctx, workItem.start, proof); err != nil { + if err := m.config.DB.CommitRangeProof(ctx, work.start, proof); err != nil { m.setError(err) return } @@ -341,7 +341,7 @@ func (m *StateSyncManager) getAndApplyRangeProof(ctx context.Context, workItem * largestHandledKey = proof.KeyValues[len(proof.KeyValues)-1].Key } - m.completeWorkItem(ctx, workItem, largestHandledKey, rootID, proof.EndProof) + m.completeWorkItem(ctx, work, largestHandledKey, rootID, proof.EndProof) } // findNextKey returns the start of the key range that should be fetched next. @@ -357,7 +357,7 @@ func (m *StateSyncManager) getAndApplyRangeProof(ctx context.Context, workItem * // Namely it's an inclusion/exclusion proof for [lastReceivedKey]. // // Invariant: [lastReceivedKey] < [rangeEnd]. -func (m *StateSyncManager) findNextKey( +func (m *Manager) findNextKey( ctx context.Context, lastReceivedKey []byte, rangeEnd []byte, @@ -392,7 +392,7 @@ func (m *StateSyncManager) findNextKey( } // get a proof for the same key as the received proof from the local db - localProofOfKey, err := m.config.SyncDB.GetProof(ctx, proofKeyPath.Value) + localProofOfKey, err := m.config.DB.GetProof(ctx, proofKeyPath.Value) if err != nil { return nil, err } @@ -518,7 +518,7 @@ func findChildDifference(node1, node2 *merkledb.ProofNode, startIndex byte) (byt return 0, false } -func (m *StateSyncManager) Error() error { +func (m *Manager) Error() error { m.errLock.Lock() defer m.errLock.Unlock() @@ -530,9 +530,9 @@ func (m *StateSyncManager) Error() error { // - sync fatally errored. // - [ctx] is canceled. // If [ctx] is canceled, returns [ctx].Err(). -func (m *StateSyncManager) Wait(ctx context.Context) error { +func (m *Manager) Wait(ctx context.Context) error { select { - case <-m.syncDoneChan: + case <-m.doneChan: case <-ctx.Done(): return ctx.Err() } @@ -542,7 +542,7 @@ func (m *StateSyncManager) Wait(ctx context.Context) error { return err } - root, err := m.config.SyncDB.GetMerkleRoot(ctx) + root, err := m.config.DB.GetMerkleRoot(ctx) if err != nil { m.config.Log.Info("completed with error", zap.Error(err)) return err @@ -555,12 +555,12 @@ func (m *StateSyncManager) Wait(ctx context.Context) error { return nil } -func (m *StateSyncManager) UpdateSyncTarget(syncTargetRoot ids.ID) error { +func (m *Manager) UpdateSyncTarget(syncTargetRoot ids.ID) error { m.workLock.Lock() defer m.workLock.Unlock() select { - case <-m.syncDoneChan: + case <-m.doneChan: return ErrAlreadyClosed default: } @@ -593,7 +593,7 @@ func (m *StateSyncManager) UpdateSyncTarget(syncTargetRoot ids.ID) error { return nil } -func (m *StateSyncManager) getTargetRoot() ids.ID { +func (m *Manager) getTargetRoot() ids.ID { m.syncTargetLock.RLock() defer m.syncTargetLock.RUnlock() @@ -601,7 +601,7 @@ func (m *StateSyncManager) getTargetRoot() ids.ID { } // Record that there was a fatal error and begin shutting down. -func (m *StateSyncManager) setError(err error) { +func (m *Manager) setError(err error) { m.errLock.Lock() defer m.errLock.Unlock() @@ -614,11 +614,11 @@ func (m *StateSyncManager) setError(err error) { // Mark the range [start, end] as synced up to [rootID]. // Assumes [m.workLock] is not held. -func (m *StateSyncManager) completeWorkItem(ctx context.Context, workItem *syncWorkItem, largestHandledKey []byte, rootID ids.ID, proofOfLargestKey []merkledb.ProofNode) { +func (m *Manager) completeWorkItem(ctx context.Context, work *workItem, largestHandledKey []byte, rootID ids.ID, proofOfLargestKey []merkledb.ProofNode) { // if the last key is equal to the end, then the full range is completed - if !bytes.Equal(largestHandledKey, workItem.end) { + if !bytes.Equal(largestHandledKey, work.end) { // find the next key to start querying by comparing the proofs for the last completed key - nextStartKey, err := m.findNextKey(ctx, largestHandledKey, workItem.end, proofOfLargestKey) + nextStartKey, err := m.findNextKey(ctx, largestHandledKey, work.end, proofOfLargestKey) if err != nil { m.setError(err) return @@ -626,27 +626,27 @@ func (m *StateSyncManager) completeWorkItem(ctx context.Context, workItem *syncW // nextStartKey being nil indicates that the entire range has been completed if nextStartKey == nil { - largestHandledKey = workItem.end + largestHandledKey = work.end } else { // the full range wasn't completed, so enqueue a new work item for the range [nextStartKey, workItem.end] - m.enqueueWork(newWorkItem(workItem.LocalRootID, nextStartKey, workItem.end, workItem.priority)) + m.enqueueWork(newWorkItem(work.LocalRootID, nextStartKey, work.end, work.priority)) largestHandledKey = nextStartKey } } - // completed the range [workItem.start, lastKey], log and record in the completed work heap + // completed the range [work.start, lastKey], log and record in the completed work heap m.config.Log.Info("completed range", - zap.Binary("start", workItem.start), + zap.Binary("start", work.start), zap.Binary("end", largestHandledKey), ) if m.getTargetRoot() == rootID { m.workLock.Lock() defer m.workLock.Unlock() - m.processedWork.MergeInsert(newWorkItem(rootID, workItem.start, largestHandledKey, workItem.priority)) + m.processedWork.MergeInsert(newWorkItem(rootID, work.start, largestHandledKey, work.priority)) } else { // the root has changed, so reinsert with high priority - m.enqueueWork(newWorkItem(rootID, workItem.start, largestHandledKey, highPriority)) + m.enqueueWork(newWorkItem(rootID, work.start, largestHandledKey, highPriority)) } } @@ -654,7 +654,7 @@ func (m *StateSyncManager) completeWorkItem(ctx context.Context, workItem *syncW // If there are sufficiently few unprocessed/processing work items, // splits the range into two items and queues them both. // Assumes [m.workLock] is not held. -func (m *StateSyncManager) enqueueWork(item *syncWorkItem) { +func (m *Manager) enqueueWork(work *workItem) { m.workLock.Lock() defer func() { m.workLock.Unlock() @@ -663,18 +663,18 @@ func (m *StateSyncManager) enqueueWork(item *syncWorkItem) { if m.processingWorkItems+m.unprocessedWork.Len() > 2*m.config.SimultaneousWorkLimit { // There are too many work items already, don't split the range - m.unprocessedWork.Insert(item) + m.unprocessedWork.Insert(work) return } // Split the remaining range into to 2. // Find the middle point. - mid := midPoint(item.start, item.end) + mid := midPoint(work.start, work.end) // first item gets higher priority than the second to encourage finished ranges to grow // rather than start a new range that is not contiguous with existing completed ranges - first := newWorkItem(item.LocalRootID, item.start, mid, medPriority) - second := newWorkItem(item.LocalRootID, mid, item.end, lowPriority) + first := newWorkItem(work.LocalRootID, work.start, mid, medPriority) + second := newWorkItem(work.LocalRootID, mid, work.end, lowPriority) m.unprocessedWork.Insert(first) m.unprocessedWork.Insert(second) diff --git a/x/sync/mock_client.go b/x/sync/mock_client.go index c5e5b9d57c3..5b55ce619dd 100644 --- a/x/sync/mock_client.go +++ b/x/sync/mock_client.go @@ -40,7 +40,7 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { } // GetChangeProof mocks base method. -func (m *MockClient) GetChangeProof(arg0 context.Context, arg1 *sync.SyncGetChangeProofRequest, arg2 SyncableDB) (*merkledb.ChangeProof, error) { +func (m *MockClient) GetChangeProof(arg0 context.Context, arg1 *sync.SyncGetChangeProofRequest, arg2 DB) (*merkledb.ChangeProof, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetChangeProof", arg0, arg1, arg2) ret0, _ := ret[0].(*merkledb.ChangeProof) diff --git a/x/sync/network_server.go b/x/sync/network_server.go index 72d021ac144..ef4706b9dda 100644 --- a/x/sync/network_server.go +++ b/x/sync/network_server.go @@ -45,11 +45,11 @@ var ErrMinProofSizeIsTooLarge = errors.New("cannot generate any proof within the type NetworkServer struct { appSender common.AppSender // Used to respond to peer requests via AppResponse. - db SyncableDB + db DB log logging.Logger } -func NewNetworkServer(appSender common.AppSender, db SyncableDB, log logging.Logger) *NetworkServer { +func NewNetworkServer(appSender common.AppSender, db DB, log logging.Logger) *NetworkServer { return &NetworkServer{ appSender: appSender, db: db, diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 10ed874399d..8794c19e9dd 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -34,10 +34,10 @@ func newNoopTracer() trace.Tracer { } type mockClient struct { - db SyncableDB + db DB } -func (client *mockClient) GetChangeProof(ctx context.Context, request *pb.SyncGetChangeProofRequest, _ SyncableDB) (*merkledb.ChangeProof, error) { +func (client *mockClient) GetChangeProof(ctx context.Context, request *pb.SyncGetChangeProofRequest, _ DB) (*merkledb.ChangeProof, error) { startRoot, err := ids.ToID(request.StartRootHash) if err != nil { return nil, err @@ -67,8 +67,8 @@ func Test_Creation(t *testing.T) { ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{}, TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, @@ -98,8 +98,8 @@ func Test_Completion(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: emptyDB}, TargetRoot: emptyRoot, SimultaneousWorkLimit: 5, @@ -107,7 +107,7 @@ func Test_Completion(t *testing.T) { }) require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) require.NoError(syncer.Wait(context.Background())) syncer.workLock.Lock() require.Zero(syncer.unprocessedWork.Len()) @@ -199,8 +199,8 @@ func Test_Sync_FindNextKey_InSync(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: dbToSync}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -209,7 +209,7 @@ func Test_Sync_FindNextKey_InSync(t *testing.T) { require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) require.NoError(syncer.Wait(context.Background())) proof, err := dbToSync.GetRangeProof(context.Background(), nil, nil, 500) @@ -272,8 +272,8 @@ func Test_Sync_FindNextKey_Deleted(t *testing.T) { syncRoot, err := db.GetMerkleRoot(context.Background()) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: nil}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -318,8 +318,8 @@ func Test_Sync_FindNextKey_BranchInLocal(t *testing.T) { proof, err := db.GetProof(context.Background(), []byte{0x11, 0x11}) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: nil}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -351,8 +351,8 @@ func Test_Sync_FindNextKey_BranchInReceived(t *testing.T) { proof, err := db.GetProof(context.Background(), []byte{0x11, 0x11}) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: nil}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -384,8 +384,8 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: dbToSync}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -394,7 +394,7 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) { require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) require.NoError(syncer.Wait(context.Background())) proof, err := dbToSync.GetRangeProof(context.Background(), nil, nil, 500) @@ -443,8 +443,8 @@ func TestFindNextKeyEmptyEndProof(t *testing.T) { ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: nil}, TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, @@ -504,8 +504,8 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: dbToSync}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -513,7 +513,7 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { }) require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) require.NoError(syncer.Wait(context.Background())) proof, err := dbToSync.GetRangeProof(context.Background(), nil, nil, 100) @@ -715,8 +715,8 @@ func TestFindNextKeyRandom(t *testing.T) { } // Get the actual value from the syncer - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: localDB, + syncer, err := NewManager(ManagerConfig{ + DB: localDB, Client: &mockClient{db: nil}, TargetRoot: ids.GenerateTestID(), SimultaneousWorkLimit: 5, @@ -760,8 +760,8 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: dbToSync}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -769,7 +769,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { }) require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) require.NoError(syncer.Wait(context.Background())) require.NoError(syncer.Error()) @@ -819,8 +819,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { ) require.NoError(err) - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: dbToSync}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -828,7 +828,7 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { }) require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) // Wait until we've processed some work // before updating the sync target. @@ -844,8 +844,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { ) syncer.Close() - newSyncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + newSyncer, err := NewManager(ManagerConfig{ + DB: db, Client: &mockClient{db: dbToSync}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -854,7 +854,7 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { require.NoError(err) require.NotNil(newSyncer) - require.NoError(newSyncer.StartSyncing(context.Background())) + require.NoError(newSyncer.Start(context.Background())) require.NoError(newSyncer.Error()) require.NoError(newSyncer.Wait(context.Background())) @@ -892,7 +892,7 @@ func Test_Sync_Error_During_Sync(t *testing.T) { }, ).AnyTimes() client.EXPECT().GetChangeProof(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, request *pb.SyncGetChangeProofRequest, _ SyncableDB) (*merkledb.ChangeProof, error) { + func(ctx context.Context, request *pb.SyncGetChangeProofRequest, _ DB) (*merkledb.ChangeProof, error) { startRoot, err := ids.ToID(request.StartRootHash) require.NoError(err) endRoot, err := ids.ToID(request.EndRootHash) @@ -901,8 +901,8 @@ func Test_Sync_Error_During_Sync(t *testing.T) { }, ).AnyTimes() - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: client, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, @@ -911,7 +911,7 @@ func Test_Sync_Error_During_Sync(t *testing.T) { require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) err = syncer.Wait(context.Background()) require.ErrorIs(err, errInvalidRangeProof) @@ -980,7 +980,7 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { }, ).AnyTimes() client.EXPECT().GetChangeProof(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, request *pb.SyncGetChangeProofRequest, _ SyncableDB) (*merkledb.ChangeProof, error) { + func(ctx context.Context, request *pb.SyncGetChangeProofRequest, _ DB) (*merkledb.ChangeProof, error) { <-updatedRootChan startRoot, err := ids.ToID(request.StartRootHash) require.NoError(err) @@ -990,8 +990,8 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { }, ).AnyTimes() - syncer, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: db, + syncer, err := NewManager(ManagerConfig{ + DB: db, Client: client, TargetRoot: firstSyncRoot, SimultaneousWorkLimit: 5, @@ -1000,7 +1000,7 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { require.NoError(err) require.NotNil(syncer) - require.NoError(syncer.StartSyncing(context.Background())) + require.NoError(syncer.Start(context.Background())) // Wait until we've processed some work // before updating the sync target. @@ -1031,8 +1031,8 @@ func Test_Sync_UpdateSyncTarget(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - m, err := NewStateSyncManager(StateSyncConfig{ - SyncDB: merkledb.NewMockMerkleDB(ctrl), // Not used + m, err := NewManager(ManagerConfig{ + DB: merkledb.NewMockMerkleDB(ctrl), // Not used Client: NewMockClient(ctrl), // Not used TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, @@ -1042,7 +1042,7 @@ func Test_Sync_UpdateSyncTarget(t *testing.T) { // Populate [m.processWork] to ensure that UpdateSyncTarget // moves the work to [m.unprocessedWork]. - item := &syncWorkItem{ + item := &workItem{ start: []byte{1}, end: []byte{2}, LocalRootID: ids.GenerateTestID(), diff --git a/x/sync/syncworkheap.go b/x/sync/workheap.go similarity index 92% rename from x/sync/syncworkheap.go rename to x/sync/workheap.go index 5ed6d52ebd9..fed2bde4389 100644 --- a/x/sync/syncworkheap.go +++ b/x/sync/workheap.go @@ -15,7 +15,7 @@ import ( var _ heap.Interface = (*innerHeap)(nil) type heapItem struct { - workItem *syncWorkItem + workItem *workItem heapIndex int } @@ -25,7 +25,7 @@ type innerHeap []*heapItem // Note that work item ranges never overlap. // Supports range merging and priority updating. // Not safe for concurrent use. -type syncWorkHeap struct { +type workHeap struct { // Max heap of items by priority. // i.e. heap.Pop returns highest priority item. innerHeap innerHeap @@ -35,8 +35,8 @@ type syncWorkHeap struct { closed bool } -func newSyncWorkHeap() *syncWorkHeap { - return &syncWorkHeap{ +func newWorkHeap() *workHeap { + return &workHeap{ sortedItems: btree.NewG( 2, func(a, b *heapItem) bool { @@ -47,12 +47,12 @@ func newSyncWorkHeap() *syncWorkHeap { } // Marks the heap as closed. -func (wh *syncWorkHeap) Close() { +func (wh *workHeap) Close() { wh.closed = true } // Adds a new [item] into the heap. Will not merge items, unlike MergeInsert. -func (wh *syncWorkHeap) Insert(item *syncWorkItem) { +func (wh *workHeap) Insert(item *workItem) { if wh.closed { return } @@ -65,7 +65,7 @@ func (wh *syncWorkHeap) Insert(item *syncWorkItem) { // Pops and returns a work item from the heap. // Returns nil if no work is available or the heap is closed. -func (wh *syncWorkHeap) GetWork() *syncWorkItem { +func (wh *workHeap) GetWork() *workItem { if wh.closed || wh.Len() == 0 { return nil } @@ -81,14 +81,14 @@ func (wh *syncWorkHeap) GetWork() *syncWorkItem { // into a single work item with range [0,20]. // e.g. if the heap contains work items [0,10] and [20,30], // and we add [10,20], we will merge them into [0,30]. -func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) { +func (wh *workHeap) MergeInsert(item *workItem) { if wh.closed { return } var mergedBefore, mergedAfter *heapItem searchItem := &heapItem{ - workItem: &syncWorkItem{ + workItem: &workItem{ start: item.start, }, } @@ -145,13 +145,13 @@ func (wh *syncWorkHeap) MergeInsert(item *syncWorkItem) { } // Deletes [item] from the heap. -func (wh *syncWorkHeap) remove(item *heapItem) { +func (wh *workHeap) remove(item *heapItem) { heap.Remove(&wh.innerHeap, item.heapIndex) wh.sortedItems.Delete(item) } -func (wh *syncWorkHeap) Len() int { +func (wh *workHeap) Len() int { return wh.innerHeap.Len() } diff --git a/x/sync/syncworkheap_test.go b/x/sync/workheap_test.go similarity index 78% rename from x/sync/syncworkheap_test.go rename to x/sync/workheap_test.go index bf4387c75b3..86b86393d40 100644 --- a/x/sync/syncworkheap_test.go +++ b/x/sync/workheap_test.go @@ -12,11 +12,11 @@ import ( ) // Tests heap.Interface methods Push, Pop, Swap, Len, Less. -func Test_SyncWorkHeap_InnerHeap(t *testing.T) { +func Test_WorkHeap_InnerHeap(t *testing.T) { require := require.New(t) lowPriorityItem := &heapItem{ - workItem: &syncWorkItem{ + workItem: &workItem{ start: []byte{1}, end: []byte{2}, priority: lowPriority, @@ -25,7 +25,7 @@ func Test_SyncWorkHeap_InnerHeap(t *testing.T) { } mediumPriorityItem := &heapItem{ - workItem: &syncWorkItem{ + workItem: &workItem{ start: []byte{3}, end: []byte{4}, priority: medPriority, @@ -34,7 +34,7 @@ func Test_SyncWorkHeap_InnerHeap(t *testing.T) { } highPriorityItem := &heapItem{ - workItem: &syncWorkItem{ + workItem: &workItem{ start: []byte{5}, end: []byte{6}, priority: highPriority, @@ -108,23 +108,23 @@ func Test_SyncWorkHeap_InnerHeap(t *testing.T) { } // Tests Insert and GetWork -func Test_SyncWorkHeap_Insert_GetWork(t *testing.T) { +func Test_WorkHeap_Insert_GetWork(t *testing.T) { require := require.New(t) - h := newSyncWorkHeap() + h := newWorkHeap() - lowPriorityItem := &syncWorkItem{ + lowPriorityItem := &workItem{ start: []byte{4}, end: []byte{5}, priority: lowPriority, LocalRootID: ids.GenerateTestID(), } - mediumPriorityItem := &syncWorkItem{ + mediumPriorityItem := &workItem{ start: []byte{0}, end: []byte{1}, priority: medPriority, LocalRootID: ids.GenerateTestID(), } - highPriorityItem := &syncWorkItem{ + highPriorityItem := &workItem{ start: []byte{2}, end: []byte{3}, priority: highPriority, @@ -136,7 +136,7 @@ func Test_SyncWorkHeap_Insert_GetWork(t *testing.T) { require.Equal(3, h.Len()) // Ensure [sortedItems] is in right order. - got := []*syncWorkItem{} + got := []*workItem{} h.sortedItems.Ascend( func(i *heapItem) bool { got = append(got, i.workItem) @@ -144,7 +144,7 @@ func Test_SyncWorkHeap_Insert_GetWork(t *testing.T) { }, ) require.Equal( - []*syncWorkItem{mediumPriorityItem, highPriorityItem, lowPriorityItem}, + []*workItem{mediumPriorityItem, highPriorityItem, lowPriorityItem}, got, ) @@ -161,26 +161,26 @@ func Test_SyncWorkHeap_Insert_GetWork(t *testing.T) { require.Zero(h.Len()) } -func Test_SyncWorkHeap_remove(t *testing.T) { +func Test_WorkHeap_remove(t *testing.T) { require := require.New(t) - h := newSyncWorkHeap() + h := newWorkHeap() - lowPriorityItem := &syncWorkItem{ + lowPriorityItem := &workItem{ start: []byte{0}, end: []byte{1}, priority: lowPriority, LocalRootID: ids.GenerateTestID(), } - mediumPriorityItem := &syncWorkItem{ + mediumPriorityItem := &workItem{ start: []byte{2}, end: []byte{3}, priority: medPriority, LocalRootID: ids.GenerateTestID(), } - highPriorityItem := &syncWorkItem{ + highPriorityItem := &workItem{ start: []byte{4}, end: []byte{5}, priority: highPriority, @@ -226,46 +226,46 @@ func Test_SyncWorkHeap_remove(t *testing.T) { require.Zero(h.sortedItems.Len()) } -func Test_SyncWorkHeap_Merge_Insert(t *testing.T) { +func Test_WorkHeap_Merge_Insert(t *testing.T) { // merge with range before - syncHeap := newSyncWorkHeap() + syncHeap := newWorkHeap() - syncHeap.MergeInsert(&syncWorkItem{start: nil, end: []byte{63}}) + syncHeap.MergeInsert(&workItem{start: nil, end: []byte{63}}) require.Equal(t, 1, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{127}, end: []byte{192}}) + syncHeap.MergeInsert(&workItem{start: []byte{127}, end: []byte{192}}) require.Equal(t, 2, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{193}, end: nil}) + syncHeap.MergeInsert(&workItem{start: []byte{193}, end: nil}) require.Equal(t, 3, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{63}, end: []byte{126}, priority: lowPriority}) + syncHeap.MergeInsert(&workItem{start: []byte{63}, end: []byte{126}, priority: lowPriority}) require.Equal(t, 3, syncHeap.Len()) // merge with range after - syncHeap = newSyncWorkHeap() + syncHeap = newWorkHeap() - syncHeap.MergeInsert(&syncWorkItem{start: nil, end: []byte{63}}) + syncHeap.MergeInsert(&workItem{start: nil, end: []byte{63}}) require.Equal(t, 1, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{127}, end: []byte{192}}) + syncHeap.MergeInsert(&workItem{start: []byte{127}, end: []byte{192}}) require.Equal(t, 2, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{193}, end: nil}) + syncHeap.MergeInsert(&workItem{start: []byte{193}, end: nil}) require.Equal(t, 3, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{64}, end: []byte{127}, priority: lowPriority}) + syncHeap.MergeInsert(&workItem{start: []byte{64}, end: []byte{127}, priority: lowPriority}) require.Equal(t, 3, syncHeap.Len()) // merge both sides at the same time - syncHeap = newSyncWorkHeap() + syncHeap = newWorkHeap() - syncHeap.MergeInsert(&syncWorkItem{start: nil, end: []byte{63}}) + syncHeap.MergeInsert(&workItem{start: nil, end: []byte{63}}) require.Equal(t, 1, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{127}, end: nil}) + syncHeap.MergeInsert(&workItem{start: []byte{127}, end: nil}) require.Equal(t, 2, syncHeap.Len()) - syncHeap.MergeInsert(&syncWorkItem{start: []byte{63}, end: []byte{127}, priority: lowPriority}) + syncHeap.MergeInsert(&workItem{start: []byte{63}, end: []byte{127}, priority: lowPriority}) require.Equal(t, 1, syncHeap.Len()) } From 67c434aa5114958313eb856a63b2481d89a7370c Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Fri, 30 Jun 2023 11:07:38 -0400 Subject: [PATCH 4/6] `Sync` -- unexport field (#1673) --- x/sync/manager.go | 22 +++++++++++----------- x/sync/sync_test.go | 2 +- x/sync/workheap.go | 4 ++-- x/sync/workheap_test.go | 18 +++++++++--------- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/x/sync/manager.go b/x/sync/manager.go index e3e2736270a..b446f1c382a 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -46,19 +46,19 @@ const ( // Signifies that we should sync the range [start, end]. // nil [start] means there is no lower bound. // nil [end] means there is no upper bound. -// [LocalRootID] is the ID of the root of this range in our database. -// If we have no local root for this range, [LocalRootID] is ids.Empty. +// [localRootID] is the ID of the root of this range in our database. +// If we have no local root for this range, [localRootID] is ids.Empty. type workItem struct { start []byte end []byte priority priority - LocalRootID ids.ID + localRootID ids.ID } // TODO danlaine look into using a sync.Pool for workItems func newWorkItem(localRootID ids.ID, start, end []byte, priority priority) *workItem { return &workItem{ - LocalRootID: localRootID, + localRootID: localRootID, start: start, end: end, priority: priority, @@ -239,7 +239,7 @@ func (m *Manager) doWork(ctx context.Context, work *workItem) { m.unprocessedWorkCond.Signal() }() - if work.LocalRootID == ids.Empty { + if work.localRootID == ids.Empty { // the keys in this range have not been downloaded, so get all key/values m.getAndApplyRangeProof(ctx, work) } else { @@ -253,7 +253,7 @@ func (m *Manager) doWork(ctx context.Context, work *workItem) { func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) { rootID := m.getTargetRoot() - if work.LocalRootID == rootID { + if work.localRootID == rootID { // Start root is the same as the end root, so we're done. m.completeWorkItem(ctx, work, work.end, rootID, nil) return @@ -262,7 +262,7 @@ func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) { changeProof, err := m.config.Client.GetChangeProof( ctx, &pb.SyncGetChangeProofRequest{ - StartRootHash: work.LocalRootID[:], + StartRootHash: work.localRootID[:], EndRootHash: rootID[:], StartKey: work.start, EndKey: work.end, @@ -287,7 +287,7 @@ func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) { // Add this range as a fresh uncompleted work item to the work heap. // TODO danlaine send range proof instead of failure notification if !changeProof.HadRootsInHistory { - work.LocalRootID = ids.Empty + work.localRootID = ids.Empty m.enqueueWork(work) return } @@ -629,7 +629,7 @@ func (m *Manager) completeWorkItem(ctx context.Context, work *workItem, largestH largestHandledKey = work.end } else { // the full range wasn't completed, so enqueue a new work item for the range [nextStartKey, workItem.end] - m.enqueueWork(newWorkItem(work.LocalRootID, nextStartKey, work.end, work.priority)) + m.enqueueWork(newWorkItem(work.localRootID, nextStartKey, work.end, work.priority)) largestHandledKey = nextStartKey } } @@ -673,8 +673,8 @@ func (m *Manager) enqueueWork(work *workItem) { // first item gets higher priority than the second to encourage finished ranges to grow // rather than start a new range that is not contiguous with existing completed ranges - first := newWorkItem(work.LocalRootID, work.start, mid, medPriority) - second := newWorkItem(work.LocalRootID, mid, work.end, lowPriority) + first := newWorkItem(work.localRootID, work.start, mid, medPriority) + second := newWorkItem(work.localRootID, mid, work.end, lowPriority) m.unprocessedWork.Insert(first) m.unprocessedWork.Insert(second) diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 8794c19e9dd..10ad6ff20c0 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -1045,7 +1045,7 @@ func Test_Sync_UpdateSyncTarget(t *testing.T) { item := &workItem{ start: []byte{1}, end: []byte{2}, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), } m.processedWork.Insert(item) diff --git a/x/sync/workheap.go b/x/sync/workheap.go index fed2bde4389..239728af9de 100644 --- a/x/sync/workheap.go +++ b/x/sync/workheap.go @@ -98,7 +98,7 @@ func (wh *workHeap) MergeInsert(item *workItem) { wh.sortedItems.DescendLessOrEqual( searchItem, func(beforeItem *heapItem) bool { - if item.LocalRootID == beforeItem.workItem.LocalRootID && bytes.Equal(beforeItem.workItem.end, item.start) { + if item.localRootID == beforeItem.workItem.localRootID && bytes.Equal(beforeItem.workItem.end, item.start) { // [beforeItem.start, beforeItem.end] and [item.start, item.end] are // merged into [beforeItem.start, item.end] beforeItem.workItem.end = item.end @@ -114,7 +114,7 @@ func (wh *workHeap) MergeInsert(item *workItem) { wh.sortedItems.AscendGreaterOrEqual( searchItem, func(afterItem *heapItem) bool { - if item.LocalRootID == afterItem.workItem.LocalRootID && bytes.Equal(afterItem.workItem.start, item.end) { + if item.localRootID == afterItem.workItem.localRootID && bytes.Equal(afterItem.workItem.start, item.end) { // [item.start, item.end] and [afterItem.start, afterItem.end] are merged into // [item.start, afterItem.end]. afterItem.workItem.start = item.start diff --git a/x/sync/workheap_test.go b/x/sync/workheap_test.go index 86b86393d40..b6456f4b825 100644 --- a/x/sync/workheap_test.go +++ b/x/sync/workheap_test.go @@ -20,7 +20,7 @@ func Test_WorkHeap_InnerHeap(t *testing.T) { start: []byte{1}, end: []byte{2}, priority: lowPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), }, } @@ -29,7 +29,7 @@ func Test_WorkHeap_InnerHeap(t *testing.T) { start: []byte{3}, end: []byte{4}, priority: medPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), }, } @@ -38,7 +38,7 @@ func Test_WorkHeap_InnerHeap(t *testing.T) { start: []byte{5}, end: []byte{6}, priority: highPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), }, } @@ -116,19 +116,19 @@ func Test_WorkHeap_Insert_GetWork(t *testing.T) { start: []byte{4}, end: []byte{5}, priority: lowPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), } mediumPriorityItem := &workItem{ start: []byte{0}, end: []byte{1}, priority: medPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), } highPriorityItem := &workItem{ start: []byte{2}, end: []byte{3}, priority: highPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), } h.Insert(highPriorityItem) h.Insert(mediumPriorityItem) @@ -170,21 +170,21 @@ func Test_WorkHeap_remove(t *testing.T) { start: []byte{0}, end: []byte{1}, priority: lowPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), } mediumPriorityItem := &workItem{ start: []byte{2}, end: []byte{3}, priority: medPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), } highPriorityItem := &workItem{ start: []byte{4}, end: []byte{5}, priority: highPriority, - LocalRootID: ids.GenerateTestID(), + localRootID: ids.GenerateTestID(), } h.Insert(lowPriorityItem) From 650dcbac3bb2c918965718561520e2c2000ce73d Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Fri, 30 Jun 2023 13:46:38 -0400 Subject: [PATCH 5/6] `sync` -- nits and cleanup (#1674) --- x/sync/manager.go | 95 ++++++++++++++++++++-------------------- x/sync/network_server.go | 27 ++++++------ 2 files changed, 60 insertions(+), 62 deletions(-) diff --git a/x/sync/manager.go b/x/sync/manager.go index b446f1c382a..992451312f0 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -169,16 +169,14 @@ func (m *Manager) sync(ctx context.Context) { m.workLock.Lock() for { // Invariant: [m.workLock] is held here. - if ctx.Err() != nil { // [m] is closed. + switch { + case ctx.Err() != nil: return // [m.workLock] released by defer. - } - if m.processingWorkItems >= m.config.SimultaneousWorkLimit { + case m.processingWorkItems >= m.config.SimultaneousWorkLimit: // We're already processing the maximum number of work items. // Wait until one of them finishes. m.unprocessedWorkCond.Wait() - continue - } - if m.unprocessedWork.Len() == 0 { + case m.unprocessedWork.Len() == 0: if m.processingWorkItems == 0 { // There's no work to do, and there are no work items being processed // which could cause work to be added, so we're done. @@ -189,15 +187,15 @@ func (m *Manager) sync(ctx context.Context) { // Close() will be called, which will broadcast on [m.unprocessedWorkCond], // which will cause Wait() to return, and this goroutine to exit. m.unprocessedWorkCond.Wait() - continue + default: + m.processingWorkItems++ + work := m.unprocessedWork.GetWork() + // TODO danlaine: We won't release [m.workLock] until + // we've started a goroutine for each available work item. + // We can't apply proofs we receive until we release [m.workLock]. + // Is this OK? Is it possible we end up with too many goroutines? + go m.doWork(ctx, work) } - m.processingWorkItems++ - work := m.unprocessedWork.GetWork() - // TODO danlaine: We won't release [m.workLock] until - // we've started a goroutine for each available work item. - // We can't apply proofs we receive until we release [m.workLock]. - // Is this OK? Is it possible we end up with too many goroutines? - go m.doWork(ctx, work) } } @@ -205,6 +203,7 @@ func (m *Manager) sync(ctx context.Context) { func (m *Manager) Close() { m.workLock.Lock() defer m.workLock.Unlock() + m.close() } @@ -251,11 +250,11 @@ func (m *Manager) doWork(ctx context.Context, work *workItem) { // Fetch and apply the change proof given by [work]. // Assumes [m.workLock] is not held. func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) { - rootID := m.getTargetRoot() + targetRootID := m.getTargetRoot() - if work.localRootID == rootID { + if work.localRootID == targetRootID { // Start root is the same as the end root, so we're done. - m.completeWorkItem(ctx, work, work.end, rootID, nil) + m.completeWorkItem(ctx, work, work.end, targetRootID, nil) return } @@ -263,7 +262,7 @@ func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) { ctx, &pb.SyncGetChangeProofRequest{ StartRootHash: work.localRootID[:], - EndRootHash: rootID[:], + EndRootHash: targetRootID[:], StartKey: work.start, EndKey: work.end, KeyLimit: defaultRequestKeyLimit, @@ -302,16 +301,16 @@ func (m *Manager) getAndApplyChangeProof(ctx context.Context, work *workItem) { largestHandledKey = changeProof.KeyChanges[len(changeProof.KeyChanges)-1].Key } - m.completeWorkItem(ctx, work, largestHandledKey, rootID, changeProof.EndProof) + m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, changeProof.EndProof) } // Fetch and apply the range proof given by [work]. // Assumes [m.workLock] is not held. func (m *Manager) getAndApplyRangeProof(ctx context.Context, work *workItem) { - rootID := m.getTargetRoot() + targetRootID := m.getTargetRoot() proof, err := m.config.Client.GetRangeProof(ctx, &pb.SyncGetRangeProofRequest{ - RootHash: rootID[:], + RootHash: targetRootID[:], StartKey: work.start, EndKey: work.end, KeyLimit: defaultRequestKeyLimit, @@ -341,7 +340,7 @@ func (m *Manager) getAndApplyRangeProof(ctx context.Context, work *workItem) { largestHandledKey = proof.KeyValues[len(proof.KeyValues)-1].Key } - m.completeWorkItem(ctx, work, largestHandledKey, rootID, proof.EndProof) + m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, proof.EndProof) } // findNextKey returns the start of the key range that should be fetched next. @@ -494,30 +493,6 @@ func (m *Manager) findNextKey( return nextKey, nil } -// findChildDifference returns the first child index that is different between node 1 and node 2 if one exists and -// a bool indicating if any difference was found -func findChildDifference(node1, node2 *merkledb.ProofNode, startIndex byte) (byte, bool) { - var ( - child1, child2 ids.ID - ok1, ok2 bool - ) - for childIndex := startIndex; childIndex < merkledb.NodeBranchFactor; childIndex++ { - if node1 != nil { - child1, ok1 = node1.Children[childIndex] - } - if node2 != nil { - child2, ok2 = node2.Children[childIndex] - } - // if one node has a child and the other doesn't or the children ids don't match, - // return the current child index as the first difference - if (ok1 || ok2) && child1 != child2 { - return childIndex, true - } - } - // there were no differences found - return 0, false -} - func (m *Manager) Error() error { m.errLock.Lock() defer m.errLock.Unlock() @@ -547,9 +522,9 @@ func (m *Manager) Wait(ctx context.Context) error { m.config.Log.Info("completed with error", zap.Error(err)) return err } - if m.getTargetRoot() != root { + if targetRootID := m.getTargetRoot(); targetRootID != root { // This should never happen. - return fmt.Errorf("%w: expected %s, got %s", ErrFinishedWithUnexpectedRoot, m.getTargetRoot(), root) + return fmt.Errorf("%w: expected %s, got %s", ErrFinishedWithUnexpectedRoot, targetRootID, root) } m.config.Log.Info("completed", zap.String("new root", root.String())) return nil @@ -745,3 +720,27 @@ func midPoint(start, end []byte) []byte { } return midpoint } + +// findChildDifference returns the first child index that is different between node 1 and node 2 if one exists and +// a bool indicating if any difference was found +func findChildDifference(node1, node2 *merkledb.ProofNode, startIndex byte) (byte, bool) { + var ( + child1, child2 ids.ID + ok1, ok2 bool + ) + for childIndex := startIndex; childIndex < merkledb.NodeBranchFactor; childIndex++ { + if node1 != nil { + child1, ok1 = node1.Children[childIndex] + } + if node2 != nil { + child2, ok2 = node2.Children[childIndex] + } + // if one node has a child and the other doesn't or the children ids don't match, + // return the current child index as the first difference + if (ok1 || ok2) && child1 != child2 { + return childIndex, true + } + } + // there were no differences found + return 0, false +} diff --git a/x/sync/network_server.go b/x/sync/network_server.go index ef4706b9dda..7aba3729abb 100644 --- a/x/sync/network_server.go +++ b/x/sync/network_server.go @@ -14,7 +14,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" "github.com/ava-labs/avalanchego/ids" @@ -133,19 +132,6 @@ func (s *NetworkServer) AppRequest( return nil } -// isTimeout returns true if err is a timeout from a context cancellation -// or a context cancellation over grpc. -func isTimeout(err error) bool { - // handle grpc wrapped DeadlineExceeded - if e, ok := status.FromError(err); ok { - if e.Code() == codes.DeadlineExceeded { - return true - } - } - // otherwise, check for context.DeadlineExceeded directly - return errors.Is(err, context.DeadlineExceeded) -} - // Generates a change proof and sends it to [nodeID]. func (s *NetworkServer) HandleChangeProofRequest( ctx context.Context, @@ -290,3 +276,16 @@ func (s *NetworkServer) HandleRangeProofRequest( } return ErrMinProofSizeIsTooLarge } + +// isTimeout returns true if err is a timeout from a context cancellation +// or a context cancellation over grpc. +func isTimeout(err error) bool { + // handle grpc wrapped DeadlineExceeded + if e, ok := status.FromError(err); ok { + if e.Code() == codes.DeadlineExceeded { + return true + } + } + // otherwise, check for context.DeadlineExceeded directly + return errors.Is(err, context.DeadlineExceeded) +} From 62d99d7c1326480a46012eafee79d7ba676f54ad Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Fri, 30 Jun 2023 14:59:43 -0400 Subject: [PATCH 6/6] `sync` -- remove unused code (#1676) --- x/sync/request.go | 100 ---------------------------------------------- 1 file changed, 100 deletions(-) delete mode 100644 x/sync/request.go diff --git a/x/sync/request.go b/x/sync/request.go deleted file mode 100644 index 27de97535db..00000000000 --- a/x/sync/request.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package sync - -import ( - "context" - "encoding/hex" - "fmt" - - "github.com/ava-labs/avalanchego/ids" -) - -var ( - _ Request = (*RangeProofRequest)(nil) - _ Request = (*ChangeProofRequest)(nil) -) - -// A request to this node for a proof. -type Request interface { - fmt.Stringer - Handle(ctx context.Context, nodeID ids.NodeID, requestID uint32, h Handler) error -} - -type rangeProofHandler interface { - // Generates a range proof and sends it to [nodeID]. - // TODO danlaine how should we handle context cancellation? - HandleRangeProofRequest( - ctx context.Context, - nodeID ids.NodeID, - requestID uint32, - request *RangeProofRequest, - ) error -} - -type changeProofHandler interface { - // Generates a change proof and sends it to [nodeID]. - // TODO danlaine how should we handle context cancellation? - HandleChangeProofRequest( - ctx context.Context, - nodeID ids.NodeID, - requestID uint32, - request *ChangeProofRequest, - ) error -} - -type Handler interface { - rangeProofHandler - changeProofHandler -} - -type RangeProofRequest struct { - Root ids.ID `serialize:"true"` - Start []byte `serialize:"true"` - End []byte `serialize:"true"` - KeyLimit uint16 `serialize:"true"` - BytesLimit uint32 `serialize:"true"` -} - -func (r *RangeProofRequest) Handle(ctx context.Context, nodeID ids.NodeID, requestID uint32, h Handler) error { - return h.HandleRangeProofRequest(ctx, nodeID, requestID, r) -} - -func (r RangeProofRequest) String() string { - return fmt.Sprintf( - "RangeProofRequest(Root=%s, Start=%s, End=%s, KeyLimit=%d, BytesLimit=%d)", - r.Root, - hex.EncodeToString(r.Start), - hex.EncodeToString(r.End), - r.KeyLimit, - r.BytesLimit, - ) -} - -// ChangeProofRequest is a request to receive trie leaves at specified Root within Start and End byte range -// Limit outlines maximum number of leaves to returns starting at Start -type ChangeProofRequest struct { - StartingRoot ids.ID `serialize:"true"` - EndingRoot ids.ID `serialize:"true"` - Start []byte `serialize:"true"` - End []byte `serialize:"true"` - KeyLimit uint16 `serialize:"true"` - BytesLimit uint32 `serialize:"true"` -} - -func (r *ChangeProofRequest) Handle(ctx context.Context, nodeID ids.NodeID, requestID uint32, h Handler) error { - return h.HandleChangeProofRequest(ctx, nodeID, requestID, r) -} - -func (r ChangeProofRequest) String() string { - return fmt.Sprintf( - "ChangeProofRequest(StartRoot=%s, EndRoot=%s, Start=%s, End=%s, KeyLimit=%d, BytesLimit=%d)", - r.StartingRoot, - r.EndingRoot, - hex.EncodeToString(r.Start), - hex.EncodeToString(r.End), - r.KeyLimit, - r.BytesLimit, - ) -}