Skip to content

Commit

Permalink
Merge pull request #285 from TxnLab/txn-1908-add-gating-tracking-supp…
Browse files Browse the repository at this point in the history
…ort-in-nodemgr-unstaking-stakers

feat(nodemgr): Add gating eviction support to nodemgr
  • Loading branch information
pbennett authored Sep 27, 2024
2 parents e0c4699 + 2fc56d4 commit dfbdb0c
Show file tree
Hide file tree
Showing 41 changed files with 782 additions and 1,263 deletions.
2 changes: 1 addition & 1 deletion contracts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@
"eslint-plugin-prettier": "^5.1.3",
"prettier": "^3.2.5",
"typescript": "^5.4.5",
"vitest": "^2.0.5"
"vitest": "2.0.5"
}
}
12 changes: 10 additions & 2 deletions nodemgr/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/TxnLab/reti/internal/lib/algo"
"github.com/TxnLab/reti/internal/lib/misc"
"github.com/TxnLab/reti/internal/lib/nfdapi/swagger"
"github.com/TxnLab/reti/internal/lib/nfdonchain"
"github.com/TxnLab/reti/internal/lib/reti"
)

Expand Down Expand Up @@ -129,6 +130,8 @@ type RetiApp struct {
signer algo.MultipleWalletSigner
algoClient *algod.Client
nfdApi *swagger.APIClient
nfdOnChain *nfdonchain.NfdApi

retiClient *reti.Reti

// just here for flag bootstrapping destination
Expand All @@ -137,8 +140,8 @@ type RetiApp struct {
retiNodeNum uint64
}

// initClients initializes both an an algod client (to correct network - which it
// also validates) and an nfd nfdApi client - for nfd updates or fetches if caller
// initClients initializes both an algod client (to correct network - which it
// also validates) and a nfd nfdApi client - for nfd updates or fetches if caller
// desires
func (ac *RetiApp) initClients(ctx context.Context, cmd *cli.Command) error {
network := cmd.String("network")
Expand Down Expand Up @@ -213,6 +216,11 @@ func (ac *RetiApp) initClients(ctx context.Context, cmd *cli.Command) error {

ac.algoClient = algoClient
ac.nfdApi = api
nfdOnChain, err := nfdonchain.NewNfdApi(algoClient, cmd.String("network"))
if err != nil {
return fmt.Errorf("failed to initialize nfd onchain api client: %v", err)
}
ac.nfdOnChain = nfdOnChain

// Initialize the 'reti' client
retiClient, err := reti.New(ac.retiAppID, ac.logger, ac.algoClient, ac.signer, ac.retiValidatorID, ac.retiNodeNum)
Expand Down
38 changes: 34 additions & 4 deletions nodemgr/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@ type Daemon struct {
logger *slog.Logger
algoClient *algod.Client

listenPort int

// embed mutex for locking state for members below the mutex
sync.RWMutex
avgBlockTime time.Duration
}

func newDaemon() *Daemon {
func newDaemon(listenPort int) *Daemon {
return &Daemon{
logger: App.retiClient.Logger,
algoClient: App.algoClient,
listenPort: listenPort,
}
}

func (d *Daemon) start(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc, listenPort int) {
func (d *Daemon) start(ctx context.Context, wg *sync.WaitGroup, cancel context.CancelFunc) {
misc.Infof(d.logger, "Réti daemon, version:%s started", getVersionInfo())
wg.Add(1)
go func() {
Expand All @@ -66,13 +69,23 @@ func (d *Daemon) start(ctx context.Context, wg *sync.WaitGroup, cancel context.C
d.EpochUpdater(ctx)
}()

wg.Add(1)
go func() {
info := App.retiClient.Info()
if info.Config.EntryGatingType == reti.GatingTypeNone {
return
}
defer wg.Done()
d.StakerEvictor(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
http.Handle("/ready", isReady())
http.Handle("/metrics", promhttp.Handler())

host := fmt.Sprintf(":%d", listenPort)
host := fmt.Sprintf(":%d", d.listenPort)
srv := &http.Server{Addr: host}
go func() {
misc.Infof(d.logger, "HTTP server listening on %q", host)
Expand Down Expand Up @@ -543,6 +556,7 @@ func (d *Daemon) EpochUpdater(ctx context.Context) {

misc.Infof(d.logger, "at round:%d, with epoch length:%d, first epoch check at %d", curRound, epochRoundLength, stopAtRound)

signerAddr, _ := types.DecodeAddress(App.retiClient.Info().Config.Manager)
for {
select {
case <-ctx.Done():
Expand All @@ -554,7 +568,6 @@ func (d *Daemon) EpochUpdater(ctx context.Context) {
}
stopAtRound = nextEpoch(blockWaitResult.atRound, epochRoundLength)

signerAddr, _ := types.DecodeAddress(App.retiClient.Info().Config.Manager)
var (
wg syncutil.WaitGroup
info = App.retiClient.Info()
Expand Down Expand Up @@ -670,6 +683,23 @@ func (d *Daemon) getFirstEligibleEpochRound(curRound uint64, epochRoundLength ui
return earliestEpochToUse
}

func (d *Daemon) StakerEvictor(ctx context.Context) {
d.logger.Info("StakerEvictor started")
defer d.logger.Info("StakerEvictor stopped")

for {
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Minute):
err := d.checkForEvictions(ctx)
if err != nil {
misc.Errorf(d.logger, "error in eviction check: checking for evictions, err:%v", err)
}
}
}
}

// accountHasAtLeast checks if an account has at least a certain amount of microAlgos (spendable)
// Errors are just treated as failures
func accountHasAtLeast(ctx context.Context, algoClient *algod.Client, accountAddr string, microAlgos uint64) bool {
Expand Down
206 changes: 206 additions & 0 deletions nodemgr/evictions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package main

import (
"context"
"fmt"
"iter"
"maps"
"slices"
"strings"

"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
"github.com/algorand/go-algorand-sdk/v2/types"
"github.com/antihax/optional"
"github.com/mailgun/holster/v4/syncutil"

"github.com/TxnLab/reti/internal/lib/misc"
"github.com/TxnLab/reti/internal/lib/nfdapi/swagger"
"github.com/TxnLab/reti/internal/lib/reti"
)

func (d *Daemon) checkForEvictions(ctx context.Context) error {
info := App.retiClient.Info()
if info.Config.EntryGatingType == reti.GatingTypeNone {
return nil
}
signer, err := App.signer.FindFirstSigner([]string{info.Config.Owner, info.Config.Manager})
if err != nil {
return fmt.Errorf("neither owner or manager address for your validator has local keys present")
}
signerAddr, _ := types.DecodeAddress(signer)

stakersAndPools, err := d.collectStakersAndPools(info)
if err != nil {
return err
}
ineligible, err := d.getIneligibleStakers(ctx, maps.Keys(stakersAndPools))
if err != nil {
return err
}
for _, staker := range ineligible {
for _, pool := range stakersAndPools[staker] {
stakerAddr, _ := types.DecodeAddress(staker)
err = App.retiClient.RemoveStake(pool, signerAddr, stakerAddr, 0 /* all stake */)
if err != nil {
return fmt.Errorf("error removing stake for pool %d, appid:%d: %v", pool.PoolId, pool.PoolAppId, err)
}
misc.Infof(d.logger, "[EVICTION] Staker:%s removed from pool %d because no longer meeting gating criteria", staker, pool.PoolId)
}
}
return nil
}

// collectStakersAndPools iterates through each pool, collecting all unique stakers (and their pools)
func (d *Daemon) collectStakersAndPools(info reti.ValidatorInfo) (map[string][]reti.ValidatorPoolKey, error) {
stakersAndPools := make(map[string][]reti.ValidatorPoolKey)

for poolIdx, pool := range info.Pools {
ledger, err := App.retiClient.GetLedgerForPool(pool.PoolAppId)
if err != nil {
if strings.Contains(err.Error(), "box not found") {
continue
}
return nil, fmt.Errorf("error getting ledger for pool #%d, appid:%d: %v", poolIdx+1, pool.PoolAppId, err)
}

for _, stakerData := range ledger {
if stakerData.Account == types.ZeroAddress {
continue
}

accountID := stakerData.Account.String()
stakersAndPools[accountID] = append(stakersAndPools[accountID],
reti.ValidatorPoolKey{
ID: info.Config.ID,
PoolId: uint64(poolIdx + 1),
PoolAppId: pool.PoolAppId,
})
}
}

return stakersAndPools, nil
}

func (d *Daemon) getIneligibleStakers(ctx context.Context, accounts iter.Seq[string]) ([]string, error) {
var (
fanOut = syncutil.NewFanOut(20)
ineligibleCh = make(chan string, 2)
)
for account := range accounts {
fanOut.Run(func(val any) error {
isEligible, err := d.isAccountEligible(ctx, account)
if err != nil {
return err
}
if !isEligible {
ineligibleCh <- account
}
return nil
}, account)
}
var errs []error
go func() {
errs = fanOut.Wait()
close(ineligibleCh)
}()
var ineligible []string
for account := range ineligibleCh {
ineligible = append(ineligible, account)
}
if len(errs) > 0 {
return nil, errs[0]
}
return ineligible, nil
}

func (d *Daemon) isAccountEligible(ctx context.Context, account string) (bool, error) {
info := App.retiClient.Info()
gatingMinBalance := info.Config.GatingAssetMinBalance

// get all assets held by the staking account first
accountInfo, err := d.algoClient.AccountInformation(account).Do(ctx)
if err != nil {
return false, fmt.Errorf("error getting account info for account %s: %v", account, err)
}
heldAssets := accountInfo.Assets

var (
valToVerify uint64
)

switch info.Config.EntryGatingType {
case reti.GatingTypeAssetsCreatedBy:
creatorAddress := info.Config.EntryGatingAddress
assetIds, err := d.collectCreatedAssets(ctx, []string{creatorAddress})
if err != nil {
return false, err
}
valToVerify = d.findValueToVerify(heldAssets, assetIds, gatingMinBalance)
case reti.GatingTypeAssetId:
gatingAssetIds := slices.DeleteFunc(info.Config.EntryGatingAssets, func(id uint64) bool {
return id == 0
})
valToVerify = d.findValueToVerify(heldAssets, gatingAssetIds, gatingMinBalance)
case reti.GatingTypeCreatedByNFDAddresses:
nfdAppId := info.Config.EntryGatingAssets[0]
nfd, err := App.nfdOnChain.GetNFD(ctx, nfdAppId, true)
if err != nil {
return false, fmt.Errorf("error getting nfd info for appid %d: %v", nfdAppId, err)
}
if len(nfd.Verified["caAlgo"]) == 0 {
return false, fmt.Errorf("nfd %d defined as gating for this validator has no verified addresses", nfdAppId)
}
createdAssetIds, err := d.collectCreatedAssets(ctx, strings.Split(nfd.Verified["caAlgo"], ","))
if err != nil {
return false, err
}
return d.findValueToVerify(heldAssets, createdAssetIds, gatingMinBalance) > 0, nil
case reti.GatingTypeSegmentOfNFD:
nfds, _, err := App.nfdApi.NfdApi.NfdSearchV2(ctx, &swagger.NfdApiNfdSearchV2Opts{
State: optional.NewInterface("owned"),
Owner: optional.NewString(account),
ParentAppID: optional.NewInt64(int64(info.Config.EntryGatingAssets[0])),
Limit: optional.NewInt64(1),
})
if err != nil {
return false, fmt.Errorf("error getting children nfds for parent appid %d: owned by %s: %v", info.Config.EntryGatingAssets[0], account, err)
}
return nfds.Total >= 1, nil

default:
return false, fmt.Errorf("unknown gating type")
}
if valToVerify == 0 {
return false, nil
}
return true, nil
}

func (d *Daemon) collectCreatedAssets(ctx context.Context, addresses []string) ([]uint64, error) {
assetIdMap := make(map[uint64]bool)
for _, address := range addresses {
creatorAccountInfo, err := d.algoClient.AccountInformation(address).Do(ctx)
if err != nil {
return nil, fmt.Errorf("error getting account info for creator address %s: %v", address, err)
}
for _, asset := range creatorAccountInfo.CreatedAssets {
if !assetIdMap[asset.Index] {
assetIdMap[asset.Index] = true
}
}
}
return slices.Collect(maps.Keys(assetIdMap)), nil
}

func (d *Daemon) findValueToVerify(heldAssets []models.AssetHolding, gatingAssets []uint64, minBalance uint64) uint64 {
// Find the first gating asset held in heldAssets that meets the minimum balance requirement
idx := slices.IndexFunc(heldAssets, func(heldAsset models.AssetHolding) bool {
return slices.ContainsFunc(gatingAssets, func(asset uint64) bool {
return asset == heldAsset.AssetId && heldAsset.Amount >= minBalance
})
})
if idx == -1 {
return 0
}
return heldAssets[idx].AssetId
}
4 changes: 2 additions & 2 deletions nodemgr/go.mod
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
module github.com/TxnLab/reti

go 1.22.2
go 1.23

require (
github.com/algorand/go-algorand-sdk/v2 v2.6.0
github.com/antihax/optional v1.0.0
github.com/joho/godotenv v1.5.1
github.com/mailgun/holster/v4 v4.20.2
github.com/mailgun/holster/v4 v4.20.3
github.com/manifoldco/promptui v0.9.0
github.com/prometheus/client_golang v1.20.3
github.com/ssgreg/repeat v1.5.1
Expand Down
Loading

0 comments on commit dfbdb0c

Please sign in to comment.