Skip to content

Commit

Permalink
feat: check blob key exist before payments
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Mar 7, 2025
1 parent 1e556fc commit b81a4a9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
11 changes: 10 additions & 1 deletion disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
if onchainState == nil {
return nil, api.NewErrorInternal("onchain state is nil")
}
if err := s.validateDispersalRequest(req, onchainState); err != nil {
if err := s.validateDispersalRequest(ctx, req, onchainState); err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate the request: %v", err))
}

Expand Down Expand Up @@ -133,6 +133,7 @@ func (s *DispersalServerV2) checkPaymentMeter(ctx context.Context, req *pb.Dispe
}

func (s *DispersalServerV2) validateDispersalRequest(
ctx context.Context,
req *pb.DisperseBlobRequest,
onchainState *OnchainState) error {

Expand Down Expand Up @@ -220,5 +221,13 @@ func (s *DispersalServerV2) validateDispersalRequest(
return errors.New("invalid blob commitment")
}

blobKey, err := blobHeader.BlobKey()
if err != nil {
return fmt.Errorf("failed to get blob key: %w", err)
}
if s.blobStore.CheckBlobExists(ctx, blobKey) {
return fmt.Errorf("blob already exists: %s", blobKey.Hex())
}

return nil
}
32 changes: 32 additions & 0 deletions disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,38 @@ func TestV2DisperseBlob(t *testing.T) {
BlobHeader: blobHeaderProto,
})
assert.Nil(t, reply)
assert.ErrorContains(t, err, "blob already exists")

data = make([]byte, 50)
_, err = rand.Read(data)
assert.NoError(t, err)
data = codec.ConvertByPaddingEmptyByte(data)
commitments, err = prover.GetCommitmentsForPaddedLength(data)
assert.NoError(t, err)
accountID, err = c.Signer.GetAccountID()
assert.NoError(t, err)
commitmentProto, err = commitments.ToProtobuf()
assert.NoError(t, err)
blobHeader, err = corev2.BlobHeaderFromProtobuf(blobHeaderProto)
assert.NoError(t, err)
sig, err = signer.SignBlobRequest(blobHeader)
assert.NoError(t, err)
blobHeaderProto = &pbcommonv2.BlobHeader{
Version: 0,
QuorumNumbers: []uint32{0, 1},
Commitment: commitmentProto,
PaymentHeader: &pbcommonv2.PaymentHeader{
AccountId: accountID.Hex(),
Timestamp: 5,
CumulativePayment: big.NewInt(100).Bytes(),
},
}
reply, err = c.DispersalServerV2.DisperseBlob(ctx, &pbv2.DisperseBlobRequest{
Blob: data,
Signature: sig,
BlobHeader: blobHeaderProto,
})
assert.Nil(t, reply)
assert.ErrorContains(t, err, "payment already exists")
}

Expand Down
11 changes: 8 additions & 3 deletions disperser/common/v2/blobstore/s3_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,25 @@ func NewBlobStore(s3BucketName string, s3Client s3.Client, logger logging.Logger

// StoreBlob adds a blob to the blob store
func (b *BlobStore) StoreBlob(ctx context.Context, key corev2.BlobKey, data []byte) error {
_, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
if err == nil {
if b.CheckBlobExists(ctx, key) {
b.logger.Warnf("blob already exists in bucket %s: %s", b.bucketName, key)
return common.ErrAlreadyExists
}

err = b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
err := b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data)
if err != nil {
b.logger.Errorf("failed to upload blob in bucket %s: %v", b.bucketName, err)
return err
}
return nil
}

// CheckBlobExists checks if a blob exists in the blob store
func (b *BlobStore) CheckBlobExists(ctx context.Context, key corev2.BlobKey) bool {
_, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
return err == nil
}

// GetBlob retrieves a blob from the blob store
func (b *BlobStore) GetBlob(ctx context.Context, key corev2.BlobKey) ([]byte, error) {
data, err := b.s3Client.DownloadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))
Expand Down

0 comments on commit b81a4a9

Please sign in to comment.