From 9340a83010f7f61bf14f7c8135c094d96776291d Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 29 Feb 2024 21:37:17 -0500 Subject: [PATCH 01/26] Updates to encode and decode for state change entry --- lib/state_change_syncer.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 5443aae30..1425391cd 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -114,8 +114,10 @@ func (stateChangeEntry *StateChangeEntry) RawEncodeWithoutMetadata(blockHeight u // Encode the block height. data = append(data, UintToBuf(blockHeight)...) - // Encode the transaction. - data = append(data, EncodeToBytes(blockHeight, stateChangeEntry.Block)...) + if stateChangeEntry.EncoderType == EncoderTypeUtxoOperation { + // Encode the transaction. + data = append(data, EncodeToBytes(blockHeight, stateChangeEntry.Block)...) + } return data } @@ -174,11 +176,13 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u } stateChangeEntry.BlockHeight = entryBlockHeight - block := &MsgDeSoBlock{} - if exist, err := DecodeFromBytes(block, rr); exist && err == nil { - stateChangeEntry.Block = block - } else if err != nil { - return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block") + if stateChangeEntry.EncoderType == EncoderTypeUtxoOperation { + block := &MsgDeSoBlock{} + if exist, err := DecodeFromBytes(block, rr); exist && err == nil { + stateChangeEntry.Block = block + } else if err != nil { + return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block") + } } return nil From 871133425fcb0e3f169ece8e1226b695a30ba670 Mon Sep 17 00:00:00 2001 From: superzordon Date: Fri, 1 Mar 2024 12:29:32 -0500 Subject: [PATCH 02/26] Reduce sleep time --- lib/state_change_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 1425391cd..d7d736a5a 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -762,7 +762,7 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv mempoolClosed := server.mempool.stopped for !mempoolClosed { // Sleep for a short while to avoid a tight loop. - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Millisecond) var err error // If the mempool is not empty, sync the mempool to the state syncer. mempoolClosed, err = stateChangeSyncer.SyncMempoolToStateSyncer(server) From 2ba32647b0addf6f3b50d033700d0822882243f6 Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 5 Mar 2024 16:34:23 -0500 Subject: [PATCH 03/26] Update state syncer mempool reversion procedure, add temporary logging --- lib/db_utils.go | 2 + lib/snapshot.go | 1 + lib/state_change_syncer.go | 113 ++++++++++++++++++++++---------- lib/state_change_syncer_test.go | 1 + 4 files changed, 81 insertions(+), 36 deletions(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index 6789e79eb..cdfd76831 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -989,6 +989,7 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve KeyBytes: key, EncoderBytes: value, AncestralRecordBytes: ancestralValue, + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: eventManager.isMempoolManager, @@ -1091,6 +1092,7 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * KeyBytes: key, EncoderBytes: nil, AncestralRecordBytes: ancestralValue, + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: eventManager.isMempoolManager, diff --git a/lib/snapshot.go b/lib/snapshot.go index 1c3f96adb..ec118fe8c 100644 --- a/lib/snapshot.go +++ b/lib/snapshot.go @@ -1265,6 +1265,7 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock. OperationType: DbOperationTypeInsert, KeyBytes: dbEntry.Key, EncoderBytes: dbEntry.Value, + IsReverted: false, }, FlushId: dbFlushId, }) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index d7d736a5a..20f468b6a 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -36,6 +36,9 @@ const ( type StateChangeEntry struct { // The type of operation that should be performed on the database. OperationType StateSyncerOperationType + // For mempool state changes, whether this operation has been booted from the mempool and should be reverted + // from the state change record. + IsReverted bool // The key that should be used for the operation. KeyBytes []byte // The encoder that is being captured by this state change. @@ -63,14 +66,19 @@ type StateChangeEntry struct { // RawEncodeWithoutMetadata constructs the bytes to represent a StateChangeEntry. // The format is: -// [operation type (varint)][encoder type (varint)][key length (varint)][key bytes] +// [operation type (varint)][is reverted bool][encoder type (varint)][key length (varint)][key bytes] // [encoder length (varint)][encoder bytes][is mempool (1 byte)][utxo ops length (varint)][utxo ops bytes] func (stateChangeEntry *StateChangeEntry) RawEncodeWithoutMetadata(blockHeight uint64, skipMetadata ...bool) []byte { // Get byte length of keyBytes (will be nil for mempool transactions) var data []byte + // OperationType data = append(data, UintToBuf(uint64(stateChangeEntry.OperationType))...) + // IsReverted + data = append(data, BoolToByte(stateChangeEntry.IsReverted)) + // EncoderType data = append(data, UintToBuf(uint64(stateChangeEntry.EncoderType))...) + // KeyBytes data = append(data, EncodeByteArray(stateChangeEntry.KeyBytes)...) // The encoder can either be represented in raw bytes or as an encoder. If it's represented in bytes, we use that. @@ -130,6 +138,13 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u } stateChangeEntry.OperationType = StateSyncerOperationType(operationType) + // Decode IsReverted + isReverted, err := ReadBoolByte(rr) + if err != nil { + return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding is reverted") + } + stateChangeEntry.IsReverted = isReverted + // Decode EncoderType encoderType, err := ReadUvarint(rr) if err != nil { @@ -230,9 +245,10 @@ type StateChangeSyncer struct { // don't need to be re-emitted to the state change file). // The key is the stringifyed key of the entry, plus the operation type. // The value is the badger entry that was flushed to the db. - MempoolKeyValueMap map[string][]byte + //MempoolSyncedKeyValueMap map[string][]byte + MempoolSyncedKeyValueMap map[string]*StateChangeEntry // This map tracks the keys that were flushed to the mempool in a single flush. - // Every time a flush occurs, this map is cleared, as opposed to the MempoolKeyValueMap, which is only cleared + // Every time a flush occurs, this map is cleared, as opposed to the MempoolSyncedKeyValueMap, which is only cleared // when a new block is processed. // This is used to determine if there are any tracked mempool transactions that have been ejected from the current // mempool state. @@ -321,7 +337,7 @@ func NewStateChangeSyncer(stateChangeDir string, nodeSyncType NodeSyncType) *Sta StateChangeMempoolIndexFile: stateChangeMempoolIndexFile, StateChangeMempoolFileSize: uint64(stateChangeMempoolFileInfo.Size()), UnflushedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), - MempoolKeyValueMap: make(map[string][]byte), + MempoolSyncedKeyValueMap: make(map[string]*StateChangeEntry), MempoolFlushKeySet: make(map[string]bool), StateSyncerMutex: &sync.Mutex{}, SyncType: nodeSyncType, @@ -374,41 +390,23 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } if event.IsMempoolTxn { - // Create a mempool flush ID if one doesn't already exist. - if event.FlushId == uuid.Nil && stateChangeSyncer.MempoolFlushId == uuid.Nil { - stateChangeSyncer.MempoolFlushId = uuid.New() - } + // Set the flushId to the mempool flush ID. + flushId = stateChangeSyncer.MempoolFlushId - // If the event flush ID is nil, then we need to use the global mempool flush ID. - if flushId == uuid.Nil { - flushId = stateChangeSyncer.MempoolFlushId - } - - // The current state of the tracked mempool is stored in the MempoolKeyValueMap. If this entry is already in there + // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there // then we don't need to re-write it to the state change file. // Create key for op + key map - txKey := createMempoolTxKey(stateChangeEntry.OperationType, stateChangeEntry.KeyBytes) + txKey := createMempoolTxKey(stateChangeEntry.KeyBytes) // Track the key in the MempoolFlushKeySet. - if stateChangeEntry.OperationType == DbOperationTypeDelete { - delete(stateChangeSyncer.MempoolFlushKeySet, txKey) - } else { - stateChangeSyncer.MempoolFlushKeySet[txKey] = true - } + stateChangeSyncer.MempoolFlushKeySet[txKey] = true // Check to see if the key is in the map, and if the value is the same as the value in the event. - if valueBytes, ok := stateChangeSyncer.MempoolKeyValueMap[txKey]; ok && bytes.Equal(valueBytes, event.StateChangeEntry.EncoderBytes) { + if cachedSCE, ok := stateChangeSyncer.MempoolSyncedKeyValueMap[txKey]; ok && bytes.Equal(cachedSCE.EncoderBytes, event.StateChangeEntry.EncoderBytes) && cachedSCE.OperationType == event.StateChangeEntry.OperationType { // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, // then we don't need to write the state change entry to the state change file - it's already being tracked. return } - - // Track the key and value if this is a new entry to the mempool. - if stateChangeEntry.OperationType == DbOperationTypeDelete { - delete(stateChangeSyncer.MempoolKeyValueMap, txKey) - } else { - stateChangeSyncer.MempoolKeyValueMap[txKey] = event.StateChangeEntry.EncoderBytes - } } else { // If the flush ID is nil, then we need to use the global block sync flush ID. if flushId == uuid.Nil { @@ -448,6 +446,13 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // Set the flush ID. stateChangeEntry.FlushId = flushId + if event.IsMempoolTxn { + txKey := createMempoolTxKey(stateChangeEntry.KeyBytes) + // Track the key and value if this is a new entry to the mempool, or if the encoder bytes or operation type + // changed since it was last synced. + stateChangeSyncer.MempoolSyncedKeyValueMap[txKey] = event.StateChangeEntry + } + // Encode the state change entry. We encode as a byte array, so the consumer can buffer just the bytes needed // to decode this entry when reading from file. entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, stateChangeEntry, false) @@ -476,14 +481,43 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // This would mean that an entry was ejected from the mempool. // When this happens, we need to reset the mempool and start from scratch, so that the consumer can revert the // mempool transactions it currently has and sync the mempool from scratch. - for key := range stateChangeSyncer.MempoolKeyValueMap { - // If any of the keys that the mempool is currently tracking weren't included in the flush, the state syncer - // mempool is bad and needs to be reset. + // + // Example: + // + // Flush: + // Key: a + // Key: b + // Key: d + + // Synced: + // Key: a + // Key: b + // Key: c <- Revert this one + // Key: d + + for key, cachedSCE := range stateChangeSyncer.MempoolSyncedKeyValueMap { + // If any of the keys that the mempool is currently tracking weren't included in the flush, that entry + // needs to be reverted from the mempool. if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { - stateChangeSyncer.ResetMempool() - return + cachedSCE.IsReverted = true + + // Create a revert state change entry and add it to the queue. This will signal the state change + // consumer to revert the synced entry. + entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, cachedSCE, false) + writeBytes := EncodeByteArray(entryBytes) + + fmt.Printf("Reverting entry %d\n", cachedSCE.EncoderType) + + // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. + stateChangeSyncer.addTransactionToQueue(cachedSCE.FlushId, writeBytes) + + // Remove this entry from the synced map + delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) } } + fmt.Printf("\nFlush len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + fmt.Printf("\nSynced len: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + // Reset the mempool flush set. stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) } @@ -500,7 +534,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State } func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { - stateChangeSyncer.MempoolKeyValueMap = make(map[string][]byte) + stateChangeSyncer.MempoolSyncedKeyValueMap = make(map[string]*StateChangeEntry) stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) delete(stateChangeSyncer.UnflushedBytes, stateChangeSyncer.MempoolFlushId) stateChangeSyncer.MempoolFlushId = uuid.Nil @@ -620,8 +654,8 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State return nil } -func createMempoolTxKey(operationType StateSyncerOperationType, keyBytes []byte) string { - return fmt.Sprintf("%v%v", operationType, string(keyBytes)) +func createMempoolTxKey(keyBytes []byte) string { + return fmt.Sprintf("%v", string(keyBytes)) } // SyncMempoolToStateSyncer flushes all mempool transactions to the db, capturing those state changes @@ -639,6 +673,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser stateChangeSyncer.BlockHeight = blockHeight + stateChangeSyncer.MempoolFlushId = originalCommittedFlushId + mempoolUtxoView, err := server.GetMempool().GetAugmentedUniversalView() if err != nil { return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") @@ -702,6 +738,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser OperationType: DbOperationTypeUpsert, KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: true, @@ -720,6 +757,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser OperationType: DbOperationTypeUpsert, KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: true, @@ -764,11 +802,13 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv // Sleep for a short while to avoid a tight loop. time.Sleep(1 * time.Millisecond) var err error + start := time.Now() // If the mempool is not empty, sync the mempool to the state syncer. mempoolClosed, err = stateChangeSyncer.SyncMempoolToStateSyncer(server) if err != nil { glog.Errorf("StateChangeSyncer.StartMempoolSyncRoutine: Error syncing mempool to state syncer: %v", err) } + fmt.Printf("Synced mempool in %v", time.Since(start)) } }() } @@ -822,6 +862,7 @@ func (stateChangeSyncer *StateChangeSyncer) FlushAllEntriesToFile(server *Server OperationType: DbOperationTypeInsert, KeyBytes: dbEntry.Key, EncoderBytes: dbEntry.Value, + IsReverted: false, } // If this prefix is the prefix for UTXO Ops, fetch the transaction for each UTXO Op and attach it to the UTXO Op. diff --git a/lib/state_change_syncer_test.go b/lib/state_change_syncer_test.go index 083c851bd..a77559c4c 100644 --- a/lib/state_change_syncer_test.go +++ b/lib/state_change_syncer_test.go @@ -44,6 +44,7 @@ func TestStateChangeEntryEncoder(t *testing.T) { KeyBytes: []byte{1, 2, 3}, Encoder: postEntry, EncoderType: postEntry.GetEncoderType(), + IsReverted: false, } stateChangeEntryBytes := EncodeToBytes(0, stateChangeEntry) From 83b4596223d55692de96ca94956eb7f7ced8d21d Mon Sep 17 00:00:00 2001 From: superzordon Date: Wed, 6 Mar 2024 21:45:59 -0500 Subject: [PATCH 04/26] Update state syncer --- lib/state_change_syncer.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 20f468b6a..2dcb98668 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -387,11 +387,12 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // Crate a block sync flush ID if one doesn't already exist. if event.FlushId == uuid.Nil && stateChangeSyncer.BlockSyncFlushId == uuid.Nil { stateChangeSyncer.BlockSyncFlushId = uuid.New() + fmt.Printf("SETTING NEW BLOCK SYNC FLUSH ID: %v\n", stateChangeSyncer.BlockSyncFlushId) } if event.IsMempoolTxn { // Set the flushId to the mempool flush ID. - flushId = stateChangeSyncer.MempoolFlushId + flushId = stateChangeSyncer.BlockSyncFlushId // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there // then we don't need to re-write it to the state change file. @@ -499,6 +500,13 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // If any of the keys that the mempool is currently tracking weren't included in the flush, that entry // needs to be reverted from the mempool. if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { + // Confirm that the block sync ID hasn't shifted. If it has, bail now. + if cachedSCE.FlushId != stateChangeSyncer.BlockSyncFlushId { + fmt.Printf("The flush ID has changed, bailing now.\n") + //stateChangeSyncer.ResetMempool() + //return + } + cachedSCE.IsReverted = true // Create a revert state change entry and add it to the queue. This will signal the state change @@ -515,12 +523,14 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) } } - fmt.Printf("\nFlush len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) - fmt.Printf("\nSynced len: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + //fmt.Printf("\nFlush len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + //fmt.Printf("\nSynced len: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) // Reset the mempool flush set. stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) + } else { + fmt.Printf("Here is the flush ID: %v\n", event.FlushId) + fmt.Printf("Here is the block sync flush ID: %v\n", event.BlockSyncFlushId) } - err := stateChangeSyncer.FlushTransactionsToFile(event) if err != nil { glog.Errorf("StateChangeSyncer._handleStateSyncerFlush: Error flushing transactions to file: %v", err) @@ -529,6 +539,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State if !event.IsMempoolFlush { // After flushing blocksync transactions to file, reset the block sync flush ID, and reset the mempool. stateChangeSyncer.BlockSyncFlushId = uuid.New() + fmt.Printf("Setting a new blocksync flush ID: %v\n", stateChangeSyncer.BlockSyncFlushId) stateChangeSyncer.ResetMempool() } } @@ -665,6 +676,10 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser originalCommittedFlushId := stateChangeSyncer.BlockSyncFlushId + if originalCommittedFlushId == uuid.Nil { + return false, nil + } + if server.mempool.stopped { return true, nil } @@ -791,7 +806,7 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv time.Sleep(15000 * time.Millisecond) } if !stateChangeSyncer.BlocksyncCompleteEntriesFlushed && stateChangeSyncer.SyncType == NodeSyncTypeBlockSync { - fmt.Printf("Flushing to file") + fmt.Printf("Flushing to file\n") err := stateChangeSyncer.FlushAllEntriesToFile(server) if err != nil { fmt.Printf("StateChangeSyncer.StartMempoolSyncRoutine: Error flushing all entries to file: %v", err) @@ -800,15 +815,15 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv mempoolClosed := server.mempool.stopped for !mempoolClosed { // Sleep for a short while to avoid a tight loop. - time.Sleep(1 * time.Millisecond) + time.Sleep(150 * time.Millisecond) var err error - start := time.Now() + //start := time.Now() // If the mempool is not empty, sync the mempool to the state syncer. mempoolClosed, err = stateChangeSyncer.SyncMempoolToStateSyncer(server) if err != nil { glog.Errorf("StateChangeSyncer.StartMempoolSyncRoutine: Error syncing mempool to state syncer: %v", err) } - fmt.Printf("Synced mempool in %v", time.Since(start)) + //fmt.Printf("Synced mempool in %v\n", time.Since(start)) } }() } From 7d4822d1b05b50592df421c2077d7e2f14ff82ca Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 7 Mar 2024 22:27:12 -0500 Subject: [PATCH 05/26] Update reversion procedure --- lib/event_manager.go | 13 ++++++++++--- lib/server.go | 3 ++- lib/state_change_syncer.go | 32 +++++++++++++++++++++----------- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/lib/event_manager.go b/lib/event_manager.go index 55c4b7bd1..d1c39ddaf 100644 --- a/lib/event_manager.go +++ b/lib/event_manager.go @@ -1,6 +1,8 @@ package lib -import "github.com/google/uuid" +import ( + "github.com/google/uuid" +) type TransactionEventFunc func(event *TransactionEvent) type StateSyncerOperationEventFunc func(event *StateSyncerOperationEvent) @@ -53,6 +55,7 @@ type BlockEvent struct { type EventManager struct { transactionConnectedHandlers []TransactionEventFunc stateSyncerOperationHandlers []StateSyncerOperationEventFunc + stateSyncerOperationFlushIds []uuid.UUID stateSyncerFlushedHandlers []StateSyncerFlushedEventFunc blockConnectedHandlers []BlockEventFunc blockDisconnectedHandlers []BlockEventFunc @@ -65,8 +68,9 @@ func NewEventManager() *EventManager { return &EventManager{} } -func (em *EventManager) OnStateSyncerOperation(handler StateSyncerOperationEventFunc) { +func (em *EventManager) OnStateSyncerOperation(handler StateSyncerOperationEventFunc, flushId uuid.UUID) { em.stateSyncerOperationHandlers = append(em.stateSyncerOperationHandlers, handler) + em.stateSyncerOperationFlushIds = append(em.stateSyncerOperationFlushIds, flushId) } func (em *EventManager) OnStateSyncerFlushed(handler StateSyncerFlushedEventFunc) { @@ -74,7 +78,10 @@ func (em *EventManager) OnStateSyncerFlushed(handler StateSyncerFlushedEventFunc } func (em *EventManager) stateSyncerOperation(event *StateSyncerOperationEvent) { - for _, handler := range em.stateSyncerOperationHandlers { + for ii, handler := range em.stateSyncerOperationHandlers { + if len(em.stateSyncerOperationFlushIds) > ii && em.stateSyncerOperationFlushIds[ii] != uuid.Nil { + event.FlushId = em.stateSyncerOperationFlushIds[ii] + } handler(event) } } diff --git a/lib/server.go b/lib/server.go index 1b614b2b8..3733206a0 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/hex" "fmt" + "github.com/google/uuid" "net" "path/filepath" "reflect" @@ -385,7 +386,7 @@ func NewServer( // Create the state change syncer to handle syncing state changes to disk, and assign some of its methods // to the event manager. stateChangeSyncer = NewStateChangeSyncer(_stateChangeDir, _syncType) - eventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation) + eventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation, uuid.Nil) eventManager.OnStateSyncerFlushed(stateChangeSyncer._handleStateSyncerFlush) } diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 2dcb98668..0d514691b 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -392,7 +392,10 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S if event.IsMempoolTxn { // Set the flushId to the mempool flush ID. - flushId = stateChangeSyncer.BlockSyncFlushId + //flushId = stateChangeSyncer.BlockSyncFlushId + if flushId == uuid.Nil { + fmt.Printf("\n\n***** FLUSH ID IS NIL *****\n\n") + } // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there // then we don't need to re-write it to the state change file. @@ -473,7 +476,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // If this is a mempool flush, make sure a block hasn't mined since the mempool entries were added to queue. // If not, reset the mempool maps and file, and start from scratch. The consumer will revert the mempool transactions // it currently has and sync from scratch. - if stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId { + if stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId || stateChangeSyncer.BlockSyncFlushId != event.FlushId { stateChangeSyncer.ResetMempool() return } @@ -503,8 +506,8 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // Confirm that the block sync ID hasn't shifted. If it has, bail now. if cachedSCE.FlushId != stateChangeSyncer.BlockSyncFlushId { fmt.Printf("The flush ID has changed, bailing now.\n") - //stateChangeSyncer.ResetMempool() - //return + stateChangeSyncer.ResetMempool() + return } cachedSCE.IsReverted = true @@ -578,6 +581,12 @@ func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.U // FlushTransactionsToFile writes the bytes that have been cached on the StateChangeSyncer to the state change file. func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *StateSyncerFlushedEvent) error { flushId := event.FlushId + + if event.IsMempoolFlush && event.FlushId != stateChangeSyncer.BlockSyncFlushId { + fmt.Printf("Bailing on mempool flush because the block sync flush ID has changed.\n") + return nil + } + // Get the relevant global flush ID from the state change syncer if the flush ID is nil. if event.FlushId == uuid.Nil { if event.IsMempoolFlush { @@ -701,7 +710,9 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // Reset event manager handlers mempoolEventManager.stateSyncerOperationHandlers = nil mempoolEventManager.stateSyncerFlushedHandlers = nil - mempoolEventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation) + mempoolEventManager.stateSyncerOperationFlushIds = []uuid.UUID{} + // TODO: Just use the stateChangeSyncer MempoolFlushId rather than this mess. + mempoolEventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation, originalCommittedFlushId) mempoolEventManager.OnStateSyncerFlushed(stateChangeSyncer._handleStateSyncerFlush) mempoolEventManager.isMempoolManager = true @@ -718,7 +729,6 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // more than once in the mempool transactions. txn := server.blockchain.db.NewTransaction(true) defer txn.Discard() - err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) @@ -733,7 +743,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, Succeeded: false, IsMempoolFlush: true, }) @@ -755,7 +765,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), IsReverted: false, }, - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, IsMempoolTxn: true, }) @@ -774,7 +784,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), IsReverted: false, }, - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, IsMempoolTxn: true, }) } @@ -782,7 +792,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // Before flushing the mempool to the state change file, check if a block has mined. If so, abort the flush. if err != nil || originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, Succeeded: false, IsMempoolFlush: true, }) @@ -790,7 +800,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, Succeeded: true, IsMempoolFlush: true, BlockSyncFlushId: originalCommittedFlushId, From f7b9b0454c0b8997160ea0e0567f5d19b99a06e6 Mon Sep 17 00:00:00 2001 From: superzordon Date: Fri, 8 Mar 2024 14:02:23 -0500 Subject: [PATCH 06/26] Update decoder to handle block height error --- lib/event_manager.go | 9 ++---- lib/server.go | 3 +- lib/state_change_syncer.go | 64 +++++++++++++++++++++----------------- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/lib/event_manager.go b/lib/event_manager.go index d1c39ddaf..51a7e6f09 100644 --- a/lib/event_manager.go +++ b/lib/event_manager.go @@ -55,7 +55,6 @@ type BlockEvent struct { type EventManager struct { transactionConnectedHandlers []TransactionEventFunc stateSyncerOperationHandlers []StateSyncerOperationEventFunc - stateSyncerOperationFlushIds []uuid.UUID stateSyncerFlushedHandlers []StateSyncerFlushedEventFunc blockConnectedHandlers []BlockEventFunc blockDisconnectedHandlers []BlockEventFunc @@ -68,9 +67,8 @@ func NewEventManager() *EventManager { return &EventManager{} } -func (em *EventManager) OnStateSyncerOperation(handler StateSyncerOperationEventFunc, flushId uuid.UUID) { +func (em *EventManager) OnStateSyncerOperation(handler StateSyncerOperationEventFunc) { em.stateSyncerOperationHandlers = append(em.stateSyncerOperationHandlers, handler) - em.stateSyncerOperationFlushIds = append(em.stateSyncerOperationFlushIds, flushId) } func (em *EventManager) OnStateSyncerFlushed(handler StateSyncerFlushedEventFunc) { @@ -78,10 +76,7 @@ func (em *EventManager) OnStateSyncerFlushed(handler StateSyncerFlushedEventFunc } func (em *EventManager) stateSyncerOperation(event *StateSyncerOperationEvent) { - for ii, handler := range em.stateSyncerOperationHandlers { - if len(em.stateSyncerOperationFlushIds) > ii && em.stateSyncerOperationFlushIds[ii] != uuid.Nil { - event.FlushId = em.stateSyncerOperationFlushIds[ii] - } + for _, handler := range em.stateSyncerOperationHandlers { handler(event) } } diff --git a/lib/server.go b/lib/server.go index 3733206a0..1b614b2b8 100644 --- a/lib/server.go +++ b/lib/server.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" "fmt" - "github.com/google/uuid" "net" "path/filepath" "reflect" @@ -386,7 +385,7 @@ func NewServer( // Create the state change syncer to handle syncing state changes to disk, and assign some of its methods // to the event manager. stateChangeSyncer = NewStateChangeSyncer(_stateChangeDir, _syncType) - eventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation, uuid.Nil) + eventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation) eventManager.OnStateSyncerFlushed(stateChangeSyncer._handleStateSyncerFlush) } diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 0d514691b..aaaca501b 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -187,7 +187,8 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u // Decode the block height. entryBlockHeight, err := ReadUvarint(rr) if err != nil { - return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height") + //return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height") + fmt.Printf("StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height: %v\n", err) } stateChangeEntry.BlockHeight = entryBlockHeight @@ -394,7 +395,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // Set the flushId to the mempool flush ID. //flushId = stateChangeSyncer.BlockSyncFlushId if flushId == uuid.Nil { - fmt.Printf("\n\n***** FLUSH ID IS NIL *****\n\n") + flushId = stateChangeSyncer.MempoolFlushId } // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there @@ -499,31 +500,33 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // Key: c <- Revert this one // Key: d - for key, cachedSCE := range stateChangeSyncer.MempoolSyncedKeyValueMap { - // If any of the keys that the mempool is currently tracking weren't included in the flush, that entry - // needs to be reverted from the mempool. - if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { - // Confirm that the block sync ID hasn't shifted. If it has, bail now. - if cachedSCE.FlushId != stateChangeSyncer.BlockSyncFlushId { - fmt.Printf("The flush ID has changed, bailing now.\n") - stateChangeSyncer.ResetMempool() - return - } + if event.Succeeded { + for key, cachedSCE := range stateChangeSyncer.MempoolSyncedKeyValueMap { + // If any of the keys that the mempool is currently tracking weren't included in the flush, that entry + // needs to be reverted from the mempool. + if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { + // Confirm that the block sync ID hasn't shifted. If it has, bail now. + if cachedSCE.FlushId != stateChangeSyncer.BlockSyncFlushId { + fmt.Printf("The flush ID has changed, bailing now.\n") + stateChangeSyncer.ResetMempool() + return + } - cachedSCE.IsReverted = true + cachedSCE.IsReverted = true - // Create a revert state change entry and add it to the queue. This will signal the state change - // consumer to revert the synced entry. - entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, cachedSCE, false) - writeBytes := EncodeByteArray(entryBytes) + // Create a revert state change entry and add it to the queue. This will signal the state change + // consumer to revert the synced entry. + entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, cachedSCE, false) + writeBytes := EncodeByteArray(entryBytes) - fmt.Printf("Reverting entry %d\n", cachedSCE.EncoderType) + fmt.Printf("Reverting entry %d\n", cachedSCE.EncoderType) - // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. - stateChangeSyncer.addTransactionToQueue(cachedSCE.FlushId, writeBytes) + // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. + stateChangeSyncer.addTransactionToQueue(cachedSCE.FlushId, writeBytes) - // Remove this entry from the synced map - delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) + // Remove this entry from the synced map + delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) + } } } //fmt.Printf("\nFlush len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) @@ -582,10 +585,10 @@ func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.U func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *StateSyncerFlushedEvent) error { flushId := event.FlushId - if event.IsMempoolFlush && event.FlushId != stateChangeSyncer.BlockSyncFlushId { - fmt.Printf("Bailing on mempool flush because the block sync flush ID has changed.\n") - return nil - } + //if event.IsMempoolFlush && event.FlushId != stateChangeSyncer.BlockSyncFlushId { + // fmt.Printf("Bailing on mempool flush because the block sync flush ID has changed.\n") + // return nil + //} // Get the relevant global flush ID from the state change syncer if the flush ID is nil. if event.FlushId == uuid.Nil { @@ -710,9 +713,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // Reset event manager handlers mempoolEventManager.stateSyncerOperationHandlers = nil mempoolEventManager.stateSyncerFlushedHandlers = nil - mempoolEventManager.stateSyncerOperationFlushIds = []uuid.UUID{} - // TODO: Just use the stateChangeSyncer MempoolFlushId rather than this mess. - mempoolEventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation, originalCommittedFlushId) + mempoolEventManager.OnStateSyncerOperation(stateChangeSyncer._handleStateSyncerOperation) mempoolEventManager.OnStateSyncerFlushed(stateChangeSyncer._handleStateSyncerFlush) mempoolEventManager.isMempoolManager = true @@ -754,6 +755,11 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) if err != nil { + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") } From 13147ad5f25ec86d1e8afb84ed83565bbdc2cf30 Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 12 Mar 2024 23:41:41 -0400 Subject: [PATCH 07/26] Add logging --- lib/state_change_syncer.go | 44 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index aaaca501b..a0aed9549 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -473,11 +473,14 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State stateChangeSyncer.StateSyncerMutex.Lock() defer stateChangeSyncer.StateSyncerMutex.Unlock() + fmt.Printf("Handling state syncer flush: %+v\n", event) + if event.IsMempoolFlush { // If this is a mempool flush, make sure a block hasn't mined since the mempool entries were added to queue. // If not, reset the mempool maps and file, and start from scratch. The consumer will revert the mempool transactions // it currently has and sync from scratch. if stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId || stateChangeSyncer.BlockSyncFlushId != event.FlushId { + fmt.Printf("The flush ID has changed, bailing now.\n") stateChangeSyncer.ResetMempool() return } @@ -507,7 +510,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { // Confirm that the block sync ID hasn't shifted. If it has, bail now. if cachedSCE.FlushId != stateChangeSyncer.BlockSyncFlushId { - fmt.Printf("The flush ID has changed, bailing now.\n") + fmt.Printf("The flush ID has changed, inside key/value check, bailing now.\n") stateChangeSyncer.ResetMempool() return } @@ -551,6 +554,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State } func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { + fmt.Printf("Resetting mempool.\n") stateChangeSyncer.MempoolSyncedKeyValueMap = make(map[string]*StateChangeEntry) stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) delete(stateChangeSyncer.UnflushedBytes, stateChangeSyncer.MempoolFlushId) @@ -585,6 +589,8 @@ func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.U func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *StateSyncerFlushedEvent) error { flushId := event.FlushId + fmt.Printf("Flushing to file: %+v\n", event) + //if event.IsMempoolFlush && event.FlushId != stateChangeSyncer.BlockSyncFlushId { // fmt.Printf("Bailing on mempool flush because the block sync flush ID has changed.\n") // return nil @@ -615,12 +621,18 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State // If the flush failed, delete the unflushed bytes and associated metadata. // Also delete any unconnected mempool txns from our cache. if !event.Succeeded { + fmt.Printf("Deleting unflushed bytes for id: %s\n", flushId) delete(stateChangeSyncer.UnflushedBytes, flushId) + if event.IsMempoolFlush { + stateChangeSyncer.ResetMempool() + } + return nil } unflushedBytes, exists := stateChangeSyncer.UnflushedBytes[flushId] if !exists { + fmt.Printf("Unflushed bytes for flush ID doesn't exist: %s\n", flushId.String()) return nil } @@ -702,6 +714,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser stateChangeSyncer.MempoolFlushId = originalCommittedFlushId + fmt.Printf("Original committed flush ID: %v\n", originalCommittedFlushId) + mempoolUtxoView, err := server.GetMempool().GetAugmentedUniversalView() if err != nil { return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") @@ -730,10 +744,25 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // more than once in the mempool transactions. txn := server.blockchain.db.NewTransaction(true) defer txn.Discard() + fmt.Printf("Mempool synced len before flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + fmt.Printf("Mempool flushed len before flush: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) + if err != nil { + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: FlushToDbWithTxn: ") + } mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) if err != nil { + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: CreateMempoolTxUtxoView: ") } @@ -748,13 +777,23 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser Succeeded: false, IsMempoolFlush: true, }) + fmt.Printf("After the mempool flush: %+v\n", &StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") } + if len(mempoolTxns) > 0 { + fmt.Printf("Mempool tx hash: %v\n", mempoolTxns[0].Hash.String()) + } + fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) for _, mempoolTx := range mempoolTxns { utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) if err != nil { + fmt.Printf("Right before the mempool flush error: %v\n", err) mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ FlushId: originalCommittedFlushId, Succeeded: false, @@ -794,6 +833,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser IsMempoolTxn: true, }) } + fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) // Before flushing the mempool to the state change file, check if a block has mined. If so, abort the flush. if err != nil || originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { @@ -822,7 +863,6 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv time.Sleep(15000 * time.Millisecond) } if !stateChangeSyncer.BlocksyncCompleteEntriesFlushed && stateChangeSyncer.SyncType == NodeSyncTypeBlockSync { - fmt.Printf("Flushing to file\n") err := stateChangeSyncer.FlushAllEntriesToFile(server) if err != nil { fmt.Printf("StateChangeSyncer.StartMempoolSyncRoutine: Error flushing all entries to file: %v", err) From e4615863d286bd95cad09cfac6f41819433a8b54 Mon Sep 17 00:00:00 2001 From: superzordon Date: Wed, 13 Mar 2024 17:31:13 -0400 Subject: [PATCH 08/26] Updates to syncing logic --- lib/blockchain.go | 78 +++++++++++++++++++++----------------- lib/state_change_syncer.go | 4 ++ 2 files changed, 47 insertions(+), 35 deletions(-) diff --git a/lib/blockchain.go b/lib/blockchain.go index 621f90636..33f67457e 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -2185,17 +2185,15 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo } bc.timer.End("Blockchain.ProcessBlock: Transactions Db snapshot & operations") if innerErr = bc.blockView.FlushToDbWithTxn(txn, blockHeight); innerErr != nil { + // If we're in the middle of a sync, we should notify the event manager that we failed to sync the block. + if bc.eventManager != nil && !bc.eventManager.isMempoolManager { + bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: uuid.Nil, + Succeeded: false, + }) + } return errors.Wrapf(innerErr, "ProcessBlock: Problem writing utxo view to db on simple add to tip") } - // Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that - // state syncer maintains a consistent view of the blockchain. - // Note: We ignore the mempool manager here, as that process handles state syncer flush events itself. - if bc.eventManager != nil && !bc.eventManager.isMempoolManager { - bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, - Succeeded: innerErr == nil, - }) - } bc.timer.End("Blockchain.ProcessBlock: Transactions Db utxo flush") bc.timer.Start("Blockchain.ProcessBlock: Transactions Db snapshot & operations") @@ -2548,6 +2546,16 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo // Signal the server that we've accepted this block in some way. if bc.eventManager != nil { bc.eventManager.blockAccepted(&BlockEvent{Block: desoBlock}) + // Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that + // state syncer maintains a consistent view of the blockchain. + // Note: We ignore the mempool manager here, as that process handles state syncer flush events itself. + if !bc.eventManager.isMempoolManager { + fmt.Printf("Emitting state syncer flushed event for synced block\n") + bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: uuid.Nil, + Succeeded: true, + }) + } } bc.timer.Print("Blockchain.ProcessBlock: Initial") @@ -3168,7 +3176,7 @@ func (bc *Blockchain) CreateUpdateGlobalParamsTxn(updaterPublicKey []byte, minimumNetworkFeeNanosPerKb int64, forbiddenPubKey []byte, maxNonceExpirationBlockHeightOffset int64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3220,10 +3228,10 @@ func (bc *Blockchain) CreateUpdateGlobalParamsTxn(updaterPublicKey []byte, } func (bc *Blockchain) CreateUpdateBitcoinUSDExchangeRateTxn( - // Exchange rate update fields +// Exchange rate update fields updaterPublicKey []byte, usdCentsPerbitcoin uint64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3255,7 +3263,7 @@ func (bc *Blockchain) CreateUpdateBitcoinUSDExchangeRateTxn( } func (bc *Blockchain) CreateSubmitPostTxn( - // Post fields +// Post fields updaterPublicKey []byte, postHashToModify []byte, parentStakeID []byte, @@ -3265,7 +3273,7 @@ func (bc *Blockchain) CreateSubmitPostTxn( tstampNanos uint64, postExtraData map[string][]byte, isHidden bool, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3325,7 +3333,7 @@ func (bc *Blockchain) CreateSubmitPostTxn( func (bc *Blockchain) CreateUpdateProfileTxn( UpdaterPublicKeyBytes []byte, - // Optional. Only set when the owner of the profile is != to the updater. +// Optional. Only set when the owner of the profile is != to the updater. OptionalProfilePublicKeyBytes []byte, NewUsername string, NewDescription string, @@ -3335,7 +3343,7 @@ func (bc *Blockchain) CreateUpdateProfileTxn( IsHidden bool, AdditionalFees uint64, ExtraData map[string][]byte, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3378,7 +3386,7 @@ func (bc *Blockchain) CreateSwapIdentityTxn( FromPublicKeyBytes []byte, ToPublicKeyBytes []byte, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3412,7 +3420,7 @@ func (bc *Blockchain) CreateSwapIdentityTxn( func (bc *Blockchain) CreateCreatorCoinTxn( UpdaterPublicKey []byte, - // See CreatorCoinMetadataa for an explanation of these fields. +// See CreatorCoinMetadataa for an explanation of these fields. ProfilePublicKey []byte, OperationType CreatorCoinOperationType, DeSoToSellNanos uint64, @@ -3420,7 +3428,7 @@ func (bc *Blockchain) CreateCreatorCoinTxn( DeSoToAddNanos uint64, MinDeSoExpectedNanos uint64, MinCreatorCoinExpectedNanos uint64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3467,7 +3475,7 @@ func (bc *Blockchain) CreateCreatorCoinTransferTxn( ProfilePublicKey []byte, CreatorCoinToTransferNanos uint64, RecipientPublicKey []byte, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3506,9 +3514,9 @@ func (bc *Blockchain) CreateCreatorCoinTransferTxn( func (bc *Blockchain) CreateDAOCoinTxn( UpdaterPublicKey []byte, - // See CreatorCoinMetadataa for an explanation of these fields. +// See CreatorCoinMetadataa for an explanation of these fields. metadata *DAOCoinMetadata, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3545,7 +3553,7 @@ func (bc *Blockchain) CreateDAOCoinTxn( func (bc *Blockchain) CreateDAOCoinTransferTxn( UpdaterPublicKey []byte, metadata *DAOCoinTransferMetadata, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3580,9 +3588,9 @@ func (bc *Blockchain) CreateDAOCoinTransferTxn( func (bc *Blockchain) CreateDAOCoinLimitOrderTxn( UpdaterPublicKey []byte, - // See DAOCoinLimitOrderMetadata for an explanation of these fields. +// See DAOCoinLimitOrderMetadata for an explanation of these fields. metadata *DAOCoinLimitOrderMetadata, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3786,7 +3794,7 @@ func (bc *Blockchain) CreateCreateNFTTxn( AdditionalDESORoyalties map[PublicKey]uint64, AdditionalCoinRoyalties map[PublicKey]uint64, ExtraData map[string][]byte, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3913,7 +3921,7 @@ func (bc *Blockchain) CreateNFTBidTxn( NFTPostHash *BlockHash, SerialNumber uint64, BidAmountNanos uint64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { // Create a transaction containing the NFT bid fields. @@ -3983,7 +3991,7 @@ func (bc *Blockchain) CreateNFTTransferTxn( NFTPostHash *BlockHash, SerialNumber uint64, EncryptedUnlockableTextBytes []byte, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4022,7 +4030,7 @@ func (bc *Blockchain) CreateAcceptNFTTransferTxn( UpdaterPublicKey []byte, NFTPostHash *BlockHash, SerialNumber uint64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4063,7 +4071,7 @@ func (bc *Blockchain) CreateBurnNFTTxn( UpdaterPublicKey []byte, NFTPostHash *BlockHash, SerialNumber uint64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4103,7 +4111,7 @@ func (bc *Blockchain) CreateAcceptNFTBidTxn( BidderPKID *PKID, BidAmountNanos uint64, EncryptedUnlockableTextBytes []byte, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4187,7 +4195,7 @@ func (bc *Blockchain) CreateUpdateNFTTxn( MinBidAmountNanos uint64, IsBuyNow bool, BuyNowPriceNanos uint64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4415,7 +4423,7 @@ func (bc *Blockchain) CreateCreatorCoinTransferTxnWithDiamonds( ReceiverPublicKey []byte, DiamondPostHash *BlockHash, DiamondLevel int64, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4499,7 +4507,7 @@ func (bc *Blockchain) CreateAuthorizeDerivedKeyTxn( extraData map[string][]byte, memo []byte, transactionSpendingLimitHex string, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4643,7 +4651,7 @@ func (bc *Blockchain) CreateBasicTransferTxnWithDiamonds( DiamondPostHash *BlockHash, DiamondLevel int64, ExtraData map[string][]byte, - // Standard transaction fields +// Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _spendAmount uint64, _changeAmount uint64, _fees uint64, _err error) { diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index a0aed9549..955e68ae5 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -712,6 +712,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser stateChangeSyncer.BlockHeight = blockHeight + fmt.Printf("Block height: %v\n", blockHeight) + stateChangeSyncer.MempoolFlushId = originalCommittedFlushId fmt.Printf("Original committed flush ID: %v\n", originalCommittedFlushId) @@ -721,6 +723,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") } + fmt.Printf("Mempool tip hash: %v\n", mempoolUtxoView.TipHash.String()) + // Create a copy of the event manager, assign it to this utxo view. mempoolEventManager := *mempoolUtxoView.EventManager From 54b30597d206a0c860fcbf592df9f033d4cd66c9 Mon Sep 17 00:00:00 2001 From: superzordon Date: Sun, 17 Mar 2024 19:38:33 -0400 Subject: [PATCH 09/26] Update state change unflushed bytes logic --- lib/state_change_syncer.go | 65 +++++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 955e68ae5..b04cb9bb9 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -3,6 +3,7 @@ package lib import ( "bytes" "encoding/binary" + "encoding/json" "fmt" "github.com/deso-protocol/go-deadlock" "github.com/golang/glog" @@ -240,7 +241,8 @@ type StateChangeSyncer struct { // we write the correct entries to the state change file. // During blocksync, all flushes are synchronous, so we don't need to worry about this. As such, those flushes // are given the uuid.Nil ID. - UnflushedBytes map[uuid.UUID]UnflushedStateSyncerBytes + UnflushedCommittedBytes map[uuid.UUID]UnflushedStateSyncerBytes + UnflushedMempoolBytes map[uuid.UUID]UnflushedStateSyncerBytes // This map is used to keep track of all the key and value pairs that state syncer is tracking (and therefore // don't need to be re-emitted to the state change file). @@ -337,7 +339,8 @@ func NewStateChangeSyncer(stateChangeDir string, nodeSyncType NodeSyncType) *Sta StateChangeMempoolFile: stateChangeMempoolFile, StateChangeMempoolIndexFile: stateChangeMempoolIndexFile, StateChangeMempoolFileSize: uint64(stateChangeMempoolFileInfo.Size()), - UnflushedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), + UnflushedCommittedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), + UnflushedMempoolBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), MempoolSyncedKeyValueMap: make(map[string]*StateChangeEntry), MempoolFlushKeySet: make(map[string]bool), StateSyncerMutex: &sync.Mutex{}, @@ -433,6 +436,20 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } encoderType = encoder.GetEncoderType() + + if encoderType == EncoderTypePostEntry { + rr := bytes.NewReader(stateChangeEntry.EncoderBytes) + if exist, err := DecodeFromBytes(encoder, rr); exist && err == nil { + post := encoder.(*PostEntry) + if PkToString(post.PosterPublicKey, &DeSoMainnetParams) == "BC1YLiUs9hvLpLL679JMtvaueoBKPbqmrckGdv1GKK823fHrBM12YBR" { + fmt.Printf("Handling post entry for flush %v: %+v\n", flushId, event.StateChangeEntry) + fmt.Printf("Event: %+v\n", event) + var body DeSoBodySchema + json.Unmarshal(post.Body, &body) + fmt.Printf("Handling post entry body: %s\n", body.Body) + } + } + } } else { // If the value associated with the key is not an encoder, then we decode the encoder entirely from the key bytes. // Examples of this are FollowEntry, LikeEntry, DeSoBalanceEntry, etc. @@ -464,7 +481,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S writeBytes := EncodeByteArray(entryBytes) // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. - stateChangeSyncer.addTransactionToQueue(stateChangeEntry.FlushId, writeBytes) + stateChangeSyncer.addTransactionToQueue(stateChangeEntry.FlushId, writeBytes, event.IsMempoolTxn) } // _handleStateSyncerFlush is called when a Badger db flush takes place. It calls a helper function that takes the bytes that @@ -525,7 +542,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State fmt.Printf("Reverting entry %d\n", cachedSCE.EncoderType) // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. - stateChangeSyncer.addTransactionToQueue(cachedSCE.FlushId, writeBytes) + stateChangeSyncer.addTransactionToQueue(cachedSCE.FlushId, writeBytes, true) // Remove this entry from the synced map delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) @@ -557,7 +574,7 @@ func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { fmt.Printf("Resetting mempool.\n") stateChangeSyncer.MempoolSyncedKeyValueMap = make(map[string]*StateChangeEntry) stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) - delete(stateChangeSyncer.UnflushedBytes, stateChangeSyncer.MempoolFlushId) + delete(stateChangeSyncer.UnflushedMempoolBytes, stateChangeSyncer.MempoolFlushId) stateChangeSyncer.MempoolFlushId = uuid.Nil // Truncate the mempool files. stateChangeSyncer.StateChangeMempoolFile.Truncate(0) @@ -566,8 +583,17 @@ func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { } // Add a transaction to the queue of transactions to be flushed to disk upon badger db flush. -func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.UUID, writeBytes []byte) { - unflushedBytes, exists := stateChangeSyncer.UnflushedBytes[flushId] +func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.UUID, writeBytes []byte, isMempool bool) { + + var unflushedBytes UnflushedStateSyncerBytes + var exists bool + + if isMempool { + unflushedBytes, exists = stateChangeSyncer.UnflushedMempoolBytes[flushId] + } else { + unflushedBytes, exists = stateChangeSyncer.UnflushedCommittedBytes[flushId] + } + if !exists { unflushedBytes = UnflushedStateSyncerBytes{ StateChangeBytes: []byte{}, @@ -582,7 +608,11 @@ func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.U unflushedBytes.StateChangeBytes = append(unflushedBytes.StateChangeBytes, writeBytes...) - stateChangeSyncer.UnflushedBytes[flushId] = unflushedBytes + if isMempool { + stateChangeSyncer.UnflushedMempoolBytes[flushId] = unflushedBytes + } else { + stateChangeSyncer.UnflushedCommittedBytes[flushId] = unflushedBytes + } } // FlushTransactionsToFile writes the bytes that have been cached on the StateChangeSyncer to the state change file. @@ -622,14 +652,22 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State // Also delete any unconnected mempool txns from our cache. if !event.Succeeded { fmt.Printf("Deleting unflushed bytes for id: %s\n", flushId) - delete(stateChangeSyncer.UnflushedBytes, flushId) if event.IsMempoolFlush { + delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) stateChangeSyncer.ResetMempool() + } else { + delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) } return nil } - unflushedBytes, exists := stateChangeSyncer.UnflushedBytes[flushId] + var unflushedBytes UnflushedStateSyncerBytes + var exists bool + if event.IsMempoolFlush { + unflushedBytes, exists = stateChangeSyncer.UnflushedMempoolBytes[flushId] + } else { + unflushedBytes, exists = stateChangeSyncer.UnflushedCommittedBytes[flushId] + } if !exists { fmt.Printf("Unflushed bytes for flush ID doesn't exist: %s\n", flushId.String()) @@ -685,7 +723,12 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State } // Update unflushed bytes map to remove the flushed bytes. - delete(stateChangeSyncer.UnflushedBytes, flushId) + if event.IsMempoolFlush { + delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) + } else { + delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) + } + return nil } From ef3716194d3534937257e193f0b4b8f831020119 Mon Sep 17 00:00:00 2001 From: superzordon Date: Mon, 18 Mar 2024 21:32:32 -0400 Subject: [PATCH 10/26] Add caching to mempool, update logging --- lib/state_change_syncer.go | 81 ++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index b04cb9bb9..a8ba259c7 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -260,6 +260,10 @@ type StateChangeSyncer struct { // clear out any mempool entries that were ejected from the mempool. MempoolFlushKeySet map[string]bool + // This cache stores the transactions and their associated utxo ops that are currently in the mempool. + // This allows us to reduce the number of connect transaction calls when syncing the mempool + MempoolCachedTxns map[string][]*StateChangeEntry + // Tracks the flush IDs of the last block sync flush and the last mempool flush. // These are not used during hypersync, as many flushes are being processed asynchronously. BlockSyncFlushId uuid.UUID @@ -343,6 +347,7 @@ func NewStateChangeSyncer(stateChangeDir string, nodeSyncType NodeSyncType) *Sta UnflushedMempoolBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), MempoolSyncedKeyValueMap: make(map[string]*StateChangeEntry), MempoolFlushKeySet: make(map[string]bool), + MempoolCachedTxns: make(map[string][]*StateChangeEntry), StateSyncerMutex: &sync.Mutex{}, SyncType: nodeSyncType, BlocksyncCompleteEntriesFlushed: blocksyncCompleteEntriesFlushed, @@ -576,6 +581,7 @@ func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) delete(stateChangeSyncer.UnflushedMempoolBytes, stateChangeSyncer.MempoolFlushId) stateChangeSyncer.MempoolFlushId = uuid.Nil + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) // Truncate the mempool files. stateChangeSyncer.StateChangeMempoolFile.Truncate(0) stateChangeSyncer.StateChangeMempoolIndexFile.Truncate(0) @@ -837,47 +843,62 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) for _, mempoolTx := range mempoolTxns { - utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( - mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) - if err != nil { - fmt.Printf("Right before the mempool flush error: %v\n", err) - mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: originalCommittedFlushId, - Succeeded: false, - IsMempoolFlush: true, - }) - return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") - } - - // Emit transaction state change. - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: &StateChangeEntry{ + var txnStateChangeEntry *StateChangeEntry + var utxoOpStateChangeEntry *StateChangeEntry + // Check if the transaction is already in the cache. If so, skip it. + txHash := mempoolTx.Hash.String() + if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { + txnStateChangeEntry = stateChangeEntries[0] + utxoOpStateChangeEntry = stateChangeEntries[1] + } else { + utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( + mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) + if err != nil { + fmt.Printf("Right before the mempool flush error: %v\n", err) + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + } + txnStateChangeEntry = &StateChangeEntry{ OperationType: DbOperationTypeUpsert, KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), IsReverted: false, - }, - FlushId: originalCommittedFlushId, - IsMempoolTxn: true, - }) + } - // Capture the utxo ops for the transaction in a UTXOOp bundle. - utxoOpBundle := &UtxoOperationBundle{ - UtxoOpBundle: [][]*UtxoOperation{}, - } + // Capture the utxo ops for the transaction in a UTXOOp bundle. + utxoOpBundle := &UtxoOperationBundle{ + UtxoOpBundle: [][]*UtxoOperation{}, + } - utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) + utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) - // Emit UTXOOp bundle event - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: &StateChangeEntry{ + utxoOpStateChangeEntry = &StateChangeEntry{ OperationType: DbOperationTypeUpsert, KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), IsReverted: false, - }, - FlushId: originalCommittedFlushId, - IsMempoolTxn: true, + } + + // Add both state change entries to the mempool sync map. + stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + } + + // Emit transaction state change. + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: txnStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, + }) + + // Emit UTXOOp bundle event + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: utxoOpStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, }) } fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) From 381d4aef56a0f2d556bd3cb599a373e2981a1099 Mon Sep 17 00:00:00 2001 From: superzordon Date: Mon, 18 Mar 2024 21:52:14 -0400 Subject: [PATCH 11/26] Update revert logic --- lib/state_change_syncer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index a8ba259c7..97078dec6 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -501,8 +501,8 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // If this is a mempool flush, make sure a block hasn't mined since the mempool entries were added to queue. // If not, reset the mempool maps and file, and start from scratch. The consumer will revert the mempool transactions // it currently has and sync from scratch. - if stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId || stateChangeSyncer.BlockSyncFlushId != event.FlushId { - fmt.Printf("The flush ID has changed, bailing now.\n") + if (stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId && event.BlockSyncFlushId != uuid.Nil) || stateChangeSyncer.BlockSyncFlushId != event.FlushId { + fmt.Printf("The flush ID has changed, bailing now. Event: %v, Event block sync: %v, Global block sync: %v\n", event.FlushId, event.BlockSyncFlushId, stateChangeSyncer.BlockSyncFlushId) stateChangeSyncer.ResetMempool() return } From 0c77aae8f40125e60ffa41bd57429f21a325cb10 Mon Sep 17 00:00:00 2001 From: superzordon Date: Wed, 20 Mar 2024 17:23:08 -0400 Subject: [PATCH 12/26] Updates to transaction caching --- lib/state_change_syncer.go | 118 ++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 47 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 97078dec6..183415c52 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -250,6 +250,9 @@ type StateChangeSyncer struct { // The value is the badger entry that was flushed to the db. //MempoolSyncedKeyValueMap map[string][]byte MempoolSyncedKeyValueMap map[string]*StateChangeEntry + + MempoolNewlyFlushedTxns map[string]*StateChangeEntry + // This map tracks the keys that were flushed to the mempool in a single flush. // Every time a flush occurs, this map is cleared, as opposed to the MempoolSyncedKeyValueMap, which is only cleared // when a new block is processed. @@ -264,6 +267,8 @@ type StateChangeSyncer struct { // This allows us to reduce the number of connect transaction calls when syncing the mempool MempoolCachedTxns map[string][]*StateChangeEntry + MempoolCachedUtxoView *UtxoView + // Tracks the flush IDs of the last block sync flush and the last mempool flush. // These are not used during hypersync, as many flushes are being processed asynchronously. BlockSyncFlushId uuid.UUID @@ -346,6 +351,7 @@ func NewStateChangeSyncer(stateChangeDir string, nodeSyncType NodeSyncType) *Sta UnflushedCommittedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), UnflushedMempoolBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), MempoolSyncedKeyValueMap: make(map[string]*StateChangeEntry), + MempoolNewlyFlushedTxns: make(map[string]*StateChangeEntry), MempoolFlushKeySet: make(map[string]bool), MempoolCachedTxns: make(map[string][]*StateChangeEntry), StateSyncerMutex: &sync.Mutex{}, @@ -419,6 +425,16 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, // then we don't need to write the state change entry to the state change file - it's already being tracked. return + } else if ok { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + // Skip if the entry is already being tracked as a new flush. + if _, newFlushExists := stateChangeSyncer.MempoolNewlyFlushedTxns[txKey]; !newFlushExists { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = cachedSCE + } + } else { + // If the key is not in the map, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = nil } } else { // If the flush ID is nil, then we need to use the global block sync flush ID. @@ -475,6 +491,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S if event.IsMempoolTxn { txKey := createMempoolTxKey(stateChangeEntry.KeyBytes) + // Track the key and value if this is a new entry to the mempool, or if the encoder bytes or operation type // changed since it was last synced. stateChangeSyncer.MempoolSyncedKeyValueMap[txKey] = event.StateChangeEntry @@ -578,6 +595,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { fmt.Printf("Resetting mempool.\n") stateChangeSyncer.MempoolSyncedKeyValueMap = make(map[string]*StateChangeEntry) + stateChangeSyncer.MempoolNewlyFlushedTxns = make(map[string]*StateChangeEntry) stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) delete(stateChangeSyncer.UnflushedMempoolBytes, stateChangeSyncer.MempoolFlushId) stateChangeSyncer.MempoolFlushId = uuid.Nil @@ -660,7 +678,15 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State fmt.Printf("Deleting unflushed bytes for id: %s\n", flushId) if event.IsMempoolFlush { delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) - stateChangeSyncer.ResetMempool() + // Loop through the unflushed mempool transactions and delete them from the cache. + for key, sce := range stateChangeSyncer.MempoolNewlyFlushedTxns { + if sce != nil { + stateChangeSyncer.MempoolSyncedKeyValueMap[key] = sce + } else { + delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) + delete(stateChangeSyncer.MempoolFlushKeySet, key) + } + } } else { delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) } @@ -731,6 +757,7 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State // Update unflushed bytes map to remove the flushed bytes. if event.IsMempoolFlush { delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) + stateChangeSyncer.MempoolNewlyFlushedTxns = make(map[string]*StateChangeEntry) } else { delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) } @@ -761,19 +788,15 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser stateChangeSyncer.BlockHeight = blockHeight - fmt.Printf("Block height: %v\n", blockHeight) - stateChangeSyncer.MempoolFlushId = originalCommittedFlushId - fmt.Printf("Original committed flush ID: %v\n", originalCommittedFlushId) + //fmt.Printf("Original committed flush ID: %v\n", originalCommittedFlushId) mempoolUtxoView, err := server.GetMempool().GetAugmentedUniversalView() if err != nil { return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") } - fmt.Printf("Mempool tip hash: %v\n", mempoolUtxoView.TipHash.String()) - // Create a copy of the event manager, assign it to this utxo view. mempoolEventManager := *mempoolUtxoView.EventManager @@ -797,8 +820,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // more than once in the mempool transactions. txn := server.blockchain.db.NewTransaction(true) defer txn.Discard() - fmt.Printf("Mempool synced len before flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) - fmt.Printf("Mempool flushed len before flush: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + //fmt.Printf("Mempool synced len before flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + //fmt.Printf("Mempool flushed len before flush: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ @@ -839,54 +862,55 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } if len(mempoolTxns) > 0 { - fmt.Printf("Mempool tx hash: %v\n", mempoolTxns[0].Hash.String()) + //fmt.Printf("Mempool tx hash: %v\n", mempoolTxns[0].Hash.String()) } - fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + //fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) for _, mempoolTx := range mempoolTxns { var txnStateChangeEntry *StateChangeEntry var utxoOpStateChangeEntry *StateChangeEntry // Check if the transaction is already in the cache. If so, skip it. txHash := mempoolTx.Hash.String() - if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { - txnStateChangeEntry = stateChangeEntries[0] - utxoOpStateChangeEntry = stateChangeEntries[1] - } else { - utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( - mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) - if err != nil { - fmt.Printf("Right before the mempool flush error: %v\n", err) - mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: originalCommittedFlushId, - Succeeded: false, - IsMempoolFlush: true, - }) - return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") - } - txnStateChangeEntry = &StateChangeEntry{ - OperationType: DbOperationTypeUpsert, - KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), - EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), - IsReverted: false, - } - - // Capture the utxo ops for the transaction in a UTXOOp bundle. - utxoOpBundle := &UtxoOperationBundle{ - UtxoOpBundle: [][]*UtxoOperation{}, - } + //if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { + // txnStateChangeEntry = stateChangeEntries[0] + // utxoOpStateChangeEntry = stateChangeEntries[1] + //} else { + utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( + mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) + if err != nil { + fmt.Printf("Right before the mempool flush error: %v\n", err) + continue + //mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + // FlushId: originalCommittedFlushId, + // Succeeded: false, + // IsMempoolFlush: true, + //}) + //return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + } + txnStateChangeEntry = &StateChangeEntry{ + OperationType: DbOperationTypeUpsert, + KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), + EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), + IsReverted: false, + } - utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) + // Capture the utxo ops for the transaction in a UTXOOp bundle. + utxoOpBundle := &UtxoOperationBundle{ + UtxoOpBundle: [][]*UtxoOperation{}, + } - utxoOpStateChangeEntry = &StateChangeEntry{ - OperationType: DbOperationTypeUpsert, - KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), - EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), - IsReverted: false, - } + utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) - // Add both state change entries to the mempool sync map. - stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + utxoOpStateChangeEntry = &StateChangeEntry{ + OperationType: DbOperationTypeUpsert, + KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), + EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), + IsReverted: false, } + // Add both state change entries to the mempool sync map. + stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + //} + // Emit transaction state change. mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ StateChangeEntry: txnStateChangeEntry, @@ -901,8 +925,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser IsMempoolTxn: true, }) } - fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) - fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + //fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + //fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) // Before flushing the mempool to the state change file, check if a block has mined. If so, abort the flush. if err != nil || originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { From 67e8113ea767186b4fb761f1e60f56eba76af48c Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 21 Mar 2024 18:00:18 -0400 Subject: [PATCH 13/26] Add back erroring to mempool --- lib/state_change_syncer.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 183415c52..9c11567ff 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -877,14 +877,14 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) if err != nil { - fmt.Printf("Right before the mempool flush error: %v\n", err) - continue - //mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - // FlushId: originalCommittedFlushId, - // Succeeded: false, - // IsMempoolFlush: true, - //}) - //return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + //fmt.Printf("Right before the mempool flush error: %v\n", err) + //continue + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") } txnStateChangeEntry = &StateChangeEntry{ OperationType: DbOperationTypeUpsert, From eee0bfef2827d0a15122fbb37e6af54deb782dd9 Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 26 Mar 2024 22:03:18 -0400 Subject: [PATCH 14/26] Update block height issue --- lib/state_change_syncer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 9c11567ff..f39744dd0 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -188,6 +188,7 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u // Decode the block height. entryBlockHeight, err := ReadUvarint(rr) if err != nil { + entryBlockHeight = DeSoMainnetParams.EncoderMigrationHeights.BalanceModel.Height //return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height") fmt.Printf("StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height: %v\n", err) } From d5a9d1cb1f6b7c888210b992cb3f3b2617008bff Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 2 Apr 2024 19:42:07 -0400 Subject: [PATCH 15/26] Add timing logs --- lib/state_change_syncer.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index f39744dd0..716b38323 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -775,6 +775,7 @@ func createMempoolTxKey(keyBytes []byte) string { // utxo ops and adds them to the mempool state change file. func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Server) (bool, error) { + startTime := time.Now() originalCommittedFlushId := stateChangeSyncer.BlockSyncFlushId if originalCommittedFlushId == uuid.Nil { @@ -823,6 +824,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser defer txn.Discard() //fmt.Printf("Mempool synced len before flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) //fmt.Printf("Mempool flushed len before flush: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + fmt.Printf("Time since mempool sync start: %v\n", time.Since(startTime)) + startTime = time.Now() err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ @@ -832,7 +835,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: FlushToDbWithTxn: ") } - + fmt.Printf("Time since db flush: %v\n", time.Since(startTime)) + startTime = time.Now() mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ @@ -842,7 +846,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: CreateMempoolTxUtxoView: ") } - + fmt.Printf("Time since utxo view: %v\n", time.Since(startTime)) + startTime = time.Now() // Loop through all the transactions in the mempool and connect them and their utxo ops to the mempool view. server.mempool.mtx.RLock() mempoolTxns, _, err := server.mempool._getTransactionsOrderedByTimeAdded() @@ -861,7 +866,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") } - + fmt.Printf("Time since getting transactions: %v\n", time.Since(startTime)) + startTime = time.Now() if len(mempoolTxns) > 0 { //fmt.Printf("Mempool tx hash: %v\n", mempoolTxns[0].Hash.String()) } @@ -926,6 +932,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser IsMempoolTxn: true, }) } + fmt.Printf("Time to connect all txns: %v\n", time.Since(startTime)) + startTime = time.Now() //fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) //fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) @@ -945,6 +953,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser IsMempoolFlush: true, BlockSyncFlushId: originalCommittedFlushId, }) + fmt.Printf("Time to flush: %v\n", time.Since(startTime)) return false, nil } From b4ea9b18749726df09f484af333c6b64895b884a Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 2 Apr 2024 21:36:17 -0400 Subject: [PATCH 16/26] Add caching to mempool routine --- lib/state_change_syncer.go | 93 +++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 716b38323..304130a38 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -872,51 +872,72 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser //fmt.Printf("Mempool tx hash: %v\n", mempoolTxns[0].Hash.String()) } //fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + + // Check to see if every txn hash in our cached txns is in the first n txns in the mempool. + // N represents the length of our cached txn map. + for ii, mempoolTx := range mempoolTxns { + if _, ok := stateChangeSyncer.MempoolCachedTxns[mempoolTx.Hash.String()]; !ok { + // If any of the transaction hashes in the first n transactions don't line up with our cache map, the mempool + // has changed since the last cache, and we need to reset it. + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil + break + } + // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. + if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { + // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. + mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView + break + } + } + for _, mempoolTx := range mempoolTxns { var txnStateChangeEntry *StateChangeEntry var utxoOpStateChangeEntry *StateChangeEntry // Check if the transaction is already in the cache. If so, skip it. txHash := mempoolTx.Hash.String() - //if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { - // txnStateChangeEntry = stateChangeEntries[0] - // utxoOpStateChangeEntry = stateChangeEntries[1] - //} else { - utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( - mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) - if err != nil { - //fmt.Printf("Right before the mempool flush error: %v\n", err) - //continue - mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: originalCommittedFlushId, - Succeeded: false, - IsMempoolFlush: true, - }) - return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") - } - txnStateChangeEntry = &StateChangeEntry{ - OperationType: DbOperationTypeUpsert, - KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), - EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), - IsReverted: false, - } + if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { + txnStateChangeEntry = stateChangeEntries[0] + utxoOpStateChangeEntry = stateChangeEntries[1] + } else { + utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( + mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) + if err != nil { + //fmt.Printf("Right before the mempool flush error: %v\n", err) + //continue + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + } + txnStateChangeEntry = &StateChangeEntry{ + OperationType: DbOperationTypeUpsert, + KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), + EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), + IsReverted: false, + } - // Capture the utxo ops for the transaction in a UTXOOp bundle. - utxoOpBundle := &UtxoOperationBundle{ - UtxoOpBundle: [][]*UtxoOperation{}, - } + // Capture the utxo ops for the transaction in a UTXOOp bundle. + utxoOpBundle := &UtxoOperationBundle{ + UtxoOpBundle: [][]*UtxoOperation{}, + } - utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) + utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) - utxoOpStateChangeEntry = &StateChangeEntry{ - OperationType: DbOperationTypeUpsert, - KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), - EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), - IsReverted: false, - } + utxoOpStateChangeEntry = &StateChangeEntry{ + OperationType: DbOperationTypeUpsert, + KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), + EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), + IsReverted: false, + } - // Add both state change entries to the mempool sync map. - stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} - //} + // Add both state change entries to the mempool sync map. + stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + // Update the cached utxo view to represent the new cached state. + stateChangeSyncer.MempoolCachedUtxoView = mempoolTxUtxoView + } // Emit transaction state change. mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ From 29ccc73b2bd3f593c49c3154882b248d56ce9f5d Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 2 Apr 2024 22:05:42 -0400 Subject: [PATCH 17/26] Update caching routine --- lib/state_change_syncer.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 304130a38..1370c0792 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -885,8 +885,10 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { - // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. - mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView + if stateChangeSyncer.MempoolCachedUtxoView != nil { + // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. + mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView + } break } } @@ -935,8 +937,6 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // Add both state change entries to the mempool sync map. stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} - // Update the cached utxo view to represent the new cached state. - stateChangeSyncer.MempoolCachedUtxoView = mempoolTxUtxoView } // Emit transaction state change. @@ -953,6 +953,12 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser IsMempoolTxn: true, }) } + // Update the cached utxo view to represent the new cached state. + stateChangeSyncer.MempoolCachedUtxoView, err = mempoolTxUtxoView.CopyUtxoView() + if err != nil { + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: Error copying cached utxo view: ") + } + fmt.Printf("Time to connect all txns: %v\n", time.Since(startTime)) startTime = time.Now() //fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) From 0bafa598688cc2fb7dac0c056fc2d13a0774b23c Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 2 Apr 2024 22:41:43 -0400 Subject: [PATCH 18/26] Add logging --- lib/state_change_syncer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 1370c0792..c86077c54 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -881,6 +881,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // has changed since the last cache, and we need to reset it. stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) stateChangeSyncer.MempoolCachedUtxoView = nil + fmt.Printf("Txn not in cache, resetting") break } // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. @@ -889,6 +890,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView } + fmt.Printf("All txns match, continueing: %v\n", ii) break } } From a0cb66c036cae4b975fed54777cd37558bb3fb7e Mon Sep 17 00:00:00 2001 From: superzordon Date: Tue, 2 Apr 2024 22:57:02 -0400 Subject: [PATCH 19/26] Update error handling --- lib/state_change_syncer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index c86077c54..6c57b03c2 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -914,6 +914,8 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser Succeeded: false, IsMempoolFlush: true, }) + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") } txnStateChangeEntry = &StateChangeEntry{ From 63cd51c3d92c3f97d30c385dc18d2816b0e5506d Mon Sep 17 00:00:00 2001 From: superzordon Date: Wed, 3 Apr 2024 16:50:38 -0400 Subject: [PATCH 20/26] Remove mempool transaction syncing to debug --- lib/state_change_syncer.go | 182 ++++++++++++++++++------------------- 1 file changed, 91 insertions(+), 91 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 6c57b03c2..b0effc2a2 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -837,7 +837,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } fmt.Printf("Time since db flush: %v\n", time.Since(startTime)) startTime = time.Now() - mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) + //mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ FlushId: originalCommittedFlushId, @@ -875,96 +875,96 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // Check to see if every txn hash in our cached txns is in the first n txns in the mempool. // N represents the length of our cached txn map. - for ii, mempoolTx := range mempoolTxns { - if _, ok := stateChangeSyncer.MempoolCachedTxns[mempoolTx.Hash.String()]; !ok { - // If any of the transaction hashes in the first n transactions don't line up with our cache map, the mempool - // has changed since the last cache, and we need to reset it. - stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) - stateChangeSyncer.MempoolCachedUtxoView = nil - fmt.Printf("Txn not in cache, resetting") - break - } - // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. - if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { - if stateChangeSyncer.MempoolCachedUtxoView != nil { - // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. - mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView - } - fmt.Printf("All txns match, continueing: %v\n", ii) - break - } - } - - for _, mempoolTx := range mempoolTxns { - var txnStateChangeEntry *StateChangeEntry - var utxoOpStateChangeEntry *StateChangeEntry - // Check if the transaction is already in the cache. If so, skip it. - txHash := mempoolTx.Hash.String() - if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { - txnStateChangeEntry = stateChangeEntries[0] - utxoOpStateChangeEntry = stateChangeEntries[1] - } else { - utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( - mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) - if err != nil { - //fmt.Printf("Right before the mempool flush error: %v\n", err) - //continue - mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: originalCommittedFlushId, - Succeeded: false, - IsMempoolFlush: true, - }) - stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) - stateChangeSyncer.MempoolCachedUtxoView = nil - return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") - } - txnStateChangeEntry = &StateChangeEntry{ - OperationType: DbOperationTypeUpsert, - KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), - EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), - IsReverted: false, - } - - // Capture the utxo ops for the transaction in a UTXOOp bundle. - utxoOpBundle := &UtxoOperationBundle{ - UtxoOpBundle: [][]*UtxoOperation{}, - } - - utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) - - utxoOpStateChangeEntry = &StateChangeEntry{ - OperationType: DbOperationTypeUpsert, - KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), - EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), - IsReverted: false, - } - - // Add both state change entries to the mempool sync map. - stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} - } - - // Emit transaction state change. - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: txnStateChangeEntry, - FlushId: originalCommittedFlushId, - IsMempoolTxn: true, - }) - - // Emit UTXOOp bundle event - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: utxoOpStateChangeEntry, - FlushId: originalCommittedFlushId, - IsMempoolTxn: true, - }) - } - // Update the cached utxo view to represent the new cached state. - stateChangeSyncer.MempoolCachedUtxoView, err = mempoolTxUtxoView.CopyUtxoView() - if err != nil { - return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: Error copying cached utxo view: ") - } - - fmt.Printf("Time to connect all txns: %v\n", time.Since(startTime)) - startTime = time.Now() + //for ii, mempoolTx := range mempoolTxns { + // if _, ok := stateChangeSyncer.MempoolCachedTxns[mempoolTx.Hash.String()]; !ok { + // // If any of the transaction hashes in the first n transactions don't line up with our cache map, the mempool + // // has changed since the last cache, and we need to reset it. + // stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + // stateChangeSyncer.MempoolCachedUtxoView = nil + // fmt.Printf("Txn not in cache, resetting\n") + // break + // } + // // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. + // if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { + // if stateChangeSyncer.MempoolCachedUtxoView != nil { + // // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. + // mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView + // } + // fmt.Printf("All txns match, continueing: %v\n", ii) + // break + // } + //} + // + //for _, mempoolTx := range mempoolTxns { + // var txnStateChangeEntry *StateChangeEntry + // var utxoOpStateChangeEntry *StateChangeEntry + // // Check if the transaction is already in the cache. If so, skip it. + // txHash := mempoolTx.Hash.String() + // if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { + // txnStateChangeEntry = stateChangeEntries[0] + // utxoOpStateChangeEntry = stateChangeEntries[1] + // } else { + // utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( + // mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) + // if err != nil { + // //fmt.Printf("Right before the mempool flush error: %v\n", err) + // //continue + // mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + // FlushId: originalCommittedFlushId, + // Succeeded: false, + // IsMempoolFlush: true, + // }) + // stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + // stateChangeSyncer.MempoolCachedUtxoView = nil + // return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + // } + // txnStateChangeEntry = &StateChangeEntry{ + // OperationType: DbOperationTypeUpsert, + // KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), + // EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), + // IsReverted: false, + // } + // + // // Capture the utxo ops for the transaction in a UTXOOp bundle. + // utxoOpBundle := &UtxoOperationBundle{ + // UtxoOpBundle: [][]*UtxoOperation{}, + // } + // + // utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) + // + // utxoOpStateChangeEntry = &StateChangeEntry{ + // OperationType: DbOperationTypeUpsert, + // KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), + // EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), + // IsReverted: false, + // } + // + // // Add both state change entries to the mempool sync map. + // stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + // } + // + // // Emit transaction state change. + // mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + // StateChangeEntry: txnStateChangeEntry, + // FlushId: originalCommittedFlushId, + // IsMempoolTxn: true, + // }) + // + // // Emit UTXOOp bundle event + // mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + // StateChangeEntry: utxoOpStateChangeEntry, + // FlushId: originalCommittedFlushId, + // IsMempoolTxn: true, + // }) + //} + //// Update the cached utxo view to represent the new cached state. + //stateChangeSyncer.MempoolCachedUtxoView, err = mempoolTxUtxoView.CopyUtxoView() + //if err != nil { + // return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: Error copying cached utxo view: ") + //} + // + //fmt.Printf("Time to connect all %d txns: %v\n", len(mempoolTxns), time.Since(startTime)) + //startTime = time.Now() //fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) //fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) From 4b2dfaddd64e98ddf8917af03582e3f840f78bf1 Mon Sep 17 00:00:00 2001 From: superzordon Date: Wed, 3 Apr 2024 19:30:48 -0400 Subject: [PATCH 21/26] Update mempool caching routine --- lib/state_change_syncer.go | 47 +++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index b0effc2a2..9e25509d2 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -413,30 +413,6 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S flushId = stateChangeSyncer.MempoolFlushId } - // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there - // then we don't need to re-write it to the state change file. - // Create key for op + key map - txKey := createMempoolTxKey(stateChangeEntry.KeyBytes) - - // Track the key in the MempoolFlushKeySet. - stateChangeSyncer.MempoolFlushKeySet[txKey] = true - - // Check to see if the key is in the map, and if the value is the same as the value in the event. - if cachedSCE, ok := stateChangeSyncer.MempoolSyncedKeyValueMap[txKey]; ok && bytes.Equal(cachedSCE.EncoderBytes, event.StateChangeEntry.EncoderBytes) && cachedSCE.OperationType == event.StateChangeEntry.OperationType { - // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, - // then we don't need to write the state change entry to the state change file - it's already being tracked. - return - } else if ok { - // If the key is in the map, and the entry bytes are different, then we need to track the new entry. - // Skip if the entry is already being tracked as a new flush. - if _, newFlushExists := stateChangeSyncer.MempoolNewlyFlushedTxns[txKey]; !newFlushExists { - // If the key is in the map, and the entry bytes are different, then we need to track the new entry. - stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = cachedSCE - } - } else { - // If the key is not in the map, then we need to track the new entry. - stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = nil - } } else { // If the flush ID is nil, then we need to use the global block sync flush ID. if flushId == uuid.Nil { @@ -491,8 +467,31 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S stateChangeEntry.FlushId = flushId if event.IsMempoolTxn { + // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there + // then we don't need to re-write it to the state change file. + // Create key for op + key map txKey := createMempoolTxKey(stateChangeEntry.KeyBytes) + // Track the key in the MempoolFlushKeySet. + stateChangeSyncer.MempoolFlushKeySet[txKey] = true + + // Check to see if the key is in the map, and if the value is the same as the value in the event. + if cachedSCE, ok := stateChangeSyncer.MempoolSyncedKeyValueMap[txKey]; ok && bytes.Equal(cachedSCE.EncoderBytes, event.StateChangeEntry.EncoderBytes) && cachedSCE.OperationType == event.StateChangeEntry.OperationType { + // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, + // then we don't need to write the state change entry to the state change file - it's already being tracked. + return + } else if ok { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + // Skip if the entry is already being tracked as a new flush. + if _, newFlushExists := stateChangeSyncer.MempoolNewlyFlushedTxns[txKey]; !newFlushExists { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = cachedSCE + } + } else { + // If the key is not in the map, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = nil + } + // Track the key and value if this is a new entry to the mempool, or if the encoder bytes or operation type // changed since it was last synced. stateChangeSyncer.MempoolSyncedKeyValueMap[txKey] = event.StateChangeEntry From c0eb640b0b668df3b692949c5330e711bb1115b3 Mon Sep 17 00:00:00 2001 From: superzordon Date: Wed, 3 Apr 2024 20:32:16 -0400 Subject: [PATCH 22/26] Uncomment transaction connecting --- lib/state_change_syncer.go | 194 ++++++++++++++++++------------------- 1 file changed, 97 insertions(+), 97 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 9e25509d2..2bffd81b9 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -836,7 +836,7 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } fmt.Printf("Time since db flush: %v\n", time.Since(startTime)) startTime = time.Now() - //mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) + mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ FlushId: originalCommittedFlushId, @@ -870,102 +870,102 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser if len(mempoolTxns) > 0 { //fmt.Printf("Mempool tx hash: %v\n", mempoolTxns[0].Hash.String()) } - //fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) - - // Check to see if every txn hash in our cached txns is in the first n txns in the mempool. - // N represents the length of our cached txn map. - //for ii, mempoolTx := range mempoolTxns { - // if _, ok := stateChangeSyncer.MempoolCachedTxns[mempoolTx.Hash.String()]; !ok { - // // If any of the transaction hashes in the first n transactions don't line up with our cache map, the mempool - // // has changed since the last cache, and we need to reset it. - // stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) - // stateChangeSyncer.MempoolCachedUtxoView = nil - // fmt.Printf("Txn not in cache, resetting\n") - // break - // } - // // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. - // if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { - // if stateChangeSyncer.MempoolCachedUtxoView != nil { - // // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. - // mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView - // } - // fmt.Printf("All txns match, continueing: %v\n", ii) - // break - // } - //} - // - //for _, mempoolTx := range mempoolTxns { - // var txnStateChangeEntry *StateChangeEntry - // var utxoOpStateChangeEntry *StateChangeEntry - // // Check if the transaction is already in the cache. If so, skip it. - // txHash := mempoolTx.Hash.String() - // if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { - // txnStateChangeEntry = stateChangeEntries[0] - // utxoOpStateChangeEntry = stateChangeEntries[1] - // } else { - // utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( - // mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) - // if err != nil { - // //fmt.Printf("Right before the mempool flush error: %v\n", err) - // //continue - // mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - // FlushId: originalCommittedFlushId, - // Succeeded: false, - // IsMempoolFlush: true, - // }) - // stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) - // stateChangeSyncer.MempoolCachedUtxoView = nil - // return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") - // } - // txnStateChangeEntry = &StateChangeEntry{ - // OperationType: DbOperationTypeUpsert, - // KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), - // EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), - // IsReverted: false, - // } - // - // // Capture the utxo ops for the transaction in a UTXOOp bundle. - // utxoOpBundle := &UtxoOperationBundle{ - // UtxoOpBundle: [][]*UtxoOperation{}, - // } - // - // utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) - // - // utxoOpStateChangeEntry = &StateChangeEntry{ - // OperationType: DbOperationTypeUpsert, - // KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), - // EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), - // IsReverted: false, - // } - // - // // Add both state change entries to the mempool sync map. - // stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} - // } - // - // // Emit transaction state change. - // mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - // StateChangeEntry: txnStateChangeEntry, - // FlushId: originalCommittedFlushId, - // IsMempoolTxn: true, - // }) - // - // // Emit UTXOOp bundle event - // mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - // StateChangeEntry: utxoOpStateChangeEntry, - // FlushId: originalCommittedFlushId, - // IsMempoolTxn: true, - // }) - //} - //// Update the cached utxo view to represent the new cached state. - //stateChangeSyncer.MempoolCachedUtxoView, err = mempoolTxUtxoView.CopyUtxoView() - //if err != nil { - // return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: Error copying cached utxo view: ") - //} - // - //fmt.Printf("Time to connect all %d txns: %v\n", len(mempoolTxns), time.Since(startTime)) - //startTime = time.Now() - //fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) - //fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + + //Check to see if every txn hash in our cached txns is in the first n txns in the mempool. + //N represents the length of our cached txn map. + for ii, mempoolTx := range mempoolTxns { + if _, ok := stateChangeSyncer.MempoolCachedTxns[mempoolTx.Hash.String()]; !ok { + // If any of the transaction hashes in the first n transactions don't line up with our cache map, the mempool + // has changed since the last cache, and we need to reset it. + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil + fmt.Printf("Txn not in cache, resetting\n") + break + } + // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. + if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { + if stateChangeSyncer.MempoolCachedUtxoView != nil { + // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. + mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView + } + fmt.Printf("All txns match, continueing: %v\n", ii) + break + } + } + + for _, mempoolTx := range mempoolTxns { + var txnStateChangeEntry *StateChangeEntry + var utxoOpStateChangeEntry *StateChangeEntry + // Check if the transaction is already in the cache. If so, skip it. + txHash := mempoolTx.Hash.String() + if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { + txnStateChangeEntry = stateChangeEntries[0] + utxoOpStateChangeEntry = stateChangeEntries[1] + } else { + utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( + mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) + if err != nil { + //fmt.Printf("Right before the mempool flush error: %v\n", err) + //continue + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + } + txnStateChangeEntry = &StateChangeEntry{ + OperationType: DbOperationTypeUpsert, + KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), + EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), + IsReverted: false, + } + + // Capture the utxo ops for the transaction in a UTXOOp bundle. + utxoOpBundle := &UtxoOperationBundle{ + UtxoOpBundle: [][]*UtxoOperation{}, + } + + utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) + + utxoOpStateChangeEntry = &StateChangeEntry{ + OperationType: DbOperationTypeUpsert, + KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), + EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), + IsReverted: false, + } + + // Add both state change entries to the mempool sync map. + stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + } + + // Emit transaction state change. + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: txnStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, + }) + + // Emit UTXOOp bundle event + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: utxoOpStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, + }) + } + // Update the cached utxo view to represent the new cached state. + stateChangeSyncer.MempoolCachedUtxoView, err = mempoolTxUtxoView.CopyUtxoView() + if err != nil { + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: Error copying cached utxo view: ") + } + + fmt.Printf("Time to connect all %d txns: %v\n", len(mempoolTxns), time.Since(startTime)) + startTime = time.Now() + fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) // Before flushing the mempool to the state change file, check if a block has mined. If so, abort the flush. if err != nil || originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { From 0844bac0eb116ccf68dfbfa254a28c0288df76c6 Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 4 Apr 2024 00:24:11 -0400 Subject: [PATCH 23/26] Add logging --- lib/server.go | 3 +++ lib/state_change_syncer.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/lib/server.go b/lib/server.go index 1b614b2b8..c36457c0c 100644 --- a/lib/server.go +++ b/lib/server.go @@ -381,6 +381,7 @@ func NewServer( // Only initialize state change syncer if the directories are defined. var stateChangeSyncer *StateChangeSyncer + fmt.Printf("State Change Dir: %v\n", _stateChangeDir) if _stateChangeDir != "" { // Create the state change syncer to handle syncing state changes to disk, and assign some of its methods // to the event manager. @@ -469,12 +470,14 @@ func NewServer( srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) } + fmt.Printf("Before mempool\n") // Create a mempool to store transactions until they're ready to be mined into // blocks. _mempool := NewDeSoMempool(_chain, _rateLimitFeerateNanosPerKB, _minFeeRateNanosPerKB, _blockCypherAPIKey, _runReadOnlyUtxoViewUpdater, _dataDir, _mempoolDumpDir, false) + fmt.Printf("After mempool\n") // Initialize state syncer mempool job, if needed. if srv.stateChangeSyncer != nil { srv.stateChangeSyncer.StartMempoolSyncRoutine(srv) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 2bffd81b9..5f0152128 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -993,6 +993,8 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv // Wait for mempool to be initialized. for server.mempool == nil || server.blockchain.chainState() != SyncStateFullyCurrent { time.Sleep(15000 * time.Millisecond) + fmt.Printf("Mempool: %v\n", server.mempool) + fmt.Printf("Chain state: %v\n", server.blockchain.chainState()) } if !stateChangeSyncer.BlocksyncCompleteEntriesFlushed && stateChangeSyncer.SyncType == NodeSyncTypeBlockSync { err := stateChangeSyncer.FlushAllEntriesToFile(server) From ed3d4aa085290020e9ea7f4a4791e54cc7f09baa Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 11 Apr 2024 17:00:49 -0400 Subject: [PATCH 24/26] Add additional messaging index to core state --- lib/db_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db_utils.go b/lib/db_utils.go index cdfd76831..37e9d4ce3 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -460,7 +460,7 @@ type DBPrefixes struct { // The Minor/Major distinction is used to deterministically map the two accessGroupIds of message's sender/recipient // into a single pair based on the lexicographical ordering of the two accessGroupIds. This is done to ensure that // both sides of the conversation have the same key for the same conversation, and we can store just a single message. - PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true"` + PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true" core_state:"true"` // PrefixDmThreadIndex is modified by the NewMessage transaction and is used to store a DmThreadEntry // for each existing dm thread. It answers the question: "Give me all the threads for a particular user." From 1fc31fef1e737a698d6b0a03d2265416107a3c55 Mon Sep 17 00:00:00 2001 From: superzordon Date: Thu, 11 Apr 2024 17:05:55 -0400 Subject: [PATCH 25/26] Go fmt --- lib/blockchain.go | 52 +++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/lib/blockchain.go b/lib/blockchain.go index 33f67457e..d32bb5ab9 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -3176,7 +3176,7 @@ func (bc *Blockchain) CreateUpdateGlobalParamsTxn(updaterPublicKey []byte, minimumNetworkFeeNanosPerKb int64, forbiddenPubKey []byte, maxNonceExpirationBlockHeightOffset int64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3228,10 +3228,10 @@ func (bc *Blockchain) CreateUpdateGlobalParamsTxn(updaterPublicKey []byte, } func (bc *Blockchain) CreateUpdateBitcoinUSDExchangeRateTxn( -// Exchange rate update fields + // Exchange rate update fields updaterPublicKey []byte, usdCentsPerbitcoin uint64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3263,7 +3263,7 @@ func (bc *Blockchain) CreateUpdateBitcoinUSDExchangeRateTxn( } func (bc *Blockchain) CreateSubmitPostTxn( -// Post fields + // Post fields updaterPublicKey []byte, postHashToModify []byte, parentStakeID []byte, @@ -3273,7 +3273,7 @@ func (bc *Blockchain) CreateSubmitPostTxn( tstampNanos uint64, postExtraData map[string][]byte, isHidden bool, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3333,7 +3333,7 @@ func (bc *Blockchain) CreateSubmitPostTxn( func (bc *Blockchain) CreateUpdateProfileTxn( UpdaterPublicKeyBytes []byte, -// Optional. Only set when the owner of the profile is != to the updater. + // Optional. Only set when the owner of the profile is != to the updater. OptionalProfilePublicKeyBytes []byte, NewUsername string, NewDescription string, @@ -3343,7 +3343,7 @@ func (bc *Blockchain) CreateUpdateProfileTxn( IsHidden bool, AdditionalFees uint64, ExtraData map[string][]byte, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3386,7 +3386,7 @@ func (bc *Blockchain) CreateSwapIdentityTxn( FromPublicKeyBytes []byte, ToPublicKeyBytes []byte, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3420,7 +3420,7 @@ func (bc *Blockchain) CreateSwapIdentityTxn( func (bc *Blockchain) CreateCreatorCoinTxn( UpdaterPublicKey []byte, -// See CreatorCoinMetadataa for an explanation of these fields. + // See CreatorCoinMetadataa for an explanation of these fields. ProfilePublicKey []byte, OperationType CreatorCoinOperationType, DeSoToSellNanos uint64, @@ -3428,7 +3428,7 @@ func (bc *Blockchain) CreateCreatorCoinTxn( DeSoToAddNanos uint64, MinDeSoExpectedNanos uint64, MinCreatorCoinExpectedNanos uint64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3475,7 +3475,7 @@ func (bc *Blockchain) CreateCreatorCoinTransferTxn( ProfilePublicKey []byte, CreatorCoinToTransferNanos uint64, RecipientPublicKey []byte, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3514,9 +3514,9 @@ func (bc *Blockchain) CreateCreatorCoinTransferTxn( func (bc *Blockchain) CreateDAOCoinTxn( UpdaterPublicKey []byte, -// See CreatorCoinMetadataa for an explanation of these fields. + // See CreatorCoinMetadataa for an explanation of these fields. metadata *DAOCoinMetadata, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3553,7 +3553,7 @@ func (bc *Blockchain) CreateDAOCoinTxn( func (bc *Blockchain) CreateDAOCoinTransferTxn( UpdaterPublicKey []byte, metadata *DAOCoinTransferMetadata, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3588,9 +3588,9 @@ func (bc *Blockchain) CreateDAOCoinTransferTxn( func (bc *Blockchain) CreateDAOCoinLimitOrderTxn( UpdaterPublicKey []byte, -// See DAOCoinLimitOrderMetadata for an explanation of these fields. + // See DAOCoinLimitOrderMetadata for an explanation of these fields. metadata *DAOCoinLimitOrderMetadata, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3794,7 +3794,7 @@ func (bc *Blockchain) CreateCreateNFTTxn( AdditionalDESORoyalties map[PublicKey]uint64, AdditionalCoinRoyalties map[PublicKey]uint64, ExtraData map[string][]byte, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -3921,7 +3921,7 @@ func (bc *Blockchain) CreateNFTBidTxn( NFTPostHash *BlockHash, SerialNumber uint64, BidAmountNanos uint64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { // Create a transaction containing the NFT bid fields. @@ -3991,7 +3991,7 @@ func (bc *Blockchain) CreateNFTTransferTxn( NFTPostHash *BlockHash, SerialNumber uint64, EncryptedUnlockableTextBytes []byte, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4030,7 +4030,7 @@ func (bc *Blockchain) CreateAcceptNFTTransferTxn( UpdaterPublicKey []byte, NFTPostHash *BlockHash, SerialNumber uint64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4071,7 +4071,7 @@ func (bc *Blockchain) CreateBurnNFTTxn( UpdaterPublicKey []byte, NFTPostHash *BlockHash, SerialNumber uint64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4111,7 +4111,7 @@ func (bc *Blockchain) CreateAcceptNFTBidTxn( BidderPKID *PKID, BidAmountNanos uint64, EncryptedUnlockableTextBytes []byte, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4195,7 +4195,7 @@ func (bc *Blockchain) CreateUpdateNFTTxn( MinBidAmountNanos uint64, IsBuyNow bool, BuyNowPriceNanos uint64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4423,7 +4423,7 @@ func (bc *Blockchain) CreateCreatorCoinTransferTxnWithDiamonds( ReceiverPublicKey []byte, DiamondPostHash *BlockHash, DiamondLevel int64, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4507,7 +4507,7 @@ func (bc *Blockchain) CreateAuthorizeDerivedKeyTxn( extraData map[string][]byte, memo []byte, transactionSpendingLimitHex string, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _changeAmount uint64, _fees uint64, _err error) { @@ -4651,7 +4651,7 @@ func (bc *Blockchain) CreateBasicTransferTxnWithDiamonds( DiamondPostHash *BlockHash, DiamondLevel int64, ExtraData map[string][]byte, -// Standard transaction fields + // Standard transaction fields minFeeRateNanosPerKB uint64, mempool *DeSoMempool, additionalOutputs []*DeSoOutput) ( _txn *MsgDeSoTxn, _totalInput uint64, _spendAmount uint64, _changeAmount uint64, _fees uint64, _err error) { From 6cfaa1d6a5d8c677aeddbf87149ea16495d5dbde Mon Sep 17 00:00:00 2001 From: superzordon Date: Fri, 24 May 2024 18:40:14 -0400 Subject: [PATCH 26/26] Change state change entry encoder --- lib/state_change_syncer.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 5f0152128..e0a8a09b0 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -123,7 +123,7 @@ func (stateChangeEntry *StateChangeEntry) RawEncodeWithoutMetadata(blockHeight u // Encode the block height. data = append(data, UintToBuf(blockHeight)...) - if stateChangeEntry.EncoderType == EncoderTypeUtxoOperation { + if stateChangeEntry.EncoderType == EncoderTypeUtxoOperation || stateChangeEntry.EncoderType == EncoderTypeUtxoOperationBundle { // Encode the transaction. data = append(data, EncodeToBytes(blockHeight, stateChangeEntry.Block)...) } @@ -194,15 +194,17 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u } stateChangeEntry.BlockHeight = entryBlockHeight - if stateChangeEntry.EncoderType == EncoderTypeUtxoOperation { - block := &MsgDeSoBlock{} - if exist, err := DecodeFromBytes(block, rr); exist && err == nil { - stateChangeEntry.Block = block - } else if err != nil { - return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block") - } + // Don't decode the block if the encoder type is not a utxo operation. + if stateChangeEntry.EncoderType != EncoderTypeUtxoOperation && stateChangeEntry.EncoderType != EncoderTypeUtxoOperationBundle { + return nil } + block := &MsgDeSoBlock{} + if exist, err := DecodeFromBytes(block, rr); exist && err == nil { + stateChangeEntry.Block = block + } else if err != nil { + return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block") + } return nil }