Skip to content

Commit

Permalink
Implementation with multiGet
Browse files Browse the repository at this point in the history
  • Loading branch information
vsuharnikov committed Nov 8, 2023
1 parent 08601d7 commit 41e5765
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
9 changes: 2 additions & 7 deletions node/src/main/scala/com/wavesplatform/database/Keys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object CurrentBalance {
case class BalanceNode(balance: Long, prevHeight: Height)
object BalanceNode {
val Empty: BalanceNode = BalanceNode(0, Height(0))
val SizeInBytes: Int = 12
}

case class CurrentVolumeAndFee(volume: Long, fee: Long, height: Height, prevHeight: Height)
Expand Down Expand Up @@ -55,13 +56,7 @@ object DataNode {

object Keys {
import KeyHelpers.*
import KeyTags.{
AddressId as AddressIdTag,
EthereumTransactionMeta as EthereumTransactionMetaTag,
InvokeScriptResult as InvokeScriptResultTag,
LeaseDetails as LeaseDetailsTag,
*
}
import KeyTags.{AddressId as AddressIdTag, EthereumTransactionMeta as EthereumTransactionMetaTag, InvokeScriptResult as InvokeScriptResultTag, LeaseDetails as LeaseDetailsTag, *}

val version: Key[Int] = intKey(Version, default = 1)
val height: Key[Height] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ class ReadOnlyDB(db: RocksDB, readOptions: ReadOptions) {
key.parse(bytes)
}

def multiGetOpt[V](keys: Seq[Key[Option[V]]], valBufferSize: Int): Seq[Option[V]] =
def multiGetOpt[V](keys: collection.Seq[Key[Option[V]]], valBufferSize: Int): Seq[Option[V]] =
db.multiGetOpt(readOptions, keys, valBufferSize)

def multiGet[V](keys: Seq[Key[V]], valBufferSize: Int): Seq[Option[V]] =
def multiGet[V](keys: collection.Seq[Key[V]], valBufferSize: Int): Seq[Option[V]] =
db.multiGet(readOptions, keys, valBufferSize)

def multiGetOpt[V](keys: Seq[Key[Option[V]]], valBufSizes: Seq[Int]): Seq[Option[V]] =
def multiGetOpt[V](keys: collection.Seq[Key[Option[V]]], valBufSizes: Seq[Int]): Seq[Option[V]] =
db.multiGetOpt(readOptions, keys, valBufSizes)

def multiGetInts(keys: Seq[Key[Int]]): Seq[Option[Int]] =
def multiGetInts(keys: collection.Seq[Key[Int]]): Seq[Option[Int]] =
db.multiGetInts(readOptions, keys)

def has[V](key: Key[V]): Boolean = {
Expand Down
35 changes: 25 additions & 10 deletions node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -633,13 +633,13 @@ class RocksDBWriter(
private def deleteOldEntries(height: Height, rw: RW): Unit = {
val changedAddressesKey = Keys.changedAddresses(height)

val wavesAddressIds = new ArrayBuffer[AddressId]()
val wavesBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]]()

rw.get(changedAddressesKey).foreach { addressId =>
// WAVES balances
val wavesBalanceAtKey = Keys.wavesBalanceAt(addressId, height)
val wavesBalanceAt = rw.get(wavesBalanceAtKey)

// DB won't complain about a non-existed key with height = 0
rw.delete(Keys.wavesBalanceAt(addressId, wavesBalanceAt.prevHeight))
wavesAddressIds.addOne(addressId)
wavesBalanceAtKeys.addOne(Keys.wavesBalanceAt(addressId, height))

// Account data
val changedDataKeysAtKey = Keys.changedDataKeys(height, addressId)
Expand All @@ -651,21 +651,36 @@ class RocksDBWriter(
}
rw.delete(changedDataKeysAtKey)
}

rw.delete(changedAddressesKey)

wavesAddressIds.view
.zip(rw.multiGet(wavesBalanceAtKeys, BalanceNode.SizeInBytes))
.foreach {
// DB won't complain about a non-existed key with height = 0
case (addressId, Some(wavesBalanceAt)) => rw.delete(Keys.wavesBalanceAt(addressId, wavesBalanceAt.prevHeight))
case _ =>
}

// Asset balances
val addressIdAndAssets = new ArrayBuffer[(AddressId, IssuedAsset)]()
val assetBalanceAtKeys = new ArrayBuffer[Key[BalanceNode]]

rw.iterateOver(KeyTags.ChangedAssetBalances.prefixBytes ++ KeyHelpers.h(height)) { e =>
val asset = IssuedAsset(ByteStr(e.getKey.takeRight(AssetIdLength)))
val changedBalancesKey = Keys.changedBalances(height, asset)
rw.get(changedBalancesKey).foreach { addressId =>
val assetBalanceAtKey = Keys.assetBalanceAt(addressId, asset, height)
val assetBalanceAt = rw.get(assetBalanceAtKey)

rw.delete(Keys.assetBalanceAt(addressId, asset, assetBalanceAt.prevHeight))
addressIdAndAssets.addOne((addressId, asset))
assetBalanceAtKeys.addOne(Keys.assetBalanceAt(addressId, asset, height))
}
rw.delete(changedBalancesKey)
}

addressIdAndAssets.view
.zip(rw.multiGet(assetBalanceAtKeys, BalanceNode.SizeInBytes))
.foreach {
case ((addressId, asset), Some(assetBalanceAt)) => rw.delete(Keys.assetBalanceAt(addressId, asset, assetBalanceAt.prevHeight))
case _ =>
}
}

override protected def doRollback(targetHeight: Int): DiscardedBlocks = {
Expand Down
28 changes: 10 additions & 18 deletions node/src/main/scala/com/wavesplatform/database/package.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wavesplatform

import java.nio.ByteBuffer
import java.util
import java.util.Map as JMap
import com.google.common.base.Charsets.UTF_8
import com.google.common.collect.{Interners, Maps}
import com.google.common.io.ByteStreams.{newDataInput, newDataOutput}
Expand All @@ -28,22 +25,17 @@ import com.wavesplatform.state.StateHash.SectionId
import com.wavesplatform.state.reader.LeaseDetails
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.transaction.lease.LeaseTransaction
import com.wavesplatform.transaction.{
EthereumTransaction,
GenesisTransaction,
PBSince,
PaymentTransaction,
Transaction,
TransactionParsers,
TxValidationError
}
import com.wavesplatform.transaction.{EthereumTransaction, GenesisTransaction, PBSince, PaymentTransaction, Transaction, TransactionParsers, TxValidationError}
import com.wavesplatform.utils.*
import monix.eval.Task
import monix.reactive.Observable
import org.rocksdb.*
import sun.nio.ch.Util
import supertagged.TaggedType

import java.nio.ByteBuffer
import java.util
import java.util.Map as JMap
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.collection.{View, mutable}
Expand Down Expand Up @@ -371,7 +363,7 @@ package object database {
def writeCurrentBalance(balance: CurrentBalance): Array[Byte] =
Longs.toByteArray(balance.balance) ++ Ints.toByteArray(balance.height) ++ Ints.toByteArray(balance.prevHeight)

def readBalanceNode(bs: Array[Byte]): BalanceNode = if (bs != null && bs.length == 12)
def readBalanceNode(bs: Array[Byte]): BalanceNode = if (bs != null && bs.length == BalanceNode.SizeInBytes)
BalanceNode(Longs.fromByteArray(bs.take(8)), Height(Ints.fromByteArray(bs.takeRight(4))))
else BalanceNode.Empty

Expand Down Expand Up @@ -409,10 +401,10 @@ package object database {
}
}

def multiGetOpt[A](readOptions: ReadOptions, keys: Seq[Key[Option[A]]], valBufSize: Int): Seq[Option[A]] =
def multiGetOpt[A](readOptions: ReadOptions, keys: collection.Seq[Key[Option[A]]], valBufSize: Int): Seq[Option[A]] =
multiGetOpt(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(keys.size, valBufSize))

def multiGetOpt[A](readOptions: ReadOptions, keys: Seq[Key[Option[A]]], valBufSizes: Seq[Int]): Seq[Option[A]] =
def multiGetOpt[A](readOptions: ReadOptions, keys: collection.Seq[Key[Option[A]]], valBufSizes: Seq[Int]): Seq[Option[A]] =
multiGetOpt(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(valBufSizes))

def multiGet[A](readOptions: ReadOptions, keys: ArrayBuffer[Key[A]], valBufSizes: ArrayBuffer[Int]): View[A] =
Expand All @@ -421,7 +413,7 @@ package object database {
def multiGet[A](readOptions: ReadOptions, keys: ArrayBuffer[Key[A]], valBufSize: Int): View[A] =
multiGet(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(keys.size, valBufSize))

def multiGet[A](readOptions: ReadOptions, keys: Seq[Key[A]], valBufSize: Int): Seq[Option[A]] = {
def multiGet[A](readOptions: ReadOptions, keys: collection.Seq[Key[A]], valBufSize: Int): Seq[Option[A]] = {
val keyBufs = getKeyBuffersFromKeys(keys)
val valBufs = getValueBuffers(keys.size, valBufSize)

Expand All @@ -442,7 +434,7 @@ package object database {
result
}

def multiGetInts(readOptions: ReadOptions, keys: Seq[Key[Int]]): Seq[Option[Int]] = {
def multiGetInts(readOptions: ReadOptions, keys: collection.Seq[Key[Int]]): Seq[Option[Int]] = {
val keyBytes = keys.map(_.keyBytes)
val keyBufs = getKeyBuffers(keyBytes)
val valBufs = getValueBuffers(keyBytes.size, 4)
Expand Down Expand Up @@ -551,7 +543,7 @@ package object database {

private def multiGetOpt[A](
readOptions: ReadOptions,
keys: Seq[Key[Option[A]]],
keys: collection.Seq[Key[Option[A]]],
keyBufs: util.List[ByteBuffer],
valBufs: util.List[ByteBuffer]
): Seq[Option[A]] = {
Expand Down

0 comments on commit 41e5765

Please sign in to comment.