diff --git a/disperser/apiserver/disperse_blob_v2.go b/disperser/apiserver/disperse_blob_v2.go index 39ed0b4c19..eb0a2155fa 100644 --- a/disperser/apiserver/disperse_blob_v2.go +++ b/disperser/apiserver/disperse_blob_v2.go @@ -29,7 +29,7 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl if onchainState == nil { return nil, api.NewErrorInternal("onchain state is nil") } - if err := s.validateDispersalRequest(req, onchainState); err != nil { + if err := s.validateDispersalRequest(ctx, req, onchainState); err != nil { return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate the request: %v", err)) } @@ -133,6 +133,7 @@ func (s *DispersalServerV2) checkPaymentMeter(ctx context.Context, req *pb.Dispe } func (s *DispersalServerV2) validateDispersalRequest( + ctx context.Context, req *pb.DisperseBlobRequest, onchainState *OnchainState) error { @@ -220,5 +221,13 @@ func (s *DispersalServerV2) validateDispersalRequest( return errors.New("invalid blob commitment") } + blobKey, err := blobHeader.BlobKey() + if err != nil { + return fmt.Errorf("failed to get blob key: %w", err) + } + if s.blobStore.CheckBlobExists(ctx, blobKey) { + return fmt.Errorf("blob already exists: %s", blobKey.Hex()) + } + return nil } diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index 8dfb3802ed..84f16e2f94 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -111,14 +111,45 @@ func TestV2DisperseBlob(t *testing.T) { assert.Greater(t, blobMetadata.RequestedAt, uint64(now.UnixNano())) assert.Equal(t, blobMetadata.RequestedAt, blobMetadata.UpdatedAt) - // Try dispersing the same blob; if payment is different, blob will be considered as a differernt blob - // payment will cause failure before commitment check + // Try dispersing the same blob; blob key check will fail if the blob is already stored reply, err = c.DispersalServerV2.DisperseBlob(ctx, &pbv2.DisperseBlobRequest{ Blob: data, Signature: sig, BlobHeader: blobHeaderProto, }) assert.Nil(t, reply) + assert.ErrorContains(t, err, "blob already exists") + + data2 := make([]byte, 50) + _, err = rand.Read(data) + assert.NoError(t, err) + + data2 = codec.ConvertByPaddingEmptyByte(data2) + commitments, err = prover.GetCommitmentsForPaddedLength(data2) + assert.NoError(t, err) + commitmentProto, err = commitments.ToProtobuf() + assert.NoError(t, err) + blobHeaderProto2 := &pbcommonv2.BlobHeader{ + Version: 0, + QuorumNumbers: []uint32{0, 1}, + Commitment: commitmentProto, + PaymentHeader: &pbcommonv2.PaymentHeader{ + AccountId: accountID.Hex(), + Timestamp: 5, + CumulativePayment: big.NewInt(100).Bytes(), + }, + } + blobHeader2, err := corev2.BlobHeaderFromProtobuf(blobHeaderProto2) + assert.NoError(t, err) + sig2, err := signer.SignBlobRequest(blobHeader2) + assert.NoError(t, err) + + reply, err = c.DispersalServerV2.DisperseBlob(ctx, &pbv2.DisperseBlobRequest{ + Blob: data2, + Signature: sig2, + BlobHeader: blobHeaderProto2, + }) + assert.Nil(t, reply) assert.ErrorContains(t, err, "payment already exists") } diff --git a/disperser/common/v2/blobstore/s3_blob_store.go b/disperser/common/v2/blobstore/s3_blob_store.go index 433af0a765..ef06fc0631 100644 --- a/disperser/common/v2/blobstore/s3_blob_store.go +++ b/disperser/common/v2/blobstore/s3_blob_store.go @@ -26,13 +26,12 @@ func NewBlobStore(s3BucketName string, s3Client s3.Client, logger logging.Logger // StoreBlob adds a blob to the blob store func (b *BlobStore) StoreBlob(ctx context.Context, key corev2.BlobKey, data []byte) error { - _, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key)) - if err == nil { + if b.CheckBlobExists(ctx, key) { b.logger.Warnf("blob already exists in bucket %s: %s", b.bucketName, key) return common.ErrAlreadyExists } - err = b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data) + err := b.s3Client.UploadObject(ctx, b.bucketName, s3.ScopedBlobKey(key), data) if err != nil { b.logger.Errorf("failed to upload blob in bucket %s: %v", b.bucketName, err) return err @@ -40,6 +39,12 @@ func (b *BlobStore) StoreBlob(ctx context.Context, key corev2.BlobKey, data []by return nil } +// CheckBlobExists checks if a blob exists in the blob store +func (b *BlobStore) CheckBlobExists(ctx context.Context, key corev2.BlobKey) bool { + _, err := b.s3Client.HeadObject(ctx, b.bucketName, s3.ScopedBlobKey(key)) + return err == nil +} + // GetBlob retrieves a blob from the blob store func (b *BlobStore) GetBlob(ctx context.Context, key corev2.BlobKey) ([]byte, error) { data, err := b.s3Client.DownloadObject(ctx, b.bucketName, s3.ScopedBlobKey(key))