Skip to content

Commit

Permalink
fix: new neighborhood type and cleanup (#4855)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 10, 2024
1 parent 3368d11 commit 518b2c1
Show file tree
Hide file tree
Showing 22 changed files with 171 additions and 82 deletions.
11 changes: 9 additions & 2 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -936,10 +936,17 @@ components:
StatusNeighborhoodResponse:
type: object
properties:
address:
type: string
neighborhood:
$ref: "#/components/schemas/Neighborhood"
reserveSizeWithinRadius:
type: integer
proximity:
type: integer

Neighborhood:
type: string
description: Swarm address of a neighborhood in string binary format, usually limited to as many bits as the current storage radius.
example: "011010111"

StatusNeighborhoodsResponse:
type: object
Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type Storer interface {
storer.LocalStore
storer.RadiusChecker
storer.Debugger
storer.NeighborhoodStats
}

type PinIntegrity interface {
Expand Down
20 changes: 6 additions & 14 deletions pkg/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package api

import (
"context"
"fmt"
"net/http"
"sort"
"sync"
Expand Down Expand Up @@ -38,8 +37,9 @@ type statusResponse struct {
}

type statusNeighborhoodResponse struct {
Address string `json:"address"`
Neighborhood string `json:"neighborhood"`
ReserveSizeWithinRadius int `json:"reserveSizeWithinRadius"`
Proximity uint8 `json:"proximity"`
}

type neighborhoodsResponse struct {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request)
}

// statusGetHandler returns the current node status.
func (s *Service) statusGetNeighborhoods(w http.ResponseWriter, _ *http.Request) {
func (s *Service) statusGetNeighborhoods(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("get_status_neighborhoods").Build()

if s.beeMode == DevMode {
Expand All @@ -182,27 +182,19 @@ func (s *Service) statusGetNeighborhoods(w http.ResponseWriter, _ *http.Request)

neighborhoods := make([]statusNeighborhoodResponse, 0)

nhoods, err := s.statusService.NeighborhoodsSnapshot()
nhoods, err := s.storer.NeighborhoodsStat(r.Context())
if err != nil {
logger.Debug("unable to get neighborhoods status", "error", err)
logger.Error(nil, "unable to get neighborhoods status")
jsonhttp.InternalServerError(w, "unable to get neighborhoods status")
return
}

if len(nhoods) == 0 {
jsonhttp.NotFound(w, "neighborhoods not found")
return
}

for _, n := range nhoods {
binaryAddr := ""
for _, b := range n.Address.Bytes() {
binaryAddr += fmt.Sprintf("%08b ", b)
}
neighborhoods = append(neighborhoods, statusNeighborhoodResponse{
Address: binaryAddr,
Neighborhood: n.Neighborhood.String(),
ReserveSizeWithinRadius: n.ReserveSizeWithinRadius,
Proximity: swarm.Proximity(s.overlay.Bytes(), n.Neighborhood.Bytes()),
})
}

Expand Down
15 changes: 0 additions & 15 deletions pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/p2p/protobuf"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/status/internal/pb"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/topology"
)
Expand All @@ -40,7 +39,6 @@ type Reserve interface {
ReserveSize() int
ReserveSizeWithinRadius() uint64
StorageRadius() uint8
NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error)
}

type topologyDriver interface {
Expand Down Expand Up @@ -133,19 +131,6 @@ func (s *Service) LocalSnapshot() (*Snapshot, error) {
}, nil
}

func (s *Service) NeighborhoodsSnapshot() ([]*storer.NeighborhoodStat, error) {
var err error
neighborhoods := make([]*storer.NeighborhoodStat, 0)

if s.reserve != nil {
neighborhoods, err = s.reserve.NeighborhoodsStat(context.Background())
if err != nil {
return neighborhoods, err
}
}
return neighborhoods, err
}

// PeerSnapshot sends request for status snapshot to the peer.
func (s *Service) PeerSnapshot(ctx context.Context, peer swarm.Address) (*Snapshot, error) {
stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
Expand Down
12 changes: 2 additions & 10 deletions pkg/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/ethersphere/bee/v2/pkg/status/internal/pb"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/topology"
"github.com/google/go-cmp/cmp"
Expand All @@ -36,7 +35,7 @@ func TestStatus(t *testing.T) {
LastSyncedBlock: 6092500,
}

sssMock := &statusSnapshotMock{want, nil}
sssMock := &statusSnapshotMock{want}

peersIterMock := new(topologyPeersIterNoopMock)

Expand Down Expand Up @@ -116,9 +115,7 @@ func TestStatusLightNode(t *testing.T) {
StorageRadius: 100, // should be ignored
BatchCommitment: 1024,
LastSyncedBlock: 6092500,
},
nil,
}
}}

peersIterMock := new(topologyPeersIterNoopMock)

Expand Down Expand Up @@ -194,7 +191,6 @@ func (m *topologyPeersIterNoopMock) IsReachable() bool {
// - SyncReporter
type statusSnapshotMock struct {
*pb.Snapshot
neighborhoods []*storer.NeighborhoodStat
}

func (m *statusSnapshotMock) SyncRate() float64 { return m.Snapshot.PullsyncRate }
Expand All @@ -207,7 +203,3 @@ func (m *statusSnapshotMock) GetChainState() *postage.ChainState {
func (m *statusSnapshotMock) ReserveSizeWithinRadius() uint64 {
return m.Snapshot.ReserveSizeWithinRadius
}

func (m *statusSnapshotMock) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return m.neighborhoods, nil
}
10 changes: 5 additions & 5 deletions pkg/storer/migration/all_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ func AfterInitSteps(
1: step_01,
2: step_02(st),
3: ReserveRepairer(st, storage.ChunkType, logger),
4: step_04(sharkyPath, sharkyNoOfShards, st),
5: step_05(st),
6: step_06(st),
4: step_04(sharkyPath, sharkyNoOfShards, st, logger),
5: step_05(st, logger),
6: step_06(st, logger),
}
}

// BeforeInitSteps lists all migration steps for localstore IndexStore before the localstore is initiated.
func BeforeInitSteps(st storage.BatchStore) migration.Steps {
func BeforeInitSteps(st storage.BatchStore, logger log.Logger) migration.Steps {
return map[uint64]migration.StepFn{
1: RefCountSizeInc(st),
1: RefCountSizeInc(st, logger),
}
}
6 changes: 3 additions & 3 deletions pkg/storer/migration/all_steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func TestPostSteps(t *testing.T) {

st := inmemstore.New()

assert.NotEmpty(t, localmigration.BeforeInitSteps(st))
assert.NotEmpty(t, localmigration.BeforeInitSteps(st, log.Noop))

t.Run("version numbers", func(t *testing.T) {
t.Parallel()

err := migration.ValidateVersions(localmigration.BeforeInitSteps(st))
err := migration.ValidateVersions(localmigration.BeforeInitSteps(st, log.Noop))
assert.NoError(t, err)
})

Expand All @@ -63,7 +63,7 @@ func TestPostSteps(t *testing.T) {

store := inmemstore.New()

err := migration.Migrate(store, "migration", localmigration.BeforeInitSteps(store))
err := migration.Migrate(store, "migration", localmigration.BeforeInitSteps(store, log.Noop))
assert.NoError(t, err)
})
}
6 changes: 3 additions & 3 deletions pkg/storer/migration/refCntSize.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"encoding/binary"
"errors"
"os"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
Expand Down Expand Up @@ -101,9 +100,10 @@ func (r OldRetrievalIndexItem) String() string {
return storageutil.JoinFields(r.Namespace(), r.ID())
}

func RefCountSizeInc(s storage.BatchStore) func() error {
func RefCountSizeInc(s storage.BatchStore, logger log.Logger) func() error {
return func() error {
logger := log.NewLogger("migration-RefCountSizeInc", log.WithSink(os.Stdout))

logger := logger.WithName("migration-RefCountSizeInc").Register()

logger.Info("starting migration of replacing chunkstore items to increase refCnt capacity")

Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/refCntSize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
"github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore"
Expand Down Expand Up @@ -36,7 +37,7 @@ func Test_RefCntSize(t *testing.T) {
assert.NoError(t, err)
}

assert.NoError(t, stepFn(store)())
assert.NoError(t, stepFn(store, log.Noop)())

// check if all entries are migrated.
for _, entry := range oldItems {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storer/migration/step_04.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package migration

import (
"context"
"os"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
Expand All @@ -21,13 +20,14 @@ func step_04(
sharkyBasePath string,
sharkyNoOfShards int,
st transaction.Storage,
logger log.Logger,
) func() error {
return func() error {
// for in-mem store, skip this step
if sharkyBasePath == "" {
return nil
}
logger := log.NewLogger("migration-step-04", log.WithSink(os.Stdout))
logger := logger.WithName("migration-step-04").Register()

logger.Info("starting sharky recovery")
sharkyRecover, err := sharky.NewRecovery(sharkyBasePath, sharkyNoOfShards, swarm.SocMaxChunkSize)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/step_04_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
chunktest "github.com/ethersphere/bee/v2/pkg/storage/testing"
Expand Down Expand Up @@ -38,7 +39,7 @@ func Test_Step_04(t *testing.T) {
store := inmemstore.New()
storage := transaction.NewStorage(sharkyStore, store)

stepFn := localmigration.Step_04(sharkyDir, 1, storage)
stepFn := localmigration.Step_04(sharkyDir, 1, storage, log.Noop)

chunks := chunktest.GenerateTestRandomChunks(10)

Expand Down
7 changes: 4 additions & 3 deletions pkg/storer/migration/step_05.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package migration
import (
"context"
"fmt"
"os"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/storage"
Expand All @@ -16,9 +15,11 @@ import (
)

// step_05 is a migration step that removes all upload items from the store.
func step_05(st transaction.Storage) func() error {
func step_05(st transaction.Storage, logger log.Logger) func() error {
return func() error {
logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout))

logger := logger.WithName("migration-step-05").Register()

logger.Info("start removing upload items")

itemC := make(chan storage.Item)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/step_05_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/leveldbstore"
Expand Down Expand Up @@ -98,7 +99,7 @@ func Test_Step_05(t *testing.T) {

wantCount(t, store.IndexStore(), 10)

err = localmigration.Step_05(store)()
err = localmigration.Step_05(store, log.Noop)()
if err != nil {
t.Fatalf("step 05: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storer/migration/step_06.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"os"
"runtime"
"sync/atomic"
"time"
Expand All @@ -24,9 +23,10 @@ import (
)

// step_06 is a migration step that adds a stampHash to all BatchRadiusItems, ChunkBinItems and StampIndexItems.
func step_06(st transaction.Storage) func() error {
func step_06(st transaction.Storage, logger log.Logger) func() error {
return func() error {
logger := log.NewLogger("migration-step-06", log.WithSink(os.Stdout))
logger := logger.WithName("migration-step-06").Register()

logger.Info("start adding stampHash to BatchRadiusItems, ChunkBinItems and StampIndexItems")

seenCount, doneCount, err := addStampHash(logger, st)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/step_06_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/leveldbstore"
Expand Down Expand Up @@ -98,7 +99,7 @@ func Test_Step_06(t *testing.T) {
}

require.NoError(t, err)
err = localmigration.Step_06(store)()
err = localmigration.Step_06(store, log.Noop)()
require.NoError(t, err)

has, err := store.IndexStore().Has(&reserve.EpochItem{})
Expand Down
4 changes: 4 additions & 0 deletions pkg/storer/mock/mockstorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,7 @@ func (m *mockStorer) IsWithinStorageRadius(_ swarm.Address) bool { return true }
func (m *mockStorer) DebugInfo(_ context.Context) (storer.Info, error) {
return m.debugInfo, nil
}

func (m *mockStorer) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return nil, nil
}
6 changes: 3 additions & 3 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (db *DB) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan
}

type NeighborhoodStat struct {
Address swarm.Address
Neighborhood swarm.Neighborhood
ReserveSizeWithinRadius int
}

Expand All @@ -511,12 +511,12 @@ func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error
prefixes := neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling)
neighs := make([]*NeighborhoodStat, len(prefixes))
for i, n := range prefixes {
neighs[i] = &NeighborhoodStat{Address: n}
neighs[i] = &NeighborhoodStat{swarm.NewNeighborhood(n, networkRadius), 0}
}

err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) {
for _, n := range neighs {
if swarm.Proximity(ch.Address.Bytes(), n.Address.Bytes()) >= networkRadius {
if swarm.Proximity(ch.Address.Bytes(), n.Neighborhood.Bytes()) >= networkRadius {
n.ReserveSizeWithinRadius++
break
}
Expand Down
Loading

0 comments on commit 518b2c1

Please sign in to comment.