From 8c390f6346ecd5eb214449cc2bf6bf4c5e2455fd Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 9 Sep 2024 13:00:42 +0300 Subject: [PATCH] chore: extra checks in step 6 migration and small changes to transaction op steps (#4803) --- pkg/storer/internal/reserve/reserve.go | 22 +++-- pkg/storer/internal/stampindex/stampindex.go | 13 +++ .../internal/stampindex/stampindex_test.go | 97 +++++++++++++++++++ pkg/storer/migration/step_06.go | 39 +++++++- 4 files changed, 160 insertions(+), 11 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 7781498cd1f..8a26492114f 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -163,6 +163,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { "new_chunk", chunk.Address(), "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), ) + // remove index items and chunk data err = r.removeChunk(ctx, s, oldStampIndex.ChunkAddress, oldStampIndex.BatchID, oldStampIndex.StampHash) if err != nil { return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err) @@ -183,13 +184,23 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } + // delete old chunk index items + err = errors.Join( + s.IndexStore().Delete(oldBatchRadiusItem), + s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), + stampindex.DeleteWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldStamp), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldStamp), + ) + if err != nil { + return err + } + binID, err := r.IncBinID(s.IndexStore(), bin) if err != nil { return err } err = errors.Join( - r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldStamp), stampindex.Store(s.IndexStore(), reserveNamespace, chunk), chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk), s.IndexStore().Put(&BatchRadiusItem{ @@ -299,15 +310,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return nil } -func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error { - return errors.Join( - s.IndexStore().Delete(oldBatchRadiusItem), - s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), - stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(oldBatchRadiusItem.Address, nil).WithStamp(sameAddressOldChunkStamp)), - chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldChunkStamp), - ) -} - func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) { item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr, StampHash: stampHash} return r.st.IndexStore().Has(item) diff --git a/pkg/storer/internal/stampindex/stampindex.go b/pkg/storer/internal/stampindex/stampindex.go index 3ead7c8d861..815cfacd0fb 100644 --- a/pkg/storer/internal/stampindex/stampindex.go +++ b/pkg/storer/internal/stampindex/stampindex.go @@ -224,3 +224,16 @@ func Delete(s storage.Writer, namespace string, chunk swarm.Chunk) error { } return nil } + +// DeleteWithStamp removes the related stamp index record from the storage. +func DeleteWithStamp(s storage.Writer, namespace string, stamp swarm.Stamp) error { + item := &Item{ + namespace: []byte(namespace), + BatchID: stamp.BatchID(), + StampIndex: stamp.Index(), + } + if err := s.Delete(item); err != nil { + return fmt.Errorf("failed to delete stampindex.Item %s: %w", item, err) + } + return nil +} diff --git a/pkg/storer/internal/stampindex/stampindex_test.go b/pkg/storer/internal/stampindex/stampindex_test.go index 7b951924bf4..94eb0493533 100644 --- a/pkg/storer/internal/stampindex/stampindex_test.go +++ b/pkg/storer/internal/stampindex/stampindex_test.go @@ -203,6 +203,103 @@ func TestStoreLoadDelete(t *testing.T) { } } +func TestStoreLoadDeleteWithStamp(t *testing.T) { + t.Parallel() + + ts := newTestStorage(t) + chunks := chunktest.GenerateTestRandomChunks(10) + + for i, chunk := range chunks { + ns := fmt.Sprintf("namespace_%d", i) + t.Run(ns, func(t *testing.T) { + t.Run("store new stamp index", func(t *testing.T) { + + err := ts.Run(context.Background(), func(s transaction.Store) error { + return stampindex.Store(s.IndexStore(), ns, chunk) + + }) + if err != nil { + t.Fatalf("Store(...): unexpected error: %v", err) + } + + stampHash, err := chunk.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) + want.StampTimestamp = chunk.Stamp().Timestamp() + want.ChunkAddress = chunk.Address() + + have := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) + err = ts.IndexStore().Get(have) + if err != nil { + t.Fatalf("Get(...): unexpected error: %v", err) + } + + if diff := cmp.Diff(want, have, cmp.AllowUnexported(stampindex.Item{})); diff != "" { + t.Fatalf("Get(...): mismatch (-want +have):\n%s", diff) + } + }) + + t.Run("load stored stamp index", func(t *testing.T) { + stampHash, err := chunk.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) + want.StampTimestamp = chunk.Stamp().Timestamp() + want.ChunkAddress = chunk.Address() + + have, err := stampindex.Load(ts.IndexStore(), ns, chunk) + if err != nil { + t.Fatalf("Load(...): unexpected error: %v", err) + } + + if diff := cmp.Diff(want, have, cmp.AllowUnexported(stampindex.Item{})); diff != "" { + t.Fatalf("Load(...): mismatch (-want +have):\n%s", diff) + } + }) + + t.Run("delete stored stamp index", func(t *testing.T) { + + err := ts.Run(context.Background(), func(s transaction.Store) error { + return stampindex.DeleteWithStamp(s.IndexStore(), ns, chunk.Stamp()) + }) + if err != nil { + t.Fatalf("Delete(...): unexpected error: %v", err) + } + + have, err := stampindex.Load(ts.IndexStore(), ns, chunk) + if have != nil { + t.Fatalf("Load(...): unexpected item %v", have) + } + if !errors.Is(err, storage.ErrNotFound) { + t.Fatalf("Load(...): unexpected error: %v", err) + } + + cnt := 0 + err = ts.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { + return new(stampindex.Item) + }, + }, + func(result storage.Result) (bool, error) { + cnt++ + return false, nil + }, + ) + if err != nil { + t.Fatalf("Store().Iterate(...): unexpected error: %v", err) + } + if want, have := 0, cnt; want != have { + t.Fatalf("Store().Iterate(...): chunk count mismatch:\nwant: %d\nhave: %d", want, have) + } + }) + }) + } +} + func TestLoadOrStore(t *testing.T) { t.Parallel() diff --git a/pkg/storer/migration/step_06.go b/pkg/storer/migration/step_06.go index dc2604c3e29..9cd85273920 100644 --- a/pkg/storer/migration/step_06.go +++ b/pkg/storer/migration/step_06.go @@ -5,7 +5,9 @@ package migration import ( + "bytes" "context" + "errors" "fmt" "os" "runtime" @@ -186,7 +188,42 @@ func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, erro } if postBatchRadiusCnt != postChunkBinCnt || preBatchRadiusCnt != postBatchRadiusCnt || preChunkBinCnt != postChunkBinCnt { - return 0, 0, fmt.Errorf("post-migration check: index counts do not match, %d vs %d. It's recommended that the nuke is run to reset the node", postBatchRadiusCnt, postChunkBinCnt) + return 0, 0, fmt.Errorf("post-migration check: index counts do not match, %d vs %d. It's recommended that the nuke cmd is run to reset the node", postBatchRadiusCnt, postChunkBinCnt) + } + + err = st.IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return new(reserve.ChunkBinItem) }, + }, func(result storage.Result) (bool, error) { + item := result.Entry.(*reserve.ChunkBinItem) + + batchRadiusItem := &reserve.BatchRadiusItem{BatchID: item.BatchID, Bin: item.Bin, Address: item.Address, StampHash: item.StampHash} + if err := st.IndexStore().Get(batchRadiusItem); err != nil { + return false, fmt.Errorf("batch radius item get: %w", err) + } + + stamp, err := chunkstamp.LoadWithBatchID(st.IndexStore(), "reserve", item.Address, item.BatchID) + if err != nil { + return false, fmt.Errorf("stamp item get: %w", err) + } + + stampIndex, err := stampindex.LoadWithStamp(st.IndexStore(), "reserve", stamp) + if err != nil { + return false, fmt.Errorf("stamp index get: %w", err) + } + + if !bytes.Equal(item.StampHash, batchRadiusItem.StampHash) { + return false, fmt.Errorf("batch radius item stamp hash, %x vs %x", item.StampHash, batchRadiusItem.StampHash) + } + + if !bytes.Equal(item.StampHash, stampIndex.StampHash) { + return false, fmt.Errorf("stamp index item stamp hash, %x vs %x", item.StampHash, stampIndex.StampHash) + } + + return false, nil + }) + + if err != nil { + return 0, 0, errors.New("post-migration check: items fields not match. It's recommended that the nuke cmd is run to reset the node") } return seenCount, doneCount.Load(), nil