Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check blob key exist before payments #1370

Merged
merged 2 commits into from
Mar 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
if onchainState == nil {
return nil, api.NewErrorInternal("onchain state is nil")
}
if err := s.validateDispersalRequest(req, onchainState); err != nil {
if err := s.validateDispersalRequest(ctx, req, onchainState); err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate the request: %v", err))
}

Expand Down Expand Up @@ -133,6 +133,7 @@ func (s *DispersalServerV2) checkPaymentMeter(ctx context.Context, req *pb.Dispe
}

func (s *DispersalServerV2) validateDispersalRequest(
ctx context.Context,
req *pb.DisperseBlobRequest,
onchainState *OnchainState) error {

Expand Down Expand Up @@ -220,5 +221,13 @@ func (s *DispersalServerV2) validateDispersalRequest(
return errors.New("invalid blob commitment")
}

blobKey, err := blobHeader.BlobKey()
if err != nil {
return fmt.Errorf("failed to get blob key: %w", err)
}
if s.blobStore.CheckBlobExists(ctx, blobKey) {
return fmt.Errorf("blob already exists: %s", blobKey.Hex())
}

return nil
}
35 changes: 33 additions & 2 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,45 @@ func TestV2DisperseBlob(t *testing.T) {
assert.Greater(t, blobMetadata.RequestedAt, uint64(now.UnixNano()))
assert.Equal(t, blobMetadata.RequestedAt, blobMetadata.UpdatedAt)

// Try dispersing the same blob; if payment is different, blob will be considered as a differernt blob
// payment will cause failure before commitment check
// Try dispersing the same blob; blob key check will fail if the blob is already stored
reply, err = c.DispersalServerV2.DisperseBlob(ctx, &pbv2.DisperseBlobRequest{
Blob: data,
Signature: sig,
BlobHeader: blobHeaderProto,
})
assert.Nil(t, reply)
assert.ErrorContains(t, err, "blob already exists")

data2 := make([]byte, 50)
_, err = rand.Read(data)
assert.NoError(t, err)

data2 = codec.ConvertByPaddingEmptyByte(data2)
commitments, err = prover.GetCommitmentsForPaddedLength(data2)
assert.NoError(t, err)
commitmentProto, err = commitments.ToProtobuf()
assert.NoError(t, err)
blobHeaderProto2 := &pbcommonv2.BlobHeader{
Version: 0,
QuorumNumbers: []uint32{0, 1},
Commitment: commitmentProto,
PaymentHeader: &pbcommonv2.PaymentHeader{
AccountId: accountID.Hex(),
Timestamp: 5,
CumulativePayment: big.NewInt(100).Bytes(),
},
}
blobHeader2, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto2)
assert.NoError(t, err)
sig2, err := signer.SignBlobRequest(blobHeader2)
assert.NoError(t, err)

reply, err = c.DispersalServerV2.DisperseBlob(ctx, &pbv2.DisperseBlobRequest{
Blob: data2,
Signature: sig2,
BlobHeader: blobHeaderProto2,
})
assert.Nil(t, reply)
assert.ErrorContains(t, err, "payment already exists")
}

Expand Down
11 changes: 8 additions & 3 deletions disperser/common/v2/blobstore/s3_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,25 @@ func NewBlobStore(s3BucketName string, s3Client s3.Client, logger logging.Logger

// StoreBlob adds a blob to the blob store
func (b *BlobStore) StoreBlob(ctx context.Context, key corev2.BlobKey, data []byte) error {
_, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
if err == nil {
if b.CheckBlobExists(ctx, key) {
b.logger.Warnf("blob already exists in bucket %s: %s", b.bucketName, key)
return common.ErrAlreadyExists
}

err = b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
err := b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
if err != nil {
b.logger.Errorf("failed to upload blob in bucket %s: %v", b.bucketName, err)
return err
}
return nil
}

// CheckBlobExists checks if a blob exists in the blob store
func (b *BlobStore) CheckBlobExists(ctx context.Context, key corev2.BlobKey) bool {
_, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
return err == nil
}

// GetBlob retrieves a blob from the blob store
func (b *BlobStore) GetBlob(ctx context.Context, key corev2.BlobKey) ([]byte, error) {
data, err := b.s3Client.DownloadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
Expand Down
Loading