Skip to content

Commit

Permalink
Stage events and process in Precommitter
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Oct 2, 2024
1 parent 9fecfc5 commit 3b92b79
Show file tree
Hide file tree
Showing 7 changed files with 504 additions and 75 deletions.
13 changes: 13 additions & 0 deletions proto/dydxprotocol/clob/finalize_block.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package dydxprotocol.clob;

import "dydxprotocol/clob/clob_pair.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";
3
// ClobStagedFinalizeBlockEvent defines a CLOB event staged during FinalizeBlock.
message ClobStagedFinalizeBlockEvent {
oneof event {
ClobPair create_clob_pair = 1;
}
}
2 changes: 2 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func Precommit(
ctx sdk.Context,
keeper keeper.Keeper,
) {
keeper.ProcessStagedFinalizeBlockEvents(ctx)

if streamingManager := keeper.GetFullNodeStreamingManager(); !streamingManager.Enabled() {
return
}
Expand Down
96 changes: 52 additions & 44 deletions protocol/x/clob/keeper/clob_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,42 @@ func (k Keeper) CreatePerpetualClobPair(
// Write the `ClobPair` to state.
k.SetClobPair(ctx, clobPair)

err := k.CreateClobPairStructures(ctx, clobPair)
if lib.IsDeliverTxMode(ctx) {
if err := k.StageNewClobPairSideEffects(ctx, clobPair); err != nil {
return clobPair, err
}
}

perpetualId, err := clobPair.GetPerpetualId()
if err != nil {
return clobPair, err
panic(err)
}
perpetual, err := k.perpetualsKeeper.GetPerpetual(ctx, perpetualId)
if err != nil {
return types.ClobPair{}, err
}

k.GetIndexerEventManager().AddTxnEvent(
ctx,
indexerevents.SubtypePerpetualMarket,
indexerevents.PerpetualMarketEventVersion,
indexer_manager.GetBytes(
indexerevents.NewPerpetualMarketCreateEvent(
perpetualId,
clobPair.Id,
perpetual.Params.Ticker,
perpetual.Params.MarketId,
clobPair.Status,
clobPair.QuantumConversionExponent,
perpetual.Params.AtomicResolution,
clobPair.SubticksPerTick,
clobPair.StepBaseQuantums,
perpetual.Params.LiquidityTier,
perpetual.Params.MarketType,
),
),
)

return clobPair, nil
}

Expand Down Expand Up @@ -159,51 +190,29 @@ func (k Keeper) maybeCreateOrderbook(ctx sdk.Context, clobPair types.ClobPair) {
k.MemClob.MaybeCreateOrderbook(clobPair)
}

// createOrderbook creates a new orderbook in the memclob.
func (k Keeper) createOrderbook(ctx sdk.Context, clobPair types.ClobPair) {
// Create the corresponding orderbook in the memclob.
func (k Keeper) ApplySideEffectsForCNewlobPair(
ctx sdk.Context,
clobPair types.ClobPair,
) {
k.MemClob.CreateOrderbook(clobPair)
k.SetClobPairIdForPerpetual(clobPair)
}

// CreateClobPair performs all non stateful operations to create a CLOB pair.
// These include creating the corresponding orderbook in the memclob, the mapping between
// the CLOB pair and the perpetual and the indexer event.
// This function returns an error if a value for the ClobPair's id already exists in state.
func (k Keeper) CreateClobPairStructures(ctx sdk.Context, clobPair types.ClobPair) error {
// Create the corresponding orderbook in the memclob.
k.createOrderbook(ctx, clobPair)

// Create the mapping between clob pair and perpetual.
k.SetClobPairIdForPerpetual(ctx, clobPair)
// StageNewClobPairSideEffects stages a ClobPair creation event, so that any in-memory side effects
// can happen later when the transaction and block is committed.
// Note the staged event will be processed only if below are both true:
// - The current transaction is committed.
// - The current block is agreed upon and committed by consensus.
func (k Keeper) StageNewClobPairSideEffects(ctx sdk.Context, clobPair types.ClobPair) error {
lib.AssertDeliverTxMode(ctx)

perpetualId, err := clobPair.GetPerpetualId()
if err != nil {
panic(err)
}
perpetual, err := k.perpetualsKeeper.GetPerpetual(ctx, perpetualId)
if err != nil {
return err
}

k.GetIndexerEventManager().AddTxnEvent(
k.finalizeBlockEventStager.StageFinalizeBlockEvent(
ctx,
indexerevents.SubtypePerpetualMarket,
indexerevents.PerpetualMarketEventVersion,
indexer_manager.GetBytes(
indexerevents.NewPerpetualMarketCreateEvent(
perpetualId,
clobPair.Id,
perpetual.Params.Ticker,
perpetual.Params.MarketId,
clobPair.Status,
clobPair.QuantumConversionExponent,
perpetual.Params.AtomicResolution,
clobPair.SubticksPerTick,
clobPair.StepBaseQuantums,
perpetual.Params.LiquidityTier,
perpetual.Params.MarketType,
),
),
&types.ClobStagedFinalizeBlockEvent{
Event: &types.ClobStagedFinalizeBlockEvent_CreateClobPair{
CreateClobPair: &clobPair,
},
},
)

return nil
Expand Down Expand Up @@ -234,14 +243,13 @@ func (k Keeper) HydrateClobPairAndPerpetualMapping(ctx sdk.Context) {
for _, clobPair := range clobPairs {
// Create the corresponding mapping between clob pair and perpetual.
k.SetClobPairIdForPerpetual(
ctx,
clobPair,
)
}
}

// SetClobPairIdForPerpetual sets the mapping between clob pair and perpetual.
func (k Keeper) SetClobPairIdForPerpetual(ctx sdk.Context, clobPair types.ClobPair) {
func (k Keeper) SetClobPairIdForPerpetual(clobPair types.ClobPair) {
// If this `ClobPair` is for a perpetual, add the `clobPairId` to the list of CLOB pair IDs
// that facilitate trading of this perpetual.
if perpetualClobMetadata := clobPair.GetPerpetualClobMetadata(); perpetualClobMetadata != nil {
Expand Down
41 changes: 39 additions & 2 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync/atomic"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
Expand All @@ -15,6 +16,7 @@ import (
liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
dydxlog "github.com/dydxprotocol/v4-chain/protocol/lib/log"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types"
flags "github.com/dydxprotocol/v4-chain/protocol/x/clob/flags"
Expand Down Expand Up @@ -46,8 +48,9 @@ type (
revshareKeeper types.RevShareKeeper
accountPlusKeeper types.AccountPlusKeeper

indexerEventManager indexer_manager.IndexerEventManager
streamingManager streamingtypes.FullNodeStreamingManager
indexerEventManager indexer_manager.IndexerEventManager
streamingManager streamingtypes.FullNodeStreamingManager
finalizeBlockEventStager finalizeblock.EventStager[*types.ClobStagedFinalizeBlockEvent]

initialized *atomic.Bool
memStoreInitialized *atomic.Bool
Expand Down Expand Up @@ -199,6 +202,40 @@ func (k Keeper) Initialize(ctx sdk.Context) {
k.HydrateClobPairAndPerpetualMapping(checkCtx)
}

func (k Keeper) ProcessStagedFinalizeBlockEvents(ctx sdk.Context) {
stagedEvents := k.finalizeBlockEventStager.GetStagedFinalizeBlockEvents(
ctx,
func() *types.ClobStagedFinalizeBlockEvent {
return &types.ClobStagedFinalizeBlockEvent{}
},
)

for _, stagedEvent := range stagedEvents {
if stagedEvent == nil {
// We don't ever expect this. However, should not panic since we are in Precommit.
dydxlog.ErrorLog(
ctx,
"got nil ClobStagedFinalizeBlockEvent, skipping",
"staged_events",
stagedEvents,
)
continue
}

switch event := stagedEvent.Event.(type) {
case *types.ClobStagedFinalizeBlockEvent_CreateClobPair:
k.ApplySideEffectsForCNewlobPair(ctx, *event.CreateClobPair)
default:
dydxlog.ErrorLog(
ctx,
"got unknown ClobStagedFinalizeBlockEvent",
"event",
event,
)
}
}
}

// InitMemStore initializes the memstore of the `clob` keeper.
// This is called during app initialization in `app.go`, before any ABCI calls are received.
func (k Keeper) InitMemStore(ctx sdk.Context) {
Expand Down
Loading

0 comments on commit 3b92b79

Please sign in to comment.