Skip to content

Commit

Permalink
move SenderInfo and ClaimedReserve to background refreshes in cleanup…
Browse files Browse the repository at this point in the history
… loop
  • Loading branch information
ad-astra-video authored and stronk-dev committed Aug 30, 2023
1 parent f350fba commit 951f6e6
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 3 deletions.
4 changes: 2 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ var (
txCostMultiplier = 100

// The interval at which to clean up cached max float values for PM senders and balances per stream
cleanupInterval = 1 * time.Minute
cleanupInterval = 10 * time.Minute
// The time to live for cached max float values for PM senders (else they will be cleaned up) in seconds
smTTL = 60 // 1 minute
smTTL = 604800 // 1 week
)

const RtmpPort = "1935"
Expand Down
18 changes: 18 additions & 0 deletions eth/watchers/senderwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/golang/glog"
Expand Down Expand Up @@ -50,6 +51,17 @@ func NewSenderWatcher(ticketBrokerAddr ethcommon.Address, watcher BlockWatcher,
}, nil
}

func (sw *SenderWatcher) UpdateSenderInfos() {
for addr, _ := range sw.senders {
info, err := sw.GetSenderInfo(addr)
if err == nil {
sw.setSenderInfo(addr, info)
} else {
glog.Errorf("Failed to get sender info for %v", hexutil.Encode(addr.Bytes()))
}
}
}

// GetSenderInfo returns information about a sender's deposit and reserve
// if values for a sender are not cached an RPC call to a remote ethereum node will be made to initialize the cache
func (sw *SenderWatcher) GetSenderInfo(addr ethcommon.Address) (*pm.SenderInfo, error) {
Expand Down Expand Up @@ -78,6 +90,12 @@ func (sw *SenderWatcher) setSenderInfo(addr ethcommon.Address, info *pm.SenderIn
}
}

func (sw *SenderWatcher) UpdateClaimedReserves(claimaint ethcommon.Address) {
for addr, _ := range sw.senders {
sw.ClaimedReserve(addr, claimaint)
}
}

// ClaimedReserve returns the amount claimed from a sender's reserve by the node operator
func (sw *SenderWatcher) ClaimedReserve(reserveHolder ethcommon.Address, claimant ethcommon.Address) (*big.Int, error) {
sw.mu.RLock()
Expand Down
4 changes: 4 additions & 0 deletions pm/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ type SenderManager interface {
Clear(addr ethcommon.Address)
// SubscribeReserveChange notifies a subcriber when the senderInfo for a sender changes
SubscribeReserveChange(sink chan<- ethcommon.Address) event.Subscription
//update SenderInfos in background
UpdateSenderInfos()
//update ClaimedReserves in background
UpdateClaimedReserves(claimaint ethcommon.Address)
}
5 changes: 4 additions & 1 deletion pm/sendermonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/golang/glog"
Expand Down Expand Up @@ -272,7 +273,6 @@ func (sm *LocalSenderMonitor) cache(addr ethcommon.Address) {
queue.Start()
done := make(chan struct{})
go sm.startTicketQueueConsumerLoop(queue, done)

sm.senders[addr] = &remoteSender{
pendingAmount: big.NewInt(0),
queue: queue,
Expand Down Expand Up @@ -327,6 +327,8 @@ func (sm *LocalSenderMonitor) startCleanupLoop() {
for {
select {
case <-ticker.C:
sm.smgr.UpdateSenderInfos()
sm.smgr.UpdateClaimedReserves(sm.cfg.Claimant)
sm.cleanup()
case <-sm.quit:
return
Expand All @@ -347,6 +349,7 @@ func (sm *LocalSenderMonitor) cleanup() {
v.subScope.Close() // close the maxfloat subscriptions
delete(sm.senders, k)
sm.smgr.Clear(k)
glog.Infof("sender cleared from cache addr: %v last access: %v ttl: %vs", hexutil.Encode(k.Bytes()), v.lastAccess, sm.cfg.TTL)
}
}
}
Expand Down

0 comments on commit 951f6e6

Please sign in to comment.