Skip to content

Commit

Permalink
Merge pull request #1559 from dwertent/fix-duplicate-metrics-call
Browse files Browse the repository at this point in the history
Refactor `maybePersistBlockchainEvent` to Return a Boolean Indicating Event Creation Status
  • Loading branch information
EnriqueL8 authored Aug 29, 2024
2 parents a983877 + a6ae64c commit 74e2cbd
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 14 deletions.
9 changes: 5 additions & 4 deletions internal/events/blockchain_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,21 @@ func (em *eventManager) getChainListenerByProtocolIDCached(ctx context.Context,
return l, nil
}

func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error {
// handleBlockchainBatchPinEvent handles a blockchain event, returning true if the event was created, false if it was a duplicate along with an error if any failures occur
func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) (bool, error) {
existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent)
if err != nil {
return err
return false, err
}
if existing != nil {
log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID)
// Return the ID of the existing event
chainEvent.ID = existing.ID
return nil
return false, nil
}
topic := em.getTopicForChainListener(listener)
ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic)
return em.database.InsertEvent(ctx, ffEvent)
return true, em.database.InsertEvent(ctx, ffEvent)
}

func (em *eventManager) getChainListenerCached(cacheKey string, getter func() (*core.ContractListener, error)) (*core.ContractListener, error) {
Expand Down
4 changes: 3 additions & 1 deletion internal/events/blockchain_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func TestContractEventWrongNS(t *testing.T) {

}

// TODO: Add test case for event not existing
func TestPersistBlockchainEventDuplicate(t *testing.T) {
em := newTestEventManager(t)
defer em.cleanup(t)
Expand All @@ -173,9 +174,10 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
em.mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev).
Return(&core.BlockchainEvent{ID: existingID}, nil)

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
created, err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.NoError(t, err)
assert.Equal(t, existingID, ev.ID)
assert.False(t, created)

}

Expand Down
2 changes: 1 addition & 1 deletion internal/events/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func TestEventFilterOnSubscriptionMatchesEventType(t *testing.T) {
filteredEvents, _ = em.FilterHistoricalEventsOnSubscription(context.Background(), events, subscription)
assert.NotNil(t, filteredEvents)
assert.Equal(t, 1, len(filteredEvents))

listenerUuid := fftypes.NewUUID()

events[0].Event.Topic = ""
Expand Down
7 changes: 5 additions & 2 deletions internal/events/token_pool_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ func (em *eventManager) confirmPool(ctx context.Context, pool *core.TokenPool, e
Type: pool.TX.Type,
BlockchainID: blockchainID,
})
if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil {
created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil)
if err != nil {
return err
}
em.emitBlockchainEventMetric(ev)
if created {
em.emitBlockchainEventMetric(ev)
}
}
if _, err := em.txHelper.PersistTransaction(ctx, pool.TX.ID, pool.TX.Type, blockchainID); err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions internal/events/tokens_approved.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -97,10 +97,13 @@ func (em *eventManager) persistTokenApproval(ctx context.Context, approval *toke
Type: approval.TX.Type,
BlockchainID: approval.Event.BlockchainTXID,
})
if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil {
created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil)
if err != nil {
return false, err
}
em.emitBlockchainEventMetric(approval.Event)
if created {
em.emitBlockchainEventMetric(approval.Event)
}
approval.BlockchainEvent = chainEvent.ID

fb := database.TokenApprovalQueryFactory.NewFilter(ctx)
Expand Down
9 changes: 6 additions & 3 deletions internal/events/tokens_transferred.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -89,10 +89,13 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke
Type: transfer.TX.Type,
BlockchainID: transfer.Event.BlockchainTXID,
})
if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil {
created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil)
if err != nil {
return false, err
}
em.emitBlockchainEventMetric(transfer.Event)
if created {
em.emitBlockchainEventMetric(transfer.Event)
}
transfer.BlockchainEvent = chainEvent.ID

// This is a no-op if we've already persisted this token transfer
Expand Down

0 comments on commit 74e2cbd

Please sign in to comment.