Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move FinalizeBlock event staging logic into a generic EventStager #2435

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions protocol/finalizeblock/event_stager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package finalizeblock

import (
"encoding/binary"

"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
)

// EventStager supports staging and retrieval of events (of type T) from FinalizeBlock.
type EventStager[T proto.Message] struct {
transientStoreKey storetypes.StoreKey
cdc codec.BinaryCodec
stagedEventCountKey string
stagedEventKeyPrefix string
}

// NewEventStager creates a new EventStager.
func NewEventStager[T proto.Message](
transientStoreKey storetypes.StoreKey,
cdc codec.BinaryCodec,
stagedEventCountKey string,
stagedEventKeyPrefix string,
) EventStager[T] {
return EventStager[T]{
transientStoreKey: transientStoreKey,
cdc: cdc,
stagedEventCountKey: stagedEventCountKey,
stagedEventKeyPrefix: stagedEventKeyPrefix,
}
}

// GetStagedFinalizeBlockEventsFromStore retrieves all staged events from the store.
func (s EventStager[T]) GetStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also meganit: take in ctx and move the free gas meter here for consistency w setter method

newStagedEvent func() T,
) []T {
count := s.getStagedEventsCount(store)
events := make([]T, count)
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
for i := uint32(0); i < count; i++ {
event := newStagedEvent()
bytes := store.Get(lib.Uint32ToKey(i))
s.cdc.MustUnmarshal(bytes, event)
events[i] = event
teddyding marked this conversation as resolved.
Show resolved Hide resolved
}
return events
}

func (s EventStager[T]) getStagedEventsCount(
store storetypes.KVStore,
) uint32 {
countsBytes := store.Get([]byte(s.stagedEventCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
teddyding marked this conversation as resolved.
Show resolved Hide resolved
}

// StageFinalizeBlockEvent stages an event in the transient store.
func (s EventStager[T]) StageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())

store := noGasCtx.TransientStore(s.transientStoreKey)
teddyding marked this conversation as resolved.
Show resolved Hide resolved

// Increment events count.
count := s.getStagedEventsCount(store)
store.Set([]byte(s.stagedEventCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(s.stagedEventKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
}
99 changes: 27 additions & 72 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package streaming

import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -11,7 +10,6 @@ import (
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -22,6 +20,8 @@ import (
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"

ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
)

var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
Expand Down Expand Up @@ -62,6 +62,8 @@ type FullNodeStreamingManagerImpl struct {

// stores the staged FinalizeBlock events for full node streaming.
streamingManagerTransientStoreKey storetypes.StoreKey

finalizeBlockStager finalizeblock.EventStager[*clobtypes.StagedFinalizeBlockEvent]
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
Expand Down Expand Up @@ -119,6 +121,12 @@ func NewFullNodeStreamingManager(

streamingManagerTransientStoreKey: streamingManagerTransientStoreKey,
cdc: cdc,
finalizeBlockStager: finalizeblock.NewEventStager[*clobtypes.StagedFinalizeBlockEvent](
streamingManagerTransientStoreKey,
cdc,
StagedEventsCountKey,
StagedEventsKeyPrefix,
),
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -381,14 +389,6 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
}
}

func getStagedEventsCount(store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(StagedEventsCountKey))
if countsBytes == nil {
return 0
}
return binary.BigEndian.Uint32(countsBytes)
}

// Send a subaccount update event.
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
ctx sdk.Context,
Expand All @@ -405,51 +405,32 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
SubaccountUpdate: &subaccountUpdate,
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meganit: the Getter returns []T so let's also pass in T instead of []byte for consistency

}

func getStagedFinalizeBlockEventsFromStore(
store storetypes.KVStore,
cdc codec.BinaryCodec,
) []clobtypes.StagedFinalizeBlockEvent {
count := getStagedEventsCount(store)
events := make([]clobtypes.StagedFinalizeBlockEvent, count)
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
for i := uint32(0); i < count; i++ {
var event clobtypes.StagedFinalizeBlockEvent
bytes := store.Get(lib.Uint32ToKey(i))
cdc.MustUnmarshal(bytes, &event)
events[i] = event
}
return events
}

// Retrieve all events staged during `FinalizeBlock`.
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)
return getStagedFinalizeBlockEventsFromStore(store, sm.cdc)
}

func (sm *FullNodeStreamingManagerImpl) stageFinalizeBlockEvent(
ctx sdk.Context,
eventBytes []byte,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(sm.streamingManagerTransientStoreKey)

// Increment events count.
count := getStagedEventsCount(store)
store.Set([]byte(StagedEventsCountKey), lib.Uint32ToKey(count+1))

// Store events keyed by index.
store = prefix.NewStore(store, []byte(StagedEventsKeyPrefix))
store.Set(lib.Uint32ToKey(count), eventBytes)
events := sm.finalizeBlockStager.GetStagedFinalizeBlockEventsFromStore(
store,
func() *clobtypes.StagedFinalizeBlockEvent {
return &clobtypes.StagedFinalizeBlockEvent{}
},
)
results := make([]clobtypes.StagedFinalizeBlockEvent, len(events))
for i, event := range events {
if event == nil {
panic("Got nil event from finalizeBlockStager")
}
results[i] = *event
}
return results
teddyding marked this conversation as resolved.
Show resolved Hide resolved
}

// SendCombinedSnapshot sends messages to a particular subscriber without buffering.
Expand Down Expand Up @@ -574,7 +555,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
},
},
}
sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
Expand Down Expand Up @@ -649,7 +630,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
},
}

sm.stageFinalizeBlockEvent(
sm.finalizeBlockStager.StageFinalizeBlockEvent(
ctx,
sm.cdc.MustMarshal(&stagedEvent),
)
Expand Down Expand Up @@ -710,32 +691,6 @@ func getStreamUpdatesForSubaccountUpdates(
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
Copy link
Contributor Author

@teddyding teddyding Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and below interface are unused and should've been deleted (as part of internalize PR)

subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendFinalizedSubaccountUpdatesLatency,
time.Now(),
)

if execMode != sdk.ExecModeFinalize {
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}

// AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
Expand Down
7 changes: 0 additions & 7 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
) {
}

func (sm *NoopGrpcStreamingManager) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
}

func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool {
return false
}
Expand Down
5 changes: 0 additions & 5 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface {
takerOrder clobtypes.StreamTakerOrder,
ctx sdk.Context,
)
SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
Expand Down
4 changes: 0 additions & 4 deletions protocol/x/clob/types/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ type SubaccountsKeeper interface {
revSharesForFill revsharetypes.RevSharesForFill,
fillForProcess FillForProcess,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []satypes.StreamSubaccountUpdate,
)
}

type AssetsKeeper interface {
Expand Down
16 changes: 0 additions & 16 deletions protocol/x/subaccounts/keeper/subaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals(
func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
return k.streamingManager
}

// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager.
func (k Keeper) SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []types.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
if len(subaccountUpdates) == 0 {
return
}
k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates(
subaccountUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}
4 changes: 0 additions & 4 deletions protocol/x/subaccounts/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,4 @@ type SubaccountsKeeper interface {
perpetualId uint32,
blockHeight uint32,
) error
SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []StreamSubaccountUpdate,
)
}
Loading