Skip to content

Commit

Permalink
Trends
Browse files Browse the repository at this point in the history
wip

add seed txns

make balances faster
  • Loading branch information
maebeam committed Nov 10, 2021
1 parent ce1f79d commit f9de04f
Show file tree
Hide file tree
Showing 6 changed files with 442 additions and 14 deletions.
2 changes: 2 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Config struct {
TXIndex bool
Regtest bool
PostgresURI string
Trends bool

// Peers
ConnectIPs []string
Expand Down Expand Up @@ -84,6 +85,7 @@ func LoadConfig() *Config {
config.TXIndex = viper.GetBool("txindex")
config.Regtest = viper.GetBool("regtest")
config.PostgresURI = viper.GetString("postgres-uri")
config.Trends = viper.GetBool("trends")

// Peers
config.ConnectIPs = viper.GetStringSlice("connect-ips")
Expand Down
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func SetupRunFlags(cmd *cobra.Command) {
cmd.PersistentFlags().String("postgres-uri", "", "BETA: Use Postgres as the backing store for chain data."+
"When enabled, most data is stored in postgres although badger is still currently used for some state. Run your "+
"Postgres instance on the same machine as your node for optimal performance.")
cmd.PersistentFlags().Bool("trends", false, "BETA: Record user trends in postgres. Makes syncing slower.")

// Peers
cmd.PersistentFlags().StringSlice("connect-ips", []string{},
Expand Down
134 changes: 132 additions & 2 deletions lib/block_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,18 @@ func (pe *ProfileEntry) IsDeleted() bool {
return pe.isDeleted
}

type TrendKey struct {
PKID PKID
BlockHeight uint32
}

func MakeTrendKey(pkid *PKID, blockHeight uint32) TrendKey {
return TrendKey{
PKID: *pkid,
BlockHeight: roundBlockHeight(blockHeight),
}
}

type UtxoView struct {
// Utxo data
NumUtxoEntries uint64
Expand Down Expand Up @@ -758,13 +770,17 @@ type UtxoView struct {
// Derived Key entries. Map key is a combination of owner and derived public keys.
DerivedKeyToDerivedEntry map[DerivedKeyMapKey]*DerivedKeyEntry

// Trends data
TrendsMap map[TrendKey]*PGTrend

// The hash of the tip the view is currently referencing. Mainly used
// for error-checking when doing a bulk operation on the view.
TipHash *BlockHash

Handle *badger.DB
Postgres *Postgres
Params *DeSoParams
Trends bool
}

type OperationType uint
Expand Down Expand Up @@ -1028,6 +1044,9 @@ func (bav *UtxoView) _ResetViewMappingsAfterFlush() {

// Derived Key entries
bav.DerivedKeyToDerivedEntry = make(map[DerivedKeyMapKey]*DerivedKeyEntry)

// Trends data
bav.TrendsMap = make(map[TrendKey]*PGTrend)
}

func (bav *UtxoView) CopyUtxoView() (*UtxoView, error) {
Expand Down Expand Up @@ -1196,6 +1215,12 @@ func (bav *UtxoView) CopyUtxoView() (*UtxoView, error) {
newView.DerivedKeyToDerivedEntry[entryKey] = &newEntry
}

newView.TrendsMap = make(map[TrendKey]*PGTrend, len(bav.TrendsMap))
for trendKey, trend := range bav.TrendsMap {
newTrend := *trend
newView.TrendsMap[trendKey] = &newTrend
}

return newView, nil
}

Expand Down Expand Up @@ -1228,6 +1253,7 @@ func NewUtxoView(
// info on that).
if view.Postgres != nil {
view.TipHash = view.Postgres.GetChain(MAIN_CHAIN).TipHash
view.Trends = view.Postgres.trends
} else {
view.TipHash = DbGetBestHash(view.Handle, ChainTypeDeSoBlock /* don't get the header chain */)
}
Expand Down Expand Up @@ -3629,6 +3655,9 @@ func (bav *UtxoView) _connectBasicTransfer(
PrevPostEntry: previousDiamondPostEntry,
PrevDiamondEntry: previousDiamondEntry,
})

// Update trends
bav.addTrendDiamond(receiverPKID.PKID, blockHeight, diamondRecipientTotal)
}

// If signature verification is requested then do that as well.
Expand Down Expand Up @@ -3666,6 +3695,89 @@ func (bav *UtxoView) _connectBasicTransfer(
return totalInput, totalOutput, utxoOpsForTxn, nil
}

//
// BEGIN TRENDS
//

func roundBlockHeight(blockHeight uint32) uint32 {
return blockHeight / BlocksPerDay * BlocksPerDay
}

// Helper method for getting and setting a trend
func (bav *UtxoView) getAndSetTrend(trendKey TrendKey) *PGTrend {
trend := bav.getTrend(trendKey)
bav.setTrendMappings(trend)
return trend
}

func (bav *UtxoView) getTrend(trendKey TrendKey) *PGTrend {
mapValue, existsMapValue := bav.TrendsMap[trendKey]
if existsMapValue {
return mapValue
}

blockHeight := roundBlockHeight(trendKey.BlockHeight)
trend := bav.Postgres.GetTrend(&trendKey.PKID, blockHeight)
if trend == nil {
trend = &PGTrend{
PKID: trendKey.PKID.NewPKID(),
BlockHeight: blockHeight,
}
}

return trend
}

func (bav *UtxoView) setTrendMappings(trend *PGTrend) {
// This function shouldn't be called with nil.
if trend == nil {
glog.Errorf("setTrend: Called with nil trend")
return
}

bav.TrendsMap[MakeTrendKey(trend.PKID, trend.BlockHeight)] = trend
}

func (bav *UtxoView) addTrendDiamond(pkid *PKID, blockHeight uint32, totalNanos uint64) {
if !bav.Trends {
return
}

trend := bav.getAndSetTrend(MakeTrendKey(pkid, blockHeight))
trend.DiamondNanos += totalNanos
}

func (bav *UtxoView) addTrendFounderReward(pkid *PKID, blockHeight uint32, totalNanos uint64) {
if !bav.Trends {
return
}

trend := bav.getAndSetTrend(MakeTrendKey(pkid, blockHeight))
trend.FounderRewardNanos += totalNanos
}

func (bav *UtxoView) addTrendNFTSeller(pkid *PKID, blockHeight uint32, totalNanos uint64) {
if !bav.Trends {
return
}

trend := bav.getAndSetTrend(MakeTrendKey(pkid, blockHeight))
trend.NFTSellerNanos += totalNanos
}

func (bav *UtxoView) addTrendNFTRoyalty(pkid *PKID, blockHeight uint32, totalNanos uint64) {
if !bav.Trends {
return
}

trend := bav.getAndSetTrend(MakeTrendKey(pkid, blockHeight))
trend.NFTRoyaltyNanos += totalNanos
}

//
// END TRENDS
//

func (bav *UtxoView) _getMessageEntryForMessageKey(messageKey *MessageKey) *MessageEntry {
// If an entry exists in the in-memory map, return the value of that mapping.
mapValue, existsMapValue := bav.MessageKeyToMessageEntry[*messageKey]
Expand Down Expand Up @@ -7274,6 +7386,9 @@ func (bav *UtxoView) _connectAcceptNFTBid(

// Rosetta uses this UtxoOperation to provide INPUT amounts
utxoOpsForTxn = append(utxoOpsForTxn, utxoOp)

// Update trends
bav.addTrendNFTSeller(updaterPKID.PKID, blockHeight, bidAmountMinusRoyalties)
}

// (4) Pay royalties to the original artist.
Expand Down Expand Up @@ -7303,6 +7418,10 @@ func (bav *UtxoView) _connectAcceptNFTBid(

// Rosetta uses this UtxoOperation to provide INPUT amounts
utxoOpsForTxn = append(utxoOpsForTxn, utxoOp)

// Update trends
artistPkid := bav.GetPKIDForPublicKey(nftPostEntry.PosterPublicKey).PKID
bav.addTrendNFTRoyalty(artistPkid, blockHeight, bidAmountMinusRoyalties)
}

// (5) Give any change back to the bidder.
Expand Down Expand Up @@ -8693,6 +8812,9 @@ func (bav *UtxoView) HelpConnectCreatorCoinBuy(

// Rosetta uses this UtxoOperation to provide INPUT amounts
utxoOpsForTxn = append(utxoOpsForTxn, utxoOp)

// Update trends
bav.addTrendFounderReward(creatorPKID, blockHeight, desoFounderRewardNanos)
}
}

Expand Down Expand Up @@ -9394,6 +9516,9 @@ func (bav *UtxoView) _connectCreatorCoinTransfer(

// Now set the diamond entry mappings on the view so they are flushed to the DB.
bav._setDiamondEntryMappings(newDiamondEntry)

// TODO: Update trends, how do we calculate CLOUT nanos?
// bav.addTrendDiamond(receiverPKID.PKID, blockHeight, totalNanos)
}

// Add an operation to the list at the end indicating we've executed a
Expand Down Expand Up @@ -9732,10 +9857,15 @@ func (bav *UtxoView) Preload(desoBlock *MsgDeSoBlock) error {
}

// Set real entries for all the profiles that actually exist
result := bav.Postgres.GetProfilesForPublicKeys(publicKeys)
for _, profile := range result {
profiles := bav.Postgres.GetProfilesForPublicKeys(publicKeys)
for _, profile := range profiles {
bav.setProfileMappings(profile)
}

balances := bav.Postgres.GetBalancesBatch(publicKeys)
for _, balance := range balances {
bav.PublicKeyToDeSoBalanceNanos[*NewPublicKey(balance.PublicKey.ToBytes())] = balance.BalanceNanos
}
}

// One iteration for everything else
Expand Down
120 changes: 119 additions & 1 deletion lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (bc *Blockchain) _initChain() error {
var err error

if bc.postgres != nil {
err = bc.postgres.InitGenesisBlock(bc.params)
err = bc.postgres.InitGenesisBlock(bc.params, bc.db)
} else {
err = InitDbWithDeSoGenesisBlock(bc.params, bc.db, bc.eventManager)
}
Expand Down Expand Up @@ -1994,6 +1994,12 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo
return false, false, errors.Wrapf(err, "ProcessBlock: Problem writing block info to db on simple add to tip")
}

// If we've enabled trends on this node then we do some extra processing
err = bc.ProcessTrends(utxoView, uint32(desoBlock.Header.Height))
if err != nil {
return false, false, errors.Wrapf(err, "ProcessBlock: Problem processing trends")
}

// Now that we've set the best chain in the db, update our in-memory data
// structure to reflect this. Do a quick check first to make sure it's consistent.
lastIndex := len(bc.bestChain) - 1
Expand Down Expand Up @@ -2337,6 +2343,118 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo
return isMainChain, false, nil
}

func (bc *Blockchain) ProcessTrends(utxoView *UtxoView, blockHeight uint32) error {
// Only process trends if enabled
if !utxoView.Trends {
return nil
}

// Only process trends every 288 blocks
if blockHeight%BlocksPerDay != 0 {
return nil
}

glog.Info("Trends: Beginning")

// We read straight from the database instead of the view to optimize for performance.
// This method is called immediately after a flush and we don't worry about what's still
// siting around in the mempool.
//
// We only calculate these metrics for public keys that have profiles
//
// TODO: Do we need to select these in small batches?

// Load all the profiles
profiles := bc.postgres.GetProfilesBatch()
numProfiles := len(profiles)
profilesMap := make(map[PKID]*PGProfile, numProfiles)
pkids := make([]*PKID, numProfiles)
publicKeys := make([]*PublicKey, numProfiles)
for _, profile := range profiles {
profilesMap[*profile.PKID] = profile
pkids = append(pkids, profile.PKID)
publicKeys = append(publicKeys, profile.PublicKey)
}

glog.Infof("Trends: Loaded %d profiles", numProfiles)

// Load all the creator coin balances
creatorCoinBalances := bc.postgres.GetCreatorCoinBalancesBatch(pkids)
numCreatorCoinBalances := len(creatorCoinBalances)
holdersMap := make(map[PKID][]*PGCreatorCoinBalance, numCreatorCoinBalances)
holdingsMap := make(map[PKID][]*PGCreatorCoinBalance, numCreatorCoinBalances)
for _, creatorCoin := range creatorCoinBalances {
holdersMap[*creatorCoin.CreatorPKID] = append(holdersMap[*creatorCoin.HolderPKID], creatorCoin)
holdingsMap[*creatorCoin.HolderPKID] = append(holdingsMap[*creatorCoin.CreatorPKID], creatorCoin)
}

glog.Infof("Trends: Loaded %d creator coin balances", numCreatorCoinBalances)

// Load all the trends
roundHeight := roundBlockHeight(blockHeight)
trends := bc.postgres.GetTrendsBatch(pkids, roundHeight)
numTrends := len(trends)
trendsMap := make(map[PKID]*PGTrend, numTrends)
for _, trend := range trends {
trendsMap[*trend.PKID] = trend
}

glog.Infof("Trends: Loaded %d trends", numTrends)

// Load all the balances
balances := bc.postgres.GetBalancesBatch(publicKeys)
numBalances := len(balances)
balanceMap := make(map[PublicKey]*PGBalance, numBalances)
for _, balance := range balances {
balanceMap[*balance.PublicKey] = balance
}

glog.Infof("Trends: Loaded %d balances", numBalances)

var upsertTrends []*PGTrend
for _, profile := range profiles {
balance := balanceMap[*profile.PublicKey]
holdings := holdersMap[*profile.PKID]
holders := holdingsMap[*profile.PKID]
trend := trendsMap[*profile.PKID]
if trend == nil {
trend = &PGTrend{
PKID: profile.PKID.NewPKID(),
BlockHeight: roundHeight,
}
}

holdingNanos := uint64(0)
for _, holding := range holdings {
holdingNanos += CalculateDeSoToReturn(holding.BalanceNanos,
profile.CoinsInCirculationNanos, profile.DESOLockedNanos, bc.params)
}

trend.HoldingNanos = holdingNanos
trend.NumHolders = uint64(len(holders))
trend.LockedNanos = profile.DESOLockedNanos
trend.CoinsInCirculation = profile.CoinsInCirculationNanos
if balance != nil {
trend.BalanceNanos = balance.BalanceNanos
}

upsertTrends = append(upsertTrends, trend)
}

glog.Infof("Trends: Calculated %d trends", len(upsertTrends))

if len(upsertTrends) > 0 {
err := bc.postgres.UpsertTrends(upsertTrends)
if err != nil {
return errors.Wrap(err, "ProcessTrends: Upsert trends failed")
}
}

glog.Info("Processing trends complete")

return nil
}

// ValidateTransaction creates a UtxoView and sees if the transaction can be connected
// to it. If a mempool is provided, this function tries to find dependencies of the
// passed-in transaction in the pool and connect them before trying to connect the
Expand Down
Loading

0 comments on commit f9de04f

Please sign in to comment.