From 951f6e637fb9323dde3b3add93464fbc26b14e4f Mon Sep 17 00:00:00 2001 From: 0xb79orch <0xb79orch@gmail.com> Date: Sat, 26 Aug 2023 04:50:16 +0000 Subject: [PATCH 1/4] move SenderInfo and ClaimedReserve to background refreshes in cleanup loop --- cmd/livepeer/starter/starter.go | 4 ++-- eth/watchers/senderwatcher.go | 18 ++++++++++++++++++ pm/broker.go | 4 ++++ pm/sendermonitor.go | 5 ++++- 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index aa8c4176be..04ff91b706 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -53,9 +53,9 @@ var ( txCostMultiplier = 100 // The interval at which to clean up cached max float values for PM senders and balances per stream - cleanupInterval = 1 * time.Minute + cleanupInterval = 10 * time.Minute // The time to live for cached max float values for PM senders (else they will be cleaned up) in seconds - smTTL = 60 // 1 minute + smTTL = 604800 // 1 week ) const RtmpPort = "1935" diff --git a/eth/watchers/senderwatcher.go b/eth/watchers/senderwatcher.go index 37861075f5..144d957cca 100644 --- a/eth/watchers/senderwatcher.go +++ b/eth/watchers/senderwatcher.go @@ -6,6 +6,7 @@ import ( "sync" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/golang/glog" @@ -50,6 +51,17 @@ func NewSenderWatcher(ticketBrokerAddr ethcommon.Address, watcher BlockWatcher, }, nil } +func (sw *SenderWatcher) UpdateSenderInfos() { + for addr, _ := range sw.senders { + info, err := sw.GetSenderInfo(addr) + if err == nil { + sw.setSenderInfo(addr, info) + } else { + glog.Errorf("Failed to get sender info for %v", hexutil.Encode(addr.Bytes())) + } + } +} + // GetSenderInfo returns information about a sender's deposit and reserve // if values for a sender are not cached an RPC call to a remote ethereum node will be made to initialize the cache func (sw *SenderWatcher) GetSenderInfo(addr ethcommon.Address) (*pm.SenderInfo, error) { @@ -78,6 +90,12 @@ func (sw *SenderWatcher) setSenderInfo(addr ethcommon.Address, info *pm.SenderIn } } +func (sw *SenderWatcher) UpdateClaimedReserves(claimaint ethcommon.Address) { + for addr, _ := range sw.senders { + sw.ClaimedReserve(addr, claimaint) + } +} + // ClaimedReserve returns the amount claimed from a sender's reserve by the node operator func (sw *SenderWatcher) ClaimedReserve(reserveHolder ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) { sw.mu.RLock() diff --git a/pm/broker.go b/pm/broker.go index fba8277227..8f165dd268 100644 --- a/pm/broker.go +++ b/pm/broker.go @@ -94,4 +94,8 @@ type SenderManager interface { Clear(addr ethcommon.Address) // SubscribeReserveChange notifies a subcriber when the senderInfo for a sender changes SubscribeReserveChange(sink chan<- ethcommon.Address) event.Subscription + //update SenderInfos in background + UpdateSenderInfos() + //update ClaimedReserves in background + UpdateClaimedReserves(claimaint ethcommon.Address) } diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index fb021002cc..01de0fbe7a 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -8,6 +8,7 @@ import ( "time" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/golang/glog" @@ -272,7 +273,6 @@ func (sm *LocalSenderMonitor) cache(addr ethcommon.Address) { queue.Start() done := make(chan struct{}) go sm.startTicketQueueConsumerLoop(queue, done) - sm.senders[addr] = &remoteSender{ pendingAmount: big.NewInt(0), queue: queue, @@ -327,6 +327,8 @@ func (sm *LocalSenderMonitor) startCleanupLoop() { for { select { case <-ticker.C: + sm.smgr.UpdateSenderInfos() + sm.smgr.UpdateClaimedReserves(sm.cfg.Claimant) sm.cleanup() case <-sm.quit: return @@ -347,6 +349,7 @@ func (sm *LocalSenderMonitor) cleanup() { v.subScope.Close() // close the maxfloat subscriptions delete(sm.senders, k) sm.smgr.Clear(k) + glog.Infof("sender cleared from cache addr: %v last access: %v ttl: %vs", hexutil.Encode(k.Bytes()), v.lastAccess, sm.cfg.TTL) } } } From 540af3b42d819d82ade340bccff87083434a60a2 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Mon, 23 Oct 2023 22:41:42 +0200 Subject: [PATCH 2/4] Set timeout to clear idle B data to 2 days --- cmd/livepeer/starter/starter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index f8b712e02b..17444dcacc 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -56,7 +56,7 @@ var ( // The interval at which to clean up cached max float values for PM senders and balances per stream cleanupInterval = 10 * time.Minute // The time to live for cached max float values for PM senders (else they will be cleaned up) in seconds - smTTL = 604800 // 1 week + smTTL = 172800 // 2 days ) const RTMPPort = "1935" From 1b3e58300ed674976822e9744da41f0f4a1fec03 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Fri, 3 Nov 2023 14:41:47 +0100 Subject: [PATCH 3/4] Remove update funcs which are already handled by `handleRoundEvent` --- eth/watchers/senderwatcher.go | 18 ------------------ pm/broker.go | 4 ---- pm/sendermonitor.go | 2 -- 3 files changed, 24 deletions(-) diff --git a/eth/watchers/senderwatcher.go b/eth/watchers/senderwatcher.go index 144d957cca..37861075f5 100644 --- a/eth/watchers/senderwatcher.go +++ b/eth/watchers/senderwatcher.go @@ -6,7 +6,6 @@ import ( "sync" ethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/golang/glog" @@ -51,17 +50,6 @@ func NewSenderWatcher(ticketBrokerAddr ethcommon.Address, watcher BlockWatcher, }, nil } -func (sw *SenderWatcher) UpdateSenderInfos() { - for addr, _ := range sw.senders { - info, err := sw.GetSenderInfo(addr) - if err == nil { - sw.setSenderInfo(addr, info) - } else { - glog.Errorf("Failed to get sender info for %v", hexutil.Encode(addr.Bytes())) - } - } -} - // GetSenderInfo returns information about a sender's deposit and reserve // if values for a sender are not cached an RPC call to a remote ethereum node will be made to initialize the cache func (sw *SenderWatcher) GetSenderInfo(addr ethcommon.Address) (*pm.SenderInfo, error) { @@ -90,12 +78,6 @@ func (sw *SenderWatcher) setSenderInfo(addr ethcommon.Address, info *pm.SenderIn } } -func (sw *SenderWatcher) UpdateClaimedReserves(claimaint ethcommon.Address) { - for addr, _ := range sw.senders { - sw.ClaimedReserve(addr, claimaint) - } -} - // ClaimedReserve returns the amount claimed from a sender's reserve by the node operator func (sw *SenderWatcher) ClaimedReserve(reserveHolder ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) { sw.mu.RLock() diff --git a/pm/broker.go b/pm/broker.go index 8f165dd268..fba8277227 100644 --- a/pm/broker.go +++ b/pm/broker.go @@ -94,8 +94,4 @@ type SenderManager interface { Clear(addr ethcommon.Address) // SubscribeReserveChange notifies a subcriber when the senderInfo for a sender changes SubscribeReserveChange(sink chan<- ethcommon.Address) event.Subscription - //update SenderInfos in background - UpdateSenderInfos() - //update ClaimedReserves in background - UpdateClaimedReserves(claimaint ethcommon.Address) } diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index 01de0fbe7a..26b29f7672 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -327,8 +327,6 @@ func (sm *LocalSenderMonitor) startCleanupLoop() { for { select { case <-ticker.C: - sm.smgr.UpdateSenderInfos() - sm.smgr.UpdateClaimedReserves(sm.cfg.Claimant) sm.cleanup() case <-sm.quit: return From 26be254b0f0ff14ea0e2af52edb953b239a1e92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 6 Nov 2023 10:48:14 +0100 Subject: [PATCH 4/4] Update pm/sendermonitor.go --- pm/sendermonitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index 26b29f7672..b91b24dbcc 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -347,7 +347,7 @@ func (sm *LocalSenderMonitor) cleanup() { v.subScope.Close() // close the maxfloat subscriptions delete(sm.senders, k) sm.smgr.Clear(k) - glog.Infof("sender cleared from cache addr: %v last access: %v ttl: %vs", hexutil.Encode(k.Bytes()), v.lastAccess, sm.cfg.TTL) + glog.V(6).Infof("sender cleared from cache addr: %v last access: %v ttl: %vs", hexutil.Encode(k.Bytes()), v.lastAccess, sm.cfg.TTL) } } }