Skip to content

Commit

Permalink
Stage ClobPair creation
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Oct 3, 2024
1 parent 8fdf8c5 commit dc672ad
Show file tree
Hide file tree
Showing 14 changed files with 894 additions and 352 deletions.
516 changes: 259 additions & 257 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/bundle.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { ClobPair, ClobPairSDKType } from "./clob_pair";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial } from "../../helpers";
/**
* ClobStagedFinalizeBlockEvent defines a CLOB event staged during
* FinalizeBlock.
*/

export interface ClobStagedFinalizeBlockEvent {
/** create_clob_pair indicates a new CLOB pair creation. */
createClobPair?: ClobPair;
}
/**
* ClobStagedFinalizeBlockEvent defines a CLOB event staged during
* FinalizeBlock.
*/

export interface ClobStagedFinalizeBlockEventSDKType {
/** create_clob_pair indicates a new CLOB pair creation. */
create_clob_pair?: ClobPairSDKType;
}

function createBaseClobStagedFinalizeBlockEvent(): ClobStagedFinalizeBlockEvent {
return {
createClobPair: undefined
};
}

export const ClobStagedFinalizeBlockEvent = {
encode(message: ClobStagedFinalizeBlockEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.createClobPair !== undefined) {
ClobPair.encode(message.createClobPair, writer.uint32(10).fork()).ldelim();
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): ClobStagedFinalizeBlockEvent {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseClobStagedFinalizeBlockEvent();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.createClobPair = ClobPair.decode(reader, reader.uint32());
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<ClobStagedFinalizeBlockEvent>): ClobStagedFinalizeBlockEvent {
const message = createBaseClobStagedFinalizeBlockEvent();
message.createClobPair = object.createClobPair !== undefined && object.createClobPair !== null ? ClobPair.fromPartial(object.createClobPair) : undefined;
return message;
}

};
4 changes: 2 additions & 2 deletions indexer/packages/v4-protos/src/codegen/gogoproto/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import * as _128 from "./gogo";
export const gogoproto = { ..._128
import * as _129 from "./gogo";
export const gogoproto = { ..._129
};
22 changes: 11 additions & 11 deletions indexer/packages/v4-protos/src/codegen/google/bundle.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import * as _129 from "./api/annotations";
import * as _130 from "./api/http";
import * as _131 from "./protobuf/descriptor";
import * as _132 from "./protobuf/duration";
import * as _133 from "./protobuf/timestamp";
import * as _134 from "./protobuf/any";
import * as _130 from "./api/annotations";
import * as _131 from "./api/http";
import * as _132 from "./protobuf/descriptor";
import * as _133 from "./protobuf/duration";
import * as _134 from "./protobuf/timestamp";
import * as _135 from "./protobuf/any";
export namespace google {
export const api = { ..._129,
..._130
export const api = { ..._130,
..._131
};
export const protobuf = { ..._131,
..._132,
export const protobuf = { ..._132,
..._133,
..._134
..._134,
..._135
};
}
16 changes: 16 additions & 0 deletions proto/dydxprotocol/clob/finalize_block.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";
package dydxprotocol.clob;

import "dydxprotocol/clob/clob_pair.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

// ClobStagedFinalizeBlockEvent defines a CLOB event staged during
// FinalizeBlock.
message ClobStagedFinalizeBlockEvent {
// event is the staged event.
oneof event {
// create_clob_pair indicates a new CLOB pair creation.
ClobPair create_clob_pair = 1;
}
}
5 changes: 4 additions & 1 deletion protocol/lib/tx_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ func AssertDeliverTxMode(ctx sdk.Context) {
}

func IsDeliverTxMode(ctx sdk.Context) bool {
return !ctx.IsCheckTx() && !ctx.IsReCheckTx()
if ctx.IsCheckTx() || ctx.IsReCheckTx() || ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck {
return false
}
return true
}

func AssertCheckTxMode(ctx sdk.Context) {
Expand Down
2 changes: 2 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func Precommit(
ctx sdk.Context,
keeper keeper.Keeper,
) {
keeper.ProcessStagedFinalizeBlockEvents(ctx)

if streamingManager := keeper.GetFullNodeStreamingManager(); !streamingManager.Enabled() {
return
}
Expand Down
97 changes: 53 additions & 44 deletions protocol/x/clob/keeper/clob_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,43 @@ func (k Keeper) CreatePerpetualClobPair(
// Write the `ClobPair` to state.
k.SetClobPair(ctx, clobPair)

err := k.CreateClobPairStructures(ctx, clobPair)
// Don't stage events for the genesis block.
if lib.IsDeliverTxMode(ctx) {
if err := k.StageNewClobPairSideEffects(ctx, clobPair); err != nil {
return clobPair, err
}
}

perpetualId, err := clobPair.GetPerpetualId()
if err != nil {
return clobPair, err
panic(err)
}
perpetual, err := k.perpetualsKeeper.GetPerpetual(ctx, perpetualId)
if err != nil {
return types.ClobPair{}, err
}

k.GetIndexerEventManager().AddTxnEvent(
ctx,
indexerevents.SubtypePerpetualMarket,
indexerevents.PerpetualMarketEventVersion,
indexer_manager.GetBytes(
indexerevents.NewPerpetualMarketCreateEvent(
perpetualId,
clobPair.Id,
perpetual.Params.Ticker,
perpetual.Params.MarketId,
clobPair.Status,
clobPair.QuantumConversionExponent,
perpetual.Params.AtomicResolution,
clobPair.SubticksPerTick,
clobPair.StepBaseQuantums,
perpetual.Params.LiquidityTier,
perpetual.Params.MarketType,
),
),
)

return clobPair, nil
}

Expand Down Expand Up @@ -159,51 +191,29 @@ func (k Keeper) maybeCreateOrderbook(ctx sdk.Context, clobPair types.ClobPair) {
k.MemClob.MaybeCreateOrderbook(clobPair)
}

// createOrderbook creates a new orderbook in the memclob.
func (k Keeper) createOrderbook(ctx sdk.Context, clobPair types.ClobPair) {
// Create the corresponding orderbook in the memclob.
func (k Keeper) ApplySideEffectsForCNewlobPair(
ctx sdk.Context,
clobPair types.ClobPair,
) {
k.MemClob.CreateOrderbook(clobPair)
k.SetClobPairIdForPerpetual(clobPair)
}

// CreateClobPair performs all non stateful operations to create a CLOB pair.
// These include creating the corresponding orderbook in the memclob, the mapping between
// the CLOB pair and the perpetual and the indexer event.
// This function returns an error if a value for the ClobPair's id already exists in state.
func (k Keeper) CreateClobPairStructures(ctx sdk.Context, clobPair types.ClobPair) error {
// Create the corresponding orderbook in the memclob.
k.createOrderbook(ctx, clobPair)

// Create the mapping between clob pair and perpetual.
k.SetClobPairIdForPerpetual(ctx, clobPair)
// StageNewClobPairSideEffects stages a ClobPair creation event, so that any in-memory side effects
// can happen later when the transaction and block is committed.
// Note the staged event will be processed only if below are both true:
// - The current transaction is committed.
// - The current block is agreed upon and committed by consensus.
func (k Keeper) StageNewClobPairSideEffects(ctx sdk.Context, clobPair types.ClobPair) error {
lib.AssertDeliverTxMode(ctx)

perpetualId, err := clobPair.GetPerpetualId()
if err != nil {
panic(err)
}
perpetual, err := k.perpetualsKeeper.GetPerpetual(ctx, perpetualId)
if err != nil {
return err
}

k.GetIndexerEventManager().AddTxnEvent(
k.finalizeBlockEventStager.StageFinalizeBlockEvent(
ctx,
indexerevents.SubtypePerpetualMarket,
indexerevents.PerpetualMarketEventVersion,
indexer_manager.GetBytes(
indexerevents.NewPerpetualMarketCreateEvent(
perpetualId,
clobPair.Id,
perpetual.Params.Ticker,
perpetual.Params.MarketId,
clobPair.Status,
clobPair.QuantumConversionExponent,
perpetual.Params.AtomicResolution,
clobPair.SubticksPerTick,
clobPair.StepBaseQuantums,
perpetual.Params.LiquidityTier,
perpetual.Params.MarketType,
),
),
&types.ClobStagedFinalizeBlockEvent{
Event: &types.ClobStagedFinalizeBlockEvent_CreateClobPair{
CreateClobPair: &clobPair,
},
},
)

return nil
Expand Down Expand Up @@ -234,14 +244,13 @@ func (k Keeper) HydrateClobPairAndPerpetualMapping(ctx sdk.Context) {
for _, clobPair := range clobPairs {
// Create the corresponding mapping between clob pair and perpetual.
k.SetClobPairIdForPerpetual(
ctx,
clobPair,
)
}
}

// SetClobPairIdForPerpetual sets the mapping between clob pair and perpetual.
func (k Keeper) SetClobPairIdForPerpetual(ctx sdk.Context, clobPair types.ClobPair) {
func (k Keeper) SetClobPairIdForPerpetual(clobPair types.ClobPair) {
// If this `ClobPair` is for a perpetual, add the `clobPairId` to the list of CLOB pair IDs
// that facilitate trading of this perpetual.
if perpetualClobMetadata := clobPair.GetPerpetualClobMetadata(); perpetualClobMetadata != nil {
Expand Down
54 changes: 50 additions & 4 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync/atomic"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
Expand All @@ -15,6 +16,7 @@ import (
liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
dydxlog "github.com/dydxprotocol/v4-chain/protocol/lib/log"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types"
flags "github.com/dydxprotocol/v4-chain/protocol/x/clob/flags"
Expand Down Expand Up @@ -46,8 +48,9 @@ type (
revshareKeeper types.RevShareKeeper
accountPlusKeeper types.AccountPlusKeeper

indexerEventManager indexer_manager.IndexerEventManager
streamingManager streamingtypes.FullNodeStreamingManager
indexerEventManager indexer_manager.IndexerEventManager
streamingManager streamingtypes.FullNodeStreamingManager
finalizeBlockEventStager finalizeblock.EventStager[*types.ClobStagedFinalizeBlockEvent]

initialized *atomic.Bool
memStoreInitialized *atomic.Bool
Expand Down Expand Up @@ -76,7 +79,7 @@ func NewKeeper(
cdc codec.BinaryCodec,
storeKey storetypes.StoreKey,
memKey storetypes.StoreKey,
liquidationsStoreKey storetypes.StoreKey,
transientStoreKey storetypes.StoreKey,
authorities []string,
memClob types.MemClob,
subaccountsKeeper types.SubaccountsKeeper,
Expand All @@ -102,7 +105,7 @@ func NewKeeper(
cdc: cdc,
storeKey: storeKey,
memKey: memKey,
transientStoreKey: liquidationsStoreKey,
transientStoreKey: transientStoreKey,
authorities: lib.UniqueSliceToSet(authorities),
MemClob: memClob,
PerpetualIdToClobPairId: make(map[uint32][]types.ClobPairId),
Expand Down Expand Up @@ -131,6 +134,12 @@ func NewKeeper(
placeCancelOrderRateLimiter: placeCancelOrderRateLimiter,
DaemonLiquidationInfo: daemonLiquidationInfo,
revshareKeeper: revshareKeeper,
finalizeBlockEventStager: finalizeblock.NewEventStager[*types.ClobStagedFinalizeBlockEvent](
transientStoreKey,
cdc,
types.StagedEventsCountKey,
types.StagedEventsKeyPrefix,
),
}

// Provide the keeper to the MemClob.
Expand Down Expand Up @@ -199,6 +208,43 @@ func (k Keeper) Initialize(ctx sdk.Context) {
k.HydrateClobPairAndPerpetualMapping(checkCtx)
}

func (k Keeper) GetStagedClobFinalizeBlockEvents(ctx sdk.Context) []*types.ClobStagedFinalizeBlockEvent {
return k.finalizeBlockEventStager.GetStagedFinalizeBlockEvents(
ctx,
func() *types.ClobStagedFinalizeBlockEvent {
return &types.ClobStagedFinalizeBlockEvent{}
},
)
}

func (k Keeper) ProcessStagedFinalizeBlockEvents(ctx sdk.Context) {
stagedEvents := k.GetStagedClobFinalizeBlockEvents(ctx)
for _, stagedEvent := range stagedEvents {
if stagedEvent == nil {
// We don't ever expect this. However, should not panic since we are in Precommit.
dydxlog.ErrorLog(
ctx,
"got nil ClobStagedFinalizeBlockEvent, skipping",
"staged_events",
stagedEvents,
)
continue
}

switch event := stagedEvent.Event.(type) {
case *types.ClobStagedFinalizeBlockEvent_CreateClobPair:
k.ApplySideEffectsForCNewlobPair(ctx, *event.CreateClobPair)
default:
dydxlog.ErrorLog(
ctx,
"got unknown ClobStagedFinalizeBlockEvent",
"event",
event,
)
}
}
}

// InitMemStore initializes the memstore of the `clob` keeper.
// This is called during app initialization in `app.go`, before any ABCI calls are received.
func (k Keeper) InitMemStore(ctx sdk.Context) {
Expand Down
Loading

0 comments on commit dc672ad

Please sign in to comment.