From eaf3fb42a2bd2047ae5745b8b6e0cf0640c49cf2 Mon Sep 17 00:00:00 2001 From: Vyatcheslav Suharnikov Date: Wed, 8 Nov 2023 08:00:42 +0400 Subject: [PATCH] Delete old entries with low priority writes --- .../database/RocksDBWriter.scala | 478 +++++++++--------- .../com/wavesplatform/database/package.scala | 46 +- 2 files changed, 268 insertions(+), 256 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala index c8435c2f6d..dced8cc922 100644 --- a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala +++ b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala @@ -31,7 +31,7 @@ import com.wavesplatform.transaction.lease.{LeaseCancelTransaction, LeaseTransac import com.wavesplatform.transaction.smart.{InvokeExpressionTransaction, InvokeScriptTransaction, SetScriptTransaction} import com.wavesplatform.transaction.transfer.* import com.wavesplatform.utils.{LoggerFacade, ScorexLogging} -import org.rocksdb.RocksDB +import org.rocksdb.{ReadOptions, RocksDB} import org.slf4j.LoggerFactory import java.util @@ -119,8 +119,6 @@ class RocksDBWriter( private[database] def readOnly[A](f: ReadOnlyDB => A): A = writableDB.readOnly(f) - private[this] def readWrite[A](f: RW => A): A = writableDB.readWrite(f) - override protected def loadMaxAddressId(): Long = writableDB.get(Keys.lastAddressId).getOrElse(0L) override protected def loadAddressId(address: Address): Option[AddressId] = @@ -399,288 +397,292 @@ class RocksDBWriter( stateHash: StateHashBuilder.Result ): Unit = { log.trace(s"Persisting block ${blockMeta.id} at height $height") - readWrite { rw => - val expiredKeys = new ArrayBuffer[Array[Byte]] + writableDB.withOptions { (ro, wo) => + writableDB.readWriteWithOptions(ro, wo) { rw => + val expiredKeys = new ArrayBuffer[Array[Byte]] - rw.put(Keys.height, Height(height)) + rw.put(Keys.height, Height(height)) - val previousSafeRollbackHeight = rw.get(Keys.safeRollbackHeight) - val newSafeRollbackHeight = height - dbSettings.maxRollbackDepth + val previousSafeRollbackHeight = rw.get(Keys.safeRollbackHeight) + val newSafeRollbackHeight = height - dbSettings.maxRollbackDepth - if (previousSafeRollbackHeight < newSafeRollbackHeight) { - rw.put(Keys.safeRollbackHeight, newSafeRollbackHeight) - deleteOldEntries(Height(math.max(1, newSafeRollbackHeight)), rw) - } + if (previousSafeRollbackHeight < newSafeRollbackHeight) { + rw.put(Keys.safeRollbackHeight, newSafeRollbackHeight) + deleteOldEntries(Height(math.max(1, newSafeRollbackHeight)), ro) + } - rw.put(Keys.blockMetaAt(Height(height)), Some(blockMeta)) - rw.put(Keys.heightOf(blockMeta.id), Some(height)) - blockHeightCache.put(blockMeta.id, Some(height)) + rw.put(Keys.blockMetaAt(Height(height)), Some(blockMeta)) + rw.put(Keys.heightOf(blockMeta.id), Some(height)) + blockHeightCache.put(blockMeta.id, Some(height)) - blockMeta.header.flatMap(_.challengedHeader.map(_.generator.toAddress)) match { - case Some(addr) => - val key = Keys.maliciousMinerBanHeights(addr.bytes) - val savedHeights = rw.get(key) - rw.put(key, height +: savedHeights) - case _ => () - } - - val lastAddressId = loadMaxAddressId() + newAddresses.size - rw.put(Keys.lastAddressId, Some(lastAddressId)) + blockMeta.header.flatMap(_.challengedHeader.map(_.generator.toAddress)) match { + case Some(addr) => + val key = Keys.maliciousMinerBanHeights(addr.bytes) + val savedHeights = rw.get(key) + rw.put(key, height +: savedHeights) + case _ => () + } - for ((address, id) <- newAddresses) { - val kaid = Keys.addressId(address) - rw.put(kaid, Some(id)) - rw.put(Keys.idToAddress(id), address) - } + val lastAddressId = loadMaxAddressId() + newAddresses.size + rw.put(Keys.lastAddressId, Some(lastAddressId)) - val threshold = newSafeRollbackHeight + for ((address, id) <- newAddresses) { + val kaid = Keys.addressId(address) + rw.put(kaid, Some(id)) + rw.put(Keys.idToAddress(id), address) + } - appendBalances(balances, snapshot.assetStatics, rw) - appendData(newAddresses, data, rw) + val threshold = newSafeRollbackHeight - val changedAddresses = (addressTransactions.asScala.keys ++ balances.keys.map(_._1)).toSet - rw.put(Keys.changedAddresses(height), changedAddresses.toSeq) + appendBalances(balances, snapshot.assetStatics, rw) + appendData(newAddresses, data, rw) - // leases - for ((addressId, (currentLeaseBalance, leaseBalanceNode)) <- leaseBalances) { - rw.put(Keys.leaseBalance(addressId), currentLeaseBalance) - rw.put(Keys.leaseBalanceAt(addressId, currentLeaseBalance.height), leaseBalanceNode) - } + val changedAddresses = (addressTransactions.asScala.keys ++ balances.keys.map(_._1)).toSet + rw.put(Keys.changedAddresses(height), changedAddresses.toSeq) - for ((orderId, (currentVolumeAndFee, volumeAndFeeNode)) <- filledQuantity) { - rw.put(Keys.filledVolumeAndFee(orderId), currentVolumeAndFee) - rw.put(Keys.filledVolumeAndFeeAt(orderId, currentVolumeAndFee.height), volumeAndFeeNode) - } + // leases + for ((addressId, (currentLeaseBalance, leaseBalanceNode)) <- leaseBalances) { + rw.put(Keys.leaseBalance(addressId), currentLeaseBalance) + rw.put(Keys.leaseBalanceAt(addressId, currentLeaseBalance.height), leaseBalanceNode) + } - for ((asset, (assetStatic, assetNum)) <- snapshot.indexedAssetStatics) { - val pbAssetStatic = StaticAssetInfo( - assetStatic.sourceTransactionId, - assetStatic.issuerPublicKey, - assetStatic.decimals, - assetStatic.nft, - assetNum, - height, - asset.id.toByteString - ) - rw.put(Keys.assetStaticInfo(asset), Some(pbAssetStatic)) - } + for ((orderId, (currentVolumeAndFee, volumeAndFeeNode)) <- filledQuantity) { + rw.put(Keys.filledVolumeAndFee(orderId), currentVolumeAndFee) + rw.put(Keys.filledVolumeAndFeeAt(orderId, currentVolumeAndFee.height), volumeAndFeeNode) + } - val updatedAssetSet = snapshot.assetVolumes.keySet ++ snapshot.assetNamesAndDescriptions.keySet - for (asset <- updatedAssetSet) { - lazy val dbInfo = rw.fromHistory(Keys.assetDetailsHistory(asset), Keys.assetDetails(asset)) - val volume = - snapshot.assetVolumes - .get(asset) - .map(v => AssetVolumeInfo(v.isReissuable, BigInt(v.volume.toByteArray))) - .orElse(dbInfo.map(_._2)) - val nameAndDescription = - snapshot.assetNamesAndDescriptions - .get(asset) - .map(nd => AssetInfo(nd.name, nd.description, nd.lastUpdatedAt)) - .orElse(dbInfo.map(_._1)) - (nameAndDescription, volume).bisequence - .foreach(rw.put(Keys.assetDetails(asset)(height), _)) - } + for ((asset, (assetStatic, assetNum)) <- snapshot.indexedAssetStatics) { + val pbAssetStatic = StaticAssetInfo( + assetStatic.sourceTransactionId, + assetStatic.issuerPublicKey, + assetStatic.decimals, + assetStatic.nft, + assetNum, + height, + asset.id.toByteString + ) + rw.put(Keys.assetStaticInfo(asset), Some(pbAssetStatic)) + } - for (asset <- snapshot.assetStatics.keySet ++ updatedAssetSet) { - expiredKeys ++= updateHistory(rw, Keys.assetDetailsHistory(asset), threshold, Keys.assetDetails(asset)) - } + val updatedAssetSet = snapshot.assetVolumes.keySet ++ snapshot.assetNamesAndDescriptions.keySet + for (asset <- updatedAssetSet) { + lazy val dbInfo = rw.fromHistory(Keys.assetDetailsHistory(asset), Keys.assetDetails(asset)) + val volume = + snapshot.assetVolumes + .get(asset) + .map(v => AssetVolumeInfo(v.isReissuable, BigInt(v.volume.toByteArray))) + .orElse(dbInfo.map(_._2)) + val nameAndDescription = + snapshot.assetNamesAndDescriptions + .get(asset) + .map(nd => AssetInfo(nd.name, nd.description, nd.lastUpdatedAt)) + .orElse(dbInfo.map(_._1)) + (nameAndDescription, volume).bisequence + .foreach(rw.put(Keys.assetDetails(asset)(height), _)) + } - for ((id, details) <- snapshot.leaseStates) { - rw.put(Keys.leaseDetails(id)(height), Some(Right(details))) - expiredKeys ++= updateHistory(rw, Keys.leaseDetailsHistory(id), threshold, Keys.leaseDetails(id)) - } + for (asset <- snapshot.assetStatics.keySet ++ updatedAssetSet) { + expiredKeys ++= updateHistory(rw, Keys.assetDetailsHistory(asset), threshold, Keys.assetDetails(asset)) + } - for ((addressId, script) <- accountScripts) { - expiredKeys ++= updateHistory(rw, Keys.addressScriptHistory(addressId), threshold, Keys.addressScript(addressId)) - if (script.isDefined) rw.put(Keys.addressScript(addressId)(height), script) - } + for ((id, details) <- snapshot.leaseStates) { + rw.put(Keys.leaseDetails(id)(height), Some(Right(details))) + expiredKeys ++= updateHistory(rw, Keys.leaseDetailsHistory(id), threshold, Keys.leaseDetails(id)) + } - for ((asset, script) <- snapshot.assetScripts) { - expiredKeys ++= updateHistory(rw, Keys.assetScriptHistory(asset), threshold, Keys.assetScript(asset)) - rw.put(Keys.assetScript(asset)(height), Some(script)) - } + for ((addressId, script) <- accountScripts) { + expiredKeys ++= updateHistory(rw, Keys.addressScriptHistory(addressId), threshold, Keys.addressScript(addressId)) + if (script.isDefined) rw.put(Keys.addressScript(addressId)(height), script) + } - if (height % BlockStep == 1) { - if ((height / BlockStep) % 2 == 0) { - bf0 = mkFilter() - } else { - bf1 = mkFilter() + for ((asset, script) <- snapshot.assetScripts) { + expiredKeys ++= updateHistory(rw, Keys.assetScriptHistory(asset), threshold, Keys.assetScript(asset)) + rw.put(Keys.assetScript(asset)(height), Some(script)) } - } - val targetBf = if ((height / BlockStep) % 2 == 0) bf0 else bf1 - - val transactionsWithSize = - snapshot.transactions.zipWithIndex.map { case ((id, txInfo), i) => - val tx = txInfo.transaction - val num = TxNum(i.toShort) - val meta = TxMeta(Height @@ blockMeta.height, txInfo.status, txInfo.spentComplexity) - val txId = TransactionId(id) - - val size = rw.put(Keys.transactionAt(Height(height), num, rdb.txHandle), Some((meta, tx))) - rw.put(Keys.transactionStateSnapshotAt(Height(height), num, rdb.txSnapshotHandle), Some(txInfo.snapshot.toProtobuf(txInfo.status))) - rw.put(Keys.transactionMetaById(txId, rdb.txMetaHandle), Some(TransactionMeta(height, num, tx.tpe.id, meta.status.protobuf, 0, size))) - targetBf.put(id.arr) - - txId -> (num, tx, size) - }.toMap - - if (dbSettings.storeTransactionsByAddress) { - val addressTxs = addressTransactions.asScala.toSeq.map { case (aid, txIds) => - (aid, txIds, Keys.addressTransactionSeqNr(aid)) + + if (height % BlockStep == 1) { + if ((height / BlockStep) % 2 == 0) { + bf0 = mkFilter() + } else { + bf1 = mkFilter() + } } - rw.multiGetInts(addressTxs.map(_._3)) - .zip(addressTxs) - .foreach { case (prevSeqNr, (addressId, txIds, txSeqNrKey)) => - val nextSeqNr = prevSeqNr.getOrElse(0) + 1 - val txTypeNumSeq = txIds.asScala.map { txId => - val (num, tx, size) = transactionsWithSize(txId) - (tx.tpe.id.toByte, num, size) - }.toSeq - rw.put(Keys.addressTransactionHN(addressId, nextSeqNr), Some((Height(height), txTypeNumSeq.sortBy(-_._2)))) - rw.put(txSeqNrKey, nextSeqNr) + val targetBf = if ((height / BlockStep) % 2 == 0) bf0 else bf1 + + val transactionsWithSize = + snapshot.transactions.zipWithIndex.map { case ((id, txInfo), i) => + val tx = txInfo.transaction + val num = TxNum(i.toShort) + val meta = TxMeta(Height @@ blockMeta.height, txInfo.status, txInfo.spentComplexity) + val txId = TransactionId(id) + + val size = rw.put(Keys.transactionAt(Height(height), num, rdb.txHandle), Some((meta, tx))) + rw.put(Keys.transactionStateSnapshotAt(Height(height), num, rdb.txSnapshotHandle), Some(txInfo.snapshot.toProtobuf(txInfo.status))) + rw.put(Keys.transactionMetaById(txId, rdb.txMetaHandle), Some(TransactionMeta(height, num, tx.tpe.id, meta.status.protobuf, 0, size))) + targetBf.put(id.arr) + + txId -> (num, tx, size) + }.toMap + + if (dbSettings.storeTransactionsByAddress) { + val addressTxs = addressTransactions.asScala.toSeq.map { case (aid, txIds) => + (aid, txIds, Keys.addressTransactionSeqNr(aid)) } - } + rw.multiGetInts(addressTxs.map(_._3)) + .zip(addressTxs) + .foreach { case (prevSeqNr, (addressId, txIds, txSeqNrKey)) => + val nextSeqNr = prevSeqNr.getOrElse(0) + 1 + val txTypeNumSeq = txIds.asScala.map { txId => + val (num, tx, size) = transactionsWithSize(txId) + (tx.tpe.id.toByte, num, size) + }.toSeq + rw.put(Keys.addressTransactionHN(addressId, nextSeqNr), Some((Height(height), txTypeNumSeq.sortBy(-_._2)))) + rw.put(txSeqNrKey, nextSeqNr) + } + } - for ((alias, address) <- snapshot.aliases) { - val key = Keys.addressIdOfAlias(alias) - val value = addressIdWithFallback(address, newAddresses) - rw.put(key, Some(value)) - } + for ((alias, address) <- snapshot.aliases) { + val key = Keys.addressIdOfAlias(alias) + val value = addressIdWithFallback(address, newAddresses) + rw.put(key, Some(value)) + } - for ((assetId, sponsorship) <- snapshot.sponsorships) { - rw.put(Keys.sponsorship(assetId)(height), sponsorship) - expiredKeys ++= updateHistory(rw, Keys.sponsorshipHistory(assetId), threshold, Keys.sponsorship(assetId)) - } + for ((assetId, sponsorship) <- snapshot.sponsorships) { + rw.put(Keys.sponsorship(assetId)(height), sponsorship) + expiredKeys ++= updateHistory(rw, Keys.sponsorshipHistory(assetId), threshold, Keys.sponsorship(assetId)) + } - val activationWindowSize = settings.functionalitySettings.activationWindowSize(height) - if (height % activationWindowSize == 0) { - val minVotes = settings.functionalitySettings.blocksForFeatureActivation(height) - val newlyApprovedFeatures = featureVotes(height) - .filterNot { case (featureId, _) => settings.functionalitySettings.preActivatedFeatures.contains(featureId) } - .collect { - case (featureId, voteCount) if voteCount + (if (blockMeta.getHeader.featureVotes.contains(featureId.toInt)) 1 else 0) >= minVotes => - featureId -> height - } + val activationWindowSize = settings.functionalitySettings.activationWindowSize(height) + if (height % activationWindowSize == 0) { + val minVotes = settings.functionalitySettings.blocksForFeatureActivation(height) + val newlyApprovedFeatures = featureVotes(height) + .filterNot { case (featureId, _) => settings.functionalitySettings.preActivatedFeatures.contains(featureId) } + .collect { + case (featureId, voteCount) if voteCount + (if (blockMeta.getHeader.featureVotes.contains(featureId.toInt)) 1 else 0) >= minVotes => + featureId -> height + } - if (newlyApprovedFeatures.nonEmpty) { - approvedFeaturesCache = newlyApprovedFeatures ++ rw.get(Keys.approvedFeatures) - rw.put(Keys.approvedFeatures, approvedFeaturesCache) + if (newlyApprovedFeatures.nonEmpty) { + approvedFeaturesCache = newlyApprovedFeatures ++ rw.get(Keys.approvedFeatures) + rw.put(Keys.approvedFeatures, approvedFeaturesCache) - val featuresToSave = (newlyApprovedFeatures.view.mapValues(_ + activationWindowSize) ++ rw.get(Keys.activatedFeatures)).toMap + val featuresToSave = (newlyApprovedFeatures.view.mapValues(_ + activationWindowSize) ++ rw.get(Keys.activatedFeatures)).toMap - activatedFeaturesCache = featuresToSave ++ settings.functionalitySettings.preActivatedFeatures - rw.put(Keys.activatedFeatures, featuresToSave) + activatedFeaturesCache = featuresToSave ++ settings.functionalitySettings.preActivatedFeatures + rw.put(Keys.activatedFeatures, featuresToSave) + } } - } - rw.put(Keys.issuedAssets(height), snapshot.assetStatics.keySet.toSeq) - rw.put(Keys.updatedAssets(height), updatedAssetSet.toSeq) - rw.put(Keys.sponsorshipAssets(height), snapshot.sponsorships.keySet.toSeq) - - rw.put(Keys.carryFee(height), carry) - expiredKeys += Keys.carryFee(threshold - 1).keyBytes - - rw.put(Keys.blockStateHash(height), computedBlockStateHash) - expiredKeys += Keys.blockStateHash(threshold - 1).keyBytes - - if (dbSettings.storeInvokeScriptResults) snapshot.scriptResults.foreach { case (txId, result) => - val (txHeight, txNum) = transactionsWithSize - .get(TransactionId @@ txId) - .map { case (txNum, _, _) => (height, txNum) } - .orElse(rw.get(Keys.transactionMetaById(TransactionId @@ txId, rdb.txMetaHandle)).map { tm => - (tm.height, TxNum(tm.num.toShort)) - }) - .getOrElse(throw new IllegalArgumentException(s"Couldn't find transaction height and num: $txId")) - - try rw.put(Keys.invokeScriptResult(txHeight, txNum), Some(result)) - catch { - case NonFatal(e) => - throw new RuntimeException(s"Error storing invoke script result for $txId: $result", e) + rw.put(Keys.issuedAssets(height), snapshot.assetStatics.keySet.toSeq) + rw.put(Keys.updatedAssets(height), updatedAssetSet.toSeq) + rw.put(Keys.sponsorshipAssets(height), snapshot.sponsorships.keySet.toSeq) + + rw.put(Keys.carryFee(height), carry) + expiredKeys += Keys.carryFee(threshold - 1).keyBytes + + rw.put(Keys.blockStateHash(height), computedBlockStateHash) + expiredKeys += Keys.blockStateHash(threshold - 1).keyBytes + + if (dbSettings.storeInvokeScriptResults) snapshot.scriptResults.foreach { case (txId, result) => + val (txHeight, txNum) = transactionsWithSize + .get(TransactionId @@ txId) + .map { case (txNum, _, _) => (height, txNum) } + .orElse(rw.get(Keys.transactionMetaById(TransactionId @@ txId, rdb.txMetaHandle)).map { tm => + (tm.height, TxNum(tm.num.toShort)) + }) + .getOrElse(throw new IllegalArgumentException(s"Couldn't find transaction height and num: $txId")) + + try rw.put(Keys.invokeScriptResult(txHeight, txNum), Some(result)) + catch { + case NonFatal(e) => + throw new RuntimeException(s"Error storing invoke script result for $txId: $result", e) + } } - } - for ((txId, pbMeta) <- snapshot.ethereumTransactionMeta) { - val txNum = transactionsWithSize(TransactionId @@ txId)._1 - val key = Keys.ethereumTransactionMeta(Height(height), txNum) - rw.put(key, Some(pbMeta)) - } + for ((txId, pbMeta) <- snapshot.ethereumTransactionMeta) { + val txNum = transactionsWithSize(TransactionId @@ txId)._1 + val key = Keys.ethereumTransactionMeta(Height(height), txNum) + rw.put(key, Some(pbMeta)) + } - expiredKeys.foreach(rw.delete) + expiredKeys.foreach(rw.delete) - if (DisableHijackedAliases.height == height) { - disabledAliases = DisableHijackedAliases(rw) - } + if (DisableHijackedAliases.height == height) { + disabledAliases = DisableHijackedAliases(rw) + } - if (dbSettings.storeStateHashes) { - val prevStateHash = - if (height == 1) ByteStr.empty - else - rw.get(Keys.stateHash(height - 1)) - .fold( - throw new IllegalStateException( - s"Couldn't load state hash for ${height - 1}. Please rebuild the state or disable db.store-state-hashes" - ) - )(_.totalHash) - - val newStateHash = stateHash.createStateHash(prevStateHash) - rw.put(Keys.stateHash(height), Some(newStateHash)) + if (dbSettings.storeStateHashes) { + val prevStateHash = + if (height == 1) ByteStr.empty + else + rw.get(Keys.stateHash(height - 1)) + .fold( + throw new IllegalStateException( + s"Couldn't load state hash for ${height - 1}. Please rebuild the state or disable db.store-state-hashes" + ) + )(_.totalHash) + + val newStateHash = stateHash.createStateHash(prevStateHash) + rw.put(Keys.stateHash(height), Some(newStateHash)) + } } } log.trace(s"Finished persisting block ${blockMeta.id} at height $height") } - private def deleteOldEntries(height: Height, rw: RW): Unit = { - val changedAddressesKey = Keys.changedAddresses(height) + private def deleteOldEntries(height: Height, readOptions: ReadOptions): Unit = writableDB.withWriteOptions { wo => + writableDB.readWriteWithOptions(readOptions, wo.setLowPri(true)) { rw => + val changedAddressesKey = Keys.changedAddresses(height) - val wavesAddressIds = new ArrayBuffer[AddressId]() - val wavesBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]]() + val wavesAddressIds = new ArrayBuffer[AddressId]() + val wavesBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]]() - rw.get(changedAddressesKey).foreach { addressId => - // WAVES balances - wavesAddressIds.addOne(addressId) - wavesBalanceAtKeys.addOne(Keys.wavesBalanceAt(addressId, height)) + rw.get(changedAddressesKey).foreach { addressId => + // WAVES balances + wavesAddressIds.addOne(addressId) + wavesBalanceAtKeys.addOne(Keys.wavesBalanceAt(addressId, height)) - // Account data - val changedDataKeysAtKey = Keys.changedDataKeys(height, addressId) - rw.get(changedDataKeysAtKey).foreach { accountDataKey => - val dataKeyAtKey = Keys.dataAt(addressId, accountDataKey)(height) - val dataKeyAt = rw.get(dataKeyAtKey) + // Account data + val changedDataKeysAtKey = Keys.changedDataKeys(height, addressId) + rw.get(changedDataKeysAtKey).foreach { accountDataKey => + val dataKeyAtKey = Keys.dataAt(addressId, accountDataKey)(height) + val dataKeyAt = rw.get(dataKeyAtKey) - rw.delete(Keys.dataAt(addressId, accountDataKey)(dataKeyAt.prevHeight)) - } - rw.delete(changedDataKeysAtKey) - } - rw.delete(changedAddressesKey) - - wavesAddressIds.view - .zip(rw.multiGet(wavesBalanceAtKeys, BalanceNode.SizeInBytes)) - .foreach { - // DB won't complain about a non-existed key with height = 0 - case (addressId, Some(wavesBalanceAt)) => rw.delete(Keys.wavesBalanceAt(addressId, wavesBalanceAt.prevHeight)) - case _ => + rw.delete(Keys.dataAt(addressId, accountDataKey)(dataKeyAt.prevHeight)) + } + rw.delete(changedDataKeysAtKey) } + rw.delete(changedAddressesKey) + + wavesAddressIds.view + .zip(rw.multiGet(wavesBalanceAtKeys, BalanceNode.SizeInBytes)) + .foreach { + // DB won't complain about a non-existed key with height = 0 + case (addressId, Some(wavesBalanceAt)) => rw.delete(Keys.wavesBalanceAt(addressId, wavesBalanceAt.prevHeight)) + case _ => + } - // Asset balances - val addressIdAndAssets = new ArrayBuffer[(AddressId, IssuedAsset)]() - val assetBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]] + // Asset balances + val addressIdAndAssets = new ArrayBuffer[(AddressId, IssuedAsset)]() + val assetBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]] - rw.iterateOver(KeyTags.ChangedAssetBalances.prefixBytes ++ KeyHelpers.h(height)) { e => - val asset = IssuedAsset(ByteStr(e.getKey.takeRight(AssetIdLength))) - val changedBalancesKey = Keys.changedBalances(height, asset) - rw.get(changedBalancesKey).foreach { addressId => - addressIdAndAssets.addOne((addressId, asset)) - assetBalanceAtKeys.addOne(Keys.assetBalanceAt(addressId, asset, height)) + rw.iterateOver(KeyTags.ChangedAssetBalances.prefixBytes ++ KeyHelpers.h(height)) { e => + val asset = IssuedAsset(ByteStr(e.getKey.takeRight(AssetIdLength))) + val changedBalancesKey = Keys.changedBalances(height, asset) + rw.get(changedBalancesKey).foreach { addressId => + addressIdAndAssets.addOne((addressId, asset)) + assetBalanceAtKeys.addOne(Keys.assetBalanceAt(addressId, asset, height)) + } + rw.delete(changedBalancesKey) } - rw.delete(changedBalancesKey) - } - addressIdAndAssets.view - .zip(rw.multiGet(assetBalanceAtKeys, BalanceNode.SizeInBytes)) - .foreach { - case ((addressId, asset), Some(assetBalanceAt)) => rw.delete(Keys.assetBalanceAt(addressId, asset, assetBalanceAt.prevHeight)) - case _ => - } + addressIdAndAssets.view + .zip(rw.multiGet(assetBalanceAtKeys, BalanceNode.SizeInBytes)) + .foreach { + case ((addressId, asset), Some(assetBalanceAt)) => rw.delete(Keys.assetBalanceAt(addressId, asset, assetBalanceAt.prevHeight)) + case _ => + } + } } override protected def doRollback(targetHeight: Int): DiscardedBlocks = { @@ -700,7 +702,7 @@ class RocksDBWriter( val aliasesToInvalidate = Seq.newBuilder[Alias] val blockHeightsToInvalidate = Seq.newBuilder[ByteStr] - val discardedBlock = readWrite { rw => + val discardedBlock = writableDB.readWrite { rw => rw.put(Keys.height, Height(currentHeight - 1)) val discardedMeta = rw diff --git a/node/src/main/scala/com/wavesplatform/database/package.scala b/node/src/main/scala/com/wavesplatform/database/package.scala index 162326a995..8aa654a759 100644 --- a/node/src/main/scala/com/wavesplatform/database/package.scala +++ b/node/src/main/scala/com/wavesplatform/database/package.scala @@ -40,7 +40,6 @@ import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.collection.{View, mutable} import scala.jdk.CollectionConverters.* -import scala.util.Using //noinspection UnstableApiUsage package object database { @@ -372,31 +371,42 @@ package object database { implicit class DBExt(val db: RocksDB) extends AnyVal { - def readOnly[A](f: ReadOnlyDB => A): A = { - Using.resource(db.getSnapshot) { s => - Using.resource(new ReadOptions().setSnapshot(s).setVerifyChecksums(false)) { ro => - f(new ReadOnlyDB(db, ro)) - } - }(db.releaseSnapshot(_)) - } + def readOnly[A](f: ReadOnlyDB => A): A = withReadOptions { ro => f(new ReadOnlyDB(db, ro)) } /** @note * Runs operations in batch, so keep in mind, that previous changes don't appear lately in f */ - def readWrite[A](f: RW => A): A = { - val snapshot = db.getSnapshot - val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) - val batch = new WriteBatch() - val rw = new RW(db, readOptions, batch) - val writeOptions = new WriteOptions() + def readWrite[A](f: RW => A): A = withOptions { (ro, wo) => readWriteWithOptions(ro, wo)(f) } + + def readWriteWithOptions[A](readOptions: ReadOptions, writeOptions: WriteOptions)(f: RW => A): A = { + val batch = new WriteBatch() + val rw = new RW(db, readOptions, batch) try { val r = f(rw) db.write(writeOptions, batch) r - } finally { - readOptions.close() - writeOptions.close() - batch.close() + } finally batch.close() + } + + def withOptions[A](f: (ReadOptions, WriteOptions) => A): A = + withReadOptions { ro => + withWriteOptions { wo => + f(ro, wo) + } + } + + def withWriteOptions[A](f: WriteOptions => A): A = { + val wo = new WriteOptions() + try f(wo) + finally wo.close() + } + + def withReadOptions[A](f: ReadOptions => A): A = { + val snapshot = db.getSnapshot + val ro = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) + try f(ro) + finally { + ro.close() db.releaseSnapshot(snapshot) } }