Skip to content

Commit

Permalink
refactor: cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
mrekucci authored and alok committed Jul 10, 2023
1 parent 90dd6eb commit dfb16e5
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 135 deletions.
2 changes: 1 addition & 1 deletion pkg/postage/batchservice/batchservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func New(
}

// Create will create a new batch with the given ID, owner value and depth and
// stores it in the BatchStore.
// stores it in the BatchedStore.
func (svc *batchService) Create(id, owner []byte, totalAmout, normalisedBalance *big.Int, depth, bucketDepth uint8, immutable bool, txHash common.Hash) error {
// don't add batches which have value which equals total cumulative
// payout or that are going to expire already within the next couple of blocks
Expand Down
4 changes: 2 additions & 2 deletions pkg/postage/postagecontract/mock/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package mock

import (
"context"
"github.com/ethereum/go-ethereum/common"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/postage/postagecontract"
)

Expand Down Expand Up @@ -38,7 +38,7 @@ func (c *contractMock) ExpireBatches(ctx context.Context) error {
// Option is a an option passed to New
type Option func(*contractMock)

// New creates a new mock BatchStore
// New creates a new mock BatchStore.
func New(opts ...Option) postagecontract.Interface {
bs := &contractMock{}

Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
// call when a batch has already been committed.
var ErrBatchCommitted = errors.New("storage: batch has already been committed")

// ErrBatchNotSupported is returned by BatchedStore.Batch call when batching
// is not supported.
var ErrBatchNotSupported = errors.New("storage: batch operations not supported")

// Batch provides set of operations that are batched.
type Batch interface {
// Put adds a new item to the batch.
Expand Down
109 changes: 57 additions & 52 deletions pkg/storage/inmemstore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/ethersphere/bee/pkg/storage"
)

var _ storage.TxStore = (*TxStore)(nil)
var (
_ storage.TxStore = (*TxStore)(nil)
_ storage.Batcher = (*TxStore)(nil)
)

func put(
reader storage.Reader,
Expand Down Expand Up @@ -68,6 +71,58 @@ func del(
return nil, err
}

// txBatch is a batch that is used in a transaction.
type txBatch struct {
batch storage.Batch
store *TxStore
revOpsMu sync.Mutex
revOps []*storage.TxRevertOp[storage.Key, storage.Item]
onCommit func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error
}

// Put implements the Batch interface.
func (b *txBatch) Put(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := put(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.revOpsMu.Lock()
b.revOps = append(b.revOps, reverseOp)
b.revOpsMu.Unlock()
}
return err
}

// Delete implements the Batch interface.
func (b *txBatch) Delete(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := del(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.revOpsMu.Lock()
b.revOps = append(b.revOps, reverseOp)
b.revOpsMu.Unlock()
}
return err
}

// Commit implements the Batch interface.
func (b *txBatch) Commit() error {
if err := b.batch.Commit(); err != nil {
return err
}
b.revOpsMu.Lock()
defer b.revOpsMu.Unlock()
defer func() {
b.revOps = nil
}()
return b.onCommit(b.revOps...)
}

// TxStore is an implementation of in-memory Store
// where all Store operations are done in a transaction.
type TxStore struct {
Expand Down Expand Up @@ -144,7 +199,7 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) {
return nil, err
}

return &txWrappedBatch{
return &txBatch{
batch: batch,
store: s,
onCommit: func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error {
Expand All @@ -153,56 +208,6 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) {
}, nil
}

type txWrappedBatch struct {
batch storage.Batch
store *TxStore
opsMu sync.Mutex
ops []*storage.TxRevertOp[storage.Key, storage.Item]
onCommit func(revOps ...*storage.TxRevertOp[storage.Key, storage.Item]) error
}

// Put implements the Batch interface.
func (b *txWrappedBatch) Put(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := put(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.opsMu.Lock()
b.ops = append(b.ops, reverseOp)
b.opsMu.Unlock()
}
return err
}

// Delete implements the Batch interface.
func (b *txWrappedBatch) Delete(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := del(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.opsMu.Lock()
b.ops = append(b.ops, reverseOp)
b.opsMu.Unlock()
}
return err
}

func (b *txWrappedBatch) Commit() error {
if err := b.batch.Commit(); err != nil {
return err
}
b.opsMu.Lock()
defer b.opsMu.Unlock()
defer func() {
b.ops = nil
}()
return b.onCommit(b.ops...)
}

// NewTx implements the TxStore interface.
func (s *TxStore) NewTx(state *storage.TxState) storage.TxStore {
if s.BatchedStore == nil {
Expand Down
105 changes: 54 additions & 51 deletions pkg/storage/leveldbstore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,61 @@ func (s *txRevertOpStore) Clean() error {
return s.db.Delete(s.id, nil)
}

// txBatch is a batch that is used in a transaction.
type txBatch struct {
batch storage.Batch
store *TxStore
revOpsMu sync.Mutex
revOps []*storage.TxRevertOp[[]byte, []byte]
onCommit func(revOps ...*storage.TxRevertOp[[]byte, []byte]) error
}

// Put implements the Batch interface.
func (b *txBatch) Put(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := put(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.revOpsMu.Lock()
b.revOps = append(b.revOps, reverseOp)
b.revOpsMu.Unlock()
}
return err
}

// Delete implements the Batch interface.
func (b *txBatch) Delete(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := del(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.revOpsMu.Lock()
b.revOps = append(b.revOps, reverseOp)
b.revOpsMu.Unlock()
}
return err
}

// Commit implements the Batch interface.
func (b *txBatch) Commit() error {
if err := b.batch.Commit(); err != nil {
return err
}
b.revOpsMu.Lock()
defer b.revOpsMu.Unlock()
defer func() {
b.revOps = nil
}()
return b.onCommit(b.revOps...)
}

var (
_ storage.TxStore = (*TxStore)(nil)
_ storage.Batcher = (*TxStore)(nil)
_ storage.Recoverer = (*TxStore)(nil)
)

Expand Down Expand Up @@ -230,7 +283,7 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) {
return nil, err
}

return &txWrappedBatch{
return &txBatch{
batch: batch,
store: s,
onCommit: func(revOps ...*storage.TxRevertOp[[]byte, []byte]) error {
Expand All @@ -239,56 +292,6 @@ func (s *TxStore) Batch(ctx context.Context) (storage.Batch, error) {
}, nil
}

type txWrappedBatch struct {
batch storage.Batch
store *TxStore
opsMu sync.Mutex
ops []*storage.TxRevertOp[[]byte, []byte]
onCommit func(revOps ...*storage.TxRevertOp[[]byte, []byte]) error
}

// Put implements the Batch interface.
func (b *txWrappedBatch) Put(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := put(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.opsMu.Lock()
b.ops = append(b.ops, reverseOp)
b.opsMu.Unlock()
}
return err
}

// Delete implements the Batch interface.
func (b *txWrappedBatch) Delete(item storage.Item) error {
if err := b.store.IsDone(); err != nil {
return err
}

reverseOp, err := del(b.store, b.batch, item)
if err == nil && reverseOp != nil {
b.opsMu.Lock()
b.ops = append(b.ops, reverseOp)
b.opsMu.Unlock()
}
return err
}

func (b *txWrappedBatch) Commit() error {
if err := b.batch.Commit(); err != nil {
return err
}
b.opsMu.Lock()
defer b.opsMu.Unlock()
defer func() {
b.ops = nil
}()
return b.onCommit(b.ops...)
}

// pendingTxNamespace exist for cashing the namespace of pendingTx
var pendingTxNamespace = new(pendingTx).Namespace()

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type Store interface {
Writer
}

// Reader contains the read-only methods required for the Data Abstraction Layer.
// Reader groups methods that read from the store.
type Reader interface {
// Get unmarshalls object with the given Item.Key.ID into the given Item.
Get(Item) error
Expand All @@ -175,7 +175,7 @@ type Reader interface {
Count(Key) (int, error)
}

// Writer contains the write-only methods required for the Data Abstraction Layer.
// Writer groups methods that change the state of the store.
type Writer interface {
// Put inserts or updates the given Item identified by its Key.ID.
Put(Item) error
Expand All @@ -185,7 +185,7 @@ type Writer interface {
Delete(Item) error
}

// BatchedStore is a store that supports batching.
// BatchedStore is a store that supports batching of Writer method calls.
type BatchedStore interface {
Store
Batcher
Expand Down
Loading

0 comments on commit dfb16e5

Please sign in to comment.