Skip to content

Commit

Permalink
feat(storage): introduce batch for txns
Browse files Browse the repository at this point in the history
  • Loading branch information
mrekucci authored and alok committed Jul 9, 2023
1 parent 6fb07f3 commit 947e82e
Show file tree
Hide file tree
Showing 23 changed files with 893 additions and 437 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ BEEKEEPER_USE_SUDO ?= false
BEEKEEPER_CLUSTER ?= local
BEELOCAL_BRANCH ?= main
BEEKEEPER_BRANCH ?= master
REACHABILITY_OVERRIDE_PUBLIC ?= false
REACHABILITY_OVERRIDE_PUBLIC ?= true
BATCHFACTOR_OVERRIDE_PUBLIC ?= 5

BEE_API_VERSION ?= "$(shell grep '^ version:' openapi/Swarm.yaml | awk '{print $$2}')"
Expand Down
4 changes: 2 additions & 2 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory")
cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 16*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 16*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, true, "disables db compactions triggered by seeks")
cmd.Flags().String(optionNamePassword, "", "password for decrypting keys")
cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys")
Expand Down
172 changes: 132 additions & 40 deletions pkg/storage/inmemstore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,69 @@
package inmemstore

import (
"context"
"errors"
"fmt"
"sync"

"github.com/ethersphere/bee/pkg/storage"
)

var _ storage.TxStore = (*TxStore)(nil)

func put(
reader storage.Reader,
writer storage.Writer,
item storage.Item,
) (*storage.TxRevertOp[storage.Key, storage.Item], error) {
prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
switch err := reader.Get(prev); {
case errors.Is(err, storage.ErrNotFound):
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutCreateOp,
ObjectID: item.String(),
Val: item,
}
case err != nil:
return nil, err
default:
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutUpdateOp,
ObjectID: prev.String(),
Val: prev,
}
}

err := writer.Put(item)
if err == nil {
return reverseOp, nil
}
return nil, err
}

func del(
reader storage.Reader,
writer storage.Writer,
item storage.Item,
) (*storage.TxRevertOp[storage.Key, storage.Item], error) {
prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
if err := reader.Get(prev); err == nil {
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.DeleteOp,
ObjectID: item.String(),
Val: prev,
}
}

err := writer.Delete(item)
if err == nil {
return reverseOp, nil
}
return nil, err
}

// TxStore is an implementation of in-memory Store
// where all Store operations are done in a transaction.
type TxStore struct {
Expand All @@ -25,7 +80,7 @@ type TxStore struct {

// release releases the TxStore transaction associated resources.
func (s *TxStore) release() {
s.TxStoreBase.Store = nil
s.TxStoreBase.BatchedStore = nil
s.revOps = nil
}

Expand All @@ -35,26 +90,7 @@ func (s *TxStore) Put(item storage.Item) error {
return err
}

prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
switch err := s.TxStoreBase.Get(prev); {
case errors.Is(err, storage.ErrNotFound):
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutCreateOp,
ObjectID: item.String(),
Val: item,
}
case err != nil:
return err
default:
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutUpdateOp,
ObjectID: prev.String(),
Val: prev,
}
}

err := s.TxStoreBase.Put(item)
reverseOp, err := put(s.TxStoreBase, s.TxStoreBase, item)
if err == nil {
err = s.revOps.Append(reverseOp)
}
Expand All @@ -67,17 +103,7 @@ func (s *TxStore) Delete(item storage.Item) error {
return err
}

prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
if err := s.TxStoreBase.Get(prev); err == nil {
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.DeleteOp,
ObjectID: prev.String(),
Val: prev,
}
}

err := s.TxStoreBase.Delete(item)
reverseOp, err := del(s.TxStoreBase, s.TxStoreBase, item)
if err == nil {
err = s.revOps.Append(reverseOp)
}
Expand Down Expand Up @@ -111,37 +137,103 @@ func (s *TxStore) Rollback() error {
return nil
}

// Batch implements the Batcher interface.
func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) {
batch, err := s.TxStoreBase.BatchedStore.Batch(ctx)
if err != nil {
return nil, err
}

return &txWrappedBatch{
batch: batch,
store: s,
onCommit: func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error {
return s.revOps.Append(revOps...)
},
}, nil
}

type txWrappedBatch struct {
batch storage.Batch
store *TxStore
opsMu sync.Mutex
ops []*storage.TxRevertOp[storage.Key, storage.Item]
onCommit func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error
}

// Put implements the Batch interface.
func (b *txWrappedBatch) Put(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := put(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.opsMu.Lock()
b.ops = append(b.ops, reverseOp)
b.opsMu.Unlock()
}
return err
}

// Delete implements the Batch interface.
func (b *txWrappedBatch) Delete(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := del(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.opsMu.Lock()
b.ops = append(b.ops, reverseOp)
b.opsMu.Unlock()
}
return err
}

func (b *txWrappedBatch) Commit() error {
if err := b.batch.Commit(); err != nil {
return err
}
b.opsMu.Lock()
defer b.opsMu.Unlock()
defer func() {
b.ops = nil
}()
return b.onCommit(b.ops...)
}

// NewTx implements the TxStore interface.
func (s *TxStore) NewTx(state *storage.TxState) storage.TxStore {
if s.Store == nil {
if s.BatchedStore == nil {
panic(errors.New("inmemstore: nil store"))
}

return &TxStore{
TxStoreBase: &storage.TxStoreBase{
TxState: state,
Store: s.Store,
TxState: state,
BatchedStore: s.BatchedStore,
},
revOps: storage.NewInMemTxRevertOpStore(
map[storage.TxOpCode]storage.TxRevertFn[storage.Key, storage.Item]{
storage.PutCreateOp: func(_ storage.Key, item storage.Item) error {
return s.Store.Delete(item)
return s.BatchedStore.Delete(item)
},
storage.PutUpdateOp: func(_ storage.Key, item storage.Item) error {
return s.Store.Put(item)
return s.BatchedStore.Put(item)
},
storage.DeleteOp: func(_ storage.Key, item storage.Item) error {
return s.Store.Put(item)
return s.BatchedStore.Put(item)
},
},
),
}
}

// NewTxStore returns a new TxStore instance backed by the given store.
func NewTxStore(store storage.Store) *TxStore {
func NewTxStore(store storage.BatchedStore) *TxStore {
return &TxStore{
TxStoreBase: &storage.TxStoreBase{Store: store},
TxStoreBase: &storage.TxStoreBase{BatchedStore: store},
revOps: new(storage.NoOpTxRevertOpStore[storage.Key, storage.Item]),
}
}
2 changes: 1 addition & 1 deletion pkg/storage/leveldbstore/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *TxStore) Recover() error {
return fmt.Errorf("leveldbstore: recovery: iteration failed: %w", err)
}

if err := s.Store.(*Store).db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil {
if err := s.BatchedStore.(*Store).db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil {
return fmt.Errorf("leveldbstore: recovery: unable to write batch: %w", err)
}
return nil
Expand Down
Loading

0 comments on commit 947e82e

Please sign in to comment.