From 0c355293a7723724954b93aae401b68e5f9e2fae Mon Sep 17 00:00:00 2001 From: Ed Felten Date: Tue, 26 Nov 2019 07:00:04 -0500 Subject: [PATCH 1/2] Improve polling and subscription shutdown in arb-provider-go --- packages/arb-provider-go/connection.go | 56 +++++++++++++++----------- packages/arb-provider-go/go.mod | 2 - 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/packages/arb-provider-go/connection.go b/packages/arb-provider-go/connection.go index 3988880101..19b9ca42a7 100644 --- a/packages/arb-provider-go/connection.go +++ b/packages/arb-provider-go/connection.go @@ -224,12 +224,13 @@ const subscriptionPollingInterval = 5 * time.Second type subscription struct { proxy ValidatorProxy firstBlockUnseen uint64 - active bool logChan chan<- types.Log errChan chan error address common.Address topics [][32]byte unsubOnce *sync.Once + closeChan chan interface{} + wg sync.WaitGroup } func _extractAddrTopics(query ethereum.FilterQuery) (addr common.Address, topics [][32]byte) { @@ -309,44 +310,50 @@ func newSubscription(conn *ArbConnection, query ethereum.FilterQuery, ch chan<- sub := &subscription{ conn.proxy, 0, - true, ch, make(chan error, 1), address, topics, &sync.Once{}, + make(chan interface{}), + sync.WaitGroup{}, } + sub.wg.Add(1) go func() { + defer sub.wg.Done() defer sub.Unsubscribe() + ticker := time.NewTicker(subscriptionPollingInterval) + defer ticker.Stop() for { - time.Sleep(subscriptionPollingInterval) - if !sub.active { + select { + case <- sub.closeChan: return - } - logInfos, err := sub.proxy.FindLogs(int64(sub.firstBlockUnseen), math.MaxInt32, sub.address[:], sub.topics) - if err != nil { - sub.errChan <- err - return - } - for _, logInfo := range logInfos { - outs, err := _decodeLogInfo(logInfo) + case <- ticker.C: + logInfos, err := sub.proxy.FindLogs(int64(sub.firstBlockUnseen), math.MaxInt32, sub.address[:], sub.topics) if err != nil { sub.errChan <- err return } - ok := true - for i, targetTopic := range topics { - if targetTopic != outs.Topics[i] { + for _, logInfo := range logInfos { + outs, err := _decodeLogInfo(logInfo) + if err != nil { + sub.errChan <- err + return + } + ok := true + for i, targetTopic := range topics { + if targetTopic != outs.Topics[i] { + ok = false + } + } + if outs.BlockNumber < sub.firstBlockUnseen { ok = false } - } - if outs.BlockNumber < sub.firstBlockUnseen { - ok = false - } - if ok { - sub.logChan <- *outs - if sub.firstBlockUnseen <= outs.BlockNumber { - sub.firstBlockUnseen = outs.BlockNumber + 1 + if ok { + sub.logChan <- *outs + if sub.firstBlockUnseen <= outs.BlockNumber { + sub.firstBlockUnseen = outs.BlockNumber + 1 + } } } } @@ -359,7 +366,8 @@ func newSubscription(conn *ArbConnection, query ethereum.FilterQuery, ch chan<- // and closes the error channel. func (sub *subscription) Unsubscribe() { sub.unsubOnce.Do(func() { - sub.active = false + close(sub.closeChan) + sub.wg.Wait() close(sub.errChan) }) } diff --git a/packages/arb-provider-go/go.mod b/packages/arb-provider-go/go.mod index 8b28be621d..6afdb1587f 100644 --- a/packages/arb-provider-go/go.mod +++ b/packages/arb-provider-go/go.mod @@ -8,8 +8,6 @@ require ( github.com/gorilla/mux v1.7.0 github.com/gorilla/rpc v1.2.0 github.com/miguelmota/go-solidity-sha3 v0.1.0 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/offchainlabs/arbitrum/packages/arb-util v0.2.0 github.com/offchainlabs/arbitrum/packages/arb-validator v0.2.0 ) From 421be44db0cc93bc1f005b16eebd10a583073c46 Mon Sep 17 00:00:00 2001 From: Ed Felten Date: Tue, 26 Nov 2019 07:07:22 -0500 Subject: [PATCH 2/2] Implement FilterLogs in arb-provider-go --- packages/arb-provider-go/connection.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/packages/arb-provider-go/connection.go b/packages/arb-provider-go/connection.go index 19b9ca42a7..5bc33c6d17 100644 --- a/packages/arb-provider-go/connection.go +++ b/packages/arb-provider-go/connection.go @@ -206,7 +206,28 @@ func (conn *ArbConnection) SendTransaction(ctx context.Context, tx *types.Transa // // TODO(karalabe): Deprecate when the subscription one can return past data too. func (conn *ArbConnection) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { - return nil, _nyiError("FilterLogs") + var ret []types.Log + address, topics := _extractAddrTopics(query) + logInfos, err := conn.proxy.FindLogs(0, math.MaxInt32, address[:], topics) + if err != nil { + return nil, err + } + for _, logInfo := range logInfos { + outs, err := _decodeLogInfo(logInfo) + if err != nil { + return nil, err + } + ok := true + for i, targetTopic := range topics { + if targetTopic != outs.Topics[i] { + ok = false + } + } + if ok { + ret = append(ret, *outs) + } + } + return ret, nil } // SubscribeFilterLogs creates a background log filtering operation, returning