Skip to content

Commit

Permalink
store success sampling result
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Oct 31, 2024
1 parent 105ab1b commit cd5fe25
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 63 deletions.
87 changes: 54 additions & 33 deletions share/availability/light/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
Expand Down Expand Up @@ -62,9 +63,10 @@ func NewShareAvailability(
// SharesAvailable randomly samples `params.SampleAmount` amount of Shares committed to the given
// ExtendedHeader. This way SharesAvailable subjectively verifies that Shares are available.
func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header.ExtendedHeader) error {
dah := header.DAH
roots := header.DAH

// short-circuit if the given root is an empty data square
if share.DataHash(dah.Hash()).IsEmptyEDS() {
if share.DataHash(roots.Hash()).IsEmptyEDS() {
return nil
}

Expand All @@ -75,62 +77,81 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header
}
defer release()

// load snapshot of the last sampling errors from disk
key := datastoreKeyForRoot(dah)
key := datastoreKeyForRoot(roots)
var samples *SamplingResult

// Attempt to load previous sampling results
la.dsLk.RLock()
last, err := la.ds.Get(ctx, key)
data, err := la.ds.Get(ctx, key)
la.dsLk.RUnlock()

// Check for error cases
var samples []Sample
switch {
case err == nil && len(last) == 0:
// Availability has already been validated
return nil
case err != nil && !errors.Is(err, datastore.ErrNotFound):
// Other error occurred
return err
case errors.Is(err, datastore.ErrNotFound):
// No sampling result found, select new samples
samples = selectRandomSamples(len(dah.RowRoots), int(la.params.SampleAmount))
default:
// Sampling result found, unmarshal it
err = json.Unmarshal(last, &samples)
if err != nil {
if !errors.Is(err, datastore.ErrNotFound) {
return err
}
// No previous results; create new samples
samples = NewSamplingResult(len(roots.RowRoots), int(la.params.SampleAmount))
} else {
samples = &SamplingResult{}
err = json.Unmarshal(data, samples)
if err != nil {
return err
}
// Verify total samples count
totalSamples := len(samples.Remaining) + len(samples.Available)
if totalSamples != int(la.params.SampleAmount) {
return fmt.Errorf("invalid sampling result:"+
" expected %d samples, got %d", la.params.SampleAmount, totalSamples)
}
}

if len(samples.Remaining) == 0 {
// All samples have been processed successfully
return nil
}

var (
failedSamplesLock sync.Mutex
failedSamples []Sample
mutex sync.Mutex
failedSamples []Sample
wg sync.WaitGroup
)

log.Debugw("starting sampling session", "height", header.Height())
var wg sync.WaitGroup
for _, s := range samples {

// remove one second from the deadline to ensure we have enough time to process the results
samplingCtx, cancel := context.WithCancel(ctx)
if deadline, ok := ctx.Deadline(); ok {
samplingCtx, cancel = context.WithDeadline(ctx, deadline.Add(-time.Second))
}
defer cancel()

// Concurrently sample shares
for _, s := range samples.Remaining {
wg.Add(1)
go func(s Sample) {
defer wg.Done()
// check if the sample is available
_, err := la.getter.GetShare(ctx, header, s.Row, s.Col)
_, err := la.getter.GetShare(samplingCtx, header, s.Row, s.Col)
mutex.Lock()
defer mutex.Unlock()
if err != nil {
log.Debugw("error fetching share", "height", header.Height(), "row", s.Row, "col", s.Col)
failedSamplesLock.Lock()
failedSamples = append(failedSamples, s)
failedSamplesLock.Unlock()
} else {
samples.Available = append(samples.Available, s)
}
}(s)
}
wg.Wait()

// store the result of the sampling session
bs, err := json.Marshal(failedSamples)
// Update remaining samples with failed ones
samples.Remaining = failedSamples

// Store the updated sampling result
updatedData, err := json.Marshal(samples)
if err != nil {
return fmt.Errorf("failed to marshal sampling result: %w", err)
return err
}
la.dsLk.Lock()
err = la.ds.Put(ctx, key, bs)
err = la.ds.Put(ctx, key, updatedData)
la.dsLk.Unlock()
if err != nil {
return fmt.Errorf("failed to store sampling result: %w", err)
Expand Down
89 changes: 59 additions & 30 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ func TestSharesAvailableSuccess(t *testing.T) {
require.NoError(t, err)

// Verify that the sampling result is stored with all samples marked as available
result, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots))
data, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots))
require.NoError(t, err)

var failed []Sample
err = json.Unmarshal(result, &failed)
var result SamplingResult
err = json.Unmarshal(data, &result)
require.NoError(t, err)
require.Empty(t, failed)

require.Empty(t, result.Remaining)
require.Len(t, result.Available, int(avail.params.SampleAmount))
}

