Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move FinalizeBlock event staging logic into a generic EventStager #2435

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions protocol/finalizeblock/event_stager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package finalizeblock

import (
"encoding/binary"

"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
)

// EventStager supports staging and retrieval of events (of type T) from FinalizeBlock.
type EventStager[T proto.Message] struct {
transientStoreKey storetypes.StoreKey
cdc codec.BinaryCodec
stagedEventCountKey string
stagedEventKeyPrefix string
}

// NewEventStager creates a new EventStager.
func NewEventStager[T proto.Message](
transientStoreKey storetypes.StoreKey,
cdc codec.BinaryCodec,
stagedEventCountKey string,
stagedEventKeyPrefix string,
) EventStager[T] {
return EventStager[T]{
transientStoreKey: transientStoreKey,
cdc: cdc,
stagedEventCountKey: stagedEventCountKey,
stagedEventKeyPrefix: stagedEventKeyPrefix,
}
}

// GetStagedFinalizeBlockEvents retrieves all staged events from the store.
func (s EventStager[T]) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
newStagedEvent func() T,
) []T {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(s.transientStoreKey)

count := s.getStagedEventsCount(store)
events := make([]T, count)
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
for i := uint32(0); i < count; i++ {
event := newStagedEvent()
bytes := store.Get(lib.Uint32ToKey(i))
s.cdc.MustUnmarshal(bytes, event)
events[i] = event
teddyding marked this conversation as resolved.
Show resolved Hide resolved
}
return events
}

func (s EventStager[T]) getStagedEventsCount(
store storetypes.KVStore,
) uint32 {
countsBytes := store.Get([]byte(s.stagedEventCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
teddyding marked this conversation as resolved.
Show resolved Hide resolved
}

// StageFinalizeBlockEvent stages an event in the transient store.
func (s EventStager[T]) StageFinalizeBlockEvent(
ctx sdk.Context,
stagedEvent T,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(s.transientStoreKey)

// Increment events count.
count := s.getStagedEventsCount(store)
store.Set([]byte(s.stagedEventCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
store.Set(lib.Uint32ToKey(count), s.cdc.MustMarshal(stagedEvent))
}
Comment on lines +68 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve error handling and consider gas usage in StageFinalizeBlockEvent.

The method effectively stages events, but there are areas for improvement:

  1. The use of an infinite gas meter (NewFreeInfiniteGasMeter()) bypasses gas consumption checks. This could potentially lead to security vulnerabilities or unintended behaviors in gas accounting.

  2. The use of MustMarshal can cause panics if marshaling fails. This could lead to stability issues in production.

  3. There's no validation of the input event.

To address these concerns:

  1. Consider using a regular context with gas metering, or document why infinite gas is necessary here.

  2. Replace MustMarshal with regular Marshal and handle potential errors gracefully. For example:

bytes, err := s.cdc.Marshal(stagedEvent)
if err != nil {
    // Handle the error appropriately, such as logging or returning it
    return fmt.Errorf("failed to marshal event: %w", err)
}
store.Set(lib.Uint32ToKey(count), bytes)
  1. Add validation for the input event to ensure it's not nil:
if stagedEvent == nil {
    return fmt.Errorf("staged event cannot be nil")
}

These changes will improve the robustness and safety of the code.

107 changes: 30 additions & 77 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package streaming

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -11,7 +10,6 @@ import (
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -22,6 +20,8 @@ import (
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"

ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
)

var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
Expand Down Expand Up @@ -62,6 +62,8 @@ type FullNodeStreamingManagerImpl struct {

// stores the staged FinalizeBlock events for full node streaming.
streamingManagerTransientStoreKey storetypes.StoreKey

finalizeBlockStager finalizeblock.EventStager[*clobtypes.StagedFinalizeBlockEvent]
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -119,6 +121,12 @@ func NewFullNodeStreamingManager(

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
cdc: cdc,
finalizeBlockStager: finalizeblock.NewEventStager[*clobtypes.StagedFinalizeBlockEvent](
streamingManagerTransientStoreKey,
cdc,
StagedEventsCountKey,
StagedEventsKeyPrefix,
),
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -381,14 +389,6 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
}
}

func getStagedEventsCount(store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(StagedEventsCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// Send a subaccount update event.
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
ctx sdk.Context,
Expand All @@ -405,51 +405,30 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
&stagedEvent,
)
}

func getStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
cdc codec.BinaryCodec,
) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
cdc.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)

// Increment events count.
count := getStagedEventsCount(store)
store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
events := sm.finalizeBlockStager.GetStagedFinalizeBlockEvents(
ctx,
func() *clobtypes.StagedFinalizeBlockEvent {
return &clobtypes.StagedFinalizeBlockEvent{}
},
)
results := make([]clobtypes.StagedFinalizeBlockEvent, len(events))
for i, event := range events {
if event == nil {
panic("Got nil event from finalizeBlockStager")
}
results[i] = *event
}
return results
}

// SendCombinedSnapshot sends messages to a particular subscriber without buffering.
Expand Down Expand Up @@ -574,9 +553,9 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
},
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
&stagedEvent,
)
}

Expand Down Expand Up @@ -649,9 +628,9 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
},
}

sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
&stagedEvent,
)
}

Expand Down Expand Up @@ -710,32 +689,6 @@ func getStreamUpdatesForSubaccountUpdates(
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
Copy link
Contributor Author

@teddyding teddyding Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and below interface are unused and should've been deleted (as part of internalize PR)

subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendFinalizedSubaccountUpdatesLatency,
time.Now(),
)

if execMode != sdk.ExecModeFinalize {
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}

// AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
Expand Down
7 changes: 0 additions & 7 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
) {
}

func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool {
return false
}
Expand Down
5 changes: 0 additions & 5 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface {
takerOrder clobtypes.StreamTakerOrder,
ctx sdk.Context,
)
SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
Expand Down
4 changes: 0 additions & 4 deletions protocol/x/clob/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ type SubaccountsKeeper interface {
revSharesForFill revsharetypes.RevSharesForFill,
fillForProcess FillForProcess,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []satypes.StreamSubaccountUpdate,
)
}

type AssetsKeeper interface {
Expand Down
16 changes: 0 additions & 16 deletions protocol/x/subaccounts/keeper/subaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals(
func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
return k.streamingManager
}

// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager.
func (k Keeper) SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []types.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
if len(subaccountUpdates) == 0 {
return
}
k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates(
subaccountUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
4 changes: 0 additions & 4 deletions protocol/x/subaccounts/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,4 @@ type SubaccountsKeeper interface {
perpetualId uint32,
blockHeight uint32,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []StreamSubaccountUpdate,
)
}
Loading