Skip to content

Commit

Permalink
feat(storage): introduce batch for txns (#4213)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Jul 10, 2023
1 parent ca8f41e commit b4270fb
Show file tree
Hide file tree
Showing 26 changed files with 904 additions and 448 deletions.
4 changes: 2 additions & 2 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory")
cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 16*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 16*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, true, "disables db compactions triggered by seeks")
cmd.Flags().String(optionNamePassword, "", "password for decrypting keys")
cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys")
Expand Down
2 changes: 1 addition & 1 deletion pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func New(
}

// Create will create a new batch with the given ID, owner value and depth and
// stores it in the BatchStore.
// stores it in the BatchedStore.
func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error {
// don't add batches which have value which equals total cumulative
// payout or that are going to expire already within the next couple of blocks
Expand Down
4 changes: 2 additions & 2 deletions pkg/postage/postagecontract/mock/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package mock

import (
"context"
"github.com/ethereum/go-ethereum/common"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/postage/postagecontract"
)

Expand Down Expand Up @@ -38,7 +38,7 @@ func (c *contractMock) ExpireBatches(ctx context.Context) error {
// Option is a an option passed to New
type Option func(*contractMock)

// New creates a new mock BatchStore
// New creates a new mock BatchStore.
func New(opts ...Option) postagecontract.Interface {
bs := &contractMock{}

Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
// call when a batch has already been committed.
var ErrBatchCommitted = errors.New("storage: batch has already been committed")

// ErrBatchNotSupported is returned by BatchedStore.Batch call when batching
// is not supported.
var ErrBatchNotSupported = errors.New("storage: batch operations not supported")

// Batch provides set of operations that are batched.
type Batch interface {
// Put adds a new item to the batch.
Expand Down
179 changes: 138 additions & 41 deletions pkg/storage/inmemstore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,123 @@
package inmemstore

import (
"context"
"errors"
"fmt"
"sync"

"github.com/ethersphere/bee/pkg/storage"
)

var _ storage.TxStore = (*TxStore)(nil)
var (
_ storage.TxStore = (*TxStore)(nil)
_ storage.Batcher = (*TxStore)(nil)
)

func put(
reader storage.Reader,
writer storage.Writer,
item storage.Item,
) (*storage.TxRevertOp[storage.Key, storage.Item], error) {
prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
switch err := reader.Get(prev); {
case errors.Is(err, storage.ErrNotFound):
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutCreateOp,
ObjectID: item.String(),
Val: item,
}
case err != nil:
return nil, err
default:
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutUpdateOp,
ObjectID: prev.String(),
Val: prev,
}
}

err := writer.Put(item)
if err == nil {
return reverseOp, nil
}
return nil, err
}

func del(
reader storage.Reader,
writer storage.Writer,
item storage.Item,
) (*storage.TxRevertOp[storage.Key, storage.Item], error) {
prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
if err := reader.Get(prev); err == nil {
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.DeleteOp,
ObjectID: item.String(),
Val: prev,
}
}

err := writer.Delete(item)
if err == nil {
return reverseOp, nil
}
return nil, err
}

// txBatch is a batch that is used in a transaction.
type txBatch struct {
batch storage.Batch
store *TxStore
revOpsMu sync.Mutex
revOps []*storage.TxRevertOp[storage.Key, storage.Item]
onCommit func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error
}

// Put implements the Batch interface.
func (b *txBatch) Put(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := put(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.revOpsMu.Lock()
b.revOps = append(b.revOps, reverseOp)
b.revOpsMu.Unlock()
}
return err
}

// Delete implements the Batch interface.
func (b *txBatch) Delete(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := del(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.revOpsMu.Lock()
b.revOps = append(b.revOps, reverseOp)
b.revOpsMu.Unlock()
}
return err
}

// Commit implements the Batch interface.
func (b *txBatch) Commit() error {
if err := b.batch.Commit(); err != nil {
return err
}
b.revOpsMu.Lock()
defer b.revOpsMu.Unlock()
defer func() {
b.revOps = nil
}()
return b.onCommit(b.revOps...)
}

// TxStore is an implementation of in-memory Store
// where all Store operations are done in a transaction.
Expand All @@ -25,7 +135,7 @@ type TxStore struct {

// release releases the TxStore transaction associated resources.
func (s *TxStore) release() {
s.TxStoreBase.Store = nil
s.TxStoreBase.BatchedStore = nil
s.revOps = nil
}

Expand All @@ -35,26 +145,7 @@ func (s *TxStore) Put(item storage.Item) error {
return err
}

prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
switch err := s.TxStoreBase.Get(prev); {
case errors.Is(err, storage.ErrNotFound):
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutCreateOp,
ObjectID: item.String(),
Val: item,
}
case err != nil:
return err
default:
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.PutUpdateOp,
ObjectID: prev.String(),
Val: prev,
}
}

