diff --git a/src/Network/Ethereum/Web3/Contract/Events.purs b/src/Network/Ethereum/Web3/Contract/Events.purs index b22be46..a156716 100644 --- a/src/Network/Ethereum/Web3/Contract/Events.purs +++ b/src/Network/Ethereum/Web3/Contract/Events.purs @@ -32,7 +32,7 @@ import Data.Array (sort) import Data.Either (Either(..)) import Data.Functor.Tagged (Tagged, tagged, untagged) import Data.Lens ((.~), (^.)) -import Data.Maybe (Maybe(..)) +import Data.Maybe (Maybe(..), maybe) import Data.Newtype (over) import Data.Symbol (class IsSymbol) import Data.Traversable (for_, traverse) @@ -178,48 +178,45 @@ filterProducer => MapRecordWithIndex fsList (ConstMapping ModifyFilter) fs fs => MultiFilterStreamState fs -> Transducer Void (Record fs) Web3 (MultiFilterStreamState fs) -filterProducer cs@(MultiFilterStreamState currentState) = do - let -- hang out until the chain makes progress - waitForMoreBlocks = do - lift $ liftAff $ delay (Milliseconds 3000.0) - filterProducer cs - - -- resume the filter production - continueTo maxEndBlock = do - let - endBlock = newTo maxEndBlock currentState.currentBlock currentState.windowSize - - modify :: forall (k :: Type) (e :: k). Filter e -> Filter e - modify fltr = - fltr # _fromBlock .~ BN currentState.currentBlock - # _toBlock - .~ BN endBlock - - fs' = hmap (ModifyFilter modify) currentState.filters - yieldT fs' - filterProducer $ MultiFilterStreamState currentState { currentBlock = succ endBlock } +filterProducer cs@(MultiFilterStreamState currentState@{ windowSize, currentBlock, filters: currentFilters }) = do chainHead <- lift eth_blockNumber - -- if the chain head is less than the current block we want to process - -- then wait until the chain progresses - if chainHead < currentState.currentBlock then - waitForMoreBlocks - -- otherwise try make progress - else case hfoldlWithIndex MultiFilterMinToBlock Latest currentState.filters of - -- consume as many as possible up to the chain head - Latest -> continueTo $ over BlockNumber (_ - fromInt currentState.trailBy) chainHead - -- if the original fitler ends at a specific block, consume as many as possible up to that block - -- or terminate if we're already past it - BN targetEnd -> - let - targetEnd' = min targetEnd $ over BlockNumber (_ - fromInt currentState.trailBy) chainHead - in - if currentState.currentBlock <= targetEnd' then - continueTo targetEnd' - else - pure cs + let + { nextEndBlock, finalBlock } = case hfoldlWithIndex MultiFilterMinToBlock Latest currentFilters of + Latest -> + { nextEndBlock: over BlockNumber (_ - fromInt currentState.trailBy) chainHead + , finalBlock: Nothing + } + BN targetEnd -> + let + nextAvailableBlock = over BlockNumber (_ - fromInt currentState.trailBy) chainHead + in + { nextEndBlock: min targetEnd nextAvailableBlock, finalBlock: Just targetEnd } + isFinished = maybe false (\final -> currentBlock > final) finalBlock + if isFinished then pure cs + else if chainHead < currentBlock then waitForMoreBlocks + else continueTo nextEndBlock + where - newTo :: BlockNumber -> BlockNumber -> Int -> BlockNumber - newTo upper current window = min upper $ over BlockNumber (_ + fromInt window) current + + waitForMoreBlocks = do + lift $ liftAff $ delay (Milliseconds 3000.0) + filterProducer cs + + -- resume the filter production + continueTo maxEndBlock = do + let + endBlock = min maxEndBlock $ over BlockNumber (_ + fromInt windowSize) currentBlock + + modify :: forall (k :: Type) (e :: k). Filter e -> Filter e + modify fltr = + fltr # _fromBlock .~ BN currentBlock + # _toBlock .~ BN endBlock + + fs' = hmap (ModifyFilter modify) currentFilters + yieldT fs' + filterProducer $ MultiFilterStreamState currentState + { currentBlock = succ endBlock + } succ :: BlockNumber -> BlockNumber succ = over BlockNumber (_ + one) @@ -456,6 +453,7 @@ stagger -> Transducer i o m a stagger osT = let - trickle = awaitForever \os -> for_ os yieldT + trickle = awaitForever \os -> + for_ os yieldT in fst <$> (osT =>= trickle) diff --git a/src/Network/Ethereum/Web3/Solidity/Event.purs b/src/Network/Ethereum/Web3/Solidity/Event.purs index 8e4239b..b32ed8c 100644 --- a/src/Network/Ethereum/Web3/Solidity/Event.purs +++ b/src/Network/Ethereum/Web3/Solidity/Event.purs @@ -80,7 +80,7 @@ parseChange parseChange (Change change) anonymous = do topics <- if anonymous then pure change.topics - else note (ParserError "no topics found") (_.tail <$> uncons change.topics) + else note (ParserError "No topics found") (_.tail <$> uncons change.topics) Tuple a _ <- arrayParser topics b <- lmap (ParserError <<< show) $ abiDecode change.data pure $ Event a b diff --git a/src/Network/Ethereum/Web3/Solidity/Internal.purs b/src/Network/Ethereum/Web3/Solidity/Internal.purs index 01a8a1c..96761b8 100644 --- a/src/Network/Ethereum/Web3/Solidity/Internal.purs +++ b/src/Network/Ethereum/Web3/Solidity/Internal.purs @@ -12,10 +12,10 @@ import Prelude import Data.Functor.Tagged (Tagged, untagged, tagged) import Data.Generic.Rep (class Generic, Argument(..), Constructor(..), NoArguments(..), Product(..), from, to) -import Network.Ethereum.Web3.Solidity.Vector (Vector) import Data.Identity (Identity(..)) import Data.Newtype (un) import Data.Symbol (class IsSymbol) +import Network.Ethereum.Web3.Solidity.Vector (Vector) import Prim.Row as Row import Record (disjointUnion) import Record as Record @@ -24,7 +24,7 @@ import Record.Builder as Builder import Type.Proxy (Proxy(..)) import Unsafe.Coerce (unsafeCoerce) -class GRecordFieldsIso rep from to | rep -> to, to rep -> from where +class GRecordFieldsIso rep from to | from rep -> to, to rep -> from where gToRecord :: rep -> Builder { | from } { | to } gFromRecord :: Record to -> rep @@ -56,8 +56,8 @@ else instance gFromRecord r = let - as = gFromRecord (unsafeCoerce r) - bs = gFromRecord (unsafeCoerce r) + as = gFromRecord (unsafeCoerce r :: Record ato) + bs = gFromRecord (unsafeCoerce r :: Record bto) in Product as bs @@ -138,4 +138,5 @@ toRecord . RecordFieldsIso a () fields => a -> Record fields -toRecord a = Builder.buildFromScratch $ _toRecord a +toRecord a = + Builder.buildFromScratch $ _toRecord a