diff --git a/packages/arb-provider-go/connection.go b/packages/arb-provider-go/connection.go index f9e746c83b..e5bfeb0ede 100644 --- a/packages/arb-provider-go/connection.go +++ b/packages/arb-provider-go/connection.go @@ -225,7 +225,28 @@ func (conn *ArbConnection) SendTransaction(ctx context.Context, tx *types.Transa // // TODO(karalabe): Deprecate when the subscription one can return past data too. func (conn *ArbConnection) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { - return nil, _nyiError("FilterLogs") + var ret []types.Log + address, topics := _extractAddrTopics(query) + logInfos, err := conn.proxy.FindLogs(0, math.MaxInt32, address[:], topics) + if err != nil { + return nil, err + } + for _, logInfo := range logInfos { + outs, err := _decodeLogInfo(logInfo) + if err != nil { + return nil, err + } + ok := true + for i, targetTopic := range topics { + if targetTopic != outs.Topics[i] { + ok = false + } + } + if ok { + ret = append(ret, *outs) + } + } + return ret, nil } // SubscribeFilterLogs creates a background log filtering operation, returning @@ -243,12 +264,13 @@ const subscriptionPollingInterval = 5 * time.Second type subscription struct { proxy ValidatorProxy firstBlockUnseen uint64 - active bool logChan chan<- types.Log errChan chan error address common.Address topics [][32]byte unsubOnce *sync.Once + closeChan chan interface{} + wg sync.WaitGroup } func _extractAddrTopics(query ethereum.FilterQuery) (addr common.Address, topics [][32]byte) { @@ -328,44 +350,50 @@ func newSubscription(conn *ArbConnection, query ethereum.FilterQuery, ch chan<- sub := &subscription{ conn.proxy, 0, - true, ch, make(chan error, 1), address, topics, &sync.Once{}, + make(chan interface{}), + sync.WaitGroup{}, } + sub.wg.Add(1) go func() { + defer sub.wg.Done() defer sub.Unsubscribe() + ticker := time.NewTicker(subscriptionPollingInterval) + defer ticker.Stop() for { - time.Sleep(subscriptionPollingInterval) - if !sub.active { + select { + case <- sub.closeChan: return - } - logInfos, err := sub.proxy.FindLogs(int64(sub.firstBlockUnseen), math.MaxInt32, sub.address[:], sub.topics) - if err != nil { - sub.errChan <- err - return - } - for _, logInfo := range logInfos { - outs, err := _decodeLogInfo(logInfo) + case <- ticker.C: + logInfos, err := sub.proxy.FindLogs(int64(sub.firstBlockUnseen), math.MaxInt32, sub.address[:], sub.topics) if err != nil { sub.errChan <- err return } - ok := true - for i, targetTopic := range topics { - if targetTopic != outs.Topics[i] { + for _, logInfo := range logInfos { + outs, err := _decodeLogInfo(logInfo) + if err != nil { + sub.errChan <- err + return + } + ok := true + for i, targetTopic := range topics { + if targetTopic != outs.Topics[i] { + ok = false + } + } + if outs.BlockNumber < sub.firstBlockUnseen { ok = false } - } - if outs.BlockNumber < sub.firstBlockUnseen { - ok = false - } - if ok { - sub.logChan <- *outs - if sub.firstBlockUnseen <= outs.BlockNumber { - sub.firstBlockUnseen = outs.BlockNumber + 1 + if ok { + sub.logChan <- *outs + if sub.firstBlockUnseen <= outs.BlockNumber { + sub.firstBlockUnseen = outs.BlockNumber + 1 + } } } } @@ -378,7 +406,8 @@ func newSubscription(conn *ArbConnection, query ethereum.FilterQuery, ch chan<- // and closes the error channel. func (sub *subscription) Unsubscribe() { sub.unsubOnce.Do(func() { - sub.active = false + close(sub.closeChan) + sub.wg.Wait() close(sub.errChan) }) }