diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index 0abab489c9..9bfe5fbc92 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/autobatch" @@ -63,6 +64,7 @@ func NewShareAvailability( // ExtendedHeader. This way SharesAvailable subjectively verifies that Shares are available. func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header.ExtendedHeader) error { dah := header.DAH + // short-circuit if the given root is an empty data square if share.DataHash(dah.Hash()).IsEmptyEDS() { return nil @@ -75,62 +77,80 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header } defer release() - // load snapshot of the last sampling errors from disk key := datastoreKeyForRoot(dah) + samples := &SamplingResult{} + + // Attempt to load previous sampling results la.dsLk.RLock() - last, err := la.ds.Get(ctx, key) + data, err := la.ds.Get(ctx, key) la.dsLk.RUnlock() - - // Check for error cases - var samples []Sample - switch { - case err == nil && len(last) == 0: - // Availability has already been validated - return nil - case err != nil && !errors.Is(err, datastore.ErrNotFound): - // Other error occurred - return err - case errors.Is(err, datastore.ErrNotFound): - // No sampling result found, select new samples - samples = selectRandomSamples(len(dah.RowRoots), int(la.params.SampleAmount)) - default: - // Sampling result found, unmarshal it - err = json.Unmarshal(last, &samples) + if err != nil { + if !errors.Is(err, datastore.ErrNotFound) { + return err + } + // No previous results; create new samples + samples = NewSamplingResult(len(dah.RowRoots), int(la.params.SampleAmount)) + } else { + err = json.Unmarshal(data, samples) if err != nil { return err } + // Verify total samples count. + totalSamples := len(samples.Remaining) + len(samples.Available) + if totalSamples != int(la.params.SampleAmount) { + return fmt.Errorf("invalid sampling result:"+ + " expected %d samples, got %d", la.params.SampleAmount, totalSamples) + } + } + + if len(samples.Remaining) == 0 { + // All samples have been processed successfully + return nil } var ( - failedSamplesLock sync.Mutex - failedSamples []Sample + mutex sync.Mutex + failedSamples []Sample + wg sync.WaitGroup ) log.Debugw("starting sampling session", "height", header.Height()) - var wg sync.WaitGroup - for _, s := range samples { + + // remove one second from the deadline to ensure we have enough time to process the results + samplingCtx, cancel := context.WithCancel(ctx) + if deadline, ok := ctx.Deadline(); ok { + samplingCtx, cancel = context.WithDeadline(ctx, deadline.Add(-time.Second)) + } + defer cancel() + + // Concurrently sample shares + for _, s := range samples.Remaining { wg.Add(1) go func(s Sample) { defer wg.Done() - // check if the sample is available - _, err := la.getter.GetShare(ctx, header, s.Row, s.Col) + _, err := la.getter.GetShare(samplingCtx, header, s.Row, s.Col) + mutex.Lock() + defer mutex.Unlock() if err != nil { log.Debugw("error fetching share", "height", header.Height(), "row", s.Row, "col", s.Col) - failedSamplesLock.Lock() failedSamples = append(failedSamples, s) - failedSamplesLock.Unlock() + } else { + samples.Available = append(samples.Available, s) } }(s) } wg.Wait() - // store the result of the sampling session - bs, err := json.Marshal(failedSamples) + // Update remaining samples with failed ones + samples.Remaining = failedSamples + + // Store the updated sampling result + updatedData, err := json.Marshal(samples) if err != nil { - return fmt.Errorf("failed to marshal sampling result: %w", err) + return err } la.dsLk.Lock() - err = la.ds.Put(ctx, key, bs) + err = la.ds.Put(ctx, key, updatedData) la.dsLk.Unlock() if err != nil { return fmt.Errorf("failed to store sampling result: %w", err) diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 97a6ef117c..5a1e289a48 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -58,13 +58,15 @@ func TestSharesAvailableSuccess(t *testing.T) { require.NoError(t, err) // Verify that the sampling result is stored with all samples marked as available - result, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots)) + data, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots)) require.NoError(t, err) - var failed []Sample - err = json.Unmarshal(result, &failed) + var result SamplingResult + err = json.Unmarshal(data, &result) require.NoError(t, err) - require.Empty(t, failed) + + require.Empty(t, result.Remaining) + require.Len(t, result.Available, int(avail.params.SampleAmount)) } func TestSharesAvailableSkipSampled(t *testing.T) { @@ -90,13 +92,16 @@ func TestSharesAvailableSkipSampled(t *testing.T) { require.ErrorIs(t, err, share.ErrNotAvailable) // Store a successful sampling result in the datastore - failed := []Sample{} - data, err := json.Marshal(failed) + samplingResult := &SamplingResult{ + Available: make([]Sample, avail.params.SampleAmount), + Remaining: []Sample{}, + } + data, err := json.Marshal(samplingResult) require.NoError(t, err) err = avail.ds.Put(ctx, datastoreKeyForRoot(roots), data) require.NoError(t, err) - // SharesAvailable should now return no error since the success sampling result is stored + // SharesAvailable should now return nil since the success sampling result is stored err = avail.SharesAvailable(ctx, eh) require.NoError(t, err) } @@ -138,25 +143,38 @@ func TestSharesAvailableFailed(t *testing.T) { require.ErrorIs(t, err, share.ErrNotAvailable) // The datastore should now contain the sampling result with all samples in Remaining - result, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots)) + data, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots)) require.NoError(t, err) - var failed []Sample - err = json.Unmarshal(result, &failed) + var failed SamplingResult + err = json.Unmarshal(data, &failed) require.NoError(t, err) - require.Len(t, failed, int(avail.params.SampleAmount)) + + require.Empty(t, failed.Available) + require.Len(t, failed.Remaining, int(avail.params.SampleAmount)) // Simulate a getter that now returns shares successfully - onceGetter := newOnceGetter() - avail.getter = onceGetter + successfulGetter := newOnceGetter() + avail.getter = successfulGetter // should be able to retrieve all the failed samples now err = avail.SharesAvailable(ctx, eh) require.NoError(t, err) + // The sampling result should now have all samples in Available + data, err = avail.ds.Get(ctx, datastoreKeyForRoot(roots)) + require.NoError(t, err) + + var result SamplingResult + err = json.Unmarshal(data, &result) + require.NoError(t, err) + + require.Empty(t, result.Remaining) + require.Len(t, result.Available, int(avail.params.SampleAmount)) + // onceGetter should have no more samples stored after the call - onceGetter.checkOnce(t) - require.ElementsMatch(t, failed, onceGetter.sampledList()) + successfulGetter.checkOnce(t) + require.ElementsMatch(t, failed.Remaining, successfulGetter.sampledList()) } func TestParallelAvailability(t *testing.T) { @@ -185,6 +203,17 @@ func TestParallelAvailability(t *testing.T) { } wg.Wait() require.Len(t, successfulGetter.sampledList(), int(avail.params.SampleAmount)) + + // Verify that the sampling result is stored with all samples marked as available + resultData, err := avail.ds.Get(ctx, datastoreKeyForRoot(roots)) + require.NoError(t, err) + + var samplingResult SamplingResult + err = json.Unmarshal(resultData, &samplingResult) + require.NoError(t, err) + + require.Empty(t, samplingResult.Remaining) + require.Len(t, samplingResult.Available, int(avail.params.SampleAmount)) } type onceGetter struct { @@ -199,39 +228,39 @@ func newOnceGetter() onceGetter { } } -func (m onceGetter) checkOnce(t *testing.T) { - m.Lock() - defer m.Unlock() - for s, count := range m.sampled { +func (g onceGetter) checkOnce(t *testing.T) { + g.Lock() + defer g.Unlock() + for s, count := range g.sampled { if count > 1 { t.Errorf("sample %v was called more than once", s) } } } -func (m onceGetter) sampledList() []Sample { - m.Lock() - defer m.Unlock() - samples := make([]Sample, 0, len(m.sampled)) - for s := range m.sampled { +func (g onceGetter) sampledList() []Sample { + g.Lock() + defer g.Unlock() + samples := make([]Sample, 0, len(g.sampled)) + for s := range g.sampled { samples = append(samples, s) } return samples } -func (m onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) { - m.Lock() - defer m.Unlock() +func (g onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) { + g.Lock() + defer g.Unlock() s := Sample{Row: row, Col: col} - m.sampled[s]++ + g.sampled[s]++ return libshare.Share{}, nil } -func (m onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { +func (g onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { panic("not implemented") } -func (m onceGetter) GetSharesByNamespace( +func (g onceGetter) GetSharesByNamespace( _ context.Context, _ *header.ExtendedHeader, _ libshare.Namespace, diff --git a/share/availability/light/sample.go b/share/availability/light/sample.go index 17f6b2a59f..6857ab5365 100644 --- a/share/availability/light/sample.go +++ b/share/availability/light/sample.go @@ -7,12 +7,31 @@ import ( "golang.org/x/exp/maps" ) +// SamplingResult holds the available and remaining samples. +type SamplingResult struct { + Available []Sample `json:"available"` + Remaining []Sample `json:"remaining"` +} + // Sample represents a coordinate in a 2D data square. type Sample struct { Row int `json:"row"` Col int `json:"col"` } +// NewSamplingResult creates a new SamplingResult with randomly selected samples. +func NewSamplingResult(squareSize, sampleCount int) *SamplingResult { + total := squareSize * squareSize + if sampleCount > total { + sampleCount = total + } + + samples := selectRandomSamples(squareSize, sampleCount) + return &SamplingResult{ + Remaining: samples, + } +} + // selectRandomSamples randomly picks unique coordinates from a square of given size. func selectRandomSamples(squareSize, sampleCount int) []Sample { total := squareSize * squareSize