func TestSharesAvailableSkipSampled(t *testing.T) {
Expand All @@ -90,13 +92,16 @@ func TestSharesAvailableSkipSampled(t *testing.T) {
require.ErrorIs(t, err, share.ErrNotAvailable)

// Store a successful sampling result in the datastore
failed := []Sample{}
data, err := json.Marshal(failed)
samplingResult := &SamplingResult{
Available: make([]Sample, avail.params.SampleAmount),
Remaining: []Sample{},
}
data, err := json.Marshal(samplingResult)
require.NoError(t, err)
err = avail.ds.Put(ctx, datastoreKeyForRoot(roots), data)
require.NoError(t, err)

// SharesAvailable should now return no error since the success sampling result is stored
// SharesAvailable should now return nil since the success sampling result is stored
err = avail.SharesAvailable(ctx, eh)
require.NoError(t, err)
}
Expand Down Expand Up @@ -138,25 +143,38 @@ func TestSharesAvailableFailed(t *testing.T) {
require.ErrorIs(t, err, share.ErrNotAvailable)

// The datastore should now contain the sampling result with all samples in Remaining
result, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots))
data, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots))
require.NoError(t, err)

var failed []Sample
err = json.Unmarshal(result, &failed)
var failed SamplingResult
err = json.Unmarshal(data, &failed)
require.NoError(t, err)
require.Len(t, failed, int(avail.params.SampleAmount))

require.Empty(t, failed.Available)
require.Len(t, failed.Remaining, int(avail.params.SampleAmount))

// Simulate a getter that now returns shares successfully
onceGetter := newOnceGetter()
avail.getter = onceGetter
successfulGetter := newOnceGetter()
avail.getter = successfulGetter

// should be able to retrieve all the failed samples now
err = avail.SharesAvailable(ctx, eh)
require.NoError(t, err)

// The sampling result should now have all samples in Available
data, err = avail.ds.Get(ctx, datastoreKeyForRoot(roots))
require.NoError(t, err)

var result SamplingResult
err = json.Unmarshal(data, &result)
require.NoError(t, err)

require.Empty(t, result.Remaining)
require.Len(t, result.Available, int(avail.params.SampleAmount))

// onceGetter should have no more samples stored after the call
onceGetter.checkOnce(t)
require.ElementsMatch(t, failed, onceGetter.sampledList())
successfulGetter.checkOnce(t)
require.ElementsMatch(t, failed.Remaining, successfulGetter.sampledList())
}

func TestParallelAvailability(t *testing.T) {
Expand Down Expand Up @@ -185,6 +203,17 @@ func TestParallelAvailability(t *testing.T) {
}
wg.Wait()
require.Len(t, successfulGetter.sampledList(), int(avail.params.SampleAmount))

// Verify that the sampling result is stored with all samples marked as available
resultData, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots))
require.NoError(t, err)

var samplingResult SamplingResult
err = json.Unmarshal(resultData, &samplingResult)
require.NoError(t, err)

require.Empty(t, samplingResult.Remaining)
require.Len(t, samplingResult.Available, int(avail.params.SampleAmount))
}

type onceGetter struct {
Expand All @@ -199,39 +228,39 @@ func newOnceGetter() onceGetter {
}
}

func (m onceGetter) checkOnce(t *testing.T) {
m.Lock()
defer m.Unlock()
for s, count := range m.sampled {
func (g onceGetter) checkOnce(t *testing.T) {
g.Lock()
defer g.Unlock()
for s, count := range g.sampled {
if count > 1 {
t.Errorf("sample %v was called more than once", s)
}
}
}

func (m onceGetter) sampledList() []Sample {
m.Lock()
defer m.Unlock()
samples := make([]Sample, 0, len(m.sampled))
for s := range m.sampled {
func (g onceGetter) sampledList() []Sample {
g.Lock()
defer g.Unlock()
samples := make([]Sample, 0, len(g.sampled))
for s := range g.sampled {
samples = append(samples, s)
}
return samples
}

func (m onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) {
m.Lock()
defer m.Unlock()
func (g onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) {
g.Lock()
defer g.Unlock()
s := Sample{Row: row, Col: col}
m.sampled[s]++
g.sampled[s]++
return libshare.Share{}, nil
}

func (m onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) {
func (g onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) {
panic("not implemented")
}

func (m onceGetter) GetSharesByNamespace(
func (g onceGetter) GetSharesByNamespace(
_ context.Context,
_ *header.ExtendedHeader,
_ libshare.Namespace,
Expand Down
19 changes: 19 additions & 0 deletions share/availability/light/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,31 @@ import (
"golang.org/x/exp/maps"
)

// SamplingResult holds the available and remaining samples.
type SamplingResult struct {
Available []Sample `json:"available"`
Remaining []Sample `json:"remaining"`
}

// Sample represents a coordinate in a 2D data square.
type Sample struct {
Row int `json:"row"`
Col int `json:"col"`
}

// NewSamplingResult creates a new SamplingResult with randomly selected samples.
func NewSamplingResult(squareSize, sampleCount int) *SamplingResult {
total := squareSize * squareSize
if sampleCount > total {
sampleCount = total
}

samples := selectRandomSamples(squareSize, sampleCount)
return &SamplingResult{
Remaining: samples,
}
}

// selectRandomSamples randomly picks unique coordinates from a square of given size.
func selectRandomSamples(squareSize, sampleCount int) []Sample {
total := squareSize * squareSize
Expand Down

0 comments on commit cd5fe25

Please sign in to comment.