diff --git a/cmd/config.go b/cmd/config.go index 77ce94606..2aebca7bb 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,13 +1,14 @@ package cmd import ( - "github.com/deso-protocol/core/lib" - "github.com/golang/glog" - "github.com/spf13/viper" "net/url" "os" "path/filepath" "strings" + + "github.com/deso-protocol/core/lib" + "github.com/golang/glog" + "github.com/spf13/viper" ) type Config struct { @@ -84,6 +85,9 @@ type Config struct { StateChangeDir string StateSyncerMempoolTxnSyncLimit uint64 + //Amqp Push Destination + AmqpPushDest string + // PoS Checkpoint Syncing CheckpointSyncingProviders []string } @@ -197,6 +201,9 @@ func LoadConfig() *Config { config.StateChangeDir = viper.GetString("state-change-dir") config.StateSyncerMempoolTxnSyncLimit = viper.GetUint64("state-syncer-mempool-txn-sync-limit") + //AMQP Push Destination + config.AmqpPushDest = viper.GetString("amqp-push-dest") + // PoS Checkpoint Syncing config.CheckpointSyncingProviders = GetStringSliceWorkaround("checkpoint-syncing-providers") for _, provider := range config.CheckpointSyncingProviders { diff --git a/cmd/node.go b/cmd/node.go index b1501978b..38787eced 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -4,7 +4,6 @@ import ( "encoding/hex" "flag" "fmt" - "github.com/deso-protocol/go-deadlock" "net" "os" "os/signal" @@ -12,6 +11,8 @@ import ( "syscall" "time" + "github.com/deso-protocol/go-deadlock" + "github.com/DataDog/datadog-go/v5/statsd" "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/wire" @@ -237,6 +238,7 @@ func (node *Node) Start(exitChannels ...*chan struct{}) { node.nodeMessageChan, node.Config.ForceChecksum, node.Config.StateChangeDir, + node.Config.AmqpPushDest, node.Config.HypersyncMaxQueueSize, blsKeystore, node.Config.MempoolBackupIntervalMillis, diff --git a/go.mod b/go.mod index 0b5355f47..936a03c96 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index a21ccd144..a3c1bc655 100644 --- a/go.sum +++ b/go.sum @@ -322,6 +322,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 h1:4+LEVOB87y175cLJC/mbsgKmoDOjrBldtXvioEy96WY= diff --git a/lib/publisher_amqp_push.go b/lib/publisher_amqp_push.go new file mode 100644 index 000000000..629f8448f --- /dev/null +++ b/lib/publisher_amqp_push.go @@ -0,0 +1,128 @@ +package lib + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + amqp "github.com/rabbitmq/amqp091-go" +) + +// persistent connection and mutex +var ( + amqpConn *amqp.Connection + connMutex sync.Mutex +) + +// Use an atomic flag where 0 means disabled and 1 means enabled. +var amqpPublisherEnabled int32 +var amqpPublisherStarted int32 + +// EnablePublisher enables AMQP publishing after the node is fully synced. +func AmqpSetEnablePublisher() { + atomic.StoreInt32(&amqpPublisherEnabled, 1) +} + +func AmpqSetStartPublisher() { + atomic.StoreInt32(&amqpPublisherStarted, 1) +} + +func getAMQPConnection(amqpDest string) (*amqp.Connection, error) { + connMutex.Lock() + defer connMutex.Unlock() + + // Check if connection exists and is open. + if amqpConn != nil && !amqpConn.IsClosed() { + return amqpConn, nil + } + + // Create a new connection. + conn, err := amqp.Dial(amqpDest) + if err != nil { + return nil, err + } + amqpConn = conn + return amqpConn, nil +} + +func PublishBlockEvent(event *BlockEvent, amqpDest string) error { + if amqpDest == "" { + // AMQP integration is not enabled. + return nil + } + if event == nil { + glog.Infoln("event is nil, skipping AMQP publish.") + return nil + } + + if event.Block != nil { + if event.Block.Txns != nil { + //glog.Infoln("BlockEvent %v", len(event.Block.Txns)) + channelName := "block_txns" + push := true + + //setup connection + conn, err := getAMQPConnection(amqpDest) + if err != nil { + glog.Errorf("Failed to get AMQP connection: %v", err) + return err + } + ch, err := conn.Channel() + if err != nil { + glog.Errorf("Failed to open an AMQP channel: %v", err) + return err + } + defer ch.Close() + + for _, txn := range event.Block.Txns { + if txn == nil { + glog.Infoln("BlockEvent Tx is nil") + } else { + //glog.Infoln("BlockEvent Tx %v", txn.Hash().String()) + + txnType := txn.TxnMeta.GetTxnType() + push = true + + //check tx, if it is a block reward or validator related, we skip it + if txnType == TxnTypeBlockReward || txnType == TxnTypeRegisterAsValidator || txnType == TxnTypeUnregisterAsValidator || txnType == TxnTypeUnjailValidator || txnType == TxnTypeUpdateBitcoinUSDExchangeRate || txnType == TxnTypeSwapIdentity || txnType == TxnTypeUnset { + push = false + } + + if push { + + body, err := txn.MarshalJSON() + if err != nil { + glog.Errorf("Failed to marshal tx to json %v", err) + return err + } + + err = ch.Publish( + "", // default exchange + channelName, // routing key (queue name) + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + Timestamp: time.Now(), + MessageId: txn.Hash().String(), + }, + ) + if err != nil { + glog.Errorf("Failed to publish message to AMQP: %v", err) + return err + } + } + } + } + } else { + glog.Infoln("BlockEvent Txns is nil") + } + + } else { + glog.Infoln("BlockEvent block is nil") + } + + return nil +} diff --git a/lib/server.go b/lib/server.go index 78d72a8e3..f5f74d1a4 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" "fmt" - "github.com/deso-protocol/go-deadlock" "net" "reflect" "runtime" @@ -12,6 +11,8 @@ import ( "sync/atomic" "time" + "github.com/deso-protocol/go-deadlock" + "github.com/btcsuite/btcd/wire" "github.com/deso-protocol/core/collections" "github.com/deso-protocol/core/consensus" @@ -166,6 +167,9 @@ type Server struct { timer *Timer stateChangeSyncer *StateChangeSyncer + + stateAmqpPushDest string + // DbMutex protects the badger database from concurrent access when it's being closed & re-opened. // This is necessary because the database is closed & re-opened when the node finishes hypersyncing in order // to change the database options from Default options to Performance options. @@ -426,6 +430,7 @@ func NewServer( _nodeMessageChan chan NodeMessage, _forceChecksum bool, _stateChangeDir string, + _stateAmqpPushDest string, _hypersyncMaxQueueSize uint32, _blsKeystore *BLSKeystore, _mempoolBackupIntervalMillis uint64, @@ -497,6 +502,14 @@ func NewServer( srv.stateChangeSyncer = stateChangeSyncer } + //Enable amqp publisher if push destionation is set. + if _stateAmqpPushDest != "" { + srv.stateAmqpPushDest = _stateAmqpPushDest + //set amqp push enable via atomic flag + AmqpSetEnablePublisher() + + } + // The same timesource is used in the chain data structure and in the connection // manager. It just takes and keeps track of the median time among our peers so // we can keep a consistent clock. @@ -3208,6 +3221,8 @@ func (srv *Server) tryTransitionToFastHotStuffConsensus() { // to the FastHotStuffConsensus. srv.fastHotStuffConsensus.Start() + + srv.handleFullySyncedStateAMQP() } func (srv *Server) _startTransactionRelayer() { @@ -3430,3 +3445,24 @@ func (srv *Server) GetLatestView() uint64 { } return srv.fastHotStuffConsensus.fastHotStuffEventLoop.GetCurrentView() } + +func (srv *Server) handleFullySyncedStateAMQP() { + // Check if AMQP publisher is enabled. + if atomic.LoadInt32(&amqpPublisherEnabled) == 1 { + glog.Infof("AMQP publisher enabled. Node is fully synced.") + // Enable AMQP publisher and set up state change event handler if configured. + if atomic.LoadInt32(&amqpPublisherStarted) == 0 { + glog.Infof("AMQP publisher not started yet.") + if srv.stateAmqpPushDest != "" { + AmpqSetStartPublisher() + amqpDest := srv.stateAmqpPushDest + glog.Infof("AMQP publisher started set.") + //add state change event handler + srv.eventManager.OnBlockAccepted(func(event *BlockEvent) { + go PublishBlockEvent(event, amqpDest) + }) + glog.Infof("AMQP publisher event handling set.") + } + } + } +}