Skip to content

Commit

Permalink
feat(store/v2): Implement State Migration (#19327)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Bezobchuk <[email protected]>
  • Loading branch information
cool-develope and alexanderbez authored Feb 13, 2024
1 parent 3461c64 commit 92eb6de
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 0 deletions.
61 changes: 61 additions & 0 deletions store/migration/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package migration

import (
"golang.org/x/sync/errgroup"

"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
)

const (
// defaultChannelBufferSize is the default buffer size for the migration stream.
defaultChannelBufferSize = 1024
// defaultStorageBufferSize is the default buffer size for the storage snapshotter.
defaultStorageBufferSize = 1024
)

// Manager manages the migration of the whole state from store/v1 to store/v2.
type Manager struct {
logger log.Logger
snapshotsManager *snapshots.Manager

storageSnapshotter snapshots.StorageSnapshotter
commitSnapshotter snapshots.CommitSnapshotter
}

// NewManager returns a new Manager.
func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager {
return &Manager{
logger: logger,
snapshotsManager: sm,
storageSnapshotter: ss,
commitSnapshotter: cs,
}
}

// Migrate migrates the whole state at the given height to the new store/v2.
func (m *Manager) Migrate(height uint64) error {
// create the migration stream and snapshot,
// which acts as protoio.Reader and snapshots.WriteCloser.
ms := NewMigrationStream(defaultChannelBufferSize)

if err := m.snapshotsManager.CreateMigration(height, ms); err != nil {
return err
}

// restore the snapshot
chStorage := make(chan *store.KVPair, defaultStorageBufferSize)

eg := new(errgroup.Group)
eg.Go(func() error {
return m.storageSnapshotter.Restore(height, chStorage)
})
eg.Go(func() error {
defer close(chStorage)
_, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage)
return err
})

return eg.Wait()
}
105 changes: 105 additions & 0 deletions store/migration/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package migration

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/commitment/iavl"
dbm "cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/snapshots"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/pebbledb"
)

var storeKeys = []string{"store1", "store2"}

func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
t.Helper()

db := dbm.NewMemDB()
multiTrees := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
require.NoError(t, err)

snapshotsStore, err := snapshots.NewStore(db, t.TempDir())
require.NoError(t, err)

snapshotsManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, log.NewNopLogger())

storageDB, err := pebbledb.New(t.TempDir())
require.NoError(t, err)
newStorageStore := storage.NewStorageStore(storageDB) // for store/v2

db1 := dbm.NewMemDB()
multiTrees1 := make(map[string]commitment.Tree)
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db1, []byte(storeKey))
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
require.NoError(t, err)

return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
}

func TestMigrateState(t *testing.T) {
m, orgCommitStore := setupMigrationManager(t)

// apply changeset
toVersion := uint64(100)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := store.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)))
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}

err := m.Migrate(toVersion - 1)
require.NoError(t, err)

// check the migrated state
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
// check the latest state
val, err := m.commitSnapshotter.(*commitment.CommitStore).Get("store1", toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Nil(t, val)
val, err = m.commitSnapshotter.(*commitment.CommitStore).Get("store2", toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)

// check the storage
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.storageSnapshotter.(*storage.StorageStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
}
79 changes: 79 additions & 0 deletions store/migration/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package migration

import (
"fmt"
"io"
"sync/atomic"

protoio "github.com/cosmos/gogoproto/io"
"github.com/cosmos/gogoproto/proto"

"cosmossdk.io/store/v2/snapshots"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
)

var (
_ snapshots.WriteCloser = (*MigrationStream)(nil)
_ protoio.ReadCloser = (*MigrationStream)(nil)
)

// MigrationStream is a stream for migrating the whole IAVL state as a snapshot.
// It's used to sync the whole state from the store/v1 to store/v2.
// The main idea is to use the same snapshotter interface without writing to disk.
type MigrationStream struct {
chBuffer chan proto.Message
err atomic.Value // atomic error
}

// NewMigrationStream returns a new MigrationStream.
func NewMigrationStream(chBufferSize int) *MigrationStream {
return &MigrationStream{
chBuffer: make(chan proto.Message, chBufferSize),
}
}

// WriteMsg implements protoio.Write interface.
func (ms *MigrationStream) WriteMsg(msg proto.Message) error {
ms.chBuffer <- msg
return nil
}

// CloseWithError implements snapshots.WriteCloser interface.
func (ms *MigrationStream) CloseWithError(err error) {
ms.err.Store(err)
close(ms.chBuffer)
}

// ReadMsg implements the protoio.Read interface.
//
// NOTE: It we follow the pattern of snapshot.Restore, however, the migration is done in memory.
// It doesn't require any deserialization -- just passing the pointer to the <msg>.
func (ms *MigrationStream) ReadMsg(msg proto.Message) error {
// msg should be a pointer to the same type as the one written to the stream
snapshotsItem, ok := msg.(*snapshotstypes.SnapshotItem)
if !ok {
return fmt.Errorf("unexpected message type: %T", msg)
}

// It doesn't require any deserialization, just a type assertion.
item := <-ms.chBuffer
if item == nil {
return io.EOF
}

*snapshotsItem = *(item.(*snapshotstypes.SnapshotItem))

// check if there is an error from the writer.
err := ms.err.Load()
if err != nil {
return err.(error)
}

return nil
}

// Close implements io.Closer interface.
func (ms *MigrationStream) Close() error {
close(ms.chBuffer)
return nil
}
24 changes: 24 additions & 0 deletions store/snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,30 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) {
}
}

// CreateMigration creates a migration snapshot and writes it to the given writer.
// It is used to migrate the state from the original store to the store/v2.
func (m *Manager) CreateMigration(height uint64, protoWriter WriteCloser) error {
if m == nil {
return errorsmod.Wrap(store.ErrLogic, "Snapshot Manager is nil")
}

err := m.begin(opSnapshot)
if err != nil {
return err
}
defer m.end()

go func() {
if err := m.commitSnapshotter.Snapshot(height, protoWriter); err != nil {
protoWriter.CloseWithError(err)
return
}
_ = protoWriter.Close() // always return nil
}()

return nil
}

// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
func (m *Manager) List() ([]*types.Snapshot, error) {
return m.store.List()
Expand Down
7 changes: 7 additions & 0 deletions store/snapshots/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ const (
snapshotCompressionLevel = 7
)

type WriteCloser interface {
protoio.WriteCloser

// CloseWithError closes the writer and sends an error to the reader.
CloseWithError(err error)
}

// StreamWriter set up a stream pipeline to serialize snapshot nodes:
// Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
type StreamWriter struct {
Expand Down

0 comments on commit 92eb6de

Please sign in to comment.