diff --git a/lib/blockchain.go b/lib/blockchain.go index 621f90636..d32bb5ab9 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -2185,17 +2185,15 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo } bc.timer.End("Blockchain.ProcessBlock: Transactions Db snapshot & operations") if innerErr = bc.blockView.FlushToDbWithTxn(txn, blockHeight); innerErr != nil { + // If we're in the middle of a sync, we should notify the event manager that we failed to sync the block. + if bc.eventManager != nil && !bc.eventManager.isMempoolManager { + bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: uuid.Nil, + Succeeded: false, + }) + } return errors.Wrapf(innerErr, "ProcessBlock: Problem writing utxo view to db on simple add to tip") } - // Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that - // state syncer maintains a consistent view of the blockchain. - // Note: We ignore the mempool manager here, as that process handles state syncer flush events itself. - if bc.eventManager != nil && !bc.eventManager.isMempoolManager { - bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, - Succeeded: innerErr == nil, - }) - } bc.timer.End("Blockchain.ProcessBlock: Transactions Db utxo flush") bc.timer.Start("Blockchain.ProcessBlock: Transactions Db snapshot & operations") @@ -2548,6 +2546,16 @@ func (bc *Blockchain) ProcessBlock(desoBlock *MsgDeSoBlock, verifySignatures boo // Signal the server that we've accepted this block in some way. if bc.eventManager != nil { bc.eventManager.blockAccepted(&BlockEvent{Block: desoBlock}) + // Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that + // state syncer maintains a consistent view of the blockchain. + // Note: We ignore the mempool manager here, as that process handles state syncer flush events itself. + if !bc.eventManager.isMempoolManager { + fmt.Printf("Emitting state syncer flushed event for synced block\n") + bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: uuid.Nil, + Succeeded: true, + }) + } } bc.timer.Print("Blockchain.ProcessBlock: Initial") diff --git a/lib/db_utils.go b/lib/db_utils.go index 6789e79eb..37e9d4ce3 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -460,7 +460,7 @@ type DBPrefixes struct { // The Minor/Major distinction is used to deterministically map the two accessGroupIds of message's sender/recipient // into a single pair based on the lexicographical ordering of the two accessGroupIds. This is done to ensure that // both sides of the conversation have the same key for the same conversation, and we can store just a single message. - PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true"` + PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true" core_state:"true"` // PrefixDmThreadIndex is modified by the NewMessage transaction and is used to store a DmThreadEntry // for each existing dm thread. It answers the question: "Give me all the threads for a particular user." @@ -989,6 +989,7 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve KeyBytes: key, EncoderBytes: value, AncestralRecordBytes: ancestralValue, + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: eventManager.isMempoolManager, @@ -1091,6 +1092,7 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * KeyBytes: key, EncoderBytes: nil, AncestralRecordBytes: ancestralValue, + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: eventManager.isMempoolManager, diff --git a/lib/event_manager.go b/lib/event_manager.go index 55c4b7bd1..51a7e6f09 100644 --- a/lib/event_manager.go +++ b/lib/event_manager.go @@ -1,6 +1,8 @@ package lib -import "github.com/google/uuid" +import ( + "github.com/google/uuid" +) type TransactionEventFunc func(event *TransactionEvent) type StateSyncerOperationEventFunc func(event *StateSyncerOperationEvent) diff --git a/lib/server.go b/lib/server.go index 1b614b2b8..c36457c0c 100644 --- a/lib/server.go +++ b/lib/server.go @@ -381,6 +381,7 @@ func NewServer( // Only initialize state change syncer if the directories are defined. var stateChangeSyncer *StateChangeSyncer + fmt.Printf("State Change Dir: %v\n", _stateChangeDir) if _stateChangeDir != "" { // Create the state change syncer to handle syncing state changes to disk, and assign some of its methods // to the event manager. @@ -469,12 +470,14 @@ func NewServer( srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) } + fmt.Printf("Before mempool\n") // Create a mempool to store transactions until they're ready to be mined into // blocks. _mempool := NewDeSoMempool(_chain, _rateLimitFeerateNanosPerKB, _minFeeRateNanosPerKB, _blockCypherAPIKey, _runReadOnlyUtxoViewUpdater, _dataDir, _mempoolDumpDir, false) + fmt.Printf("After mempool\n") // Initialize state syncer mempool job, if needed. if srv.stateChangeSyncer != nil { srv.stateChangeSyncer.StartMempoolSyncRoutine(srv) diff --git a/lib/snapshot.go b/lib/snapshot.go index 1c3f96adb..ec118fe8c 100644 --- a/lib/snapshot.go +++ b/lib/snapshot.go @@ -1265,6 +1265,7 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock. OperationType: DbOperationTypeInsert, KeyBytes: dbEntry.Key, EncoderBytes: dbEntry.Value, + IsReverted: false, }, FlushId: dbFlushId, }) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 5443aae30..e0a8a09b0 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -3,6 +3,7 @@ package lib import ( "bytes" "encoding/binary" + "encoding/json" "fmt" "github.com/deso-protocol/go-deadlock" "github.com/golang/glog" @@ -36,6 +37,9 @@ const ( type StateChangeEntry struct { // The type of operation that should be performed on the database. OperationType StateSyncerOperationType + // For mempool state changes, whether this operation has been booted from the mempool and should be reverted + // from the state change record. + IsReverted bool // The key that should be used for the operation. KeyBytes []byte // The encoder that is being captured by this state change. @@ -63,14 +67,19 @@ type StateChangeEntry struct { // RawEncodeWithoutMetadata constructs the bytes to represent a StateChangeEntry. // The format is: -// [operation type (varint)][encoder type (varint)][key length (varint)][key bytes] +// [operation type (varint)][is reverted bool][encoder type (varint)][key length (varint)][key bytes] // [encoder length (varint)][encoder bytes][is mempool (1 byte)][utxo ops length (varint)][utxo ops bytes] func (stateChangeEntry *StateChangeEntry) RawEncodeWithoutMetadata(blockHeight uint64, skipMetadata ...bool) []byte { // Get byte length of keyBytes (will be nil for mempool transactions) var data []byte + // OperationType data = append(data, UintToBuf(uint64(stateChangeEntry.OperationType))...) + // IsReverted + data = append(data, BoolToByte(stateChangeEntry.IsReverted)) + // EncoderType data = append(data, UintToBuf(uint64(stateChangeEntry.EncoderType))...) + // KeyBytes data = append(data, EncodeByteArray(stateChangeEntry.KeyBytes)...) // The encoder can either be represented in raw bytes or as an encoder. If it's represented in bytes, we use that. @@ -114,8 +123,10 @@ func (stateChangeEntry *StateChangeEntry) RawEncodeWithoutMetadata(blockHeight u // Encode the block height. data = append(data, UintToBuf(blockHeight)...) - // Encode the transaction. - data = append(data, EncodeToBytes(blockHeight, stateChangeEntry.Block)...) + if stateChangeEntry.EncoderType == EncoderTypeUtxoOperation || stateChangeEntry.EncoderType == EncoderTypeUtxoOperationBundle { + // Encode the transaction. + data = append(data, EncodeToBytes(blockHeight, stateChangeEntry.Block)...) + } return data } @@ -128,6 +139,13 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u } stateChangeEntry.OperationType = StateSyncerOperationType(operationType) + // Decode IsReverted + isReverted, err := ReadBoolByte(rr) + if err != nil { + return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding is reverted") + } + stateChangeEntry.IsReverted = isReverted + // Decode EncoderType encoderType, err := ReadUvarint(rr) if err != nil { @@ -170,17 +188,23 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u // Decode the block height. entryBlockHeight, err := ReadUvarint(rr) if err != nil { - return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height") + entryBlockHeight = DeSoMainnetParams.EncoderMigrationHeights.BalanceModel.Height + //return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height") + fmt.Printf("StateChangeEntry.RawDecodeWithoutMetadata: error decoding block height: %v\n", err) } stateChangeEntry.BlockHeight = entryBlockHeight + // Don't decode the block if the encoder type is not a utxo operation. + if stateChangeEntry.EncoderType != EncoderTypeUtxoOperation && stateChangeEntry.EncoderType != EncoderTypeUtxoOperationBundle { + return nil + } + block := &MsgDeSoBlock{} if exist, err := DecodeFromBytes(block, rr); exist && err == nil { stateChangeEntry.Block = block } else if err != nil { return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block") } - return nil } @@ -220,15 +244,20 @@ type StateChangeSyncer struct { // we write the correct entries to the state change file. // During blocksync, all flushes are synchronous, so we don't need to worry about this. As such, those flushes // are given the uuid.Nil ID. - UnflushedBytes map[uuid.UUID]UnflushedStateSyncerBytes + UnflushedCommittedBytes map[uuid.UUID]UnflushedStateSyncerBytes + UnflushedMempoolBytes map[uuid.UUID]UnflushedStateSyncerBytes // This map is used to keep track of all the key and value pairs that state syncer is tracking (and therefore // don't need to be re-emitted to the state change file). // The key is the stringifyed key of the entry, plus the operation type. // The value is the badger entry that was flushed to the db. - MempoolKeyValueMap map[string][]byte + //MempoolSyncedKeyValueMap map[string][]byte + MempoolSyncedKeyValueMap map[string]*StateChangeEntry + + MempoolNewlyFlushedTxns map[string]*StateChangeEntry + // This map tracks the keys that were flushed to the mempool in a single flush. - // Every time a flush occurs, this map is cleared, as opposed to the MempoolKeyValueMap, which is only cleared + // Every time a flush occurs, this map is cleared, as opposed to the MempoolSyncedKeyValueMap, which is only cleared // when a new block is processed. // This is used to determine if there are any tracked mempool transactions that have been ejected from the current // mempool state. @@ -237,6 +266,12 @@ type StateChangeSyncer struct { // clear out any mempool entries that were ejected from the mempool. MempoolFlushKeySet map[string]bool + // This cache stores the transactions and their associated utxo ops that are currently in the mempool. + // This allows us to reduce the number of connect transaction calls when syncing the mempool + MempoolCachedTxns map[string][]*StateChangeEntry + + MempoolCachedUtxoView *UtxoView + // Tracks the flush IDs of the last block sync flush and the last mempool flush. // These are not used during hypersync, as many flushes are being processed asynchronously. BlockSyncFlushId uuid.UUID @@ -316,9 +351,12 @@ func NewStateChangeSyncer(stateChangeDir string, nodeSyncType NodeSyncType) *Sta StateChangeMempoolFile: stateChangeMempoolFile, StateChangeMempoolIndexFile: stateChangeMempoolIndexFile, StateChangeMempoolFileSize: uint64(stateChangeMempoolFileInfo.Size()), - UnflushedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), - MempoolKeyValueMap: make(map[string][]byte), + UnflushedCommittedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), + UnflushedMempoolBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), + MempoolSyncedKeyValueMap: make(map[string]*StateChangeEntry), + MempoolNewlyFlushedTxns: make(map[string]*StateChangeEntry), MempoolFlushKeySet: make(map[string]bool), + MempoolCachedTxns: make(map[string][]*StateChangeEntry), StateSyncerMutex: &sync.Mutex{}, SyncType: nodeSyncType, BlocksyncCompleteEntriesFlushed: blocksyncCompleteEntriesFlushed, @@ -367,44 +405,16 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // Crate a block sync flush ID if one doesn't already exist. if event.FlushId == uuid.Nil && stateChangeSyncer.BlockSyncFlushId == uuid.Nil { stateChangeSyncer.BlockSyncFlushId = uuid.New() + fmt.Printf("SETTING NEW BLOCK SYNC FLUSH ID: %v\n", stateChangeSyncer.BlockSyncFlushId) } if event.IsMempoolTxn { - // Create a mempool flush ID if one doesn't already exist. - if event.FlushId == uuid.Nil && stateChangeSyncer.MempoolFlushId == uuid.Nil { - stateChangeSyncer.MempoolFlushId = uuid.New() - } - - // If the event flush ID is nil, then we need to use the global mempool flush ID. + // Set the flushId to the mempool flush ID. + //flushId = stateChangeSyncer.BlockSyncFlushId if flushId == uuid.Nil { flushId = stateChangeSyncer.MempoolFlushId } - // The current state of the tracked mempool is stored in the MempoolKeyValueMap. If this entry is already in there - // then we don't need to re-write it to the state change file. - // Create key for op + key map - txKey := createMempoolTxKey(stateChangeEntry.OperationType, stateChangeEntry.KeyBytes) - - // Track the key in the MempoolFlushKeySet. - if stateChangeEntry.OperationType == DbOperationTypeDelete { - delete(stateChangeSyncer.MempoolFlushKeySet, txKey) - } else { - stateChangeSyncer.MempoolFlushKeySet[txKey] = true - } - - // Check to see if the key is in the map, and if the value is the same as the value in the event. - if valueBytes, ok := stateChangeSyncer.MempoolKeyValueMap[txKey]; ok && bytes.Equal(valueBytes, event.StateChangeEntry.EncoderBytes) { - // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, - // then we don't need to write the state change entry to the state change file - it's already being tracked. - return - } - - // Track the key and value if this is a new entry to the mempool. - if stateChangeEntry.OperationType == DbOperationTypeDelete { - delete(stateChangeSyncer.MempoolKeyValueMap, txKey) - } else { - stateChangeSyncer.MempoolKeyValueMap[txKey] = event.StateChangeEntry.EncoderBytes - } } else { // If the flush ID is nil, then we need to use the global block sync flush ID. if flushId == uuid.Nil { @@ -426,6 +436,20 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } encoderType = encoder.GetEncoderType() + + if encoderType == EncoderTypePostEntry { + rr := bytes.NewReader(stateChangeEntry.EncoderBytes) + if exist, err := DecodeFromBytes(encoder, rr); exist && err == nil { + post := encoder.(*PostEntry) + if PkToString(post.PosterPublicKey, &DeSoMainnetParams) == "BC1YLiUs9hvLpLL679JMtvaueoBKPbqmrckGdv1GKK823fHrBM12YBR" { + fmt.Printf("Handling post entry for flush %v: %+v\n", flushId, event.StateChangeEntry) + fmt.Printf("Event: %+v\n", event) + var body DeSoBodySchema + json.Unmarshal(post.Body, &body) + fmt.Printf("Handling post entry body: %s\n", body.Body) + } + } + } } else { // If the value associated with the key is not an encoder, then we decode the encoder entirely from the key bytes. // Examples of this are FollowEntry, LikeEntry, DeSoBalanceEntry, etc. @@ -444,13 +468,44 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // Set the flush ID. stateChangeEntry.FlushId = flushId + if event.IsMempoolTxn { + // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there + // then we don't need to re-write it to the state change file. + // Create key for op + key map + txKey := createMempoolTxKey(stateChangeEntry.KeyBytes) + + // Track the key in the MempoolFlushKeySet. + stateChangeSyncer.MempoolFlushKeySet[txKey] = true + + // Check to see if the key is in the map, and if the value is the same as the value in the event. + if cachedSCE, ok := stateChangeSyncer.MempoolSyncedKeyValueMap[txKey]; ok && bytes.Equal(cachedSCE.EncoderBytes, event.StateChangeEntry.EncoderBytes) && cachedSCE.OperationType == event.StateChangeEntry.OperationType { + // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, + // then we don't need to write the state change entry to the state change file - it's already being tracked. + return + } else if ok { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + // Skip if the entry is already being tracked as a new flush. + if _, newFlushExists := stateChangeSyncer.MempoolNewlyFlushedTxns[txKey]; !newFlushExists { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = cachedSCE + } + } else { + // If the key is not in the map, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = nil + } + + // Track the key and value if this is a new entry to the mempool, or if the encoder bytes or operation type + // changed since it was last synced. + stateChangeSyncer.MempoolSyncedKeyValueMap[txKey] = event.StateChangeEntry + } + // Encode the state change entry. We encode as a byte array, so the consumer can buffer just the bytes needed // to decode this entry when reading from file. entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, stateChangeEntry, false) writeBytes := EncodeByteArray(entryBytes) // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. - stateChangeSyncer.addTransactionToQueue(stateChangeEntry.FlushId, writeBytes) + stateChangeSyncer.addTransactionToQueue(stateChangeEntry.FlushId, writeBytes, event.IsMempoolTxn) } // _handleStateSyncerFlush is called when a Badger db flush takes place. It calls a helper function that takes the bytes that @@ -459,11 +514,14 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State stateChangeSyncer.StateSyncerMutex.Lock() defer stateChangeSyncer.StateSyncerMutex.Unlock() + fmt.Printf("Handling state syncer flush: %+v\n", event) + if event.IsMempoolFlush { // If this is a mempool flush, make sure a block hasn't mined since the mempool entries were added to queue. // If not, reset the mempool maps and file, and start from scratch. The consumer will revert the mempool transactions // it currently has and sync from scratch. - if stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId { + if (stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId && event.BlockSyncFlushId != uuid.Nil) || stateChangeSyncer.BlockSyncFlushId != event.FlushId { + fmt.Printf("The flush ID has changed, bailing now. Event: %v, Event block sync: %v, Global block sync: %v\n", event.FlushId, event.BlockSyncFlushId, stateChangeSyncer.BlockSyncFlushId) stateChangeSyncer.ResetMempool() return } @@ -472,17 +530,57 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // This would mean that an entry was ejected from the mempool. // When this happens, we need to reset the mempool and start from scratch, so that the consumer can revert the // mempool transactions it currently has and sync the mempool from scratch. - for key := range stateChangeSyncer.MempoolKeyValueMap { - // If any of the keys that the mempool is currently tracking weren't included in the flush, the state syncer - // mempool is bad and needs to be reset. - if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { - stateChangeSyncer.ResetMempool() - return + // + // Example: + // + // Flush: + // Key: a + // Key: b + // Key: d + + // Synced: + // Key: a + // Key: b + // Key: c <- Revert this one + // Key: d + + if event.Succeeded { + for key, cachedSCE := range stateChangeSyncer.MempoolSyncedKeyValueMap { + // If any of the keys that the mempool is currently tracking weren't included in the flush, that entry + // needs to be reverted from the mempool. + if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { + // Confirm that the block sync ID hasn't shifted. If it has, bail now. + if cachedSCE.FlushId != stateChangeSyncer.BlockSyncFlushId { + fmt.Printf("The flush ID has changed, inside key/value check, bailing now.\n") + stateChangeSyncer.ResetMempool() + return + } + + cachedSCE.IsReverted = true + + // Create a revert state change entry and add it to the queue. This will signal the state change + // consumer to revert the synced entry. + entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, cachedSCE, false) + writeBytes := EncodeByteArray(entryBytes) + + fmt.Printf("Reverting entry %d\n", cachedSCE.EncoderType) + + // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. + stateChangeSyncer.addTransactionToQueue(cachedSCE.FlushId, writeBytes, true) + + // Remove this entry from the synced map + delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) + } } } + //fmt.Printf("\nFlush len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + //fmt.Printf("\nSynced len: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + // Reset the mempool flush set. stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) + } else { + fmt.Printf("Here is the flush ID: %v\n", event.FlushId) + fmt.Printf("Here is the block sync flush ID: %v\n", event.BlockSyncFlushId) } - err := stateChangeSyncer.FlushTransactionsToFile(event) if err != nil { glog.Errorf("StateChangeSyncer._handleStateSyncerFlush: Error flushing transactions to file: %v", err) @@ -491,15 +589,19 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State if !event.IsMempoolFlush { // After flushing blocksync transactions to file, reset the block sync flush ID, and reset the mempool. stateChangeSyncer.BlockSyncFlushId = uuid.New() + fmt.Printf("Setting a new blocksync flush ID: %v\n", stateChangeSyncer.BlockSyncFlushId) stateChangeSyncer.ResetMempool() } } func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { - stateChangeSyncer.MempoolKeyValueMap = make(map[string][]byte) + fmt.Printf("Resetting mempool.\n") + stateChangeSyncer.MempoolSyncedKeyValueMap = make(map[string]*StateChangeEntry) + stateChangeSyncer.MempoolNewlyFlushedTxns = make(map[string]*StateChangeEntry) stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) - delete(stateChangeSyncer.UnflushedBytes, stateChangeSyncer.MempoolFlushId) + delete(stateChangeSyncer.UnflushedMempoolBytes, stateChangeSyncer.MempoolFlushId) stateChangeSyncer.MempoolFlushId = uuid.Nil + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) // Truncate the mempool files. stateChangeSyncer.StateChangeMempoolFile.Truncate(0) stateChangeSyncer.StateChangeMempoolIndexFile.Truncate(0) @@ -507,8 +609,17 @@ func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { } // Add a transaction to the queue of transactions to be flushed to disk upon badger db flush. -func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.UUID, writeBytes []byte) { - unflushedBytes, exists := stateChangeSyncer.UnflushedBytes[flushId] +func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.UUID, writeBytes []byte, isMempool bool) { + + var unflushedBytes UnflushedStateSyncerBytes + var exists bool + + if isMempool { + unflushedBytes, exists = stateChangeSyncer.UnflushedMempoolBytes[flushId] + } else { + unflushedBytes, exists = stateChangeSyncer.UnflushedCommittedBytes[flushId] + } + if !exists { unflushedBytes = UnflushedStateSyncerBytes{ StateChangeBytes: []byte{}, @@ -523,12 +634,24 @@ func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.U unflushedBytes.StateChangeBytes = append(unflushedBytes.StateChangeBytes, writeBytes...) - stateChangeSyncer.UnflushedBytes[flushId] = unflushedBytes + if isMempool { + stateChangeSyncer.UnflushedMempoolBytes[flushId] = unflushedBytes + } else { + stateChangeSyncer.UnflushedCommittedBytes[flushId] = unflushedBytes + } } // FlushTransactionsToFile writes the bytes that have been cached on the StateChangeSyncer to the state change file. func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *StateSyncerFlushedEvent) error { flushId := event.FlushId + + fmt.Printf("Flushing to file: %+v\n", event) + + //if event.IsMempoolFlush && event.FlushId != stateChangeSyncer.BlockSyncFlushId { + // fmt.Printf("Bailing on mempool flush because the block sync flush ID has changed.\n") + // return nil + //} + // Get the relevant global flush ID from the state change syncer if the flush ID is nil. if event.FlushId == uuid.Nil { if event.IsMempoolFlush { @@ -554,12 +677,34 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State // If the flush failed, delete the unflushed bytes and associated metadata. // Also delete any unconnected mempool txns from our cache. if !event.Succeeded { - delete(stateChangeSyncer.UnflushedBytes, flushId) + fmt.Printf("Deleting unflushed bytes for id: %s\n", flushId) + if event.IsMempoolFlush { + delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) + // Loop through the unflushed mempool transactions and delete them from the cache. + for key, sce := range stateChangeSyncer.MempoolNewlyFlushedTxns { + if sce != nil { + stateChangeSyncer.MempoolSyncedKeyValueMap[key] = sce + } else { + delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) + delete(stateChangeSyncer.MempoolFlushKeySet, key) + } + } + } else { + delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) + } + return nil } - unflushedBytes, exists := stateChangeSyncer.UnflushedBytes[flushId] + var unflushedBytes UnflushedStateSyncerBytes + var exists bool + if event.IsMempoolFlush { + unflushedBytes, exists = stateChangeSyncer.UnflushedMempoolBytes[flushId] + } else { + unflushedBytes, exists = stateChangeSyncer.UnflushedCommittedBytes[flushId] + } if !exists { + fmt.Printf("Unflushed bytes for flush ID doesn't exist: %s\n", flushId.String()) return nil } @@ -612,12 +757,18 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State } // Update unflushed bytes map to remove the flushed bytes. - delete(stateChangeSyncer.UnflushedBytes, flushId) + if event.IsMempoolFlush { + delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) + stateChangeSyncer.MempoolNewlyFlushedTxns = make(map[string]*StateChangeEntry) + } else { + delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) + } + return nil } -func createMempoolTxKey(operationType StateSyncerOperationType, keyBytes []byte) string { - return fmt.Sprintf("%v%v", operationType, string(keyBytes)) +func createMempoolTxKey(keyBytes []byte) string { + return fmt.Sprintf("%v", string(keyBytes)) } // SyncMempoolToStateSyncer flushes all mempool transactions to the db, capturing those state changes @@ -625,8 +776,13 @@ func createMempoolTxKey(operationType StateSyncerOperationType, keyBytes []byte) // utxo ops and adds them to the mempool state change file. func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Server) (bool, error) { + startTime := time.Now() originalCommittedFlushId := stateChangeSyncer.BlockSyncFlushId + if originalCommittedFlushId == uuid.Nil { + return false, nil + } + if server.mempool.stopped { return true, nil } @@ -635,6 +791,10 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser stateChangeSyncer.BlockHeight = blockHeight + stateChangeSyncer.MempoolFlushId = originalCommittedFlushId + + //fmt.Printf("Original committed flush ID: %v\n", originalCommittedFlushId) + mempoolUtxoView, err := server.GetMempool().GetAugmentedUniversalView() if err != nil { return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") @@ -663,14 +823,32 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // more than once in the mempool transactions. txn := server.blockchain.db.NewTransaction(true) defer txn.Discard() - + //fmt.Printf("Mempool synced len before flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + //fmt.Printf("Mempool flushed len before flush: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + fmt.Printf("Time since mempool sync start: %v\n", time.Since(startTime)) + startTime = time.Now() err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) - + if err != nil { + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: FlushToDbWithTxn: ") + } + fmt.Printf("Time since db flush: %v\n", time.Since(startTime)) + startTime = time.Now() mempoolTxUtxoView, err := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) if err != nil { + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: CreateMempoolTxUtxoView: ") } - + fmt.Printf("Time since utxo view: %v\n", time.Since(startTime)) + startTime = time.Now() // Loop through all the transactions in the mempool and connect them and their utxo ops to the mempool view. server.mempool.mtx.RLock() mempoolTxns, _, err := server.mempool._getTransactionsOrderedByTimeAdded() @@ -678,54 +856,123 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + fmt.Printf("After the mempool flush: %+v\n", &StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, Succeeded: false, IsMempoolFlush: true, }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") } - - for _, mempoolTx := range mempoolTxns { - utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( - mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) - if err != nil { - return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + fmt.Printf("Time since getting transactions: %v\n", time.Since(startTime)) + startTime = time.Now() + if len(mempoolTxns) > 0 { + //fmt.Printf("Mempool tx hash: %v\n", mempoolTxns[0].Hash.String()) + } + fmt.Printf("Mempool synced len after flush: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + + //Check to see if every txn hash in our cached txns is in the first n txns in the mempool. + //N represents the length of our cached txn map. + for ii, mempoolTx := range mempoolTxns { + if _, ok := stateChangeSyncer.MempoolCachedTxns[mempoolTx.Hash.String()]; !ok { + // If any of the transaction hashes in the first n transactions don't line up with our cache map, the mempool + // has changed since the last cache, and we need to reset it. + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil + fmt.Printf("Txn not in cache, resetting\n") + break } + // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. + if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { + if stateChangeSyncer.MempoolCachedUtxoView != nil { + // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. + mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView + } + fmt.Printf("All txns match, continueing: %v\n", ii) + break + } + } - // Emit transaction state change. - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: &StateChangeEntry{ + for _, mempoolTx := range mempoolTxns { + var txnStateChangeEntry *StateChangeEntry + var utxoOpStateChangeEntry *StateChangeEntry + // Check if the transaction is already in the cache. If so, skip it. + txHash := mempoolTx.Hash.String() + if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { + txnStateChangeEntry = stateChangeEntries[0] + utxoOpStateChangeEntry = stateChangeEntries[1] + } else { + utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( + mempoolTx.Tx, mempoolTx.Hash, 0, uint32(blockHeight+1), false, false /*ignoreUtxos*/) + if err != nil { + //fmt.Printf("Right before the mempool flush error: %v\n", err) + //continue + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") + } + txnStateChangeEntry = &StateChangeEntry{ OperationType: DbOperationTypeUpsert, KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), - }, - FlushId: uuid.Nil, - IsMempoolTxn: true, - }) + IsReverted: false, + } - // Capture the utxo ops for the transaction in a UTXOOp bundle. - utxoOpBundle := &UtxoOperationBundle{ - UtxoOpBundle: [][]*UtxoOperation{}, - } + // Capture the utxo ops for the transaction in a UTXOOp bundle. + utxoOpBundle := &UtxoOperationBundle{ + UtxoOpBundle: [][]*UtxoOperation{}, + } - utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) + utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) - // Emit UTXOOp bundle event - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: &StateChangeEntry{ + utxoOpStateChangeEntry = &StateChangeEntry{ OperationType: DbOperationTypeUpsert, KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), - }, - FlushId: uuid.Nil, - IsMempoolTxn: true, + IsReverted: false, + } + + // Add both state change entries to the mempool sync map. + stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + } + + // Emit transaction state change. + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: txnStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, + }) + + // Emit UTXOOp bundle event + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: utxoOpStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, }) } + // Update the cached utxo view to represent the new cached state. + stateChangeSyncer.MempoolCachedUtxoView, err = mempoolTxUtxoView.CopyUtxoView() + if err != nil { + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: Error copying cached utxo view: ") + } + + fmt.Printf("Time to connect all %d txns: %v\n", len(mempoolTxns), time.Since(startTime)) + startTime = time.Now() + fmt.Printf("Mempool flushed len: %d\n", len(stateChangeSyncer.MempoolFlushKeySet)) + fmt.Printf("Mempool synced len after all: %d\n", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) // Before flushing the mempool to the state change file, check if a block has mined. If so, abort the flush. if err != nil || originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, Succeeded: false, IsMempoolFlush: true, }) @@ -733,11 +980,12 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, Succeeded: true, IsMempoolFlush: true, BlockSyncFlushId: originalCommittedFlushId, }) + fmt.Printf("Time to flush: %v\n", time.Since(startTime)) return false, nil } @@ -747,9 +995,10 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv // Wait for mempool to be initialized. for server.mempool == nil || server.blockchain.chainState() != SyncStateFullyCurrent { time.Sleep(15000 * time.Millisecond) + fmt.Printf("Mempool: %v\n", server.mempool) + fmt.Printf("Chain state: %v\n", server.blockchain.chainState()) } if !stateChangeSyncer.BlocksyncCompleteEntriesFlushed && stateChangeSyncer.SyncType == NodeSyncTypeBlockSync { - fmt.Printf("Flushing to file") err := stateChangeSyncer.FlushAllEntriesToFile(server) if err != nil { fmt.Printf("StateChangeSyncer.StartMempoolSyncRoutine: Error flushing all entries to file: %v", err) @@ -758,13 +1007,15 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv mempoolClosed := server.mempool.stopped for !mempoolClosed { // Sleep for a short while to avoid a tight loop. - time.Sleep(100 * time.Millisecond) + time.Sleep(150 * time.Millisecond) var err error + //start := time.Now() // If the mempool is not empty, sync the mempool to the state syncer. mempoolClosed, err = stateChangeSyncer.SyncMempoolToStateSyncer(server) if err != nil { glog.Errorf("StateChangeSyncer.StartMempoolSyncRoutine: Error syncing mempool to state syncer: %v", err) } + //fmt.Printf("Synced mempool in %v\n", time.Since(start)) } }() } @@ -818,6 +1069,7 @@ func (stateChangeSyncer *StateChangeSyncer) FlushAllEntriesToFile(server *Server OperationType: DbOperationTypeInsert, KeyBytes: dbEntry.Key, EncoderBytes: dbEntry.Value, + IsReverted: false, } // If this prefix is the prefix for UTXO Ops, fetch the transaction for each UTXO Op and attach it to the UTXO Op. diff --git a/lib/state_change_syncer_test.go b/lib/state_change_syncer_test.go index 083c851bd..a77559c4c 100644 --- a/lib/state_change_syncer_test.go +++ b/lib/state_change_syncer_test.go @@ -44,6 +44,7 @@ func TestStateChangeEntryEncoder(t *testing.T) { KeyBytes: []byte{1, 2, 3}, Encoder: postEntry, EncoderType: postEntry.GetEncoderType(), + IsReverted: false, } stateChangeEntryBytes := EncodeToBytes(0, stateChangeEntry)