err := s.TxStoreBase.Put(item)
reverseOp, err := put(s.TxStoreBase, s.TxStoreBase, item)
if err == nil {
err = s.revOps.Append(reverseOp)
}
Expand All @@ -67,17 +158,7 @@ func (s *TxStore) Delete(item storage.Item) error {
return err
}

prev := item.Clone()
var reverseOp *storage.TxRevertOp[storage.Key, storage.Item]
if err := s.TxStoreBase.Get(prev); err == nil {
reverseOp = &storage.TxRevertOp[storage.Key, storage.Item]{
Origin: storage.DeleteOp,
ObjectID: prev.String(),
Val: prev,
}
}

err := s.TxStoreBase.Delete(item)
reverseOp, err := del(s.TxStoreBase, s.TxStoreBase, item)
if err == nil {
err = s.revOps.Append(reverseOp)
}
Expand Down Expand Up @@ -111,37 +192,53 @@ func (s *TxStore) Rollback() error {
return nil
}

// Batch implements the Batcher interface.
func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) {
batch, err := s.TxStoreBase.BatchedStore.Batch(ctx)
if err != nil {
return nil, err
}

return &txBatch{
batch: batch,
store: s,
onCommit: func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error {
return s.revOps.Append(revOps...)
},
}, nil
}

// NewTx implements the TxStore interface.
func (s *TxStore) NewTx(state *storage.TxState) storage.TxStore {
if s.Store == nil {
if s.BatchedStore == nil {
panic(errors.New("inmemstore: nil store"))
}

return &TxStore{
TxStoreBase: &storage.TxStoreBase{
TxState: state,
Store: s.Store,
TxState: state,
BatchedStore: s.BatchedStore,
},
revOps: storage.NewInMemTxRevertOpStore(
map[storage.TxOpCode]storage.TxRevertFn[storage.Key, storage.Item]{
storage.PutCreateOp: func(_ storage.Key, item storage.Item) error {
return s.Store.Delete(item)
return s.BatchedStore.Delete(item)
},
storage.PutUpdateOp: func(_ storage.Key, item storage.Item) error {
return s.Store.Put(item)
return s.BatchedStore.Put(item)
},
storage.DeleteOp: func(_ storage.Key, item storage.Item) error {
return s.Store.Put(item)
return s.BatchedStore.Put(item)
},
},
),
}
}

// NewTxStore returns a new TxStore instance backed by the given store.
func NewTxStore(store storage.Store) *TxStore {
func NewTxStore(store storage.BatchedStore) *TxStore {
return &TxStore{
TxStoreBase: &storage.TxStoreBase{Store: store},
TxStoreBase: &storage.TxStoreBase{BatchedStore: store},
revOps: new(storage.NoOpTxRevertOpStore[storage.Key, storage.Item]),
}
}
2 changes: 1 addition & 1 deletion pkg/storage/leveldbstore/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *TxStore) Recover() error {
return fmt.Errorf("leveldbstore: recovery: iteration failed: %w", err)
}

if err := s.Store.(*Store).db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil {
if err := s.BatchedStore.(*Store).db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil {
return fmt.Errorf("leveldbstore: recovery: unable to write batch: %w", err)
}
return nil
Expand Down
Loading

0 comments on commit b4270fb

Please sign in to comment.