Skip to content

Commit

Permalink
chore: extra checks in step 6 migration and small changes to transact…
Browse files Browse the repository at this point in the history
…ion op steps (#4803)
  • Loading branch information
istae authored Sep 9, 2024
1 parent 3ebd780 commit 8c390f6
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 11 deletions.
22 changes: 12 additions & 10 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
"new_chunk", chunk.Address(),
"batch_id", hex.EncodeToString(chunk.Stamp().BatchID()),
)
// remove index items and chunk data
err = r.removeChunk(ctx, s, oldStampIndex.ChunkAddress, oldStampIndex.BatchID, oldStampIndex.StampHash)
if err != nil {
return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err)
Expand All @@ -183,13 +184,23 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return err
}

// delete old chunk index items
err = errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.DeleteWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldStamp),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldStamp),
)
if err != nil {
return err
}

binID, err := r.IncBinID(s.IndexStore(), bin)
if err != nil {
return err
}

err = errors.Join(
r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldStamp),
stampindex.Store(s.IndexStore(), reserveNamespace, chunk),
chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk),
s.IndexStore().Put(&BatchRadiusItem{
Expand Down Expand Up @@ -299,15 +310,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return nil
}

func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error {
return errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(oldBatchRadiusItem.Address, nil).WithStamp(sameAddressOldChunkStamp)),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldChunkStamp),
)
}

func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) {
item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr, StampHash: stampHash}
return r.st.IndexStore().Has(item)
Expand Down
13 changes: 13 additions & 0 deletions pkg/storer/internal/stampindex/stampindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,16 @@ func Delete(s storage.Writer, namespace string, chunk swarm.Chunk) error {
}
return nil
}

// DeleteWithStamp removes the related stamp index record from the storage.
func DeleteWithStamp(s storage.Writer, namespace string, stamp swarm.Stamp) error {
item := &Item{
namespace: []byte(namespace),
BatchID: stamp.BatchID(),
StampIndex: stamp.Index(),
}
if err := s.Delete(item); err != nil {
return fmt.Errorf("failed to delete stampindex.Item %s: %w", item, err)
}
return nil
}
97 changes: 97 additions & 0 deletions pkg/storer/internal/stampindex/stampindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,103 @@ func TestStoreLoadDelete(t *testing.T) {
}
}

func TestStoreLoadDeleteWithStamp(t *testing.T) {
t.Parallel()

ts := newTestStorage(t)
chunks := chunktest.GenerateTestRandomChunks(10)

for i, chunk := range chunks {
ns := fmt.Sprintf("namespace_%d", i)
t.Run(ns, func(t *testing.T) {
t.Run("store new stamp index", func(t *testing.T) {

err := ts.Run(context.Background(), func(s transaction.Store) error {
return stampindex.Store(s.IndexStore(), ns, chunk)

})
if err != nil {
t.Fatalf("Store(...): unexpected error: %v", err)
}

stampHash, err := chunk.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash)
want.StampTimestamp = chunk.Stamp().Timestamp()
want.ChunkAddress = chunk.Address()

have := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash)
err = ts.IndexStore().Get(have)
if err != nil {
t.Fatalf("Get(...): unexpected error: %v", err)
}

if diff := cmp.Diff(want, have, cmp.AllowUnexported(stampindex.Item{})); diff != "" {
t.Fatalf("Get(...): mismatch (-want +have):\n%s", diff)
}
})

t.Run("load stored stamp index", func(t *testing.T) {
stampHash, err := chunk.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash)
want.StampTimestamp = chunk.Stamp().Timestamp()
want.ChunkAddress = chunk.Address()

have, err := stampindex.Load(ts.IndexStore(), ns, chunk)
if err != nil {
t.Fatalf("Load(...): unexpected error: %v", err)
}

if diff := cmp.Diff(want, have, cmp.AllowUnexported(stampindex.Item{})); diff != "" {
t.Fatalf("Load(...): mismatch (-want +have):\n%s", diff)
}
})

t.Run("delete stored stamp index", func(t *testing.T) {

err := ts.Run(context.Background(), func(s transaction.Store) error {
return stampindex.DeleteWithStamp(s.IndexStore(), ns, chunk.Stamp())
})
if err != nil {
t.Fatalf("Delete(...): unexpected error: %v", err)
}

have, err := stampindex.Load(ts.IndexStore(), ns, chunk)
if have != nil {
t.Fatalf("Load(...): unexpected item %v", have)
}
if !errors.Is(err, storage.ErrNotFound) {
t.Fatalf("Load(...): unexpected error: %v", err)
}

cnt := 0
err = ts.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item {
return new(stampindex.Item)
},
},
func(result storage.Result) (bool, error) {
cnt++
return false, nil
},
)
if err != nil {
t.Fatalf("Store().Iterate(...): unexpected error: %v", err)
}
if want, have := 0, cnt; want != have {
t.Fatalf("Store().Iterate(...): chunk count mismatch:\nwant: %d\nhave: %d", want, have)
}
})
})
}
}

func TestLoadOrStore(t *testing.T) {
t.Parallel()

Expand Down
39 changes: 38 additions & 1 deletion pkg/storer/migration/step_06.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package migration

import (
"bytes"
"context"
"errors"
"fmt"
"os"
"runtime"
Expand Down Expand Up @@ -186,7 +188,42 @@ func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, erro
}

if postBatchRadiusCnt != postChunkBinCnt || preBatchRadiusCnt != postBatchRadiusCnt || preChunkBinCnt != postChunkBinCnt {
return 0, 0, fmt.Errorf("post-migration check: index counts do not match, %d vs %d. It's recommended that the nuke is run to reset the node", postBatchRadiusCnt, postChunkBinCnt)
return 0, 0, fmt.Errorf("post-migration check: index counts do not match, %d vs %d. It's recommended that the nuke cmd is run to reset the node", postBatchRadiusCnt, postChunkBinCnt)
}

err = st.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return new(reserve.ChunkBinItem) },
}, func(result storage.Result) (bool, error) {
item := result.Entry.(*reserve.ChunkBinItem)

batchRadiusItem := &reserve.BatchRadiusItem{BatchID: item.BatchID, Bin: item.Bin, Address: item.Address, StampHash: item.StampHash}
if err := st.IndexStore().Get(batchRadiusItem); err != nil {
return false, fmt.Errorf("batch radius item get: %w", err)
}

stamp, err := chunkstamp.LoadWithBatchID(st.IndexStore(), "reserve", item.Address, item.BatchID)
if err != nil {
return false, fmt.Errorf("stamp item get: %w", err)
}

stampIndex, err := stampindex.LoadWithStamp(st.IndexStore(), "reserve", stamp)
if err != nil {
return false, fmt.Errorf("stamp index get: %w", err)
}

if !bytes.Equal(item.StampHash, batchRadiusItem.StampHash) {
return false, fmt.Errorf("batch radius item stamp hash, %x vs %x", item.StampHash, batchRadiusItem.StampHash)
}

if !bytes.Equal(item.StampHash, stampIndex.StampHash) {
return false, fmt.Errorf("stamp index item stamp hash, %x vs %x", item.StampHash, stampIndex.StampHash)
}

return false, nil
})

if err != nil {
return 0, 0, errors.New("post-migration check: items fields not match. It's recommended that the nuke cmd is run to reset the node")
}

return seenCount, doneCount.Load(), nil
Expand Down

0 comments on commit 8c390f6

Please sign in to comment.