From f3d92a1c2d4a9a0a9ba564925affaaf9c6a3ae11 Mon Sep 17 00:00:00 2001 From: mvanhalen Date: Tue, 11 Feb 2025 19:05:07 +0100 Subject: [PATCH 1/6] amqppush update separate logic. --- cmd/config.go | 13 +++-- cmd/node.go | 4 +- lib/server.go | 37 +++++++++++++- lib/state_amqp_push.go | 106 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 5 deletions(-) create mode 100644 lib/state_amqp_push.go diff --git a/cmd/config.go b/cmd/config.go index 77ce94606..2aebca7bb 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,13 +1,14 @@ package cmd import ( - "github.com/deso-protocol/core/lib" - "github.com/golang/glog" - "github.com/spf13/viper" "net/url" "os" "path/filepath" "strings" + + "github.com/deso-protocol/core/lib" + "github.com/golang/glog" + "github.com/spf13/viper" ) type Config struct { @@ -84,6 +85,9 @@ type Config struct { StateChangeDir string StateSyncerMempoolTxnSyncLimit uint64 + //Amqp Push Destination + AmqpPushDest string + // PoS Checkpoint Syncing CheckpointSyncingProviders []string } @@ -197,6 +201,9 @@ func LoadConfig() *Config { config.StateChangeDir = viper.GetString("state-change-dir") config.StateSyncerMempoolTxnSyncLimit = viper.GetUint64("state-syncer-mempool-txn-sync-limit") + //AMQP Push Destination + config.AmqpPushDest = viper.GetString("amqp-push-dest") + // PoS Checkpoint Syncing config.CheckpointSyncingProviders = GetStringSliceWorkaround("checkpoint-syncing-providers") for _, provider := range config.CheckpointSyncingProviders { diff --git a/cmd/node.go b/cmd/node.go index b1501978b..38787eced 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -4,7 +4,6 @@ import ( "encoding/hex" "flag" "fmt" - "github.com/deso-protocol/go-deadlock" "net" "os" "os/signal" @@ -12,6 +11,8 @@ import ( "syscall" "time" + "github.com/deso-protocol/go-deadlock" + "github.com/DataDog/datadog-go/v5/statsd" "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/wire" @@ -237,6 +238,7 @@ func (node *Node) Start(exitChannels ...*chan struct{}) { node.nodeMessageChan, node.Config.ForceChecksum, node.Config.StateChangeDir, + node.Config.AmqpPushDest, node.Config.HypersyncMaxQueueSize, blsKeystore, node.Config.MempoolBackupIntervalMillis, diff --git a/lib/server.go b/lib/server.go index 78d72a8e3..f196ae14e 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" "fmt" - "github.com/deso-protocol/go-deadlock" "net" "reflect" "runtime" @@ -12,6 +11,8 @@ import ( "sync/atomic" "time" + "github.com/deso-protocol/go-deadlock" + "github.com/btcsuite/btcd/wire" "github.com/deso-protocol/core/collections" "github.com/deso-protocol/core/consensus" @@ -166,6 +167,9 @@ type Server struct { timer *Timer stateChangeSyncer *StateChangeSyncer + + stateAmqpPushDest string + // DbMutex protects the badger database from concurrent access when it's being closed & re-opened. // This is necessary because the database is closed & re-opened when the node finishes hypersyncing in order // to change the database options from Default options to Performance options. @@ -426,6 +430,7 @@ func NewServer( _nodeMessageChan chan NodeMessage, _forceChecksum bool, _stateChangeDir string, + _stateAmqpPushDest string, _hypersyncMaxQueueSize uint32, _blsKeystore *BLSKeystore, _mempoolBackupIntervalMillis uint64, @@ -497,6 +502,14 @@ func NewServer( srv.stateChangeSyncer = stateChangeSyncer } + //Enable amqp publisher if push destionation is set. + if _stateAmqpPushDest != "" { + srv.stateAmqpPushDest = _stateAmqpPushDest + //set amqp push enable via atomic flag + AmqpSetEnablePublisher() + + } + // The same timesource is used in the chain data structure and in the connection // manager. It just takes and keeps track of the median time among our peers so // we can keep a consistent clock. @@ -3208,6 +3221,8 @@ func (srv *Server) tryTransitionToFastHotStuffConsensus() { // to the FastHotStuffConsensus. srv.fastHotStuffConsensus.Start() + + srv.handleFullySyncedStateAMQP() } func (srv *Server) _startTransactionRelayer() { @@ -3430,3 +3445,23 @@ func (srv *Server) GetLatestView() uint64 { } return srv.fastHotStuffConsensus.fastHotStuffEventLoop.GetCurrentView() } + +func (srv *Server) handleFullySyncedStateAMQP() { + // Check if AMQP publisher is enabled. + if atomic.LoadInt32(&amqpPublisherEnabled) == 1 { + // Enable AMQP publisher and set up state change event handler if configured. + if atomic.LoadInt32(&amqpPublisherStarted) == 0 { + if srv.stateAmqpPushDest != "" { + + AmpqSetStartPublisher() + amqpDest := srv.stateAmqpPushDest + glog.Infof("AMQP publisher enabled. Node is fully synced.") + //add state change event handler + srv.eventManager.OnStateSyncerOperation(func(event *StateSyncerOperationEvent) { + //Run async + go PublishStateChangeEvent(event.StateChangeEntry, amqpDest) + }) + } + } + } +} diff --git a/lib/state_amqp_push.go b/lib/state_amqp_push.go new file mode 100644 index 000000000..5fe3783e0 --- /dev/null +++ b/lib/state_amqp_push.go @@ -0,0 +1,106 @@ +package lib + +import ( + "encoding/json" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + amqp "github.com/rabbitmq/amqp091-go" +) + +// persistent connection and mutex +var ( + amqpConn *amqp.Connection + connMutex sync.Mutex +) + +// Use an atomic flag where 0 means disabled and 1 means enabled. +var amqpPublisherEnabled int32 +var amqpPublisherStarted int32 + +// EnablePublisher enables AMQP publishing after the node is fully synced. +func AmqpSetEnablePublisher() { + atomic.StoreInt32(&amqpPublisherEnabled, 1) +} + +func AmpqSetStartPublisher() { + atomic.StoreInt32(&amqpPublisherStarted, 1) +} + +func getAMQPConnection(amqpDest string) (*amqp.Connection, error) { + connMutex.Lock() + defer connMutex.Unlock() + + // Check if connection exists and is open. + if amqpConn != nil && !amqpConn.IsClosed() { + return amqpConn, nil + } + + // Create a new connection. + conn, err := amqp.Dial(amqpDest) + if err != nil { + return nil, err + } + amqpConn = conn + return amqpConn, nil +} + +// PublishStateChangeEvent publishes a state change event to an AMQP broker. +// It returns an error if the publish fails. +func PublishStateChangeEvent(event *StateChangeEntry, amqpDest string) error { + + glog.Infoln("AMQP publish event") + if amqpDest == "" { + // AMQP integration is not enabled. + return nil + } + + if event == nil { + glog.Infoln("StateChangeEntry is nil, skipping AMQP publish.") + + return nil + } + + conn, err := getAMQPConnection(amqpDest) + if err != nil { + glog.Infoln("Failed to get AMQP connection: %v", err) + + return err + } + + ch, err := conn.Channel() + if err != nil { + glog.Infoln("Failed to open an AMQP channel: %v", err) + return err + } + defer ch.Close() + + // Check if we want to have another format if performance requires it. + // Marshal the state change event into JSON. + body, err := json.Marshal(event) + if err != nil { + glog.Infoln("Failed to marshal event to JSON: %v", err) + return err + } + + // Publish the message. Here, we’re using the default exchange and assuming the queue name is "state_changes". + err = ch.Publish( + "", // default exchange + "state_changes", // routing key (queue name) + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + Timestamp: time.Now(), + }, + ) + if err != nil { + glog.Infoln("Failed to publish message to AMQP: %v", err) + + return err + } + return nil +} From 8fb42438ea87f1bcd91c20ee4b4c298ce6972e2f Mon Sep 17 00:00:00 2001 From: mvanhalen Date: Tue, 11 Feb 2025 20:56:55 +0100 Subject: [PATCH 2/6] Sync to Rabbit MQ works. Todo data encoding --- lib/server.go | 6 ++++-- lib/state_amqp_push.go | 14 +++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/server.go b/lib/server.go index f196ae14e..dc91641ad 100644 --- a/lib/server.go +++ b/lib/server.go @@ -3449,18 +3449,20 @@ func (srv *Server) GetLatestView() uint64 { func (srv *Server) handleFullySyncedStateAMQP() { // Check if AMQP publisher is enabled. if atomic.LoadInt32(&amqpPublisherEnabled) == 1 { + glog.Infof("AMQP publisher enabled. Node is fully synced.") // Enable AMQP publisher and set up state change event handler if configured. if atomic.LoadInt32(&amqpPublisherStarted) == 0 { + glog.Infof("AMQP publisher not started yet.") if srv.stateAmqpPushDest != "" { - AmpqSetStartPublisher() amqpDest := srv.stateAmqpPushDest - glog.Infof("AMQP publisher enabled. Node is fully synced.") + glog.Infof("AMQP publisher started set.") //add state change event handler srv.eventManager.OnStateSyncerOperation(func(event *StateSyncerOperationEvent) { //Run async go PublishStateChangeEvent(event.StateChangeEntry, amqpDest) }) + glog.Infof("AMQP publisher event handling set.") } } } diff --git a/lib/state_amqp_push.go b/lib/state_amqp_push.go index 5fe3783e0..1fd554093 100644 --- a/lib/state_amqp_push.go +++ b/lib/state_amqp_push.go @@ -51,7 +51,7 @@ func getAMQPConnection(amqpDest string) (*amqp.Connection, error) { // It returns an error if the publish fails. func PublishStateChangeEvent(event *StateChangeEntry, amqpDest string) error { - glog.Infoln("AMQP publish event") + //glog.Infoln("AMQP publish event") if amqpDest == "" { // AMQP integration is not enabled. return nil @@ -70,6 +70,18 @@ func PublishStateChangeEvent(event *StateChangeEntry, amqpDest string) error { return err } + // --- Integrated encoder logic --- + if isEncoder, encoder := StateKeyToDeSoEncoder(event.KeyBytes); isEncoder && encoder != nil { + // Blocks are serialized in Badger as MsgDesoBlock. + // Convert these bytes to the appropriate format by appending metadata. + if encoder.GetEncoderType() == EncoderTypeBlock { + event.EncoderBytes = AddEncoderMetadataToMsgDeSoBlockBytes(event.EncoderBytes, event.BlockHeight) + } + if encoder.GetEncoderType() == EncoderTypeBlockNode { + event.EncoderBytes = AddEncoderMetadataToBlockNodeBytes(event.EncoderBytes, event.BlockHeight) + } + } + ch, err := conn.Channel() if err != nil { glog.Infoln("Failed to open an AMQP channel: %v", err) From 9cb77f0dbee6186c3329d770d046d4ec96e3e58f Mon Sep 17 00:00:00 2001 From: mvanhalen Date: Thu, 13 Feb 2025 13:37:41 +0100 Subject: [PATCH 3/6] using BlockEvents --- lib/server.go | 9 +- lib/state_amqp_push.go | 266 +++++++++++++++++--- lib/state_json_adapters.go | 498 +++++++++++++++++++++++++++++++++++++ 3 files changed, 737 insertions(+), 36 deletions(-) create mode 100644 lib/state_json_adapters.go diff --git a/lib/server.go b/lib/server.go index dc91641ad..93449af27 100644 --- a/lib/server.go +++ b/lib/server.go @@ -3458,10 +3458,13 @@ func (srv *Server) handleFullySyncedStateAMQP() { amqpDest := srv.stateAmqpPushDest glog.Infof("AMQP publisher started set.") //add state change event handler - srv.eventManager.OnStateSyncerOperation(func(event *StateSyncerOperationEvent) { - //Run async - go PublishStateChangeEvent(event.StateChangeEntry, amqpDest) + srv.eventManager.OnBlockAccepted(func(event *BlockEvent) { + go PublishBlockEvent(event, amqpDest) }) + // srv.eventManager.OnStateSyncerOperation(func(event *StateSyncerOperationEvent) { + // //Run async + // go PublishStateChangeEvent(event.StateChangeEntry, amqpDest) + // }) glog.Infof("AMQP publisher event handling set.") } } diff --git a/lib/state_amqp_push.go b/lib/state_amqp_push.go index 1fd554093..5cbcf4c98 100644 --- a/lib/state_amqp_push.go +++ b/lib/state_amqp_push.go @@ -1,6 +1,7 @@ package lib import ( + "bytes" "encoding/json" "sync" "sync/atomic" @@ -47,62 +48,261 @@ func getAMQPConnection(amqpDest string) (*amqp.Connection, error) { return amqpConn, nil } -// PublishStateChangeEvent publishes a state change event to an AMQP broker. -// It returns an error if the publish fails. -func PublishStateChangeEvent(event *StateChangeEntry, amqpDest string) error { - - //glog.Infoln("AMQP publish event") +func PublishBlockEvent(event *BlockEvent, amqpDest string) error { if amqpDest == "" { // AMQP integration is not enabled. return nil } - if event == nil { + glog.Infoln("event is nil, skipping AMQP publish.") + return nil + } + if event.Block != nil { + if event.Block.Txns != nil { + glog.Infoln("BlockEvent %v", len(event.Block.Txns)) + channelName := "block_txns" + push := true + //setup node + conn, err := getAMQPConnection(amqpDest) + if err != nil { + glog.Errorf("Failed to get AMQP connection: %v", err) + return err + } + ch, err := conn.Channel() + if err != nil { + glog.Errorf("Failed to open an AMQP channel: %v", err) + return err + } + defer ch.Close() + + for _, txn := range event.Block.Txns { + if txn == nil { + glog.Infoln("BlockEvent Tx is nil") + } else { + glog.Infoln("BlockEvent Tx %v", txn.Hash().String()) + + txnType := txn.TxnMeta.GetTxnType() + push = true + + //check tx, if it is a block reward or validator related, we skip it + if txnType == TxnTypeBlockReward || txnType == TxnTypeRegisterAsValidator || txnType == TxnTypeUnregisterAsValidator || txnType == TxnTypeUnjailValidator || txnType == TxnTypeUpdateBitcoinUSDExchangeRate || txnType == TxnTypeSwapIdentity || txnType == TxnTypeUnset { + push = false + } + + if push { + + err = ch.Publish( + "", // default exchange + channelName, // routing key (queue name) + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: txn.MarshalJSON(), + Timestamp: time.Now(), + }, + ) + if err != nil { + glog.Errorf("Failed to publish message to AMQP: %v", err) + return err + } + } + } + } + } else { + glog.Infoln("BlockEvent Txns is nil") + } + + } else { + glog.Infoln("BlockEvent block is nil") + } + + return nil +} + +// PublishStateChangeEvent publishes a state change event to AMQP. +// It uses the appropriate adapter based on the underlying encoder type. +func PublishStateChangeEvent(stateChangeEntry *StateChangeEntry, amqpDest string) error { + if amqpDest == "" { + // AMQP integration is not enabled. + return nil + } + if stateChangeEntry == nil { glog.Infoln("StateChangeEntry is nil, skipping AMQP publish.") + return nil + } + // Check to see if the index in question has a "core_state" annotation in its definition. + if !isCoreStateKey(stateChangeEntry.KeyBytes) { return nil } - conn, err := getAMQPConnection(amqpDest) - if err != nil { - glog.Infoln("Failed to get AMQP connection: %v", err) + channelName := "state_changes" + push := true - return err + // Check if the event’s key bytes indicate that the stored value + // is encoded using one of our DeSoEncoder types. + var encoderType EncoderType + + if isEncoder, encoder := StateKeyToDeSoEncoder(stateChangeEntry.KeyBytes); isEncoder && encoder != nil { + + encoderType = encoder.GetEncoderType() + //types we are not interested in + if encoderType == EncoderTypeValidatorEntry || encoderType == EncoderTypeStakeRewardStateChangeMetadata || encoderType == EncoderTypeStakeEntry { + return nil + } + // (For blocks, we may need to add extra metadata.) + if encoderType == EncoderTypeBlock { + stateChangeEntry.EncoderBytes = AddEncoderMetadataToMsgDeSoBlockBytes(stateChangeEntry.EncoderBytes, stateChangeEntry.BlockHeight) + } + if encoderType == EncoderTypeBlockNode { + stateChangeEntry.EncoderBytes = AddEncoderMetadataToBlockNodeBytes(stateChangeEntry.EncoderBytes, stateChangeEntry.BlockHeight) + } + + glog.Infof("State encoder event for %d", encoderType) + } else { + keyEncoder, err := DecodeStateKey(stateChangeEntry.KeyBytes, stateChangeEntry.EncoderBytes) + if err != nil { + glog.Infof("PublishStateChangeEvent: Error decoding state key: %v", err) + // Instead of panicking, skip this event. + return nil + } + if keyEncoder == nil { + glog.Infof("PublishStateChangeEvent: No key encoder found, skipping event") + return nil + } + encoderType = keyEncoder.GetEncoderType() + glog.Infof("State event for %d", encoderType) + stateChangeEntry.Encoder = keyEncoder + stateChangeEntry.EncoderBytes = nil } + stateChangeEntry.EncoderType = encoderType + stateChangeEntry.EncoderBytes = EncodeToBytes(stateChangeEntry.BlockHeight, stateChangeEntry, false) + var body []byte + var err error + + // Here we try to decode the EncoderBytes into a known type and then use its adapter. + if push { + + switch encoderType { + // ----- ProfileEntry Example ----- + case EncoderTypeProfileEntry: + var profile ProfileEntry + r := bytes.NewReader(stateChangeEntry.EncoderBytes) + if err = profile.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { + glog.Infoln("failed to decode PostEntry: %v", err) + } + // Call the adapter. + profileJSON := profile.ToJSON() + body, err = json.Marshal(profileJSON) + if err != nil { + glog.Infoln("failed to marshal ProfileEntry JSON: %v", err) + } else { + glog.Infof("ProfileEntry endcoded") + } + + // ----- PostEntry Example ----- + case EncoderTypePostEntry: + var post PostEntry + r := bytes.NewReader(stateChangeEntry.EncoderBytes) + if err = post.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { + glog.Infoln("failed to decode PostEntry: %v", err) + } + // Call the adapter. + postJSON := post.ToJSON() + body, err = json.Marshal(postJSON) + if err != nil { + glog.Infoln("failed to marshal PostEntry JSON: %v", err) + } else { + glog.Infof("PostEntry endcoded") + } - // --- Integrated encoder logic --- - if isEncoder, encoder := StateKeyToDeSoEncoder(event.KeyBytes); isEncoder && encoder != nil { - // Blocks are serialized in Badger as MsgDesoBlock. - // Convert these bytes to the appropriate format by appending metadata. - if encoder.GetEncoderType() == EncoderTypeBlock { - event.EncoderBytes = AddEncoderMetadataToMsgDeSoBlockBytes(event.EncoderBytes, event.BlockHeight) + // ----- NFTEntry Example ----- + case EncoderTypeNFTEntry: + var nft NFTEntry + r := bytes.NewReader(stateChangeEntry.EncoderBytes) + if err = nft.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { + glog.Infoln("failed to decode NFTEntry: %v", err) + } + nftJSON := nft.ToJSON() + body, err = json.Marshal(nftJSON) + if err != nil { + glog.Infoln("failed to marshal NFTEntry JSON: %v", err) + } + + // ----- DAO Coin Limit Order Example ----- + // case EncoderTypeDAOCoinLimitOrderEntry: + // var daoOrder DAOCoinLimitOrderEntry + // r := bytes.NewReader(event.EncoderBytes) + // if err = daoOrder.RawDecodeWithoutMetadata(event.BlockHeight, r); err != nil { + // glog.Infoln("failed to decode DAOCoinLimitOrderEntry: %v", err) + // } + // daoJSON := daoOrder.ToJSON() + // body, err = json.Marshal(daoJSON) + // if err != nil { + // glog.Infoln("failed to marshal DAOCoinLimitOrderEntry JSON: %v", err) + // } + + // ----- DESO CoinEntry Example ----- + case EncoderTypeCoinEntry: + var coinEntry CoinEntry + r := bytes.NewReader(stateChangeEntry.EncoderBytes) + if err = coinEntry.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { + glog.Infoln("failed to decode CoinEntry: %v", err) + } + coinJSON := coinEntry.ToJSON() + body, err = json.Marshal(coinJSON) + if err != nil { + glog.Infoln("failed to marshal CoinEntry JSON: %v", err) + } + + // ----- DeSoBalanceEntry Example ----- + case EncoderTypeDeSoBalanceEntry: + var balance DeSoBalanceEntry + r := bytes.NewReader(stateChangeEntry.EncoderBytes) + if err = balance.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { + glog.Infoln("failed to decode DeSoBalanceEntry: %v", err) + } + balanceJSON := balance.ToJSON() + body, err = json.Marshal(balanceJSON) + if err != nil { + glog.Infoln("failed to marshal DeSoBalanceEntry JSON: %v", err) + } + + // ----- Fallback: Use raw bytes (or add more cases as needed) ----- + default: + glog.Warningf("No adapter for encoder type %v; falling back to raw bytes", encoderType) + body, err = json.Marshal(stateChangeEntry.EncoderBytes) + if err != nil { + glog.Infoln("failed to marshal raw event bytes: %v", err) + } } - if encoder.GetEncoderType() == EncoderTypeBlockNode { - event.EncoderBytes = AddEncoderMetadataToBlockNodeBytes(event.EncoderBytes, event.BlockHeight) + } else { + // If no encoder was detected, just marshal the raw EncoderBytes. + body, err = json.Marshal(stateChangeEntry.EncoderBytes) + if err != nil { + glog.Infoln("failed to marshal raw event bytes: %v", err) } } - ch, err := conn.Channel() + // Now publish the JSON-encoded event. + conn, err := getAMQPConnection(amqpDest) if err != nil { - glog.Infoln("Failed to open an AMQP channel: %v", err) + glog.Errorf("Failed to get AMQP connection: %v", err) return err } - defer ch.Close() - - // Check if we want to have another format if performance requires it. - // Marshal the state change event into JSON. - body, err := json.Marshal(event) + ch, err := conn.Channel() if err != nil { - glog.Infoln("Failed to marshal event to JSON: %v", err) + glog.Errorf("Failed to open an AMQP channel: %v", err) return err } + defer ch.Close() - // Publish the message. Here, we’re using the default exchange and assuming the queue name is "state_changes". err = ch.Publish( - "", // default exchange - "state_changes", // routing key (queue name) - false, // mandatory - false, // immediate + "", // default exchange + channelName, // routing key (queue name) + false, // mandatory + false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, @@ -110,9 +310,9 @@ func PublishStateChangeEvent(event *StateChangeEntry, amqpDest string) error { }, ) if err != nil { - glog.Infoln("Failed to publish message to AMQP: %v", err) - + glog.Errorf("Failed to publish message to AMQP: %v", err) return err } + return nil } diff --git a/lib/state_json_adapters.go b/lib/state_json_adapters.go new file mode 100644 index 000000000..69bec74c4 --- /dev/null +++ b/lib/state_json_adapters.go @@ -0,0 +1,498 @@ +// json_adapters_extended.go +package lib + +import ( + "encoding/hex" + "sort" +) + +// -------------------------------------------------------------------- +// UtxoEntry adapter +// -------------------------------------------------------------------- + +// UtxoEntryJSON is a JSON-friendly view of UtxoEntry. +type UtxoEntryJSON struct { + AmountNanos uint64 `json:"amount_nanos"` + PublicKey string `json:"public_key"` + BlockHeight uint32 `json:"block_height"` + UtxoType string `json:"utxo_type"` +} + +// ToJSON converts a UtxoEntry to its JSON view. +func (ue *UtxoEntry) ToJSON() UtxoEntryJSON { + return UtxoEntryJSON{ + AmountNanos: ue.AmountNanos, + // Here we encode the public key as hex. (You might choose base58 or another encoding.) + PublicKey: hex.EncodeToString(ue.PublicKey), + BlockHeight: ue.BlockHeight, + UtxoType: ue.UtxoType.String(), + } +} + +// ===================================================================== +// ProfileEntry Adapter +// ===================================================================== + +// ProfileEntryJSON is a JSON‐friendly view of a ProfileEntry. +type ProfileEntryJSON struct { + PublicKey string `json:"public_key"` + Username string `json:"username"` + Description string `json:"description"` + ProfilePic string `json:"profile_pic"` + IsHidden bool `json:"is_hidden"` + CreatorCoinEntry *CoinEntryJSON `json:"creator_coin_entry"` + DAOCoinEntry *CoinEntryJSON `json:"dao_coin_entry"` + ExtraData map[string]string `json:"extra_data"` +} + +// ToJSON converts a ProfileEntry to its JSON view. +func (pe *ProfileEntry) ToJSON() ProfileEntryJSON { + // Convert extra data values (which are []byte) to hex strings. + extra := make(map[string]string) + keys := make([]string, 0, len(pe.ExtraData)) + for k := range pe.ExtraData { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + extra[k] = hex.EncodeToString(pe.ExtraData[k]) + } + + return ProfileEntryJSON{ + // Here we encode the public key as hex. You might choose to use a different encoding if desired. + PublicKey: hex.EncodeToString(pe.PublicKey), + Username: string(pe.Username), // assumes UTF-8 encoding + Description: string(pe.Description), // assumes UTF-8 encoding + //ProfilePic: string(pe.ProfilePic), // could be a data URL + IsHidden: pe.IsHidden, + // CreatorCoinEntry: pe.CreatorCoinEntry.ToJSON(), + // DAOCoinEntry: pe.DAOCoinEntry.ToJSON(), + ExtraData: extra, + } +} + +// ===================================================================== +// NFTEntry Adapter +// ===================================================================== + +// NFTEntryJSON is a JSON-friendly view of an NFTEntry. +type NFTEntryJSON struct { + LastOwnerPKID string `json:"last_owner_pkid"` + OwnerPKID string `json:"owner_pkid"` + NFTPostHash string `json:"nft_post_hash"` + SerialNumber uint64 `json:"serial_number"` + IsForSale bool `json:"is_for_sale"` + MinBidAmountNanos uint64 `json:"min_bid_amount_nanos"` + UnlockableText string `json:"unlockable_text"` + LastAcceptedBidAmountNanos uint64 `json:"last_accepted_bid_amount_nanos"` + IsPending bool `json:"is_pending"` + IsBuyNow bool `json:"is_buy_now"` + BuyNowPriceNanos uint64 `json:"buy_now_price_nanos"` + ExtraData map[string]string `json:"extra_data"` +} + +// ToJSON converts an NFTEntry to its JSON view. +func (nft *NFTEntry) ToJSON() NFTEntryJSON { + extra := make(map[string]string) + // For each extra data key, encode the value as hex. + for k, v := range nft.ExtraData { + extra[k] = hex.EncodeToString(v) + } + var postHashStr string + if nft.NFTPostHash != nil { + postHashStr = hex.EncodeToString((*nft.NFTPostHash)[:]) + } else { + postHashStr = "" + } + + var lastOwnerPKID string + if nft.LastOwnerPKID != nil { + lastOwnerPKID = nft.LastOwnerPKID.ToString() + } + var ownerPKID string + if nft.OwnerPKID != nil { + ownerPKID = nft.OwnerPKID.ToString() + } + + return NFTEntryJSON{ + LastOwnerPKID: lastOwnerPKID, // Assumes PKID has a String() method. + OwnerPKID: ownerPKID, + NFTPostHash: postHashStr, + SerialNumber: nft.SerialNumber, + IsForSale: nft.IsForSale, + MinBidAmountNanos: nft.MinBidAmountNanos, + UnlockableText: string(nft.UnlockableText), + LastAcceptedBidAmountNanos: nft.LastAcceptedBidAmountNanos, + IsPending: nft.IsPending, + IsBuyNow: nft.IsBuyNow, + BuyNowPriceNanos: nft.BuyNowPriceNanos, + ExtraData: extra, + } +} + +// ===================================================================== +// NFTBidEntry Adapter +// ===================================================================== + +// NFTBidEntryJSON is a JSON-friendly view of an NFTBidEntry. +type NFTBidEntryJSON struct { + BidderPKID string `json:"bidder_pkid"` + NFTPostHash string `json:"nft_post_hash"` + SerialNumber uint64 `json:"serial_number"` + BidAmountNanos uint64 `json:"bid_amount_nanos"` + AcceptedBlockHeight *uint32 `json:"accepted_block_height,omitempty"` +} + +// ToJSON converts an NFTBidEntry to its JSON view. +func (bid *NFTBidEntry) ToJSON() NFTBidEntryJSON { + + var bidderPKID string + if bid.BidderPKID != nil { + bidderPKID = bid.BidderPKID.ToString() + } + var nftPostHash string + if bid.NFTPostHash != nil { + nftPostHash = hex.EncodeToString((*bid.NFTPostHash)[:]) + } + + return NFTBidEntryJSON{ + BidderPKID: bidderPKID, + NFTPostHash: nftPostHash, + SerialNumber: bid.SerialNumber, + BidAmountNanos: bid.BidAmountNanos, + AcceptedBlockHeight: bid.AcceptedBlockHeight, + } +} + +// ===================================================================== +// PostEntry Adapter +// ===================================================================== + +// PostEntryJSON is a JSON-friendly view of a PostEntry. +type PostEntryJSON struct { + PostHash string `json:"post_hash"` + PosterPublicKey string `json:"poster_public_key"` + ParentStakeID string `json:"parent_stake_id,omitempty"` + Body string `json:"body"` + RepostedPostHash string `json:"reposted_post_hash,omitempty"` + IsQuotedRepost bool `json:"is_quoted_repost"` + CreatorBasisPoints uint64 `json:"creator_basis_points"` + StakeMultipleBasisPoints uint64 `json:"stake_multiple_basis_points"` + ConfirmationBlockHeight uint32 `json:"confirmation_block_height"` + TimestampNanos uint64 `json:"timestamp_nanos"` + IsHidden bool `json:"is_hidden"` + LikeCount uint64 `json:"like_count"` + RepostCount uint64 `json:"repost_count"` + QuoteRepostCount uint64 `json:"quote_repost_count"` + DiamondCount uint64 `json:"diamond_count"` + CommentCount uint64 `json:"comment_count"` + IsPinned bool `json:"is_pinned"` + IsNFT bool `json:"is_nft"` + NumNFTCopies uint64 `json:"num_nft_copies"` + NumNFTCopiesForSale uint64 `json:"num_nft_copies_for_sale"` + NumNFTCopiesBurned uint64 `json:"num_nft_copies_burned"` + HasUnlockable bool `json:"has_unlockable"` + NFTRoyaltyToCreatorBasisPoints uint64 `json:"nft_royalty_to_creator_basis_points"` + NFTRoyaltyToCoinBasisPoints uint64 `json:"nft_royalty_to_coin_basis_points"` + AdditionalNFTRoyaltiesToCreators map[string]uint64 `json:"additional_nft_royalties_to_creators_basis_points"` + AdditionalNFTRoyaltiesToCoins map[string]uint64 `json:"additional_nft_royalties_to_coins_basis_points"` + PostExtraData map[string]string `json:"post_extra_data"` + IsFrozen bool `json:"is_frozen"` +} + +// ToJSON converts a PostEntry to its JSON view. +func (pe *PostEntry) ToJSON() PostEntryJSON { + extra := make(map[string]string) + // Sort keys to have deterministic JSON. + keys := make([]string, 0, len(pe.PostExtraData)) + for k := range pe.PostExtraData { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + extra[k] = hex.EncodeToString(pe.PostExtraData[k]) + } + + creators := make(map[string]uint64) + for pkid, val := range pe.AdditionalNFTRoyaltiesToCreatorsBasisPoints { + creators[pkid.ToString()] = val + } + coins := make(map[string]uint64) + for pkid, val := range pe.AdditionalNFTRoyaltiesToCoinsBasisPoints { + coins[pkid.ToString()] = val + } + var repostHash string + if pe.RepostedPostHash != nil { + repostHash = hex.EncodeToString((*pe.RepostedPostHash)[:]) + } + var parentStake string + if len(pe.ParentStakeID) > 0 { + parentStake = hex.EncodeToString(pe.ParentStakeID) + } + var postHashStr string + if pe.PostHash != nil { + postHashStr = hex.EncodeToString((*pe.PostHash)[:]) + } else { + postHashStr = "" + } + return PostEntryJSON{ + PostHash: postHashStr, + PosterPublicKey: hex.EncodeToString(pe.PosterPublicKey), + ParentStakeID: parentStake, + Body: string(pe.Body), + RepostedPostHash: repostHash, + IsQuotedRepost: pe.IsQuotedRepost, + CreatorBasisPoints: pe.CreatorBasisPoints, + StakeMultipleBasisPoints: pe.StakeMultipleBasisPoints, + ConfirmationBlockHeight: pe.ConfirmationBlockHeight, + TimestampNanos: pe.TimestampNanos, + IsHidden: pe.IsHidden, + LikeCount: pe.LikeCount, + RepostCount: pe.RepostCount, + QuoteRepostCount: pe.QuoteRepostCount, + DiamondCount: pe.DiamondCount, + CommentCount: pe.CommentCount, + IsPinned: pe.IsPinned, + IsNFT: pe.IsNFT, + NumNFTCopies: pe.NumNFTCopies, + NumNFTCopiesForSale: pe.NumNFTCopiesForSale, + NumNFTCopiesBurned: pe.NumNFTCopiesBurned, + HasUnlockable: pe.HasUnlockable, + NFTRoyaltyToCreatorBasisPoints: pe.NFTRoyaltyToCreatorBasisPoints, + NFTRoyaltyToCoinBasisPoints: pe.NFTRoyaltyToCoinBasisPoints, + AdditionalNFTRoyaltiesToCreators: creators, + AdditionalNFTRoyaltiesToCoins: coins, + PostExtraData: extra, + IsFrozen: pe.IsFrozen, + } +} + +// ===================================================================== +// DAO Operations Adapter: DAOCoinLimitOrderEntry +// ===================================================================== + +// DAOCoinLimitOrderEntryJSON is a JSON view of a DAO coin limit order. +type DAOCoinLimitOrderEntryJSON struct { + OrderID string `json:"order_id"` + TransactorPKID string `json:"transactor_pkid"` + BuyingDAOCoinCreatorPKID string `json:"buying_dao_coin_creator_pkid"` + SellingDAOCoinCreatorPKID string `json:"selling_dao_coin_creator_pkid"` + ScaledExchangeRateCoinsToSellPerCoin string `json:"scaled_exchange_rate_coins_to_sell_per_coin_to_buy"` + QuantityToFillInBaseUnits string `json:"quantity_to_fill_in_base_units"` + OperationType string `json:"operation_type"` + FillType string `json:"fill_type"` + BlockHeight uint32 `json:"block_height"` +} + +// ToJSON converts a DAOCoinLimitOrderEntry to its JSON view. +func (order *DAOCoinLimitOrderEntry) ToJSON() DAOCoinLimitOrderEntryJSON { + opType := "" + switch order.OperationType { + case DAOCoinLimitOrderOperationTypeASK: + opType = "ASK" + case DAOCoinLimitOrderOperationTypeBID: + opType = "BID" + default: + opType = "UNKNOWN" + } + fillType := "" + switch order.FillType { + case DAOCoinLimitOrderFillTypeGoodTillCancelled: + fillType = "GoodTillCancelled" + case DAOCoinLimitOrderFillTypeImmediateOrCancel: + fillType = "ImmediateOrCancel" + case DAOCoinLimitOrderFillTypeFillOrKill: + fillType = "FillOrKill" + default: + fillType = "UNKNOWN" + } + var scaledRate, quantity string + if order.ScaledExchangeRateCoinsToSellPerCoinToBuy != nil { + scaledRate = order.ScaledExchangeRateCoinsToSellPerCoinToBuy.String() + } + if order.QuantityToFillInBaseUnits != nil { + quantity = order.QuantityToFillInBaseUnits.String() + } + orderIDStr := "" + if order.OrderID != nil { + orderIDStr = order.OrderID.String() + } + + transactorPKID := "" + if order.TransactorPKID != nil { + transactorPKID = order.TransactorPKID.ToString() + } + sellingDAOCoinCreatorPKID := "" + if order.SellingDAOCoinCreatorPKID != nil { + sellingDAOCoinCreatorPKID = order.SellingDAOCoinCreatorPKID.ToString() + } + buyingDAOCoinCreatorPKID := "" + if order.BuyingDAOCoinCreatorPKID != nil { + buyingDAOCoinCreatorPKID = order.BuyingDAOCoinCreatorPKID.ToString() + } + return DAOCoinLimitOrderEntryJSON{ + OrderID: orderIDStr, + TransactorPKID: transactorPKID, + BuyingDAOCoinCreatorPKID: buyingDAOCoinCreatorPKID, + SellingDAOCoinCreatorPKID: sellingDAOCoinCreatorPKID, + ScaledExchangeRateCoinsToSellPerCoin: scaledRate, + QuantityToFillInBaseUnits: quantity, + OperationType: opType, + FillType: fillType, + BlockHeight: order.BlockHeight, + } +} + +// ===================================================================== +// DAO Operations Adapter: FilledDAOCoinLimitOrder +// ===================================================================== + +// FilledDAOCoinLimitOrderJSON is a JSON view of a filled DAO coin limit order. +type FilledDAOCoinLimitOrderJSON struct { + OrderID string `json:"order_id"` + TransactorPKID string `json:"transactor_pkid"` + BuyingDAOCoinCreatorPKID string `json:"buying_dao_coin_creator_pkid"` + SellingDAOCoinCreatorPKID string `json:"selling_dao_coin_creator_pkid"` + CoinQuantityInBaseUnitsBought string `json:"coin_quantity_in_base_units_bought"` + CoinQuantityInBaseUnitsSold string `json:"coin_quantity_in_base_units_sold"` + IsFulfilled bool `json:"is_fulfilled"` +} + +// ToJSON converts a FilledDAOCoinLimitOrder to its JSON view. +func (filledOrder *FilledDAOCoinLimitOrder) ToJSON() FilledDAOCoinLimitOrderJSON { + var qtyBought, qtySold string + if filledOrder.CoinQuantityInBaseUnitsBought != nil { + qtyBought = filledOrder.CoinQuantityInBaseUnitsBought.String() + } + if filledOrder.CoinQuantityInBaseUnitsSold != nil { + qtySold = filledOrder.CoinQuantityInBaseUnitsSold.String() + } + orderIDStr := "" + if filledOrder.OrderID != nil { + orderIDStr = filledOrder.OrderID.String() + } + transactorPKID := "" + if filledOrder.TransactorPKID != nil { + transactorPKID = filledOrder.TransactorPKID.ToString() + } + sellingDAOCoinCreatorPKID := "" + if filledOrder.SellingDAOCoinCreatorPKID != nil { + sellingDAOCoinCreatorPKID = filledOrder.SellingDAOCoinCreatorPKID.ToString() + } + buyingDAOCoinCreatorPKID := "" + if filledOrder.BuyingDAOCoinCreatorPKID != nil { + buyingDAOCoinCreatorPKID = filledOrder.BuyingDAOCoinCreatorPKID.ToString() + } + return FilledDAOCoinLimitOrderJSON{ + OrderID: orderIDStr, + TransactorPKID: transactorPKID, + BuyingDAOCoinCreatorPKID: buyingDAOCoinCreatorPKID, + SellingDAOCoinCreatorPKID: sellingDAOCoinCreatorPKID, + CoinQuantityInBaseUnitsBought: qtyBought, + CoinQuantityInBaseUnitsSold: qtySold, + IsFulfilled: filledOrder.IsFulfilled, + } +} + +// ===================================================================== +// DESO Operations Adapter: CoinEntry +// ===================================================================== + +// CoinEntryJSON is a JSON-friendly view of a CoinEntry. +type CoinEntryJSON struct { + CreatorBasisPoints uint64 `json:"creator_basis_points"` + DeSoLockedNanos uint64 `json:"deso_locked_nanos"` + NumberOfHolders uint64 `json:"number_of_holders"` + CoinsInCirculationNanos string `json:"coins_in_circulation_nanos"` + CoinWatermarkNanos uint64 `json:"coin_watermark_nanos"` + MintingDisabled bool `json:"minting_disabled"` + TransferRestrictionStatus string `json:"transfer_restriction_status"` + LockupTransferRestrictionStatus string `json:"lockup_transfer_restriction_status,omitempty"` +} + +// ToJSON converts a CoinEntry to its JSON view. +func (ce *CoinEntry) ToJSON() CoinEntryJSON { + var transStatus string + switch ce.TransferRestrictionStatus { + case TransferRestrictionStatusUnrestricted: + transStatus = "Unrestricted" + case TransferRestrictionStatusProfileOwnerOnly: + transStatus = "ProfileOwnerOnly" + case TransferRestrictionStatusDAOMembersOnly: + transStatus = "DAOMembersOnly" + case TransferRestrictionStatusPermanentlyUnrestricted: + transStatus = "PermanentlyUnrestricted" + default: + transStatus = "Unknown" + } + var lockupStatus string + switch ce.LockupTransferRestrictionStatus { + case TransferRestrictionStatusUnrestricted: + lockupStatus = "Unrestricted" + case TransferRestrictionStatusProfileOwnerOnly: + lockupStatus = "ProfileOwnerOnly" + case TransferRestrictionStatusDAOMembersOnly: + lockupStatus = "DAOMembersOnly" + case TransferRestrictionStatusPermanentlyUnrestricted: + lockupStatus = "PermanentlyUnrestricted" + default: + lockupStatus = "" + } + return CoinEntryJSON{ + CreatorBasisPoints: ce.CreatorBasisPoints, + DeSoLockedNanos: ce.DeSoLockedNanos, + NumberOfHolders: ce.NumberOfHolders, + CoinsInCirculationNanos: ce.CoinsInCirculationNanos.String(), + CoinWatermarkNanos: ce.CoinWatermarkNanos, + MintingDisabled: ce.MintingDisabled, + TransferRestrictionStatus: transStatus, + LockupTransferRestrictionStatus: lockupStatus, + } +} + +// ===================================================================== +// DESO Operations Adapter: DeSoBalanceEntry +// ===================================================================== + +// DeSoBalanceEntryJSON is a JSON view of a DeSoBalanceEntry. +type DeSoBalanceEntryJSON struct { + PublicKey string `json:"public_key"` + BalanceNanos uint64 `json:"balance_nanos"` +} + +// ToJSON converts a DeSoBalanceEntry to its JSON view. +func (de *DeSoBalanceEntry) ToJSON() DeSoBalanceEntryJSON { + return DeSoBalanceEntryJSON{ + PublicKey: hex.EncodeToString(de.PublicKey), + BalanceNanos: de.BalanceNanos, + } +} + +// ===================================================================== +// (Optional) Custom JSON Marshaling Example +// ===================================================================== + +// You can have the types themselves implement json.Marshaler so that they marshal automatically. +// For instance: +// +// func (nft *NFTEntry) MarshalJSON() ([]byte, error) { +// return json.Marshal(nft.ToJSON()) +// } +// +// And similarly for the other types. +// +// ===================================================================== +// Example Usage: +// +// func main() { +// // For example, converting an NFTEntry to JSON. +// nft := &NFTEntry{ +// // … initialize fields … +// } +// jsonData, err := json.MarshalIndent(nft.ToJSON(), "", " ") +// if err != nil { +// panic(err) +// } +// fmt.Println(string(jsonData)) +// } +// From c302497edb8b6f178185808d8a02758b83c86a53 Mon Sep 17 00:00:00 2001 From: mvanhalen Date: Thu, 13 Feb 2025 14:02:05 +0100 Subject: [PATCH 4/6] Blockevent TX push working --- lib/state_amqp_push.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/state_amqp_push.go b/lib/state_amqp_push.go index 5cbcf4c98..174c949a3 100644 --- a/lib/state_amqp_push.go +++ b/lib/state_amqp_push.go @@ -90,6 +90,11 @@ func PublishBlockEvent(event *BlockEvent, amqpDest string) error { } if push { + body, err := txn.MarshalJSON() + if err != nil { + glog.Errorf("Failed to marshal tx to json %v", err) + return err + } err = ch.Publish( "", // default exchange @@ -98,7 +103,7 @@ func PublishBlockEvent(event *BlockEvent, amqpDest string) error { false, // immediate amqp.Publishing{ ContentType: "application/json", - Body: txn.MarshalJSON(), + Body: body, Timestamp: time.Now(), }, ) From 7f6dcfc5d1e4231315b36e5140dee76a30c5fa55 Mon Sep 17 00:00:00 2001 From: mvanhalen Date: Fri, 14 Feb 2025 10:54:42 +0100 Subject: [PATCH 5/6] Cleanup and remove state approach --- lib/publisher_amqp_push.go | 128 ++++++++++ lib/server.go | 4 - lib/state_amqp_push.go | 323 ------------------------ lib/state_json_adapters.go | 498 ------------------------------------- 4 files changed, 128 insertions(+), 825 deletions(-) create mode 100644 lib/publisher_amqp_push.go delete mode 100644 lib/state_amqp_push.go delete mode 100644 lib/state_json_adapters.go diff --git a/lib/publisher_amqp_push.go b/lib/publisher_amqp_push.go new file mode 100644 index 000000000..629f8448f --- /dev/null +++ b/lib/publisher_amqp_push.go @@ -0,0 +1,128 @@ +package lib + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + amqp "github.com/rabbitmq/amqp091-go" +) + +// persistent connection and mutex +var ( + amqpConn *amqp.Connection + connMutex sync.Mutex +) + +// Use an atomic flag where 0 means disabled and 1 means enabled. +var amqpPublisherEnabled int32 +var amqpPublisherStarted int32 + +// EnablePublisher enables AMQP publishing after the node is fully synced. +func AmqpSetEnablePublisher() { + atomic.StoreInt32(&amqpPublisherEnabled, 1) +} + +func AmpqSetStartPublisher() { + atomic.StoreInt32(&amqpPublisherStarted, 1) +} + +func getAMQPConnection(amqpDest string) (*amqp.Connection, error) { + connMutex.Lock() + defer connMutex.Unlock() + + // Check if connection exists and is open. + if amqpConn != nil && !amqpConn.IsClosed() { + return amqpConn, nil + } + + // Create a new connection. + conn, err := amqp.Dial(amqpDest) + if err != nil { + return nil, err + } + amqpConn = conn + return amqpConn, nil +} + +func PublishBlockEvent(event *BlockEvent, amqpDest string) error { + if amqpDest == "" { + // AMQP integration is not enabled. + return nil + } + if event == nil { + glog.Infoln("event is nil, skipping AMQP publish.") + return nil + } + + if event.Block != nil { + if event.Block.Txns != nil { + //glog.Infoln("BlockEvent %v", len(event.Block.Txns)) + channelName := "block_txns" + push := true + + //setup connection + conn, err := getAMQPConnection(amqpDest) + if err != nil { + glog.Errorf("Failed to get AMQP connection: %v", err) + return err + } + ch, err := conn.Channel() + if err != nil { + glog.Errorf("Failed to open an AMQP channel: %v", err) + return err + } + defer ch.Close() + + for _, txn := range event.Block.Txns { + if txn == nil { + glog.Infoln("BlockEvent Tx is nil") + } else { + //glog.Infoln("BlockEvent Tx %v", txn.Hash().String()) + + txnType := txn.TxnMeta.GetTxnType() + push = true + + //check tx, if it is a block reward or validator related, we skip it + if txnType == TxnTypeBlockReward || txnType == TxnTypeRegisterAsValidator || txnType == TxnTypeUnregisterAsValidator || txnType == TxnTypeUnjailValidator || txnType == TxnTypeUpdateBitcoinUSDExchangeRate || txnType == TxnTypeSwapIdentity || txnType == TxnTypeUnset { + push = false + } + + if push { + + body, err := txn.MarshalJSON() + if err != nil { + glog.Errorf("Failed to marshal tx to json %v", err) + return err + } + + err = ch.Publish( + "", // default exchange + channelName, // routing key (queue name) + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + Timestamp: time.Now(), + MessageId: txn.Hash().String(), + }, + ) + if err != nil { + glog.Errorf("Failed to publish message to AMQP: %v", err) + return err + } + } + } + } + } else { + glog.Infoln("BlockEvent Txns is nil") + } + + } else { + glog.Infoln("BlockEvent block is nil") + } + + return nil +} diff --git a/lib/server.go b/lib/server.go index 93449af27..f5f74d1a4 100644 --- a/lib/server.go +++ b/lib/server.go @@ -3461,10 +3461,6 @@ func (srv *Server) handleFullySyncedStateAMQP() { srv.eventManager.OnBlockAccepted(func(event *BlockEvent) { go PublishBlockEvent(event, amqpDest) }) - // srv.eventManager.OnStateSyncerOperation(func(event *StateSyncerOperationEvent) { - // //Run async - // go PublishStateChangeEvent(event.StateChangeEntry, amqpDest) - // }) glog.Infof("AMQP publisher event handling set.") } } diff --git a/lib/state_amqp_push.go b/lib/state_amqp_push.go deleted file mode 100644 index 174c949a3..000000000 --- a/lib/state_amqp_push.go +++ /dev/null @@ -1,323 +0,0 @@ -package lib - -import ( - "bytes" - "encoding/json" - "sync" - "sync/atomic" - "time" - - "github.com/golang/glog" - amqp "github.com/rabbitmq/amqp091-go" -) - -// persistent connection and mutex -var ( - amqpConn *amqp.Connection - connMutex sync.Mutex -) - -// Use an atomic flag where 0 means disabled and 1 means enabled. -var amqpPublisherEnabled int32 -var amqpPublisherStarted int32 - -// EnablePublisher enables AMQP publishing after the node is fully synced. -func AmqpSetEnablePublisher() { - atomic.StoreInt32(&amqpPublisherEnabled, 1) -} - -func AmpqSetStartPublisher() { - atomic.StoreInt32(&amqpPublisherStarted, 1) -} - -func getAMQPConnection(amqpDest string) (*amqp.Connection, error) { - connMutex.Lock() - defer connMutex.Unlock() - - // Check if connection exists and is open. - if amqpConn != nil && !amqpConn.IsClosed() { - return amqpConn, nil - } - - // Create a new connection. - conn, err := amqp.Dial(amqpDest) - if err != nil { - return nil, err - } - amqpConn = conn - return amqpConn, nil -} - -func PublishBlockEvent(event *BlockEvent, amqpDest string) error { - if amqpDest == "" { - // AMQP integration is not enabled. - return nil - } - if event == nil { - glog.Infoln("event is nil, skipping AMQP publish.") - return nil - } - if event.Block != nil { - if event.Block.Txns != nil { - glog.Infoln("BlockEvent %v", len(event.Block.Txns)) - channelName := "block_txns" - push := true - //setup node - conn, err := getAMQPConnection(amqpDest) - if err != nil { - glog.Errorf("Failed to get AMQP connection: %v", err) - return err - } - ch, err := conn.Channel() - if err != nil { - glog.Errorf("Failed to open an AMQP channel: %v", err) - return err - } - defer ch.Close() - - for _, txn := range event.Block.Txns { - if txn == nil { - glog.Infoln("BlockEvent Tx is nil") - } else { - glog.Infoln("BlockEvent Tx %v", txn.Hash().String()) - - txnType := txn.TxnMeta.GetTxnType() - push = true - - //check tx, if it is a block reward or validator related, we skip it - if txnType == TxnTypeBlockReward || txnType == TxnTypeRegisterAsValidator || txnType == TxnTypeUnregisterAsValidator || txnType == TxnTypeUnjailValidator || txnType == TxnTypeUpdateBitcoinUSDExchangeRate || txnType == TxnTypeSwapIdentity || txnType == TxnTypeUnset { - push = false - } - - if push { - body, err := txn.MarshalJSON() - if err != nil { - glog.Errorf("Failed to marshal tx to json %v", err) - return err - } - - err = ch.Publish( - "", // default exchange - channelName, // routing key (queue name) - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: body, - Timestamp: time.Now(), - }, - ) - if err != nil { - glog.Errorf("Failed to publish message to AMQP: %v", err) - return err - } - } - } - } - } else { - glog.Infoln("BlockEvent Txns is nil") - } - - } else { - glog.Infoln("BlockEvent block is nil") - } - - return nil -} - -// PublishStateChangeEvent publishes a state change event to AMQP. -// It uses the appropriate adapter based on the underlying encoder type. -func PublishStateChangeEvent(stateChangeEntry *StateChangeEntry, amqpDest string) error { - if amqpDest == "" { - // AMQP integration is not enabled. - return nil - } - if stateChangeEntry == nil { - glog.Infoln("StateChangeEntry is nil, skipping AMQP publish.") - return nil - } - - // Check to see if the index in question has a "core_state" annotation in its definition. - if !isCoreStateKey(stateChangeEntry.KeyBytes) { - return nil - } - - channelName := "state_changes" - push := true - - // Check if the event’s key bytes indicate that the stored value - // is encoded using one of our DeSoEncoder types. - var encoderType EncoderType - - if isEncoder, encoder := StateKeyToDeSoEncoder(stateChangeEntry.KeyBytes); isEncoder && encoder != nil { - - encoderType = encoder.GetEncoderType() - //types we are not interested in - if encoderType == EncoderTypeValidatorEntry || encoderType == EncoderTypeStakeRewardStateChangeMetadata || encoderType == EncoderTypeStakeEntry { - return nil - } - // (For blocks, we may need to add extra metadata.) - if encoderType == EncoderTypeBlock { - stateChangeEntry.EncoderBytes = AddEncoderMetadataToMsgDeSoBlockBytes(stateChangeEntry.EncoderBytes, stateChangeEntry.BlockHeight) - } - if encoderType == EncoderTypeBlockNode { - stateChangeEntry.EncoderBytes = AddEncoderMetadataToBlockNodeBytes(stateChangeEntry.EncoderBytes, stateChangeEntry.BlockHeight) - } - - glog.Infof("State encoder event for %d", encoderType) - } else { - keyEncoder, err := DecodeStateKey(stateChangeEntry.KeyBytes, stateChangeEntry.EncoderBytes) - if err != nil { - glog.Infof("PublishStateChangeEvent: Error decoding state key: %v", err) - // Instead of panicking, skip this event. - return nil - } - if keyEncoder == nil { - glog.Infof("PublishStateChangeEvent: No key encoder found, skipping event") - return nil - } - encoderType = keyEncoder.GetEncoderType() - glog.Infof("State event for %d", encoderType) - stateChangeEntry.Encoder = keyEncoder - stateChangeEntry.EncoderBytes = nil - } - stateChangeEntry.EncoderType = encoderType - stateChangeEntry.EncoderBytes = EncodeToBytes(stateChangeEntry.BlockHeight, stateChangeEntry, false) - var body []byte - var err error - - // Here we try to decode the EncoderBytes into a known type and then use its adapter. - if push { - - switch encoderType { - // ----- ProfileEntry Example ----- - case EncoderTypeProfileEntry: - var profile ProfileEntry - r := bytes.NewReader(stateChangeEntry.EncoderBytes) - if err = profile.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { - glog.Infoln("failed to decode PostEntry: %v", err) - } - // Call the adapter. - profileJSON := profile.ToJSON() - body, err = json.Marshal(profileJSON) - if err != nil { - glog.Infoln("failed to marshal ProfileEntry JSON: %v", err) - } else { - glog.Infof("ProfileEntry endcoded") - } - - // ----- PostEntry Example ----- - case EncoderTypePostEntry: - var post PostEntry - r := bytes.NewReader(stateChangeEntry.EncoderBytes) - if err = post.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { - glog.Infoln("failed to decode PostEntry: %v", err) - } - // Call the adapter. - postJSON := post.ToJSON() - body, err = json.Marshal(postJSON) - if err != nil { - glog.Infoln("failed to marshal PostEntry JSON: %v", err) - } else { - glog.Infof("PostEntry endcoded") - } - - // ----- NFTEntry Example ----- - case EncoderTypeNFTEntry: - var nft NFTEntry - r := bytes.NewReader(stateChangeEntry.EncoderBytes) - if err = nft.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { - glog.Infoln("failed to decode NFTEntry: %v", err) - } - nftJSON := nft.ToJSON() - body, err = json.Marshal(nftJSON) - if err != nil { - glog.Infoln("failed to marshal NFTEntry JSON: %v", err) - } - - // ----- DAO Coin Limit Order Example ----- - // case EncoderTypeDAOCoinLimitOrderEntry: - // var daoOrder DAOCoinLimitOrderEntry - // r := bytes.NewReader(event.EncoderBytes) - // if err = daoOrder.RawDecodeWithoutMetadata(event.BlockHeight, r); err != nil { - // glog.Infoln("failed to decode DAOCoinLimitOrderEntry: %v", err) - // } - // daoJSON := daoOrder.ToJSON() - // body, err = json.Marshal(daoJSON) - // if err != nil { - // glog.Infoln("failed to marshal DAOCoinLimitOrderEntry JSON: %v", err) - // } - - // ----- DESO CoinEntry Example ----- - case EncoderTypeCoinEntry: - var coinEntry CoinEntry - r := bytes.NewReader(stateChangeEntry.EncoderBytes) - if err = coinEntry.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { - glog.Infoln("failed to decode CoinEntry: %v", err) - } - coinJSON := coinEntry.ToJSON() - body, err = json.Marshal(coinJSON) - if err != nil { - glog.Infoln("failed to marshal CoinEntry JSON: %v", err) - } - - // ----- DeSoBalanceEntry Example ----- - case EncoderTypeDeSoBalanceEntry: - var balance DeSoBalanceEntry - r := bytes.NewReader(stateChangeEntry.EncoderBytes) - if err = balance.RawDecodeWithoutMetadata(stateChangeEntry.BlockHeight, r); err != nil { - glog.Infoln("failed to decode DeSoBalanceEntry: %v", err) - } - balanceJSON := balance.ToJSON() - body, err = json.Marshal(balanceJSON) - if err != nil { - glog.Infoln("failed to marshal DeSoBalanceEntry JSON: %v", err) - } - - // ----- Fallback: Use raw bytes (or add more cases as needed) ----- - default: - glog.Warningf("No adapter for encoder type %v; falling back to raw bytes", encoderType) - body, err = json.Marshal(stateChangeEntry.EncoderBytes) - if err != nil { - glog.Infoln("failed to marshal raw event bytes: %v", err) - } - } - } else { - // If no encoder was detected, just marshal the raw EncoderBytes. - body, err = json.Marshal(stateChangeEntry.EncoderBytes) - if err != nil { - glog.Infoln("failed to marshal raw event bytes: %v", err) - } - } - - // Now publish the JSON-encoded event. - conn, err := getAMQPConnection(amqpDest) - if err != nil { - glog.Errorf("Failed to get AMQP connection: %v", err) - return err - } - ch, err := conn.Channel() - if err != nil { - glog.Errorf("Failed to open an AMQP channel: %v", err) - return err - } - defer ch.Close() - - err = ch.Publish( - "", // default exchange - channelName, // routing key (queue name) - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: body, - Timestamp: time.Now(), - }, - ) - if err != nil { - glog.Errorf("Failed to publish message to AMQP: %v", err) - return err - } - - return nil -} diff --git a/lib/state_json_adapters.go b/lib/state_json_adapters.go deleted file mode 100644 index 69bec74c4..000000000 --- a/lib/state_json_adapters.go +++ /dev/null @@ -1,498 +0,0 @@ -// json_adapters_extended.go -package lib - -import ( - "encoding/hex" - "sort" -) - -// -------------------------------------------------------------------- -// UtxoEntry adapter -// -------------------------------------------------------------------- - -// UtxoEntryJSON is a JSON-friendly view of UtxoEntry. -type UtxoEntryJSON struct { - AmountNanos uint64 `json:"amount_nanos"` - PublicKey string `json:"public_key"` - BlockHeight uint32 `json:"block_height"` - UtxoType string `json:"utxo_type"` -} - -// ToJSON converts a UtxoEntry to its JSON view. -func (ue *UtxoEntry) ToJSON() UtxoEntryJSON { - return UtxoEntryJSON{ - AmountNanos: ue.AmountNanos, - // Here we encode the public key as hex. (You might choose base58 or another encoding.) - PublicKey: hex.EncodeToString(ue.PublicKey), - BlockHeight: ue.BlockHeight, - UtxoType: ue.UtxoType.String(), - } -} - -// ===================================================================== -// ProfileEntry Adapter -// ===================================================================== - -// ProfileEntryJSON is a JSON‐friendly view of a ProfileEntry. -type ProfileEntryJSON struct { - PublicKey string `json:"public_key"` - Username string `json:"username"` - Description string `json:"description"` - ProfilePic string `json:"profile_pic"` - IsHidden bool `json:"is_hidden"` - CreatorCoinEntry *CoinEntryJSON `json:"creator_coin_entry"` - DAOCoinEntry *CoinEntryJSON `json:"dao_coin_entry"` - ExtraData map[string]string `json:"extra_data"` -} - -// ToJSON converts a ProfileEntry to its JSON view. -func (pe *ProfileEntry) ToJSON() ProfileEntryJSON { - // Convert extra data values (which are []byte) to hex strings. - extra := make(map[string]string) - keys := make([]string, 0, len(pe.ExtraData)) - for k := range pe.ExtraData { - keys = append(keys, k) - } - sort.Strings(keys) - for _, k := range keys { - extra[k] = hex.EncodeToString(pe.ExtraData[k]) - } - - return ProfileEntryJSON{ - // Here we encode the public key as hex. You might choose to use a different encoding if desired. - PublicKey: hex.EncodeToString(pe.PublicKey), - Username: string(pe.Username), // assumes UTF-8 encoding - Description: string(pe.Description), // assumes UTF-8 encoding - //ProfilePic: string(pe.ProfilePic), // could be a data URL - IsHidden: pe.IsHidden, - // CreatorCoinEntry: pe.CreatorCoinEntry.ToJSON(), - // DAOCoinEntry: pe.DAOCoinEntry.ToJSON(), - ExtraData: extra, - } -} - -// ===================================================================== -// NFTEntry Adapter -// ===================================================================== - -// NFTEntryJSON is a JSON-friendly view of an NFTEntry. -type NFTEntryJSON struct { - LastOwnerPKID string `json:"last_owner_pkid"` - OwnerPKID string `json:"owner_pkid"` - NFTPostHash string `json:"nft_post_hash"` - SerialNumber uint64 `json:"serial_number"` - IsForSale bool `json:"is_for_sale"` - MinBidAmountNanos uint64 `json:"min_bid_amount_nanos"` - UnlockableText string `json:"unlockable_text"` - LastAcceptedBidAmountNanos uint64 `json:"last_accepted_bid_amount_nanos"` - IsPending bool `json:"is_pending"` - IsBuyNow bool `json:"is_buy_now"` - BuyNowPriceNanos uint64 `json:"buy_now_price_nanos"` - ExtraData map[string]string `json:"extra_data"` -} - -// ToJSON converts an NFTEntry to its JSON view. -func (nft *NFTEntry) ToJSON() NFTEntryJSON { - extra := make(map[string]string) - // For each extra data key, encode the value as hex. - for k, v := range nft.ExtraData { - extra[k] = hex.EncodeToString(v) - } - var postHashStr string - if nft.NFTPostHash != nil { - postHashStr = hex.EncodeToString((*nft.NFTPostHash)[:]) - } else { - postHashStr = "" - } - - var lastOwnerPKID string - if nft.LastOwnerPKID != nil { - lastOwnerPKID = nft.LastOwnerPKID.ToString() - } - var ownerPKID string - if nft.OwnerPKID != nil { - ownerPKID = nft.OwnerPKID.ToString() - } - - return NFTEntryJSON{ - LastOwnerPKID: lastOwnerPKID, // Assumes PKID has a String() method. - OwnerPKID: ownerPKID, - NFTPostHash: postHashStr, - SerialNumber: nft.SerialNumber, - IsForSale: nft.IsForSale, - MinBidAmountNanos: nft.MinBidAmountNanos, - UnlockableText: string(nft.UnlockableText), - LastAcceptedBidAmountNanos: nft.LastAcceptedBidAmountNanos, - IsPending: nft.IsPending, - IsBuyNow: nft.IsBuyNow, - BuyNowPriceNanos: nft.BuyNowPriceNanos, - ExtraData: extra, - } -} - -// ===================================================================== -// NFTBidEntry Adapter -// ===================================================================== - -// NFTBidEntryJSON is a JSON-friendly view of an NFTBidEntry. -type NFTBidEntryJSON struct { - BidderPKID string `json:"bidder_pkid"` - NFTPostHash string `json:"nft_post_hash"` - SerialNumber uint64 `json:"serial_number"` - BidAmountNanos uint64 `json:"bid_amount_nanos"` - AcceptedBlockHeight *uint32 `json:"accepted_block_height,omitempty"` -} - -// ToJSON converts an NFTBidEntry to its JSON view. -func (bid *NFTBidEntry) ToJSON() NFTBidEntryJSON { - - var bidderPKID string - if bid.BidderPKID != nil { - bidderPKID = bid.BidderPKID.ToString() - } - var nftPostHash string - if bid.NFTPostHash != nil { - nftPostHash = hex.EncodeToString((*bid.NFTPostHash)[:]) - } - - return NFTBidEntryJSON{ - BidderPKID: bidderPKID, - NFTPostHash: nftPostHash, - SerialNumber: bid.SerialNumber, - BidAmountNanos: bid.BidAmountNanos, - AcceptedBlockHeight: bid.AcceptedBlockHeight, - } -} - -// ===================================================================== -// PostEntry Adapter -// ===================================================================== - -// PostEntryJSON is a JSON-friendly view of a PostEntry. -type PostEntryJSON struct { - PostHash string `json:"post_hash"` - PosterPublicKey string `json:"poster_public_key"` - ParentStakeID string `json:"parent_stake_id,omitempty"` - Body string `json:"body"` - RepostedPostHash string `json:"reposted_post_hash,omitempty"` - IsQuotedRepost bool `json:"is_quoted_repost"` - CreatorBasisPoints uint64 `json:"creator_basis_points"` - StakeMultipleBasisPoints uint64 `json:"stake_multiple_basis_points"` - ConfirmationBlockHeight uint32 `json:"confirmation_block_height"` - TimestampNanos uint64 `json:"timestamp_nanos"` - IsHidden bool `json:"is_hidden"` - LikeCount uint64 `json:"like_count"` - RepostCount uint64 `json:"repost_count"` - QuoteRepostCount uint64 `json:"quote_repost_count"` - DiamondCount uint64 `json:"diamond_count"` - CommentCount uint64 `json:"comment_count"` - IsPinned bool `json:"is_pinned"` - IsNFT bool `json:"is_nft"` - NumNFTCopies uint64 `json:"num_nft_copies"` - NumNFTCopiesForSale uint64 `json:"num_nft_copies_for_sale"` - NumNFTCopiesBurned uint64 `json:"num_nft_copies_burned"` - HasUnlockable bool `json:"has_unlockable"` - NFTRoyaltyToCreatorBasisPoints uint64 `json:"nft_royalty_to_creator_basis_points"` - NFTRoyaltyToCoinBasisPoints uint64 `json:"nft_royalty_to_coin_basis_points"` - AdditionalNFTRoyaltiesToCreators map[string]uint64 `json:"additional_nft_royalties_to_creators_basis_points"` - AdditionalNFTRoyaltiesToCoins map[string]uint64 `json:"additional_nft_royalties_to_coins_basis_points"` - PostExtraData map[string]string `json:"post_extra_data"` - IsFrozen bool `json:"is_frozen"` -} - -// ToJSON converts a PostEntry to its JSON view. -func (pe *PostEntry) ToJSON() PostEntryJSON { - extra := make(map[string]string) - // Sort keys to have deterministic JSON. - keys := make([]string, 0, len(pe.PostExtraData)) - for k := range pe.PostExtraData { - keys = append(keys, k) - } - sort.Strings(keys) - for _, k := range keys { - extra[k] = hex.EncodeToString(pe.PostExtraData[k]) - } - - creators := make(map[string]uint64) - for pkid, val := range pe.AdditionalNFTRoyaltiesToCreatorsBasisPoints { - creators[pkid.ToString()] = val - } - coins := make(map[string]uint64) - for pkid, val := range pe.AdditionalNFTRoyaltiesToCoinsBasisPoints { - coins[pkid.ToString()] = val - } - var repostHash string - if pe.RepostedPostHash != nil { - repostHash = hex.EncodeToString((*pe.RepostedPostHash)[:]) - } - var parentStake string - if len(pe.ParentStakeID) > 0 { - parentStake = hex.EncodeToString(pe.ParentStakeID) - } - var postHashStr string - if pe.PostHash != nil { - postHashStr = hex.EncodeToString((*pe.PostHash)[:]) - } else { - postHashStr = "" - } - return PostEntryJSON{ - PostHash: postHashStr, - PosterPublicKey: hex.EncodeToString(pe.PosterPublicKey), - ParentStakeID: parentStake, - Body: string(pe.Body), - RepostedPostHash: repostHash, - IsQuotedRepost: pe.IsQuotedRepost, - CreatorBasisPoints: pe.CreatorBasisPoints, - StakeMultipleBasisPoints: pe.StakeMultipleBasisPoints, - ConfirmationBlockHeight: pe.ConfirmationBlockHeight, - TimestampNanos: pe.TimestampNanos, - IsHidden: pe.IsHidden, - LikeCount: pe.LikeCount, - RepostCount: pe.RepostCount, - QuoteRepostCount: pe.QuoteRepostCount, - DiamondCount: pe.DiamondCount, - CommentCount: pe.CommentCount, - IsPinned: pe.IsPinned, - IsNFT: pe.IsNFT, - NumNFTCopies: pe.NumNFTCopies, - NumNFTCopiesForSale: pe.NumNFTCopiesForSale, - NumNFTCopiesBurned: pe.NumNFTCopiesBurned, - HasUnlockable: pe.HasUnlockable, - NFTRoyaltyToCreatorBasisPoints: pe.NFTRoyaltyToCreatorBasisPoints, - NFTRoyaltyToCoinBasisPoints: pe.NFTRoyaltyToCoinBasisPoints, - AdditionalNFTRoyaltiesToCreators: creators, - AdditionalNFTRoyaltiesToCoins: coins, - PostExtraData: extra, - IsFrozen: pe.IsFrozen, - } -} - -// ===================================================================== -// DAO Operations Adapter: DAOCoinLimitOrderEntry -// ===================================================================== - -// DAOCoinLimitOrderEntryJSON is a JSON view of a DAO coin limit order. -type DAOCoinLimitOrderEntryJSON struct { - OrderID string `json:"order_id"` - TransactorPKID string `json:"transactor_pkid"` - BuyingDAOCoinCreatorPKID string `json:"buying_dao_coin_creator_pkid"` - SellingDAOCoinCreatorPKID string `json:"selling_dao_coin_creator_pkid"` - ScaledExchangeRateCoinsToSellPerCoin string `json:"scaled_exchange_rate_coins_to_sell_per_coin_to_buy"` - QuantityToFillInBaseUnits string `json:"quantity_to_fill_in_base_units"` - OperationType string `json:"operation_type"` - FillType string `json:"fill_type"` - BlockHeight uint32 `json:"block_height"` -} - -// ToJSON converts a DAOCoinLimitOrderEntry to its JSON view. -func (order *DAOCoinLimitOrderEntry) ToJSON() DAOCoinLimitOrderEntryJSON { - opType := "" - switch order.OperationType { - case DAOCoinLimitOrderOperationTypeASK: - opType = "ASK" - case DAOCoinLimitOrderOperationTypeBID: - opType = "BID" - default: - opType = "UNKNOWN" - } - fillType := "" - switch order.FillType { - case DAOCoinLimitOrderFillTypeGoodTillCancelled: - fillType = "GoodTillCancelled" - case DAOCoinLimitOrderFillTypeImmediateOrCancel: - fillType = "ImmediateOrCancel" - case DAOCoinLimitOrderFillTypeFillOrKill: - fillType = "FillOrKill" - default: - fillType = "UNKNOWN" - } - var scaledRate, quantity string - if order.ScaledExchangeRateCoinsToSellPerCoinToBuy != nil { - scaledRate = order.ScaledExchangeRateCoinsToSellPerCoinToBuy.String() - } - if order.QuantityToFillInBaseUnits != nil { - quantity = order.QuantityToFillInBaseUnits.String() - } - orderIDStr := "" - if order.OrderID != nil { - orderIDStr = order.OrderID.String() - } - - transactorPKID := "" - if order.TransactorPKID != nil { - transactorPKID = order.TransactorPKID.ToString() - } - sellingDAOCoinCreatorPKID := "" - if order.SellingDAOCoinCreatorPKID != nil { - sellingDAOCoinCreatorPKID = order.SellingDAOCoinCreatorPKID.ToString() - } - buyingDAOCoinCreatorPKID := "" - if order.BuyingDAOCoinCreatorPKID != nil { - buyingDAOCoinCreatorPKID = order.BuyingDAOCoinCreatorPKID.ToString() - } - return DAOCoinLimitOrderEntryJSON{ - OrderID: orderIDStr, - TransactorPKID: transactorPKID, - BuyingDAOCoinCreatorPKID: buyingDAOCoinCreatorPKID, - SellingDAOCoinCreatorPKID: sellingDAOCoinCreatorPKID, - ScaledExchangeRateCoinsToSellPerCoin: scaledRate, - QuantityToFillInBaseUnits: quantity, - OperationType: opType, - FillType: fillType, - BlockHeight: order.BlockHeight, - } -} - -// ===================================================================== -// DAO Operations Adapter: FilledDAOCoinLimitOrder -// ===================================================================== - -// FilledDAOCoinLimitOrderJSON is a JSON view of a filled DAO coin limit order. -type FilledDAOCoinLimitOrderJSON struct { - OrderID string `json:"order_id"` - TransactorPKID string `json:"transactor_pkid"` - BuyingDAOCoinCreatorPKID string `json:"buying_dao_coin_creator_pkid"` - SellingDAOCoinCreatorPKID string `json:"selling_dao_coin_creator_pkid"` - CoinQuantityInBaseUnitsBought string `json:"coin_quantity_in_base_units_bought"` - CoinQuantityInBaseUnitsSold string `json:"coin_quantity_in_base_units_sold"` - IsFulfilled bool `json:"is_fulfilled"` -} - -// ToJSON converts a FilledDAOCoinLimitOrder to its JSON view. -func (filledOrder *FilledDAOCoinLimitOrder) ToJSON() FilledDAOCoinLimitOrderJSON { - var qtyBought, qtySold string - if filledOrder.CoinQuantityInBaseUnitsBought != nil { - qtyBought = filledOrder.CoinQuantityInBaseUnitsBought.String() - } - if filledOrder.CoinQuantityInBaseUnitsSold != nil { - qtySold = filledOrder.CoinQuantityInBaseUnitsSold.String() - } - orderIDStr := "" - if filledOrder.OrderID != nil { - orderIDStr = filledOrder.OrderID.String() - } - transactorPKID := "" - if filledOrder.TransactorPKID != nil { - transactorPKID = filledOrder.TransactorPKID.ToString() - } - sellingDAOCoinCreatorPKID := "" - if filledOrder.SellingDAOCoinCreatorPKID != nil { - sellingDAOCoinCreatorPKID = filledOrder.SellingDAOCoinCreatorPKID.ToString() - } - buyingDAOCoinCreatorPKID := "" - if filledOrder.BuyingDAOCoinCreatorPKID != nil { - buyingDAOCoinCreatorPKID = filledOrder.BuyingDAOCoinCreatorPKID.ToString() - } - return FilledDAOCoinLimitOrderJSON{ - OrderID: orderIDStr, - TransactorPKID: transactorPKID, - BuyingDAOCoinCreatorPKID: buyingDAOCoinCreatorPKID, - SellingDAOCoinCreatorPKID: sellingDAOCoinCreatorPKID, - CoinQuantityInBaseUnitsBought: qtyBought, - CoinQuantityInBaseUnitsSold: qtySold, - IsFulfilled: filledOrder.IsFulfilled, - } -} - -// ===================================================================== -// DESO Operations Adapter: CoinEntry -// ===================================================================== - -// CoinEntryJSON is a JSON-friendly view of a CoinEntry. -type CoinEntryJSON struct { - CreatorBasisPoints uint64 `json:"creator_basis_points"` - DeSoLockedNanos uint64 `json:"deso_locked_nanos"` - NumberOfHolders uint64 `json:"number_of_holders"` - CoinsInCirculationNanos string `json:"coins_in_circulation_nanos"` - CoinWatermarkNanos uint64 `json:"coin_watermark_nanos"` - MintingDisabled bool `json:"minting_disabled"` - TransferRestrictionStatus string `json:"transfer_restriction_status"` - LockupTransferRestrictionStatus string `json:"lockup_transfer_restriction_status,omitempty"` -} - -// ToJSON converts a CoinEntry to its JSON view. -func (ce *CoinEntry) ToJSON() CoinEntryJSON { - var transStatus string - switch ce.TransferRestrictionStatus { - case TransferRestrictionStatusUnrestricted: - transStatus = "Unrestricted" - case TransferRestrictionStatusProfileOwnerOnly: - transStatus = "ProfileOwnerOnly" - case TransferRestrictionStatusDAOMembersOnly: - transStatus = "DAOMembersOnly" - case TransferRestrictionStatusPermanentlyUnrestricted: - transStatus = "PermanentlyUnrestricted" - default: - transStatus = "Unknown" - } - var lockupStatus string - switch ce.LockupTransferRestrictionStatus { - case TransferRestrictionStatusUnrestricted: - lockupStatus = "Unrestricted" - case TransferRestrictionStatusProfileOwnerOnly: - lockupStatus = "ProfileOwnerOnly" - case TransferRestrictionStatusDAOMembersOnly: - lockupStatus = "DAOMembersOnly" - case TransferRestrictionStatusPermanentlyUnrestricted: - lockupStatus = "PermanentlyUnrestricted" - default: - lockupStatus = "" - } - return CoinEntryJSON{ - CreatorBasisPoints: ce.CreatorBasisPoints, - DeSoLockedNanos: ce.DeSoLockedNanos, - NumberOfHolders: ce.NumberOfHolders, - CoinsInCirculationNanos: ce.CoinsInCirculationNanos.String(), - CoinWatermarkNanos: ce.CoinWatermarkNanos, - MintingDisabled: ce.MintingDisabled, - TransferRestrictionStatus: transStatus, - LockupTransferRestrictionStatus: lockupStatus, - } -} - -// ===================================================================== -// DESO Operations Adapter: DeSoBalanceEntry -// ===================================================================== - -// DeSoBalanceEntryJSON is a JSON view of a DeSoBalanceEntry. -type DeSoBalanceEntryJSON struct { - PublicKey string `json:"public_key"` - BalanceNanos uint64 `json:"balance_nanos"` -} - -// ToJSON converts a DeSoBalanceEntry to its JSON view. -func (de *DeSoBalanceEntry) ToJSON() DeSoBalanceEntryJSON { - return DeSoBalanceEntryJSON{ - PublicKey: hex.EncodeToString(de.PublicKey), - BalanceNanos: de.BalanceNanos, - } -} - -// ===================================================================== -// (Optional) Custom JSON Marshaling Example -// ===================================================================== - -// You can have the types themselves implement json.Marshaler so that they marshal automatically. -// For instance: -// -// func (nft *NFTEntry) MarshalJSON() ([]byte, error) { -// return json.Marshal(nft.ToJSON()) -// } -// -// And similarly for the other types. -// -// ===================================================================== -// Example Usage: -// -// func main() { -// // For example, converting an NFTEntry to JSON. -// nft := &NFTEntry{ -// // … initialize fields … -// } -// jsonData, err := json.MarshalIndent(nft.ToJSON(), "", " ") -// if err != nil { -// panic(err) -// } -// fmt.Println(string(jsonData)) -// } -// From 0dc5bfbf1b268671bdf37f2092b300f280828d27 Mon Sep 17 00:00:00 2001 From: mvanhalen Date: Mon, 17 Feb 2025 20:22:46 +0100 Subject: [PATCH 6/6] github.com/rabbitmq/amqp091-go added --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index 0b5355f47..936a03c96 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index a21ccd144..a3c1bc655 100644 --- a/go.sum +++ b/go.sum @@ -322,6 +322,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 h1:4+LEVOB87y175cLJC/mbsgKmoDOjrBldtXvioEy96WY=