Skip to content

Commit

Permalink
feat: add status neighborhoods endpoint (#4853)
Browse files Browse the repository at this point in the history
Co-authored-by: istae <[email protected]>
  • Loading branch information
martinconic and istae authored Oct 10, 2024
1 parent d52e3fc commit 646f02d
Show file tree
Hide file tree
Showing 23 changed files with 255 additions and 46 deletions.
19 changes: 18 additions & 1 deletion openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/FeedType"
required: false
description: "Feed indexing scheme (default: sequence)"
- $ref: "SwarmCommon.yaml#/components/headers/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter"
Expand Down Expand Up @@ -2448,6 +2448,23 @@ paths:
default:
description: Default response.

"/status/neighborhoods":
get:
summary: Get the current neighborhoods status of this node.
tags:
- Node Status
responses:
"200":
description: Returns the neighborhoods status of this node
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/StatusNeighborhoodsResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response.

components:
securitySchemes:
basicAuth:
Expand Down
24 changes: 24 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,30 @@ components:
items:
$ref: "#/components/schemas/StatusSnapshotResponse"

StatusNeighborhoodResponse:
type: object
properties:
neighborhood:
$ref: "#/components/schemas/Neighborhood"
reserveSizeWithinRadius:
type: integer
proximity:
type: integer

Neighborhood:
type: string
description: Swarm address of a neighborhood in string binary format, usually limited to as many bits as the current storage radius.
example: "011010111"

StatusNeighborhoodsResponse:
type: object
properties:
stamps:
type: array
nullable: false
items:
$ref: "#/components/schemas/StatusNeighborhoodResponse"

ApiChunkInclusionProof:
type: object
properties:
Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type Storer interface {
storer.LocalStore
storer.RadiusChecker
storer.Debugger
storer.NeighborhoodStats
}

type PinIntegrity interface {
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,14 @@ func (s *Service) mountBusinessDebug() {
),
})

handle("/status/neighborhoods", jsonhttp.MethodHandler{
"GET": web.ChainHandlers(
httpaccess.NewHTTPAccessSuppressLogHandler(),
s.statusAccessHandler,
web.FinalHandlerFunc(s.statusGetNeighborhoods),
),
})

handle("/rchash/{depth}/{anchor1}/{anchor2}", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.rchash),
Expand Down
41 changes: 41 additions & 0 deletions pkg/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ type statusResponse struct {
Snapshots []statusSnapshotResponse `json:"snapshots"`
}

type statusNeighborhoodResponse struct {
Neighborhood string `json:"neighborhood"`
ReserveSizeWithinRadius int `json:"reserveSizeWithinRadius"`
Proximity uint8 `json:"proximity"`
}

type neighborhoodsResponse struct {
Neighborhoods []statusNeighborhoodResponse `json:"neighborhoods"`
}

// statusAccessHandler is a middleware that limits the number of simultaneous
// status requests.
func (s *Service) statusAccessHandler(h http.Handler) http.Handler {
Expand Down Expand Up @@ -159,3 +169,34 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request)
})
jsonhttp.OK(w, statusResponse{Snapshots: snapshots})
}

// statusGetHandler returns the current node status.
func (s *Service) statusGetNeighborhoods(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("get_status_neighborhoods").Build()

if s.beeMode == DevMode {
logger.Warning("status neighborhoods endpoint is disabled in dev mode")
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
return
}

neighborhoods := make([]statusNeighborhoodResponse, 0)

nhoods, err := s.storer.NeighborhoodsStat(r.Context())
if err != nil {
logger.Debug("unable to get neighborhoods status", "error", err)
logger.Error(nil, "unable to get neighborhoods status")
jsonhttp.InternalServerError(w, "unable to get neighborhoods status")
return
}

for _, n := range nhoods {
neighborhoods = append(neighborhoods, statusNeighborhoodResponse{
Neighborhood: n.Neighborhood.String(),
ReserveSizeWithinRadius: n.ReserveSizeWithinRadius,
Proximity: swarm.Proximity(s.overlay.Bytes(), n.Neighborhood.Bytes()),
})
}

jsonhttp.OK(w, neighborhoodsResponse{Neighborhoods: neighborhoods})
}
6 changes: 6 additions & 0 deletions pkg/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package api_test

import (
"context"
"net/http"
"testing"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/topology"
)

Expand Down Expand Up @@ -119,6 +121,7 @@ type statusSnapshotMock struct {
storageRadius uint8
commitment uint64
chainState *postage.ChainState
neighborhoods []*storer.NeighborhoodStat
}

func (m *statusSnapshotMock) SyncRate() float64 { return m.syncRate }
Expand All @@ -129,3 +132,6 @@ func (m *statusSnapshotMock) GetChainState() *postage.ChainState { return m.chai
func (m *statusSnapshotMock) ReserveSizeWithinRadius() uint64 {
return m.reserveSizeWithinRadius
}
func (m *statusSnapshotMock) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return m.neighborhoods, nil
}
10 changes: 5 additions & 5 deletions pkg/storer/migration/all_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ func AfterInitSteps(
1: step_01,
2: step_02(st),
3: ReserveRepairer(st, storage.ChunkType, logger),
4: step_04(sharkyPath, sharkyNoOfShards, st),
5: step_05(st),
6: step_06(st),
4: step_04(sharkyPath, sharkyNoOfShards, st, logger),
5: step_05(st, logger),
6: step_06(st, logger),
}
}

