From 41e576529973378e7913ba6baae869850da7be97 Mon Sep 17 00:00:00 2001 From: Vyatcheslav Suharnikov Date: Wed, 8 Nov 2023 07:04:29 +0400 Subject: [PATCH] Implementation with multiGet --- .../com/wavesplatform/database/Keys.scala | 9 ++--- .../wavesplatform/database/ReadOnlyDB.scala | 8 ++--- .../database/RocksDBWriter.scala | 35 +++++++++++++------ .../com/wavesplatform/database/package.scala | 28 ++++++--------- 4 files changed, 41 insertions(+), 39 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/database/Keys.scala b/node/src/main/scala/com/wavesplatform/database/Keys.scala index ef26a8b36a..204c4c5f1c 100644 --- a/node/src/main/scala/com/wavesplatform/database/Keys.scala +++ b/node/src/main/scala/com/wavesplatform/database/Keys.scala @@ -21,6 +21,7 @@ object CurrentBalance { case class BalanceNode(balance: Long, prevHeight: Height) object BalanceNode { val Empty: BalanceNode = BalanceNode(0, Height(0)) + val SizeInBytes: Int = 12 } case class CurrentVolumeAndFee(volume: Long, fee: Long, height: Height, prevHeight: Height) @@ -55,13 +56,7 @@ object DataNode { object Keys { import KeyHelpers.* - import KeyTags.{ - AddressId as AddressIdTag, - EthereumTransactionMeta as EthereumTransactionMetaTag, - InvokeScriptResult as InvokeScriptResultTag, - LeaseDetails as LeaseDetailsTag, - * - } + import KeyTags.{AddressId as AddressIdTag, EthereumTransactionMeta as EthereumTransactionMetaTag, InvokeScriptResult as InvokeScriptResultTag, LeaseDetails as LeaseDetailsTag, *} val version: Key[Int] = intKey(Version, default = 1) val height: Key[Height] = diff --git a/node/src/main/scala/com/wavesplatform/database/ReadOnlyDB.scala b/node/src/main/scala/com/wavesplatform/database/ReadOnlyDB.scala index 3c152b0bbb..b75541116c 100644 --- a/node/src/main/scala/com/wavesplatform/database/ReadOnlyDB.scala +++ b/node/src/main/scala/com/wavesplatform/database/ReadOnlyDB.scala @@ -15,16 +15,16 @@ class ReadOnlyDB(db: RocksDB, readOptions: ReadOptions) { key.parse(bytes) } - def multiGetOpt[V](keys: Seq[Key[Option[V]]], valBufferSize: Int): Seq[Option[V]] = + def multiGetOpt[V](keys: collection.Seq[Key[Option[V]]], valBufferSize: Int): Seq[Option[V]] = db.multiGetOpt(readOptions, keys, valBufferSize) - def multiGet[V](keys: Seq[Key[V]], valBufferSize: Int): Seq[Option[V]] = + def multiGet[V](keys: collection.Seq[Key[V]], valBufferSize: Int): Seq[Option[V]] = db.multiGet(readOptions, keys, valBufferSize) - def multiGetOpt[V](keys: Seq[Key[Option[V]]], valBufSizes: Seq[Int]): Seq[Option[V]] = + def multiGetOpt[V](keys: collection.Seq[Key[Option[V]]], valBufSizes: Seq[Int]): Seq[Option[V]] = db.multiGetOpt(readOptions, keys, valBufSizes) - def multiGetInts(keys: Seq[Key[Int]]): Seq[Option[Int]] = + def multiGetInts(keys: collection.Seq[Key[Int]]): Seq[Option[Int]] = db.multiGetInts(readOptions, keys) def has[V](key: Key[V]): Boolean = { diff --git a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala index fa4b9d4881..c8435c2f6d 100644 --- a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala +++ b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala @@ -633,13 +633,13 @@ class RocksDBWriter( private def deleteOldEntries(height: Height, rw: RW): Unit = { val changedAddressesKey = Keys.changedAddresses(height) + val wavesAddressIds = new ArrayBuffer[AddressId]() + val wavesBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]]() + rw.get(changedAddressesKey).foreach { addressId => // WAVES balances - val wavesBalanceAtKey = Keys.wavesBalanceAt(addressId, height) - val wavesBalanceAt = rw.get(wavesBalanceAtKey) - - // DB won't complain about a non-existed key with height = 0 - rw.delete(Keys.wavesBalanceAt(addressId, wavesBalanceAt.prevHeight)) + wavesAddressIds.addOne(addressId) + wavesBalanceAtKeys.addOne(Keys.wavesBalanceAt(addressId, height)) // Account data val changedDataKeysAtKey = Keys.changedDataKeys(height, addressId) @@ -651,21 +651,36 @@ class RocksDBWriter( } rw.delete(changedDataKeysAtKey) } - rw.delete(changedAddressesKey) + wavesAddressIds.view + .zip(rw.multiGet(wavesBalanceAtKeys, BalanceNode.SizeInBytes)) + .foreach { + // DB won't complain about a non-existed key with height = 0 + case (addressId, Some(wavesBalanceAt)) => rw.delete(Keys.wavesBalanceAt(addressId, wavesBalanceAt.prevHeight)) + case _ => + } + // Asset balances + val addressIdAndAssets = new ArrayBuffer[(AddressId, IssuedAsset)]() + val assetBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]] + rw.iterateOver(KeyTags.ChangedAssetBalances.prefixBytes ++ KeyHelpers.h(height)) { e => val asset = IssuedAsset(ByteStr(e.getKey.takeRight(AssetIdLength))) val changedBalancesKey = Keys.changedBalances(height, asset) rw.get(changedBalancesKey).foreach { addressId => - val assetBalanceAtKey = Keys.assetBalanceAt(addressId, asset, height) - val assetBalanceAt = rw.get(assetBalanceAtKey) - - rw.delete(Keys.assetBalanceAt(addressId, asset, assetBalanceAt.prevHeight)) + addressIdAndAssets.addOne((addressId, asset)) + assetBalanceAtKeys.addOne(Keys.assetBalanceAt(addressId, asset, height)) } rw.delete(changedBalancesKey) } + + addressIdAndAssets.view + .zip(rw.multiGet(assetBalanceAtKeys, BalanceNode.SizeInBytes)) + .foreach { + case ((addressId, asset), Some(assetBalanceAt)) => rw.delete(Keys.assetBalanceAt(addressId, asset, assetBalanceAt.prevHeight)) + case _ => + } } override protected def doRollback(targetHeight: Int): DiscardedBlocks = { diff --git a/node/src/main/scala/com/wavesplatform/database/package.scala b/node/src/main/scala/com/wavesplatform/database/package.scala index 9ca8e410bc..162326a995 100644 --- a/node/src/main/scala/com/wavesplatform/database/package.scala +++ b/node/src/main/scala/com/wavesplatform/database/package.scala @@ -1,8 +1,5 @@ package com.wavesplatform -import java.nio.ByteBuffer -import java.util -import java.util.Map as JMap import com.google.common.base.Charsets.UTF_8 import com.google.common.collect.{Interners, Maps} import com.google.common.io.ByteStreams.{newDataInput, newDataOutput} @@ -28,15 +25,7 @@ import com.wavesplatform.state.StateHash.SectionId import com.wavesplatform.state.reader.LeaseDetails import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.transaction.lease.LeaseTransaction -import com.wavesplatform.transaction.{ - EthereumTransaction, - GenesisTransaction, - PBSince, - PaymentTransaction, - Transaction, - TransactionParsers, - TxValidationError -} +import com.wavesplatform.transaction.{EthereumTransaction, GenesisTransaction, PBSince, PaymentTransaction, Transaction, TransactionParsers, TxValidationError} import com.wavesplatform.utils.* import monix.eval.Task import monix.reactive.Observable @@ -44,6 +33,9 @@ import org.rocksdb.* import sun.nio.ch.Util import supertagged.TaggedType +import java.nio.ByteBuffer +import java.util +import java.util.Map as JMap import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.collection.{View, mutable} @@ -371,7 +363,7 @@ package object database { def writeCurrentBalance(balance: CurrentBalance): Array[Byte] = Longs.toByteArray(balance.balance) ++ Ints.toByteArray(balance.height) ++ Ints.toByteArray(balance.prevHeight) - def readBalanceNode(bs: Array[Byte]): BalanceNode = if (bs != null && bs.length == 12) + def readBalanceNode(bs: Array[Byte]): BalanceNode = if (bs != null && bs.length == BalanceNode.SizeInBytes) BalanceNode(Longs.fromByteArray(bs.take(8)), Height(Ints.fromByteArray(bs.takeRight(4)))) else BalanceNode.Empty @@ -409,10 +401,10 @@ package object database { } } - def multiGetOpt[A](readOptions: ReadOptions, keys: Seq[Key[Option[A]]], valBufSize: Int): Seq[Option[A]] = + def multiGetOpt[A](readOptions: ReadOptions, keys: collection.Seq[Key[Option[A]]], valBufSize: Int): Seq[Option[A]] = multiGetOpt(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(keys.size, valBufSize)) - def multiGetOpt[A](readOptions: ReadOptions, keys: Seq[Key[Option[A]]], valBufSizes: Seq[Int]): Seq[Option[A]] = + def multiGetOpt[A](readOptions: ReadOptions, keys: collection.Seq[Key[Option[A]]], valBufSizes: Seq[Int]): Seq[Option[A]] = multiGetOpt(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(valBufSizes)) def multiGet[A](readOptions: ReadOptions, keys: ArrayBuffer[Key[A]], valBufSizes: ArrayBuffer[Int]): View[A] = @@ -421,7 +413,7 @@ package object database { def multiGet[A](readOptions: ReadOptions, keys: ArrayBuffer[Key[A]], valBufSize: Int): View[A] = multiGet(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(keys.size, valBufSize)) - def multiGet[A](readOptions: ReadOptions, keys: Seq[Key[A]], valBufSize: Int): Seq[Option[A]] = { + def multiGet[A](readOptions: ReadOptions, keys: collection.Seq[Key[A]], valBufSize: Int): Seq[Option[A]] = { val keyBufs = getKeyBuffersFromKeys(keys) val valBufs = getValueBuffers(keys.size, valBufSize) @@ -442,7 +434,7 @@ package object database { result } - def multiGetInts(readOptions: ReadOptions, keys: Seq[Key[Int]]): Seq[Option[Int]] = { + def multiGetInts(readOptions: ReadOptions, keys: collection.Seq[Key[Int]]): Seq[Option[Int]] = { val keyBytes = keys.map(_.keyBytes) val keyBufs = getKeyBuffers(keyBytes) val valBufs = getValueBuffers(keyBytes.size, 4) @@ -551,7 +543,7 @@ package object database { private def multiGetOpt[A]( readOptions: ReadOptions, - keys: Seq[Key[Option[A]]], + keys: collection.Seq[Key[Option[A]]], keyBufs: util.List[ByteBuffer], valBufs: util.List[ByteBuffer] ): Seq[Option[A]] = {