diff --git a/lib/blockchain.go b/lib/blockchain.go index 3551ee41e..570e06473 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -2558,17 +2558,15 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures } bc.timer.End("Blockchain.ProcessBlock: Transactions Db snapshot & operations") if innerErr = bc.blockView.FlushToDbWithTxn(txn, blockHeight); innerErr != nil { + // If we're in the middle of a sync, we should notify the event manager that we failed to sync the block. + if bc.eventManager != nil && !bc.eventManager.isMempoolManager { + bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: uuid.Nil, + Succeeded: false, + }) + } return errors.Wrapf(innerErr, "ProcessBlock: Problem writing utxo view to db on simple add to tip") } - // Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that - // state syncer maintains a consistent view of the blockchain. - // Note: We ignore the mempool manager here, as that process handles state syncer flush events itself. - if bc.eventManager != nil && !bc.eventManager.isMempoolManager { - bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, - Succeeded: innerErr == nil, - }) - } bc.timer.End("Blockchain.ProcessBlock: Transactions Db utxo flush") bc.timer.Start("Blockchain.ProcessBlock: Transactions Db snapshot & operations") @@ -2927,6 +2925,16 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures // Signal the server that we've accepted this block in some way. if bc.eventManager != nil { bc.eventManager.blockAccepted(&BlockEvent{Block: desoBlock}) + // Immediately after the utxo view is flushed to badger, emit a state syncer flushed event, so that + // state syncer maintains a consistent view of the blockchain. + // Note: We ignore the mempool manager here, as that process handles state syncer flush events itself. + if !bc.eventManager.isMempoolManager { + fmt.Printf("Emitting state syncer flushed event for synced block\n") + bc.eventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: uuid.Nil, + Succeeded: true, + }) + } } bc.timer.Print("Blockchain.ProcessBlock: Initial") diff --git a/lib/db_utils.go b/lib/db_utils.go index 98bcf6666..64ad6366d 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -464,7 +464,7 @@ type DBPrefixes struct { // The Minor/Major distinction is used to deterministically map the two accessGroupIds of message's sender/recipient // into a single pair based on the lexicographical ordering of the two accessGroupIds. This is done to ensure that // both sides of the conversation have the same key for the same conversation, and we can store just a single message. - PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true"` + PrefixDmMessagesIndex []byte `prefix_id:"[75]" is_state:"true" core_state:"true"` // PrefixDmThreadIndex is modified by the NewMessage transaction and is used to store a DmThreadEntry // for each existing dm thread. It answers the question: "Give me all the threads for a particular user." @@ -1149,6 +1149,7 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve KeyBytes: key, EncoderBytes: value, AncestralRecordBytes: ancestralValue, + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: eventManager.isMempoolManager, @@ -1251,6 +1252,7 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * KeyBytes: key, EncoderBytes: nil, AncestralRecordBytes: ancestralValue, + IsReverted: false, }, FlushId: uuid.Nil, IsMempoolTxn: eventManager.isMempoolManager, diff --git a/lib/pos_blockchain.go b/lib/pos_blockchain.go index 154268389..7be62f35a 100644 --- a/lib/pos_blockchain.go +++ b/lib/pos_blockchain.go @@ -1725,6 +1725,9 @@ func (bc *Blockchain) commitBlockPoS(blockHash *BlockHash, verifySignatures bool return errors.Wrapf(err, "commitBlockPoS: Problem putting block in db: ") } + if bc.snapshot != nil { + bc.snapshot.FinishProcessBlock(blockNode) + } if bc.eventManager != nil { bc.eventManager.blockCommitted(&BlockEvent{ Block: block, @@ -1741,9 +1744,6 @@ func (bc *Blockchain) commitBlockPoS(blockHash *BlockHash, verifySignatures bool }) } } - if bc.snapshot != nil { - bc.snapshot.FinishProcessBlock(blockNode) - } currentEpochNumber, err := utxoView.GetCurrentEpochNumber() if err != nil { return errors.Wrapf(err, "commitBlockPoS: Problem getting current epoch number") diff --git a/lib/snapshot.go b/lib/snapshot.go index 787a8f347..3ddeb7244 100644 --- a/lib/snapshot.go +++ b/lib/snapshot.go @@ -1175,6 +1175,7 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock. OperationType: DbOperationTypeInsert, KeyBytes: dbEntry.Key, EncoderBytes: dbEntry.Value, + IsReverted: false, }, FlushId: dbFlushId, }) diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 00efd3137..62017e7f0 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -60,18 +60,26 @@ type StateChangeEntry struct { BlockHeight uint64 // The block associated with this state change event. Only applicable to utxo operations. Block *MsgDeSoBlock + // For mempool state changes, whether this operation has been booted from the mempool and should be reverted + // from the state change record. + IsReverted bool } // RawEncodeWithoutMetadata constructs the bytes to represent a StateChangeEntry. // The format is: -// [operation type (varint)][encoder type (varint)][key length (varint)][key bytes] +// [operation type (varint)][is reverted bool][encoder type (varint)][key length (varint)][key bytes] // [encoder length (varint)][encoder bytes][is mempool (1 byte)][utxo ops length (varint)][utxo ops bytes] func (stateChangeEntry *StateChangeEntry) RawEncodeWithoutMetadata(blockHeight uint64, skipMetadata ...bool) []byte { // Get byte length of keyBytes (will be nil for mempool transactions) var data []byte + // OperationType data = append(data, UintToBuf(uint64(stateChangeEntry.OperationType))...) + // IsReverted + data = append(data, BoolToByte(stateChangeEntry.IsReverted)) + // EncoderType data = append(data, UintToBuf(uint64(stateChangeEntry.EncoderType))...) + // KeyBytes data = append(data, EncodeByteArray(stateChangeEntry.KeyBytes)...) // The encoder can either be represented in raw bytes or as an encoder. If it's represented in bytes, we use that. @@ -115,8 +123,14 @@ func (stateChangeEntry *StateChangeEntry) RawEncodeWithoutMetadata(blockHeight u // Encode the block height. data = append(data, UintToBuf(blockHeight)...) - // Encode the transaction. - data = append(data, EncodeToBytes(blockHeight, stateChangeEntry.Block)...) + // Encode the block, only for utxo operations. + if stateChangeEntry.EncoderType == EncoderTypeUtxoOperation { + data = append(data, EncodeToBytes(blockHeight, stateChangeEntry.Block)...) + } else { + // If the encoder type is not a utxo operation, encode a nil value. + // We do this to simplify the decode logic and avoid an encoder migration. + data = append(data, EncodeToBytes(blockHeight, nil)...) + } return data } @@ -129,6 +143,13 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u } stateChangeEntry.OperationType = StateSyncerOperationType(operationType) + // Decode IsReverted + isReverted, err := ReadBoolByte(rr) + if err != nil { + return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding is reverted") + } + stateChangeEntry.IsReverted = isReverted + // Decode EncoderType encoderType, err := ReadUvarint(rr) if err != nil { @@ -181,7 +202,6 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u } else if err != nil { return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding block") } - return nil } @@ -221,16 +241,18 @@ type StateChangeSyncer struct { // we write the correct entries to the state change file. // During blocksync, all flushes are synchronous, so we don't need to worry about this. As such, those flushes // are given the uuid.Nil ID. - UnflushedBytes map[uuid.UUID]UnflushedStateSyncerBytes + UnflushedCommittedBytes map[uuid.UUID]UnflushedStateSyncerBytes + UnflushedMempoolBytes map[uuid.UUID]UnflushedStateSyncerBytes // This map is used to keep track of all the key and value pairs that state syncer is tracking (and therefore // don't need to be re-emitted to the state change file). // The key is the stringifyed key of the entry, plus the operation type. // The value is the badger entry that was flushed to the db. - MempoolKeyValueMap map[string][]byte + MempoolSyncedKeyValueMap map[string]*StateChangeEntry + + MempoolNewlyFlushedTxns map[string]*StateChangeEntry // This map tracks the keys that were flushed to the mempool in a single flush. - // Every time a flush occurs, this map is cleared, as opposed to the MempoolKeyValueMap, which is only cleared - // when a new block is processed. + // Every time a flush occurs, this map is cleared, as opposed to the MempoolSyncedKeyValueMap, which is only cleared // This is used to determine if there are any tracked mempool transactions that have been ejected from the current // mempool state. // When this occurs, the mempool is reset, and all tracked mempool transactions are re-emitted to the state change file. @@ -238,6 +260,11 @@ type StateChangeSyncer struct { // clear out any mempool entries that were ejected from the mempool. MempoolFlushKeySet map[string]bool + // This cache stores the transactions and their associated utxo ops that are currently in the mempool. + // This allows us to reduce the number of connect transaction calls when syncing the mempool + MempoolCachedTxns map[string][]*StateChangeEntry + + MempoolCachedUtxoView *UtxoView // Tracks the flush IDs of the last block sync flush and the last mempool flush. // These are not used during hypersync, as many flushes are being processed asynchronously. BlockSyncFlushId uuid.UUID @@ -320,9 +347,12 @@ func NewStateChangeSyncer(stateChangeDir string, nodeSyncType NodeSyncType, memp StateChangeMempoolFile: stateChangeMempoolFile, StateChangeMempoolIndexFile: stateChangeMempoolIndexFile, StateChangeMempoolFileSize: uint64(stateChangeMempoolFileInfo.Size()), - UnflushedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), - MempoolKeyValueMap: make(map[string][]byte), + UnflushedCommittedBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), + UnflushedMempoolBytes: make(map[uuid.UUID]UnflushedStateSyncerBytes), + MempoolSyncedKeyValueMap: make(map[string]*StateChangeEntry), + MempoolNewlyFlushedTxns: make(map[string]*StateChangeEntry), MempoolFlushKeySet: make(map[string]bool), + MempoolCachedTxns: make(map[string][]*StateChangeEntry), StateSyncerMutex: &sync.Mutex{}, SyncType: nodeSyncType, BlocksyncCompleteEntriesFlushed: blocksyncCompleteEntriesFlushed, @@ -375,41 +405,13 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } if event.IsMempoolTxn { - // Create a mempool flush ID if one doesn't already exist. - if event.FlushId == uuid.Nil && stateChangeSyncer.MempoolFlushId == uuid.Nil { - stateChangeSyncer.MempoolFlushId = uuid.New() - } + // Set the flushId to the mempool flush ID. + //flushId = stateChangeSyncer.BlockSyncFlushI // If the event flush ID is nil, then we need to use the global mempool flush ID. if flushId == uuid.Nil { flushId = stateChangeSyncer.MempoolFlushId } - - // The current state of the tracked mempool is stored in the MempoolKeyValueMap. If this entry is already in there - // then we don't need to re-write it to the state change file. - // Create key for op + key map - txKey := createMempoolTxKey(stateChangeEntry.OperationType, stateChangeEntry.KeyBytes) - - // Track the key in the MempoolFlushKeySet. - if stateChangeEntry.OperationType == DbOperationTypeDelete { - delete(stateChangeSyncer.MempoolFlushKeySet, txKey) - } else { - stateChangeSyncer.MempoolFlushKeySet[txKey] = true - } - - // Check to see if the key is in the map, and if the value is the same as the value in the event. - if valueBytes, ok := stateChangeSyncer.MempoolKeyValueMap[txKey]; ok && bytes.Equal(valueBytes, event.StateChangeEntry.EncoderBytes) { - // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, - // then we don't need to write the state change entry to the state change file - it's already being tracked. - return - } - - // Track the key and value if this is a new entry to the mempool. - if stateChangeEntry.OperationType == DbOperationTypeDelete { - delete(stateChangeSyncer.MempoolKeyValueMap, txKey) - } else { - stateChangeSyncer.MempoolKeyValueMap[txKey] = event.StateChangeEntry.EncoderBytes - } } else { // If the flush ID is nil, then we need to use the global block sync flush ID. if flushId == uuid.Nil { @@ -449,13 +451,44 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S // Set the flush ID. stateChangeEntry.FlushId = flushId + if event.IsMempoolTxn { + // The current state of the tracked mempool is stored in the MempoolSyncedKeyValueMap. If this entry is already in there + // then we don't need to re-write it to the state change file. + // Create key for op + key map + txKey := createMempoolTxKey(stateChangeEntry.KeyBytes) + + // Track the key in the MempoolFlushKeySet. + stateChangeSyncer.MempoolFlushKeySet[txKey] = true + + // Check to see if the key is in the map, and if the value is the same as the value in the event. + if cachedSCE, ok := stateChangeSyncer.MempoolSyncedKeyValueMap[txKey]; ok && bytes.Equal(cachedSCE.EncoderBytes, event.StateChangeEntry.EncoderBytes) && cachedSCE.OperationType == event.StateChangeEntry.OperationType { + // If the key is in the map, and the entry bytes are the same as those that are already tracked by state syncer, + // then we don't need to write the state change entry to the state change file - it's already being tracked. + return + } else if ok { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + // Skip if the entry is already being tracked as a new flush. + if _, newFlushExists := stateChangeSyncer.MempoolNewlyFlushedTxns[txKey]; !newFlushExists { + // If the key is in the map, and the entry bytes are different, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = cachedSCE + } + } else { + // If the key is not in the map, then we need to track the new entry. + stateChangeSyncer.MempoolNewlyFlushedTxns[txKey] = nil + } + + // Track the key and value if this is a new entry to the mempool, or if the encoder bytes or operation type + // changed since it was last synced. + stateChangeSyncer.MempoolSyncedKeyValueMap[txKey] = event.StateChangeEntry + } + // Encode the state change entry. We encode as a byte array, so the consumer can buffer just the bytes needed // to decode this entry when reading from file. entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, stateChangeEntry, false) writeBytes := EncodeByteArray(entryBytes) // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. - stateChangeSyncer.addTransactionToQueue(stateChangeEntry.FlushId, writeBytes) + stateChangeSyncer.addTransactionToQueue(stateChangeEntry.FlushId, writeBytes, event.IsMempoolTxn) } // _handleStateSyncerFlush is called when a Badger db flush takes place. It calls a helper function that takes the bytes that @@ -464,11 +497,17 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State stateChangeSyncer.StateSyncerMutex.Lock() defer stateChangeSyncer.StateSyncerMutex.Unlock() + glog.V(2).Infof("Handling state syncer flush: %+v", event) + if event.IsMempoolFlush { // If this is a mempool flush, make sure a block hasn't mined since the mempool entries were added to queue. // If not, reset the mempool maps and file, and start from scratch. The consumer will revert the mempool transactions // it currently has and sync from scratch. - if stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId { + if (stateChangeSyncer.BlockSyncFlushId != event.BlockSyncFlushId && event.BlockSyncFlushId != uuid.Nil) || + stateChangeSyncer.BlockSyncFlushId != event.FlushId { + glog.V(2).Infof( + "The flush ID has changed, bailing now. Event: %v, Event block sync: %v, Global block sync: %v\n", + event.FlushId, event.BlockSyncFlushId, stateChangeSyncer.BlockSyncFlushId) stateChangeSyncer.ResetMempool() return } @@ -477,15 +516,55 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State // This would mean that an entry was ejected from the mempool. // When this happens, we need to reset the mempool and start from scratch, so that the consumer can revert the // mempool transactions it currently has and sync the mempool from scratch. - for key := range stateChangeSyncer.MempoolKeyValueMap { - // If any of the keys that the mempool is currently tracking weren't included in the flush, the state syncer - // mempool is bad and needs to be reset. - if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { - stateChangeSyncer.ResetMempool() - return + // + // Example: + // + // Flush: + // Key: a + // Key: b + // Key: d + + // Synced: + // Key: a + // Key: b + // Key: c <- Revert this one + // Key: d + + if event.Succeeded { + for key, cachedSCE := range stateChangeSyncer.MempoolSyncedKeyValueMap { + // If any of the keys that the mempool is currently tracking weren't included in the flush, that entry + // needs to be reverted from the mempool. + if _, ok := stateChangeSyncer.MempoolFlushKeySet[key]; !ok { + // Confirm that the block sync ID hasn't shifted. If it has, bail now. + if cachedSCE.FlushId != stateChangeSyncer.BlockSyncFlushId { + glog.V(2).Infof("The flush ID has changed, inside key/value check, bailing now.\n") + stateChangeSyncer.ResetMempool() + return + } + + cachedSCE.IsReverted = true + + // Create a revert state change entry and add it to the queue. This will signal the state change + // consumer to revert the synced entry. + entryBytes := EncodeToBytes(stateChangeSyncer.BlockHeight, cachedSCE, false) + writeBytes := EncodeByteArray(entryBytes) + + glog.V(2).Infof("Reverting entry %d\n", cachedSCE.EncoderType) + + // Add the StateChangeEntry bytes to the queue of bytes to be written to the state change file upon Badger db flush. + stateChangeSyncer.addTransactionToQueue(cachedSCE.FlushId, writeBytes, true) + + // Remove this entry from the synced map + delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) + } } } + + // Reset the mempool flush set. stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) + } else { + glog.V(2).Infof("Here is the flush ID: %v\n", event.FlushId) + glog.V(2).Infof("Here is the block sync flush ID: %v\n", event.BlockSyncFlushId) } err := stateChangeSyncer.FlushTransactionsToFile(event) @@ -496,15 +575,19 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerFlush(event *State if !event.IsMempoolFlush { // After flushing blocksync transactions to file, reset the block sync flush ID, and reset the mempool. stateChangeSyncer.BlockSyncFlushId = uuid.New() + glog.V(2).Infof("Setting a new blocksync flush ID: %v\n", stateChangeSyncer.BlockSyncFlushId) stateChangeSyncer.ResetMempool() } } func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { - stateChangeSyncer.MempoolKeyValueMap = make(map[string][]byte) + glog.V(2).Info("Resetting mempool.\n") + stateChangeSyncer.MempoolSyncedKeyValueMap = make(map[string]*StateChangeEntry) + stateChangeSyncer.MempoolNewlyFlushedTxns = make(map[string]*StateChangeEntry) stateChangeSyncer.MempoolFlushKeySet = make(map[string]bool) - delete(stateChangeSyncer.UnflushedBytes, stateChangeSyncer.MempoolFlushId) + delete(stateChangeSyncer.UnflushedMempoolBytes, stateChangeSyncer.MempoolFlushId) stateChangeSyncer.MempoolFlushId = uuid.Nil + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) // Truncate the mempool files. stateChangeSyncer.StateChangeMempoolFile.Truncate(0) stateChangeSyncer.StateChangeMempoolIndexFile.Truncate(0) @@ -512,15 +595,23 @@ func (stateChangeSyncer *StateChangeSyncer) ResetMempool() { } // Add a transaction to the queue of transactions to be flushed to disk upon badger db flush. -func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.UUID, writeBytes []byte) { - unflushedBytes, exists := stateChangeSyncer.UnflushedBytes[flushId] +func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.UUID, writeBytes []byte, isMempool bool) { + + var unflushedBytes UnflushedStateSyncerBytes + var exists bool + + if isMempool { + unflushedBytes, exists = stateChangeSyncer.UnflushedMempoolBytes[flushId] + } else { + unflushedBytes, exists = stateChangeSyncer.UnflushedCommittedBytes[flushId] + } + if !exists { unflushedBytes = UnflushedStateSyncerBytes{ StateChangeBytes: []byte{}, StateChangeOperationIndexes: []uint64{}, } } - // Get the byte index of where this transaction occurs in the unflushed bytes, and add it to the list of // indexes that should be written to the index file. dbOperationIndex := uint64(len(unflushedBytes.StateChangeBytes)) @@ -528,12 +619,18 @@ func (stateChangeSyncer *StateChangeSyncer) addTransactionToQueue(flushId uuid.U unflushedBytes.StateChangeBytes = append(unflushedBytes.StateChangeBytes, writeBytes...) - stateChangeSyncer.UnflushedBytes[flushId] = unflushedBytes + if isMempool { + stateChangeSyncer.UnflushedMempoolBytes[flushId] = unflushedBytes + } else { + stateChangeSyncer.UnflushedCommittedBytes[flushId] = unflushedBytes + } } // FlushTransactionsToFile writes the bytes that have been cached on the StateChangeSyncer to the state change file. func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *StateSyncerFlushedEvent) error { flushId := event.FlushId + + glog.V(2).Infof("Flushing to file: %+v", event) // Get the relevant global flush ID from the state change syncer if the flush ID is nil. if event.FlushId == uuid.Nil { if event.IsMempoolFlush { @@ -559,12 +656,34 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State // If the flush failed, delete the unflushed bytes and associated metadata. // Also delete any unconnected mempool txns from our cache. if !event.Succeeded { - delete(stateChangeSyncer.UnflushedBytes, flushId) + glog.V(2).Infof("Deleting unflushed bytes for id: %s", flushId) + if event.IsMempoolFlush { + delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) + // Loop through the unflushed mempool transactions and delete them from the cache. + for key, sce := range stateChangeSyncer.MempoolNewlyFlushedTxns { + if sce != nil { + stateChangeSyncer.MempoolSyncedKeyValueMap[key] = sce + } else { + delete(stateChangeSyncer.MempoolSyncedKeyValueMap, key) + delete(stateChangeSyncer.MempoolFlushKeySet, key) + } + } + } else { + delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) + } + return nil } - unflushedBytes, exists := stateChangeSyncer.UnflushedBytes[flushId] + var unflushedBytes UnflushedStateSyncerBytes + var exists bool + if event.IsMempoolFlush { + unflushedBytes, exists = stateChangeSyncer.UnflushedMempoolBytes[flushId] + } else { + unflushedBytes, exists = stateChangeSyncer.UnflushedCommittedBytes[flushId] + } if !exists { + glog.V(2).Infof("Unflushed bytes for flush ID doesn't exist: %s", flushId.String()) return nil } @@ -617,26 +736,39 @@ func (stateChangeSyncer *StateChangeSyncer) FlushTransactionsToFile(event *State } // Update unflushed bytes map to remove the flushed bytes. - delete(stateChangeSyncer.UnflushedBytes, flushId) + if event.IsMempoolFlush { + delete(stateChangeSyncer.UnflushedMempoolBytes, flushId) + stateChangeSyncer.MempoolNewlyFlushedTxns = make(map[string]*StateChangeEntry) + } else { + delete(stateChangeSyncer.UnflushedCommittedBytes, flushId) + } + return nil } -func createMempoolTxKey(operationType StateSyncerOperationType, keyBytes []byte) string { - return fmt.Sprintf("%v%v", operationType, string(keyBytes)) +func createMempoolTxKey(keyBytes []byte) string { + return fmt.Sprintf("%v", string(keyBytes)) } // SyncMempoolToStateSyncer flushes all mempool transactions to the db, capturing those state changes // in the mempool state change file. It also loops through all unconnected transactions and their associated // utxo ops and adds them to the mempool state change file. func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Server) (bool, error) { + startTime := time.Now() originalCommittedFlushId := stateChangeSyncer.BlockSyncFlushId + if originalCommittedFlushId == uuid.Nil { + return false, nil + } + if !server.GetMempool().IsRunning() { return true, nil } blockHeight := uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height) + stateChangeSyncer.MempoolFlushId = originalCommittedFlushId + stateChangeSyncer.BlockHeight = blockHeight mempoolUtxoView, err := server.GetMempool().GetAugmentedUniversalView() @@ -669,13 +801,21 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // more than once in the mempool transactions. txn := server.blockchain.db.NewTransaction(true) defer txn.Discard() - + glog.V(2).Infof("Time since mempool sync start: %v", time.Since(startTime)) + startTime = time.Now() err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) - - mempoolTxUtxoView := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) if err != nil { - return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: CreateMempoolTxUtxoView: ") + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: FlushToDbWithTxn: ") } + glog.V(2).Infof("Time since db flush: %v", time.Since(startTime)) + mempoolTxUtxoView := NewUtxoView(server.blockchain.db, server.blockchain.params, server.blockchain.postgres, nil, &mempoolEventManager) + glog.V(2).Infof("Time since utxo view: %v", time.Since(startTime)) + startTime = time.Now() // Loop through all the transactions in the mempool and connect them and their utxo ops to the mempool view. mempoolTxns := server.GetMempool().GetOrderedTransactions() @@ -684,13 +824,22 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser uncommittedBlocks, err := server.blockchain.GetUncommittedBlocks(mempoolUtxoView.TipHash) if err != nil { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + glog.V(2).Infof("After the mempool flush: %+v", &StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, Succeeded: false, IsMempoolFlush: true, }) return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer: ") } + glog.V(2).Infof("Time since getting transactions: %v", time.Since(startTime)) + startTime = time.Now() + glog.V(2).Infof("Mempool synced len after flush: %d", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) + // TODO: Have Z look at if we need to do some caching in the uncommitted blocks logic. // First connect the uncommitted blocks to the mempool view. for _, uncommittedBlock := range uncommittedBlocks { utxoViewAndOpsAtBlockHash, err := server.blockchain.getUtxoViewAndUtxoOpsAtBlockHash(*uncommittedBlock.Hash) @@ -722,78 +871,103 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser mempoolTxUtxoView.EventManager = &mempoolEventManager } - currentTimestamp := time.Now().UnixNano() + //Check to see if every txn hash in our cached txns is in the first n txns in the mempool. + //N represents the length of our cached txn map. for ii, mempoolTx := range mempoolTxns { - if server.params.IsPoSBlockHeight(blockHeight) && uint64(ii) > stateChangeSyncer.MempoolTxnSyncLimit { + if _, ok := stateChangeSyncer.MempoolCachedTxns[mempoolTx.Hash.String()]; !ok { + // If any of the transaction hashes in the first n transactions don't line up with our cache map, the mempool + // has changed since the last cache, and we need to reset it. + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil + fmt.Printf("Txn not in cache, resetting\n") break } - var utxoOpsForTxn []*UtxoOperation - if server.params.IsPoSBlockHeight(blockHeight + 1) { - // We need to create a copy of the view in the event that the transaction fails to - // connect. If it fails to connect, we need to reset the view to its original state. - // and try to connect it as a failing transaction. If that fails as well, we just continue - // and the mempoolTxUtxoView is unmodified. - var copiedView *UtxoView - copiedView = mempoolTxUtxoView.CopyUtxoView() - utxoOpsForTxn, _, _, _, err = copiedView.ConnectTransaction( - mempoolTx.Tx, mempoolTx.Hash, uint32(blockHeight+1), - currentTimestamp, false, false /*ignoreUtxos*/) - // If the transaction successfully connected, we update mempoolTxUtxoView to the copied view. - if err == nil { - mempoolTxUtxoView = copiedView - } else { - glog.V(2).Infof( - "StateChangeSyncer.SyncMempoolToStateSyncer failed connecting mempool tx with (hash= %v): (err=%v)", - mempoolTx.Hash, - err, - ) - // If the txn fails to connect, then we should not emit any state changes for it. - continue + + // Once we're past the number of cached txns, we have confirmed that nothing in our cache is out of date and can break. + if ii >= len(stateChangeSyncer.MempoolCachedTxns)-1 { + if stateChangeSyncer.MempoolCachedUtxoView != nil { + // If we know that all our transactions are good, set the state of the utxo view to the cached one, and exit. + mempoolUtxoView = stateChangeSyncer.MempoolCachedUtxoView } + fmt.Printf("All txns match, continueing: %v\n", ii) + break + } + } + + currentTimestamp := time.Now().UnixNano() + for _, mempoolTx := range mempoolTxns { + var txnStateChangeEntry *StateChangeEntry + var utxoOpStateChangeEntry *StateChangeEntry + // Check if the transaction is already in the cache. If so, skip it. + txHash := mempoolTx.Hash.String() + if stateChangeEntries, ok := stateChangeSyncer.MempoolCachedTxns[txHash]; ok { + txnStateChangeEntry = stateChangeEntries[0] + utxoOpStateChangeEntry = stateChangeEntries[1] } else { - // For PoW block heights, we can just connect the transaction to the mempool view. - utxoOpsForTxn, _, _, _, err = mempoolTxUtxoView.ConnectTransaction( - mempoolTx.Tx, mempoolTx.Hash, uint32(blockHeight+1), - currentTimestamp, false, false /*ignoreUtxos*/) + utxoOpsForTxn, _, _, _, err := mempoolTxUtxoView.ConnectTransaction( + mempoolTx.Tx, mempoolTx.Hash, uint32(blockHeight+1), currentTimestamp, false, false /*ignoreUtxos*/) if err != nil { + //fmt.Printf("Right before the mempool flush error: %v\n", err) + //continue + mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ + FlushId: originalCommittedFlushId, + Succeeded: false, + IsMempoolFlush: true, + }) + stateChangeSyncer.MempoolCachedTxns = make(map[string][]*StateChangeEntry) + stateChangeSyncer.MempoolCachedUtxoView = nil return false, errors.Wrapf(err, "StateChangeSyncer.SyncMempoolToStateSyncer ConnectTransaction: ") } - } - - // Emit transaction state change. - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: &StateChangeEntry{ + txnStateChangeEntry = &StateChangeEntry{ OperationType: DbOperationTypeUpsert, KeyBytes: TxnHashToTxnKey(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, mempoolTx.Tx, false), - }, - FlushId: uuid.Nil, - IsMempoolTxn: true, - }) + IsReverted: false, + } - // Capture the utxo ops for the transaction in a UTXOOp bundle. - utxoOpBundle := &UtxoOperationBundle{ - UtxoOpBundle: [][]*UtxoOperation{}, - } + // Capture the utxo ops for the transaction in a UTXOOp bundle. + utxoOpBundle := &UtxoOperationBundle{ + UtxoOpBundle: [][]*UtxoOperation{}, + } - utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) + utxoOpBundle.UtxoOpBundle = append(utxoOpBundle.UtxoOpBundle, utxoOpsForTxn) - // Emit UTXOOp bundle event - mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ - StateChangeEntry: &StateChangeEntry{ + utxoOpStateChangeEntry = &StateChangeEntry{ OperationType: DbOperationTypeUpsert, KeyBytes: _DbKeyForTxnUtxoOps(mempoolTx.Hash), EncoderBytes: EncodeToBytes(blockHeight, utxoOpBundle, false), - }, - FlushId: uuid.Nil, - IsMempoolTxn: true, + IsReverted: false, + } + + // Add both state change entries to the mempool sync map. + stateChangeSyncer.MempoolCachedTxns[txHash] = []*StateChangeEntry{txnStateChangeEntry, utxoOpStateChangeEntry} + } + + // Emit transaction state change. + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: txnStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, + }) + + // Emit UTXOOp bundle event + mempoolUtxoView.EventManager.stateSyncerOperation(&StateSyncerOperationEvent{ + StateChangeEntry: utxoOpStateChangeEntry, + FlushId: originalCommittedFlushId, + IsMempoolTxn: true, }) } + // Update the cached utxo view to represent the new cached state. + stateChangeSyncer.MempoolCachedUtxoView = mempoolTxUtxoView.CopyUtxoView() + glog.V(2).Infof("Time to connect all %d txns: %v", len(mempoolTxns), time.Since(startTime)) + startTime = time.Now() + glog.V(2).Infof("Mempool flushed len: %d", len(stateChangeSyncer.MempoolFlushKeySet)) + glog.V(2).Infof("Mempool synced len after all: %d", len(stateChangeSyncer.MempoolSyncedKeyValueMap)) // Before flushing the mempool to the state change file, check if a block has mined. If so, abort the flush. - if err != nil || originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { + if originalCommittedFlushId != stateChangeSyncer.BlockSyncFlushId { mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, Succeeded: false, IsMempoolFlush: true, }) @@ -801,11 +975,12 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser } mempoolUtxoView.EventManager.stateSyncerFlushed(&StateSyncerFlushedEvent{ - FlushId: uuid.Nil, + FlushId: originalCommittedFlushId, Succeeded: true, IsMempoolFlush: true, BlockSyncFlushId: originalCommittedFlushId, }) + glog.V(2).Infof("Time to flush: %v", time.Since(startTime)) return false, nil } @@ -815,9 +990,10 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv // Wait for mempool to be initialized. for server.GetMempool() == nil || server.blockchain.chainState() != SyncStateFullyCurrent { time.Sleep(15000 * time.Millisecond) + glog.V(2).Infof("Mempool: %v", server.mempool) + glog.V(2).Infof("Chain state: %v", server.blockchain.chainState()) } if !stateChangeSyncer.BlocksyncCompleteEntriesFlushed && stateChangeSyncer.SyncType == NodeSyncTypeBlockSync { - fmt.Printf("Flushing to file") err := stateChangeSyncer.FlushAllEntriesToFile(server) if err != nil { fmt.Printf("StateChangeSyncer.StartMempoolSyncRoutine: Error flushing all entries to file: %v", err) @@ -886,6 +1062,7 @@ func (stateChangeSyncer *StateChangeSyncer) FlushAllEntriesToFile(server *Server OperationType: DbOperationTypeInsert, KeyBytes: dbEntry.Key, EncoderBytes: dbEntry.Value, + IsReverted: false, } // If this prefix is the prefix for UTXO Ops, fetch the transaction for each UTXO Op and attach it to the UTXO Op. diff --git a/lib/state_change_syncer_test.go b/lib/state_change_syncer_test.go index 083c851bd..a77559c4c 100644 --- a/lib/state_change_syncer_test.go +++ b/lib/state_change_syncer_test.go @@ -44,6 +44,7 @@ func TestStateChangeEntryEncoder(t *testing.T) { KeyBytes: []byte{1, 2, 3}, Encoder: postEntry, EncoderType: postEntry.GetEncoderType(), + IsReverted: false, } stateChangeEntryBytes := EncodeToBytes(0, stateChangeEntry)