Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send new DeSo transactions over AMQP to a message broker (aka a firehose) #1443

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions cmd/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package cmd

import (
"github.com/deso-protocol/core/lib"
"github.com/golang/glog"
"github.com/spf13/viper"
"net/url"
"os"
"path/filepath"
"strings"

"github.com/deso-protocol/core/lib"
"github.com/golang/glog"
"github.com/spf13/viper"
)

type Config struct {
Expand Down Expand Up @@ -84,6 +85,9 @@ type Config struct {
StateChangeDir string
StateSyncerMempoolTxnSyncLimit uint64

//Amqp Push Destination
AmqpPushDest string

// PoS Checkpoint Syncing
CheckpointSyncingProviders []string
}
Expand Down Expand Up @@ -197,6 +201,9 @@ func LoadConfig() *Config {
config.StateChangeDir = viper.GetString("state-change-dir")
config.StateSyncerMempoolTxnSyncLimit = viper.GetUint64("state-syncer-mempool-txn-sync-limit")

//AMQP Push Destination
config.AmqpPushDest = viper.GetString("amqp-push-dest")

// PoS Checkpoint Syncing
config.CheckpointSyncingProviders = GetStringSliceWorkaround("checkpoint-syncing-providers")
for _, provider := range config.CheckpointSyncingProviders {
Expand Down
4 changes: 3 additions & 1 deletion cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"encoding/hex"
"flag"
"fmt"
"github.com/deso-protocol/go-deadlock"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/deso-protocol/go-deadlock"

"github.com/DataDog/datadog-go/v5/statsd"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/wire"
Expand Down Expand Up @@ -237,6 +238,7 @@ func (node *Node) Start(exitChannels ...*chan struct{}) {
node.nodeMessageChan,
node.Config.ForceChecksum,
node.Config.StateChangeDir,
node.Config.AmqpPushDest,
node.Config.HypersyncMaxQueueSize,
blsKeystore,
node.Config.MempoolBackupIntervalMillis,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 h1:4+LEVOB87y175cLJC/mbsgKmoDOjrBldtXvioEy96WY=
Expand Down
128 changes: 128 additions & 0 deletions lib/publisher_amqp_push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package lib

import (
"sync"
"sync/atomic"
"time"

"github.com/golang/glog"
amqp "github.com/rabbitmq/amqp091-go"
)

// persistent connection and mutex
var (
amqpConn *amqp.Connection
connMutex sync.Mutex
)

// Use an atomic flag where 0 means disabled and 1 means enabled.
var amqpPublisherEnabled int32
var amqpPublisherStarted int32

// EnablePublisher enables AMQP publishing after the node is fully synced.
func AmqpSetEnablePublisher() {
atomic.StoreInt32(&amqpPublisherEnabled, 1)
}

func AmpqSetStartPublisher() {
atomic.StoreInt32(&amqpPublisherStarted, 1)
}

func getAMQPConnection(amqpDest string) (*amqp.Connection, error) {
connMutex.Lock()
defer connMutex.Unlock()

// Check if connection exists and is open.
if amqpConn != nil && !amqpConn.IsClosed() {
return amqpConn, nil
}

// Create a new connection.
conn, err := amqp.Dial(amqpDest)
if err != nil {
return nil, err
}
amqpConn = conn
return amqpConn, nil
}

func PublishBlockEvent(event *BlockEvent, amqpDest string) error {
if amqpDest == "" {
// AMQP integration is not enabled.
return nil
}
if event == nil {
glog.Infoln("event is nil, skipping AMQP publish.")
return nil
}

if event.Block != nil {
if event.Block.Txns != nil {
//glog.Infoln("BlockEvent %v", len(event.Block.Txns))
channelName := "block_txns"
push := true

//setup connection
conn, err := getAMQPConnection(amqpDest)
if err != nil {
glog.Errorf("Failed to get AMQP connection: %v", err)
return err
}
ch, err := conn.Channel()
if err != nil {
glog.Errorf("Failed to open an AMQP channel: %v", err)
return err
}
defer ch.Close()

for _, txn := range event.Block.Txns {
if txn == nil {
glog.Infoln("BlockEvent Tx is nil")
} else {
//glog.Infoln("BlockEvent Tx %v", txn.Hash().String())

txnType := txn.TxnMeta.GetTxnType()
push = true

//check tx, if it is a block reward or validator related, we skip it
if txnType == TxnTypeBlockReward || txnType == TxnTypeRegisterAsValidator || txnType == TxnTypeUnregisterAsValidator || txnType == TxnTypeUnjailValidator || txnType == TxnTypeUpdateBitcoinUSDExchangeRate || txnType == TxnTypeSwapIdentity || txnType == TxnTypeUnset {
push = false
}

if push {

body, err := txn.MarshalJSON()
if err != nil {
glog.Errorf("Failed to marshal tx to json %v", err)
return err
}

err = ch.Publish(
"", // default exchange
channelName, // routing key (queue name)
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
Timestamp: time.Now(),
MessageId: txn.Hash().String(),
},
)
if err != nil {
glog.Errorf("Failed to publish message to AMQP: %v", err)
return err
}
}
}
}
} else {
glog.Infoln("BlockEvent Txns is nil")
}

} else {
glog.Infoln("BlockEvent block is nil")
}

return nil
}
38 changes: 37 additions & 1 deletion lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"bytes"
"encoding/hex"
"fmt"
"github.com/deso-protocol/go-deadlock"
"net"
"reflect"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/deso-protocol/go-deadlock"

"github.com/btcsuite/btcd/wire"
"github.com/deso-protocol/core/collections"
"github.com/deso-protocol/core/consensus"
Expand Down Expand Up @@ -166,6 +167,9 @@ type Server struct {
timer *Timer

stateChangeSyncer *StateChangeSyncer

stateAmqpPushDest string

// DbMutex protects the badger database from concurrent access when it's being closed & re-opened.
// This is necessary because the database is closed & re-opened when the node finishes hypersyncing in order
// to change the database options from Default options to Performance options.
Expand Down Expand Up @@ -426,6 +430,7 @@ func NewServer(
_nodeMessageChan chan NodeMessage,
_forceChecksum bool,
_stateChangeDir string,
_stateAmqpPushDest string,
_hypersyncMaxQueueSize uint32,
_blsKeystore *BLSKeystore,
_mempoolBackupIntervalMillis uint64,
Expand Down Expand Up @@ -497,6 +502,14 @@ func NewServer(
srv.stateChangeSyncer = stateChangeSyncer
}

//Enable amqp publisher if push destionation is set.
if _stateAmqpPushDest != "" {
srv.stateAmqpPushDest = _stateAmqpPushDest
//set amqp push enable via atomic flag
AmqpSetEnablePublisher()

}

// The same timesource is used in the chain data structure and in the connection
// manager. It just takes and keeps track of the median time among our peers so
// we can keep a consistent clock.
Expand Down Expand Up @@ -3208,6 +3221,8 @@ func (srv *Server) tryTransitionToFastHotStuffConsensus() {
// to the FastHotStuffConsensus.

srv.fastHotStuffConsensus.Start()

srv.handleFullySyncedStateAMQP()
}

func (srv *Server) _startTransactionRelayer() {
Expand Down Expand Up @@ -3430,3 +3445,24 @@ func (srv *Server) GetLatestView() uint64 {
}
return srv.fastHotStuffConsensus.fastHotStuffEventLoop.GetCurrentView()
}

func (srv *Server) handleFullySyncedStateAMQP() {
// Check if AMQP publisher is enabled.
if atomic.LoadInt32(&amqpPublisherEnabled) == 1 {
glog.Infof("AMQP publisher enabled. Node is fully synced.")
// Enable AMQP publisher and set up state change event handler if configured.
if atomic.LoadInt32(&amqpPublisherStarted) == 0 {
glog.Infof("AMQP publisher not started yet.")
if srv.stateAmqpPushDest != "" {
AmpqSetStartPublisher()
amqpDest := srv.stateAmqpPushDest
glog.Infof("AMQP publisher started set.")
//add state change event handler
srv.eventManager.OnBlockAccepted(func(event *BlockEvent) {
go PublishBlockEvent(event, amqpDest)
})
glog.Infof("AMQP publisher event handling set.")
}
}
}
}
Loading