Skip to content

Commit

Permalink
Feat/split table (#93)
Browse files Browse the repository at this point in the history
* feat: split table

* feat: update

* feat: update

* feat: update

* feat: change migrate sql

* feat: fix

* feat: update name

* feat: address comments

* feat: update log info

* feat: update sql

* feat: delete token_type

* fix check eth_amount_status 0 messages

* add l1 messenger start balance

* feat: fix

---------

Co-authored-by: colinlyguo <[email protected]>
  • Loading branch information
georgehao and colinlyguo authored Dec 1, 2023
1 parent d14fbc7 commit eae073e
Show file tree
Hide file tree
Showing 23 changed files with 1,289 additions and 934 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ build/bin
coverage.txt
*.integration.txt
conf/config.local.json
conf/config.sepolia.json

# Visual Studio Code
.vscode
Expand Down
3 changes: 2 additions & 1 deletion conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"scroll_messenger": "0x50c7d3e7f7c656493D1D76aaa1a836CedfCBB16A",
"message_queue": "0xF0B2293F5D834eAe920c6974D50957A1732de763",
"scroll_chain": "0x2D567EcE699Eabe5afCd141eDB7A4f2D0D6ce8a0"
}
},
"start_messenger_balance": 10000000000000000000
},
"l2_config": {
"l2_url": "<l2 node rpc url>",
Expand Down
9 changes: 5 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type L1Contracts struct {

// L1Config l1 chain config.
type L1Config struct {
L1URL string `json:"l1_url"`
Confirm rpc.BlockNumber
L1Contracts *L1Contracts `json:"l1_contracts"`
StartNumber uint64 `json:"start_number"`
L1URL string `json:"l1_url"`
Confirm rpc.BlockNumber
L1Contracts *L1Contracts `json:"l1_contracts"`
StartNumber uint64 `json:"start_number"`
StartMessengerBalance uint64 `json:"start_messenger_balance"`
}

// L2Contracts l1chain config.
Expand Down
101 changes: 39 additions & 62 deletions internal/controller/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"gorm.io/gorm"

"github.com/scroll-tech/chain-monitor/internal/config"
"github.com/scroll-tech/chain-monitor/internal/logic/checker"
"github.com/scroll-tech/chain-monitor/internal/logic/assembler"
"github.com/scroll-tech/chain-monitor/internal/logic/contracts"
"github.com/scroll-tech/chain-monitor/internal/logic/events"
messagematch "github.com/scroll-tech/chain-monitor/internal/logic/message_match"
Expand All @@ -28,13 +28,13 @@ const maxBlockFetchSize uint64 = 49

// ContractController is a struct that manages the interaction with contracts on Layer 1 and Layer 2.
type ContractController struct {
l1Client *rpc.Client
l2Client *rpc.Client
conf *config.Config
eventGatherLogic *events.EventGather
contractsLogic *contracts.Contracts
checker *checker.Checker
messageMatchLogic *messagematch.LogicMessageMatch
l1Client *rpc.Client
l2Client *rpc.Client
conf *config.Config
eventGatherLogic *events.EventGather
contractsLogic *contracts.Contracts
messageMatchAssembler *assembler.MessageMatchAssembler
messageMatchLogic *messagematch.LogicMessageMatch

stopL1ContractChan chan struct{}
stopL2ContractChan chan struct{}
Expand All @@ -49,24 +49,26 @@ type ContractController struct {
contractControllerUpdateOrInsertMessageMatchFailureTotal *prometheus.CounterVec
contractControllerCheckWithdrawRootFailureTotal *prometheus.CounterVec

db *gorm.DB
messageMatchOrm *orm.MessageMatch
db *gorm.DB
messengerMessageMatchOrm *orm.MessengerMessageMatch
gatewayMessageMatchOrm *orm.GatewayMessageMatch
}

// NewContractController creates a new ContractController object.
func NewContractController(conf *config.Config, db *gorm.DB, l1Client, l2Client *rpc.Client) *ContractController {
c := &ContractController{
l1Client: l1Client,
l2Client: l2Client,
conf: conf,
eventGatherLogic: events.NewEventGather(),
contractsLogic: contracts.NewContracts(ethclient.NewClient(l1Client), ethclient.NewClient(l2Client)),
checker: checker.NewChecker(db),
messageMatchLogic: messagematch.NewMessageMatchLogic(conf, db),
stopL1ContractChan: make(chan struct{}),
stopL2ContractChan: make(chan struct{}),
db: db,
messageMatchOrm: orm.NewMessageMatch(db),
l1Client: l1Client,
l2Client: l2Client,
conf: conf,
eventGatherLogic: events.NewEventGather(),
contractsLogic: contracts.NewContracts(ethclient.NewClient(l1Client), ethclient.NewClient(l2Client)),
messageMatchAssembler: assembler.NewMessageMatchAssembler(db),
messageMatchLogic: messagematch.NewMessageMatchLogic(conf, db),
stopL1ContractChan: make(chan struct{}),
stopL2ContractChan: make(chan struct{}),
db: db,
messengerMessageMatchOrm: orm.NewMessengerMessageMatch(db),
gatewayMessageMatchOrm: orm.NewGatewayMessageMatch(db),
}

if err := c.contractsLogic.Register(c.conf); err != nil {
Expand Down Expand Up @@ -219,10 +221,10 @@ func (c *ContractController) watcherStart(ctx context.Context, client *ethclient
}

if loopEnd >= start {
var lastMessage *orm.MessageMatch
var lastMessage *orm.MessengerMessageMatch
if layer == types.Layer2 {
var checkErr error
lastMessage, checkErr = c.checker.CheckL2WithdrawRoots(ctx, start, loopEnd, c.l2Client, c.conf.L2Config.L2Contracts.MessageQueue)
lastMessage, checkErr = c.messageMatchAssembler.L2WithdrawRootsValidator(ctx, start, loopEnd, c.l2Client, c.conf.L2Config.L2Contracts.MessageQueue)
if checkErr != nil {
c.contractControllerCheckWithdrawRootFailureTotal.WithLabelValues(types.Layer2.String()).Inc()
log.Error("check withdraw roots failed", "layer", types.Layer2, "start", start, "end", loopEnd, "error", checkErr)
Expand All @@ -233,12 +235,16 @@ func (c *ContractController) watcherStart(ctx context.Context, client *ethclient
// Update last valid message's withdraw trie proof and block status after check.
err = c.db.Transaction(func(tx *gorm.DB) error {
if layer == types.Layer2 {
if err = c.messageMatchOrm.UpdateMsgProofAndStatus(ctx, lastMessage, tx); err != nil {
if err = c.messengerMessageMatchOrm.UpdateMsgProofAndStatus(ctx, lastMessage, tx); err != nil {
return fmt.Errorf("insert or update msg proof and status failed, err: %w, message: %+v", err, lastMessage)
}
}

if err = c.messageMatchOrm.UpdateBlockStatus(ctx, layer, start, loopEnd, tx); err != nil {
if err = c.messengerMessageMatchOrm.UpdateBlockStatus(ctx, layer, start, loopEnd, tx); err != nil {
return fmt.Errorf("update block status failed, err: %w", err)
}

if err = c.gatewayMessageMatchOrm.UpdateBlockStatus(ctx, layer, start, loopEnd, tx); err != nil {
return fmt.Errorf("update block status failed, err: %w", err)
}
return nil
Expand Down Expand Up @@ -269,7 +275,7 @@ func (c *ContractController) l1Watch(ctx context.Context, start uint64, end uint
return err
}
messengerEvents := c.eventGatherLogic.Dispatch(ctx, types.Layer1, types.MessengerEventCategory, messengerIterList)
messengerMessageMatches, err := c.checker.MessengerCheck(ctx, types.Layer1, messengerEvents)
messengerMessageMatches, err := c.messageMatchAssembler.MessageMatchAssembler(messengerEvents)
if err != nil {
log.Error("generate messenger message match failed", "layer", types.Layer2, "eventCategory", types.MessengerEventCategory, "error", err)
return err
Expand All @@ -279,7 +285,7 @@ func (c *ContractController) l1Watch(ctx context.Context, start uint64, end uint
return nil
}

var l1GatewayMessageMatches []orm.MessageMatch
var l1GatewayMessageMatches []orm.GatewayMessageMatch
for _, eventCategory := range c.l1EventCategoryList {
wrapIterList, err := c.contractsLogic.Iterator(ctx, &opts, types.Layer1, eventCategory)
if err != nil {
Expand All @@ -303,7 +309,7 @@ func (c *ContractController) l1Watch(ctx context.Context, start uint64, end uint
}

// match transfer event
retL1MessageMatches, checkErr := c.checker.GatewayCheck(ctx, eventCategory, gatewayEvents, messengerEvents, transferEvents)
retL1MessageMatches, checkErr := c.messageMatchAssembler.GatewayMessageAssembler(eventCategory, gatewayEvents, messengerEvents, transferEvents)
l1GatewayMessageMatches = append(l1GatewayMessageMatches, retL1MessageMatches...)
if checkErr != nil {
c.contractControllerGatewayCheckFailureTotal.WithLabelValues(types.Layer1.String()).Inc()
Expand All @@ -312,8 +318,7 @@ func (c *ContractController) l1Watch(ctx context.Context, start uint64, end uint
}
}

c.replaceGatewayEventInfo(types.Layer1, l1GatewayMessageMatches, messengerMessageMatches)
if err := c.messageMatchLogic.InsertOrUpdateMessageMatches(ctx, types.Layer1, messengerMessageMatches); err != nil {
if err := c.messageMatchLogic.InsertOrUpdateMessageMatches(ctx, types.Layer1, l1GatewayMessageMatches, messengerMessageMatches); err != nil {
c.contractControllerUpdateOrInsertMessageMatchFailureTotal.WithLabelValues(types.Layer1.String()).Inc()
log.Error("insert message events failed", "layer", types.Layer1, "error", err)
return err
Expand All @@ -336,7 +341,7 @@ func (c *ContractController) l2Watch(ctx context.Context, start uint64, end uint
return err
}
messengerEvents := c.eventGatherLogic.Dispatch(ctx, types.Layer2, types.MessengerEventCategory, messengerIterList)
messengerMessageMatches, err := c.checker.MessengerCheck(ctx, types.Layer2, messengerEvents)
messengerMessageMatches, err := c.messageMatchAssembler.MessageMatchAssembler(messengerEvents)
if err != nil {
log.Error("generate messenger message match failed", "layer", types.Layer2, "eventCategory", types.MessengerEventCategory, "error", err)
return err
Expand All @@ -346,7 +351,7 @@ func (c *ContractController) l2Watch(ctx context.Context, start uint64, end uint
return nil
}

var l2GatewayMessageMatches []orm.MessageMatch
var l2GatewayMessageMatches []orm.GatewayMessageMatch
for _, eventCategory := range c.l2EventCategoryList {
var wrapIterList []types.WrapIterator
wrapIterList, err = c.contractsLogic.Iterator(ctx, &opts, types.Layer2, eventCategory)
Expand All @@ -372,7 +377,7 @@ func (c *ContractController) l2Watch(ctx context.Context, start uint64, end uint
}

// match transfer event
retL2MessageMatches, checkErr := c.checker.GatewayCheck(ctx, eventCategory, gatewayEvents, messengerEvents, transferEvents)
retL2MessageMatches, checkErr := c.messageMatchAssembler.GatewayMessageAssembler(eventCategory, gatewayEvents, messengerEvents, transferEvents)
l2GatewayMessageMatches = append(l2GatewayMessageMatches, retL2MessageMatches...)
if checkErr != nil {
c.contractControllerGatewayCheckFailureTotal.WithLabelValues(types.Layer2.String()).Inc()
Expand All @@ -381,38 +386,10 @@ func (c *ContractController) l2Watch(ctx context.Context, start uint64, end uint
}
}

c.replaceGatewayEventInfo(types.Layer2, l2GatewayMessageMatches, messengerMessageMatches)
if err = c.messageMatchLogic.InsertOrUpdateMessageMatches(ctx, types.Layer2, messengerMessageMatches); err != nil {
if err = c.messageMatchLogic.InsertOrUpdateMessageMatches(ctx, types.Layer2, l2GatewayMessageMatches, messengerMessageMatches); err != nil {
c.contractControllerUpdateOrInsertMessageMatchFailureTotal.WithLabelValues(types.Layer2.String()).Inc()
log.Error("insert message events failed", "layer", types.Layer2, "error", err)
return err
}
return nil
}

func (c *ContractController) replaceGatewayEventInfo(layer types.LayerType, gatewayMessages, messengerMessages []orm.MessageMatch) {
messageHashGatewayMessageMatchMap := make(map[string]orm.MessageMatch)
for _, gatewayMessage := range gatewayMessages {
messageHashGatewayMessageMatchMap[gatewayMessage.MessageHash] = gatewayMessage
}

for i := 0; i < len(messengerMessages); i++ {
m := messengerMessages[i]
gatewayMessageMatch, ok := messageHashGatewayMessageMatchMap[m.MessageHash]
if !ok {
continue
}

messengerMessages[i].TokenType = gatewayMessageMatch.TokenType
switch layer {
case types.Layer1:
messengerMessages[i].L1EventType = gatewayMessageMatch.L1EventType
messengerMessages[i].L1TokenIds = gatewayMessageMatch.L1TokenIds
messengerMessages[i].L1Amounts = gatewayMessageMatch.L1Amounts
case types.Layer2:
messengerMessages[i].L2EventType = gatewayMessageMatch.L2EventType
messengerMessages[i].L2TokenIds = gatewayMessageMatch.L2TokenIds
messengerMessages[i].L2Amounts = gatewayMessageMatch.L2Amounts
}
}
}
15 changes: 9 additions & 6 deletions internal/controller/cross_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (

// CrossChainController is a struct that contains a reference to the Logic object.
type CrossChainController struct {
crossChainLogic *crosschain.LogicCrossChain
gatewayCrossChainLogic *crosschain.LogicGatewayCrossChain
messengerCrossChainLogic *crosschain.LogicMessengerCrossChain

stopL1CrossChainChan chan struct{}
stopL2CrossChainChan chan struct{}

Expand All @@ -29,9 +31,10 @@ func NewCrossChainController(cfg *config.Config, db *gorm.DB, l1Client, l2Client
l1MessengerAddr := cfg.L1Config.L1Contracts.ScrollMessenger
l2MessengerAddr := cfg.L2Config.L2Contracts.ScrollMessenger
return &CrossChainController{
stopL1CrossChainChan: make(chan struct{}),
stopL2CrossChainChan: make(chan struct{}),
crossChainLogic: crosschain.NewCrossChainLogic(db, l1Client, l2Client, l1MessengerAddr, l2MessengerAddr),
stopL1CrossChainChan: make(chan struct{}),
stopL2CrossChainChan: make(chan struct{}),
gatewayCrossChainLogic: crosschain.NewLogicGatewayCrossChain(db),
messengerCrossChainLogic: crosschain.NewLogicMessengerCrossChain(db, l1Client, l2Client, l1MessengerAddr, l2MessengerAddr, cfg.L1Config.StartMessengerBalance),
crossChainControllerRunningTotal: promauto.With(prometheus.DefaultRegisterer).NewCounterVec(prometheus.CounterOpts{
Name: "cross_chain_check_controller_running_total",
Help: "The total number of cross chain controller running.",
Expand Down Expand Up @@ -72,8 +75,8 @@ func (c *CrossChainController) watcherStart(ctx context.Context, layer types.Lay

c.crossChainControllerRunningTotal.WithLabelValues(layer.String()).Inc()

c.crossChainLogic.CheckCrossChainGatewayMessage(ctx, layer)
c.crossChainLogic.CheckETHBalance(ctx, layer)
c.gatewayCrossChainLogic.CheckCrossChainGatewayMessage(ctx, layer)
c.messengerCrossChainLogic.CheckETHBalance(ctx, layer)

// To prevent frequent database access, obtaining empty values.
time.Sleep(10 * time.Second)
Expand Down
90 changes: 90 additions & 0 deletions internal/logic/assembler/assembler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package assembler

import (
"context"
"math"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/rpc"
"gorm.io/gorm"

"github.com/scroll-tech/chain-monitor/internal/logic/events"
"github.com/scroll-tech/chain-monitor/internal/orm"
"github.com/scroll-tech/chain-monitor/internal/types"
)

type messageEventKey struct {
TxHash common.Hash
LogIndex uint
}

// MessageMatchAssembler is a structure that helps in verifying the data integrity
// in the blockchain by checking the message matches and the events.
type MessageMatchAssembler struct {
messengerMessageMatchOrm *orm.MessengerMessageMatch

transferMatcher *TransferEventMatcher
}

// NewMessageMatchAssembler returns a new message match instance.
func NewMessageMatchAssembler(db *gorm.DB) *MessageMatchAssembler {
return &MessageMatchAssembler{
messengerMessageMatchOrm: orm.NewMessengerMessageMatch(db),
transferMatcher: NewTransferEventMatcher(),
}
}

// GatewayMessageAssembler assemble the gateway events.
func (c *MessageMatchAssembler) GatewayMessageAssembler(eventCategory types.EventCategory, gatewayEvents, messengerEvents, transferEvents []events.EventUnmarshaler) ([]orm.GatewayMessageMatch, error) {
switch eventCategory {
case types.ERC20EventCategory:
return c.erc20EventMessageMatchAssembler(gatewayEvents, messengerEvents, transferEvents)
case types.ERC721EventCategory:
return c.erc721EventMessageMatchAssembler(gatewayEvents, messengerEvents, transferEvents)
case types.ERC1155EventCategory:
return c.erc1155EventMessageMatchAssembler(gatewayEvents, messengerEvents, transferEvents)
}
return nil, nil
}

// L2WithdrawRootsValidator the L2 withdraw roots validator.
func (c *MessageMatchAssembler) L2WithdrawRootsValidator(ctx context.Context, startBlockNumber, endBlockNumber uint64, client *rpc.Client, messageQueueAddr common.Address) (*orm.MessengerMessageMatch, error) {
return c.checkL2WithdrawRoots(ctx, startBlockNumber, endBlockNumber, client, messageQueueAddr)
}

// MessageMatchAssembler assemble the messenger events.
func (c *MessageMatchAssembler) MessageMatchAssembler(messengerEvents []events.EventUnmarshaler) ([]orm.MessengerMessageMatch, error) {
return c.messengerMessageMatchAssembler(messengerEvents)
}

func (c *MessageMatchAssembler) findNextMessageEvent(txHash common.Hash, logIndex uint, messageHashes map[messageEventKey]common.Hash) (common.Hash, bool) {
var nextMessageHash common.Hash
var found bool
var smallestDiff uint = math.MaxUint
for key, msgHash := range messageHashes {
if key.TxHash == txHash && key.LogIndex > logIndex {
if diff := key.LogIndex - logIndex; diff < smallestDiff {
smallestDiff = diff
nextMessageHash = msgHash
found = true
}
}
}
return nextMessageHash, found
}

func (c *MessageMatchAssembler) findPrevMessageEvent(txHash common.Hash, logIndex uint, messageHashes map[messageEventKey]common.Hash) (common.Hash, bool) {
var prevMessageHash common.Hash
var found bool
var smallestDiff uint = math.MaxUint
for key, msgHash := range messageHashes {
if key.TxHash == txHash && key.LogIndex < logIndex {
if diff := logIndex - key.LogIndex; diff < smallestDiff {
smallestDiff = diff
prevMessageHash = msgHash
found = true
}
}
}
return prevMessageHash, found
}
Loading

0 comments on commit eae073e

Please sign in to comment.