// BeforeInitSteps lists all migration steps for localstore IndexStore before the localstore is initiated.
func BeforeInitSteps(st storage.BatchStore) migration.Steps {
func BeforeInitSteps(st storage.BatchStore, logger log.Logger) migration.Steps {
return map[uint64]migration.StepFn{
1: RefCountSizeInc(st),
1: RefCountSizeInc(st, logger),
}
}
6 changes: 3 additions & 3 deletions pkg/storer/migration/all_steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func TestPostSteps(t *testing.T) {

st := inmemstore.New()

assert.NotEmpty(t, localmigration.BeforeInitSteps(st))
assert.NotEmpty(t, localmigration.BeforeInitSteps(st, log.Noop))

t.Run("version numbers", func(t *testing.T) {
t.Parallel()

err := migration.ValidateVersions(localmigration.BeforeInitSteps(st))
err := migration.ValidateVersions(localmigration.BeforeInitSteps(st, log.Noop))
assert.NoError(t, err)
})

Expand All @@ -63,7 +63,7 @@ func TestPostSteps(t *testing.T) {

store := inmemstore.New()

err := migration.Migrate(store, "migration", localmigration.BeforeInitSteps(store))
err := migration.Migrate(store, "migration", localmigration.BeforeInitSteps(store, log.Noop))
assert.NoError(t, err)
})
}
6 changes: 3 additions & 3 deletions pkg/storer/migration/refCntSize.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"encoding/binary"
"errors"
"os"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
Expand Down Expand Up @@ -101,9 +100,10 @@ func (r OldRetrievalIndexItem) String() string {
return storageutil.JoinFields(r.Namespace(), r.ID())
}

func RefCountSizeInc(s storage.BatchStore) func() error {
func RefCountSizeInc(s storage.BatchStore, logger log.Logger) func() error {
return func() error {
logger := log.NewLogger("migration-RefCountSizeInc", log.WithSink(os.Stdout))

logger := logger.WithName("migration-RefCountSizeInc").Register()

logger.Info("starting migration of replacing chunkstore items to increase refCnt capacity")

Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/refCntSize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
"github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore"
Expand Down Expand Up @@ -36,7 +37,7 @@ func Test_RefCntSize(t *testing.T) {
assert.NoError(t, err)
}

assert.NoError(t, stepFn(store)())
assert.NoError(t, stepFn(store, log.Noop)())

// check if all entries are migrated.
for _, entry := range oldItems {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storer/migration/step_04.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package migration

import (
"context"
"os"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
Expand All @@ -21,13 +20,14 @@ func step_04(
sharkyBasePath string,
sharkyNoOfShards int,
st transaction.Storage,
logger log.Logger,
) func() error {
return func() error {
// for in-mem store, skip this step
if sharkyBasePath == "" {
return nil
}
logger := log.NewLogger("migration-step-04", log.WithSink(os.Stdout))
logger := logger.WithName("migration-step-04").Register()

logger.Info("starting sharky recovery")
sharkyRecover, err := sharky.NewRecovery(sharkyBasePath, sharkyNoOfShards, swarm.SocMaxChunkSize)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/step_04_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
chunktest "github.com/ethersphere/bee/v2/pkg/storage/testing"
Expand Down Expand Up @@ -38,7 +39,7 @@ func Test_Step_04(t *testing.T) {
store := inmemstore.New()
storage := transaction.NewStorage(sharkyStore, store)

stepFn := localmigration.Step_04(sharkyDir, 1, storage)
stepFn := localmigration.Step_04(sharkyDir, 1, storage, log.Noop)

chunks := chunktest.GenerateTestRandomChunks(10)

Expand Down
7 changes: 4 additions & 3 deletions pkg/storer/migration/step_05.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package migration
import (
"context"
"fmt"
"os"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/storage"
Expand All @@ -16,9 +15,11 @@ import (
)

// step_05 is a migration step that removes all upload items from the store.
func step_05(st transaction.Storage) func() error {
func step_05(st transaction.Storage, logger log.Logger) func() error {
return func() error {
logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout))

logger := logger.WithName("migration-step-05").Register()

logger.Info("start removing upload items")

itemC := make(chan storage.Item)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/step_05_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/leveldbstore"
Expand Down Expand Up @@ -98,7 +99,7 @@ func Test_Step_05(t *testing.T) {

wantCount(t, store.IndexStore(), 10)

err = localmigration.Step_05(store)()
err = localmigration.Step_05(store, log.Noop)()
if err != nil {
t.Fatalf("step 05: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storer/migration/step_06.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"os"
"runtime"
"sync/atomic"
"time"
Expand All @@ -24,9 +23,10 @@ import (
)

// step_06 is a migration step that adds a stampHash to all BatchRadiusItems, ChunkBinItems and StampIndexItems.
func step_06(st transaction.Storage) func() error {
func step_06(st transaction.Storage, logger log.Logger) func() error {
return func() error {
logger := log.NewLogger("migration-step-06", log.WithSink(os.Stdout))
logger := logger.WithName("migration-step-06").Register()

logger.Info("start adding stampHash to BatchRadiusItems, ChunkBinItems and StampIndexItems")

seenCount, doneCount, err := addStampHash(logger, st)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/migration/step_06_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"testing"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/leveldbstore"
Expand Down Expand Up @@ -98,7 +99,7 @@ func Test_Step_06(t *testing.T) {
}

require.NoError(t, err)
err = localmigration.Step_06(store)()
err = localmigration.Step_06(store, log.Noop)()
require.NoError(t, err)

has, err := store.IndexStore().Has(&reserve.EpochItem{})
Expand Down
4 changes: 4 additions & 0 deletions pkg/storer/mock/mockstorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,7 @@ func (m *mockStorer) IsWithinStorageRadius(_ swarm.Address) bool { return true }
func (m *mockStorer) DebugInfo(_ context.Context) (storer.Info, error) {
return m.debugInfo, nil
}

func (m *mockStorer) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return nil, nil
}
Loading

0 comments on commit 646f02d

Please sign in to comment.