From 511b113d1d96500024da46161ff50139fd2c45a5 Mon Sep 17 00:00:00 2001 From: mrekucci Date: Mon, 10 Jul 2023 09:59:51 +0200 Subject: [PATCH] refactor: cleanups --- pkg/postage/batchservice/batchservice.go | 2 +- pkg/postage/postagecontract/mock/contract.go | 4 +- pkg/storage/batch.go | 4 - pkg/storage/inmemstore/transaction.go | 109 +++++++++--------- pkg/storage/leveldbstore/transaction.go | 105 +++++++++-------- pkg/storage/storage.go | 6 +- pkg/storage/storagetest/transaction.go | 38 +++--- .../staking/mock/contract.go | 2 +- 8 files changed, 135 insertions(+), 135 deletions(-) diff --git a/pkg/postage/batchservice/batchservice.go b/pkg/postage/batchservice/batchservice.go index e04a35352b0..bd4a10d8d28 100644 --- a/pkg/postage/batchservice/batchservice.go +++ b/pkg/postage/batchservice/batchservice.go @@ -99,7 +99,7 @@ func New( } // Create will create a new batch with the given ID, owner value and depth and -// stores it in the BatchStore. +// stores it in the BatchedStore. func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error { // don't add batches which have value which equals total cumulative // payout or that are going to expire already within the next couple of blocks diff --git a/pkg/postage/postagecontract/mock/contract.go b/pkg/postage/postagecontract/mock/contract.go index b4d8494a1be..ff5b3a8220a 100644 --- a/pkg/postage/postagecontract/mock/contract.go +++ b/pkg/postage/postagecontract/mock/contract.go @@ -6,9 +6,9 @@ package mock import ( "context" - "github.com/ethereum/go-ethereum/common" "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/pkg/postage/postagecontract" ) @@ -38,7 +38,7 @@ func (c *contractMock) ExpireBatches(ctx context.Context) error { // Option is a an option passed to New type Option func(*contractMock) -// New creates a new mock BatchStore +// New creates a new mock BatchStore. func New(opts ...Option) postagecontract.Interface { bs := &contractMock{} diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index bbdcd0b8144..d159f7ad646 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -13,10 +13,6 @@ import ( // call when a batch has already been committed. var ErrBatchCommitted = errors.New("storage: batch has already been committed") -// ErrBatchNotSupported is returned by BatchedStore.Batch call when batching -// is not supported. -var ErrBatchNotSupported = errors.New("storage: batch operations not supported") - // Batch provides set of operations that are batched. type Batch interface { // Put adds a new item to the batch. diff --git a/pkg/storage/inmemstore/transaction.go b/pkg/storage/inmemstore/transaction.go index a0ad4929896..b0d600444bc 100644 --- a/pkg/storage/inmemstore/transaction.go +++ b/pkg/storage/inmemstore/transaction.go @@ -13,7 +13,10 @@ import ( "github.com/ethersphere/bee/pkg/storage" ) -var _ storage.TxStore = (*TxStore)(nil) +var ( + _ storage.TxStore = (*TxStore)(nil) + _ storage.Batcher = (*TxStore)(nil) +) func put( reader storage.Reader, @@ -68,6 +71,58 @@ func del( return nil, err } +// txBatch is a batch that is used in a transaction. +type txBatch struct { + batch storage.Batch + store *TxStore + revOpsMu sync.Mutex + revOps []*storage.TxRevertOp[storage.Key, storage.Item] + onCommit func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error +} + +// Put implements the Batch interface. +func (b *txBatch) Put(item storage.Item) error { + if err := b.store.IsDone(); err != nil { + return err + } + + reverseOp, err := put(b.store, b.batch, item) + if err == nil && reverseOp != nil { + b.revOpsMu.Lock() + b.revOps = append(b.revOps, reverseOp) + b.revOpsMu.Unlock() + } + return err +} + +// Delete implements the Batch interface. +func (b *txBatch) Delete(item storage.Item) error { + if err := b.store.IsDone(); err != nil { + return err + } + + reverseOp, err := del(b.store, b.batch, item) + if err == nil && reverseOp != nil { + b.revOpsMu.Lock() + b.revOps = append(b.revOps, reverseOp) + b.revOpsMu.Unlock() + } + return err +} + +// Commit implements the Batch interface. +func (b *txBatch) Commit() error { + if err := b.batch.Commit(); err != nil { + return err + } + b.revOpsMu.Lock() + defer b.revOpsMu.Unlock() + defer func() { + b.revOps = nil + }() + return b.onCommit(b.revOps...) +} + // TxStore is an implementation of in-memory Store // where all Store operations are done in a transaction. type TxStore struct { @@ -144,7 +199,7 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) { return nil, err } - return &txWrappedBatch{ + return &txBatch{ batch: batch, store: s, onCommit: func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error { @@ -153,56 +208,6 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) { }, nil } -type txWrappedBatch struct { - batch storage.Batch - store *TxStore - opsMu sync.Mutex - ops []*storage.TxRevertOp[storage.Key, storage.Item] - onCommit func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error -} - -// Put implements the Batch interface. -func (b *txWrappedBatch) Put(item storage.Item) error { - if err := b.store.IsDone(); err != nil { - return err - } - - reverseOp, err := put(b.store, b.batch, item) - if err == nil && reverseOp != nil { - b.opsMu.Lock() - b.ops = append(b.ops, reverseOp) - b.opsMu.Unlock() - } - return err -} - -// Delete implements the Batch interface. -func (b *txWrappedBatch) Delete(item storage.Item) error { - if err := b.store.IsDone(); err != nil { - return err - } - - reverseOp, err := del(b.store, b.batch, item) - if err == nil && reverseOp != nil { - b.opsMu.Lock() - b.ops = append(b.ops, reverseOp) - b.opsMu.Unlock() - } - return err -} - -func (b *txWrappedBatch) Commit() error { - if err := b.batch.Commit(); err != nil { - return err - } - b.opsMu.Lock() - defer b.opsMu.Unlock() - defer func() { - b.ops = nil - }() - return b.onCommit(b.ops...) -} - // NewTx implements the TxStore interface. func (s *TxStore) NewTx(state *storage.TxState) storage.TxStore { if s.BatchedStore == nil { diff --git a/pkg/storage/leveldbstore/transaction.go b/pkg/storage/leveldbstore/transaction.go index 6080b66d765..9777b305765 100644 --- a/pkg/storage/leveldbstore/transaction.go +++ b/pkg/storage/leveldbstore/transaction.go @@ -86,8 +86,61 @@ func (s *txRevertOpStore) Clean() error { return s.db.Delete(s.id, nil) } +// txBatch is a batch that is used in a transaction. +type txBatch struct { + batch storage.Batch + store *TxStore + revOpsMu sync.Mutex + revOps []*storage.TxRevertOp[[]byte, []byte] + onCommit func(revOps ...*storage.TxRevertOp[[]byte, []byte]) error +} + +// Put implements the Batch interface. +func (b *txBatch) Put(item storage.Item) error { + if err := b.store.IsDone(); err != nil { + return err + } + + reverseOp, err := put(b.store, b.batch, item) + if err == nil && reverseOp != nil { + b.revOpsMu.Lock() + b.revOps = append(b.revOps, reverseOp) + b.revOpsMu.Unlock() + } + return err +} + +// Delete implements the Batch interface. +func (b *txBatch) Delete(item storage.Item) error { + if err := b.store.IsDone(); err != nil { + return err + } + + reverseOp, err := del(b.store, b.batch, item) + if err == nil && reverseOp != nil { + b.revOpsMu.Lock() + b.revOps = append(b.revOps, reverseOp) + b.revOpsMu.Unlock() + } + return err +} + +// Commit implements the Batch interface. +func (b *txBatch) Commit() error { + if err := b.batch.Commit(); err != nil { + return err + } + b.revOpsMu.Lock() + defer b.revOpsMu.Unlock() + defer func() { + b.revOps = nil + }() + return b.onCommit(b.revOps...) +} + var ( _ storage.TxStore = (*TxStore)(nil) + _ storage.Batcher = (*TxStore)(nil) _ storage.Recoverer = (*TxStore)(nil) ) @@ -230,7 +283,7 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) { return nil, err } - return &txWrappedBatch{ + return &txBatch{ batch: batch, store: s, onCommit: func(revOps ...*storage.TxRevertOp[[]byte, []byte]) error { @@ -239,56 +292,6 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) { }, nil } -type txWrappedBatch struct { - batch storage.Batch - store *TxStore - opsMu sync.Mutex - ops []*storage.TxRevertOp[[]byte, []byte] - onCommit func(revOps ...*storage.TxRevertOp[[]byte, []byte]) error -} - -// Put implements the Batch interface. -func (b *txWrappedBatch) Put(item storage.Item) error { - if err := b.store.IsDone(); err != nil { - return err - } - - reverseOp, err := put(b.store, b.batch, item) - if err == nil && reverseOp != nil { - b.opsMu.Lock() - b.ops = append(b.ops, reverseOp) - b.opsMu.Unlock() - } - return err -} - -// Delete implements the Batch interface. -func (b *txWrappedBatch) Delete(item storage.Item) error { - if err := b.store.IsDone(); err != nil { - return err - } - - reverseOp, err := del(b.store, b.batch, item) - if err == nil && reverseOp != nil { - b.opsMu.Lock() - b.ops = append(b.ops, reverseOp) - b.opsMu.Unlock() - } - return err -} - -func (b *txWrappedBatch) Commit() error { - if err := b.batch.Commit(); err != nil { - return err - } - b.opsMu.Lock() - defer b.opsMu.Unlock() - defer func() { - b.ops = nil - }() - return b.onCommit(b.ops...) -} - // pendingTxNamespace exist for cashing the namespace of pendingTx var pendingTxNamespace = new(pendingTx).Namespace() diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d57aa70afbf..f61099b4188 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -155,7 +155,7 @@ type Store interface { Writer } -// Reader contains the read-only methods required for the Data Abstraction Layer. +// Reader groups methods that read from the store. type Reader interface { // Get unmarshalls object with the given Item.Key.ID into the given Item. Get(Item) error @@ -175,7 +175,7 @@ type Reader interface { Count(Key) (int, error) } -// Writer contains the write-only methods required for the Data Abstraction Layer. +// Writer groups methods that change the state of the store. type Writer interface { // Put inserts or updates the given Item identified by its Key.ID. Put(Item) error @@ -185,7 +185,7 @@ type Writer interface { Delete(Item) error } -// BatchedStore is a store that supports batching. +// BatchedStore is a store that supports batching of Writer method calls. type BatchedStore interface { Store Batcher diff --git a/pkg/storage/storagetest/transaction.go b/pkg/storage/storagetest/transaction.go index 28cf43bc41a..60a80c934d4 100644 --- a/pkg/storage/storagetest/transaction.go +++ b/pkg/storage/storagetest/transaction.go @@ -12,7 +12,7 @@ import ( "testing" "time" - storage "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/storageutil" chunktest "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/swarm" @@ -63,12 +63,12 @@ func (o object) String() string { } // initStore initializes the given store with the given objects. -func initStore(t *testing.T, store storage.BatchedStore, batch bool, objects ...*object) { +func initStore(t *testing.T, store storage.BatchedStore, batched bool, objects ...*object) { t.Helper() - var writer storage.Writer + var writer storage.Writer = store - if batch { + if batched { b, err := store.Batch(context.Background()) if err != nil { t.Fatalf("Batch(): unexpected error: %v", err) @@ -79,8 +79,6 @@ func initStore(t *testing.T, store storage.BatchedStore, batch bool, objects ... } }() writer = b - } else { - writer = store } for _, o := range objects { @@ -90,12 +88,12 @@ func initStore(t *testing.T, store storage.BatchedStore, batch bool, objects ... } } -func deleteStore(t *testing.T, store storage.BatchedStore, batch bool, objects ...*object) { +func deleteStore(t *testing.T, store storage.BatchedStore, batched bool, objects ...*object) { t.Helper() - var writer storage.Writer + var writer storage.Writer = store - if batch { + if batched { b, err := store.Batch(context.Background()) if err != nil { t.Fatalf("Batch(): unexpected error: %v", err) @@ -106,8 +104,6 @@ func deleteStore(t *testing.T, store storage.BatchedStore, batch bool, objects . } }() writer = b - } else { - writer = store } for _, o := range objects { @@ -192,11 +188,11 @@ func TestTxStore(t *testing.T, store storage.TxStore) { }) tCases := []struct { - name string - batch bool + name string + batched bool }{ {"single", false}, - {"batchd", true}, + {"batched", true}, } for _, tCase := range tCases { @@ -214,7 +210,7 @@ func TestTxStore(t *testing.T, store storage.TxStore) { t.Run("add new objects", func(t *testing.T) { tx := store.NewTx(storage.NewTxState(ctx)) - initStore(t, tx, tCase.batch, objects...) + initStore(t, tx, tCase.batched, objects...) if err := tx.Commit(); err != nil { t.Fatalf("Commit(): unexpected error: %v", err) @@ -233,7 +229,7 @@ func TestTxStore(t *testing.T, store storage.TxStore) { t.Run("delete existing objects", func(t *testing.T) { tx := store.NewTx(storage.NewTxState(ctx)) - deleteStore(t, tx, tCase.batch, objects...) + deleteStore(t, tx, tCase.batched, objects...) if err := tx.Commit(); err != nil { t.Fatalf("Commit(): unexpected error: %v", err) } @@ -290,7 +286,7 @@ func TestTxStore(t *testing.T, store storage.TxStore) { {id: "0002" + tCase.name, data: []byte("data2")}, {id: "0003" + tCase.name, data: []byte("data3")}, } - initStore(t, tx, tCase.batch, objects...) + initStore(t, tx, tCase.batched, objects...) if err := tx.Rollback(); err != nil { t.Fatalf("Rollback(): unexpected error: %v", err) @@ -317,7 +313,7 @@ func TestTxStore(t *testing.T, store storage.TxStore) { {id: "0002" + tCase.name, data: []byte("data2")}, {id: "0003" + tCase.name, data: []byte("data3")}, } - initStore(t, tx, tCase.batch, oldObjects...) + initStore(t, tx, tCase.batched, oldObjects...) if err := tx.Commit(); err != nil { t.Fatalf("Commit(): unexpected error: %v", err) } @@ -328,7 +324,7 @@ func TestTxStore(t *testing.T, store storage.TxStore) { {id: "0002" + tCase.name, data: []byte("data22")}, {id: "0003" + tCase.name, data: []byte("data33")}, } - initStore(t, tx, tCase.batch, newObjects...) + initStore(t, tx, tCase.batched, newObjects...) if err := tx.Rollback(); err != nil { t.Fatalf("Rollback(): unexpected error: %v", err) } @@ -357,13 +353,13 @@ func TestTxStore(t *testing.T, store storage.TxStore) { {id: "0002" + tCase.name, data: []byte("data2")}, {id: "0003" + tCase.name, data: []byte("data3")}, } - initStore(t, tx, tCase.batch, objects...) + initStore(t, tx, tCase.batched, objects...) if err := tx.Commit(); err != nil { t.Fatalf("Commit(): unexpected error: %v", err) } tx = store.NewTx(storage.NewTxState(ctx)) - deleteStore(t, tx, tCase.batch, objects...) + deleteStore(t, tx, tCase.batched, objects...) if err := tx.Rollback(); err != nil { t.Fatalf("Rollback(): unexpected error: %v", err) } diff --git a/pkg/storageincentives/staking/mock/contract.go b/pkg/storageincentives/staking/mock/contract.go index c87564a374a..ab85d5cb519 100644 --- a/pkg/storageincentives/staking/mock/contract.go +++ b/pkg/storageincentives/staking/mock/contract.go @@ -39,7 +39,7 @@ func (s *stakingContractMock) IsOverlayFrozen(ctx context.Context, block uint64) // Option is a an option passed to New type Option func(mock *stakingContractMock) -// New creates a new mock BatchStore +// New creates a new mock BatchStore. func New(opts ...Option) staking.Contract { bs := &stakingContractMock{}