Skip to content

Commit

Permalink
Added new test, fixed an issue with excess deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
vsuharnikov committed Nov 7, 2023
1 parent 483bbc6 commit 08601d7
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class RocksDBWriter(

if (previousSafeRollbackHeight < newSafeRollbackHeight) {
rw.put(Keys.safeRollbackHeight, newSafeRollbackHeight)
deleteOldRecords(Height(math.max(1, newSafeRollbackHeight - 1)), rw)
deleteOldEntries(Height(math.max(1, newSafeRollbackHeight)), rw)
}

rw.put(Keys.blockMetaAt(Height(height)), Some(blockMeta))
Expand Down Expand Up @@ -630,7 +630,7 @@ class RocksDBWriter(
log.trace(s"Finished persisting block ${blockMeta.id} at height $height")
}

private def deleteOldRecords(height: Height, rw: RW): Unit = {
private def deleteOldEntries(height: Height, rw: RW): Unit = {
val changedAddressesKey = Keys.changedAddresses(height)

rw.get(changedAddressesKey).foreach { addressId =>
Expand All @@ -640,7 +640,6 @@ class RocksDBWriter(

// DB won't complain about a non-existed key with height = 0
rw.delete(Keys.wavesBalanceAt(addressId, wavesBalanceAt.prevHeight))
rw.delete(wavesBalanceAtKey)

// Account data
val changedDataKeysAtKey = Keys.changedDataKeys(height, addressId)
Expand All @@ -649,7 +648,6 @@ class RocksDBWriter(
val dataKeyAt = rw.get(dataKeyAtKey)

rw.delete(Keys.dataAt(addressId, accountDataKey)(dataKeyAt.prevHeight))
rw.delete(dataKeyAtKey)
}
rw.delete(changedDataKeysAtKey)
}
Expand All @@ -665,7 +663,6 @@ class RocksDBWriter(
val assetBalanceAt = rw.get(assetBalanceAtKey)

rw.delete(Keys.assetBalanceAt(addressId, asset, assetBalanceAt.prevHeight))
rw.delete(assetBalanceAtKey)
}
rw.delete(changedBalancesKey)
}
Expand Down
184 changes: 146 additions & 38 deletions node/src/test/scala/com/wavesplatform/database/RocksDBWriterSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.wavesplatform.database

import com.google.common.primitives.{Ints, Longs, Shorts}
import com.wavesplatform.TestValues
import com.wavesplatform.account.KeyPair
import com.wavesplatform.account.{Address, KeyPair}
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.database.KeyTags.KeyTag
import com.wavesplatform.db.WithDomain
import com.wavesplatform.db.WithState.AddrWithBalance
import com.wavesplatform.features.BlockchainFeatures
Expand Down Expand Up @@ -155,8 +157,8 @@ class RocksDBWriterSpec extends FreeSpec with WithDomain {
d.blockchain.resolveAlias(createAlias.alias) shouldEqual Left(AliasDoesNotExist(createAlias.alias))
}

"deleteOldRecords" - {
val maxRollbackDepth = 3
"deleteOldEntries" - {
val maxRollbackDepth = 2
val cleanupTestsSettings = {
val s = DomainPresets.RideV6
s.copy(dbSettings = s.dbSettings.copy(maxRollbackDepth = maxRollbackDepth))
Expand All @@ -172,57 +174,163 @@ class RocksDBWriterSpec extends FreeSpec with WithDomain {
val carl = TxHelpers.signer(3)
val carlAddress = carl.toAddress

val issueTx = TxHelpers.issue(issuer = alice)
def transferWavesTx = TxHelpers.transfer(from = alice, to = bobAddress)
def transferAssetTx = TxHelpers.transfer(from = alice, to = carlAddress, asset = issueTx.asset, amount = 123)
val dataKey = "test"
val dataTxFee = TxPositiveAmount.unsafeFrom(TestValues.fee)

def expectedValues(d: Domain): Unit = {
val blocksSinceStart = d.blockchain.height - 2 // genesis and a block with issueTx

markup("WAVES balance")
d.balance(aliceAddress) shouldBe (aliceInitBalance - issueTx.fee.value - blocksSinceStart * List(
transferWavesTx.amount,
transferWavesTx.fee,
transferAssetTx.fee,
dataTxFee
).map(_.value).sum)
d.balance(bobAddress) shouldBe (blocksSinceStart * transferWavesTx.amount.value)
d.balance(carlAddress) shouldBe 0

markup("Asset balance")
d.balance(aliceAddress, issueTx.asset) shouldBe (issueTx.quantity.value - blocksSinceStart * transferAssetTx.amount.value)
d.balance(bobAddress, issueTx.asset) shouldBe 0
d.balance(carlAddress, issueTx.asset) shouldBe (blocksSinceStart * transferAssetTx.amount.value)

markup("Data")
if (blocksSinceStart > 0) d.blockchain.accountData(aliceAddress, dataKey).get.value shouldBe s"test-$blocksSinceStart"
else d.blockchain.accountData(aliceAddress, dataKey) shouldBe empty
}
val issueTx = TxHelpers.issue(issuer = alice)
def transferWavesTx = TxHelpers.transfer(from = alice, to = bobAddress)
def transferAssetTx = TxHelpers.transfer(from = alice, to = carlAddress, asset = issueTx.asset, amount = 123)
val dataKey = "test"
val dataTxFee = TxPositiveAmount.unsafeFrom(TestValues.fee)
val defaultAddresses = Seq(aliceAddress, bobAddress, carlAddress, TxHelpers.defaultSigner.toAddress)

"doesn't affect entries required rollback" in withDomain(
cleanupTestsSettings,
Seq(AddrWithBalance(aliceAddress, aliceInitBalance))
) { d =>
def checkExpectedValues(): Unit = {
val blocksSinceStart = d.blockchain.height - 2 // genesis and a block with issueTx

// WAVES
d.balance(aliceAddress) shouldBe (aliceInitBalance - issueTx.fee.value - blocksSinceStart * List(
transferWavesTx.amount,
transferWavesTx.fee,
transferAssetTx.fee,
dataTxFee
).map(_.value).sum)
d.balance(bobAddress) shouldBe (blocksSinceStart * transferWavesTx.amount.value)
d.balance(carlAddress) shouldBe 0

// Asset
d.balance(aliceAddress, issueTx.asset) shouldBe (issueTx.quantity.value - blocksSinceStart * transferAssetTx.amount.value)
d.balance(bobAddress, issueTx.asset) shouldBe 0
d.balance(carlAddress, issueTx.asset) shouldBe (blocksSinceStart * transferAssetTx.amount.value)

// Data
if (blocksSinceStart > 0) d.blockchain.accountData(aliceAddress, dataKey).get.value shouldBe s"test-$blocksSinceStart"
else d.blockchain.accountData(aliceAddress, dataKey) shouldBe empty
}

"doesn't affect current and past values" in withDomain(cleanupTestsSettings, Seq(AddrWithBalance(aliceAddress, aliceInitBalance))) { d =>
d.appendBlock(issueTx)
expectedValues(d)

(1 to maxRollbackDepth + 1).foreach { i =>
withClue(s"Append ${d.blockchain.height} block: ") {
markup("Appending")
(1 to maxRollbackDepth + 3).foreach { i =>
withClue(s"Append ${d.blockchain.height + 1} block: ") {
d.appendBlock(
transferWavesTx,
transferAssetTx,
TxHelpers.dataSingle(account = alice, key = dataKey, value = s"test-$i", fee = dataTxFee.value)
)
expectedValues(d)
checkExpectedValues()
}
}

(1 to maxRollbackDepth + 1).foreach { _ => // + 1 because of liquid block
markup("Rolling back")
(1 to maxRollbackDepth + 1).foreach { _ =>
val rollbackHeight = d.blockchain.height - 1
withClue(s"Rollback to $rollbackHeight:") {
d.rollbackTo(rollbackHeight)
expectedValues(d)
checkExpectedValues()
}
}

withClue("No data before current height: ") {
checkDataOnlySinceHeight(d, defaultAddresses, d.blockchain.height)
}
}

"deletes old entries" in withDomain(
cleanupTestsSettings,
Seq(AddrWithBalance(aliceAddress, aliceInitBalance))
) { d =>
d.appendBlock(issueTx)

markup("Appending transactions")
d.appendBlock(
transferWavesTx,
transferAssetTx,
TxHelpers.dataSingle(account = alice, key = dataKey, value = "test-1", fee = dataTxFee.value)
)

d.appendBlock(
transferWavesTx,
transferAssetTx,
TxHelpers.dataSingle(account = alice, key = dataKey, value = "test-2", fee = dataTxFee.value)
)

markup("Appending empty")
(1 to maxRollbackDepth + 1).foreach { _ =>
withClue(s"Append ${d.blockchain.height + 1} block: ") {
d.appendBlock()
}
}

markup("Appending transactions - 2")
d.appendBlock(
transferWavesTx,
transferAssetTx,
TxHelpers.dataSingle(account = alice, key = dataKey, value = "test-2", fee = dataTxFee.value)
)

markup("Appending empty - 2")
(1 to maxRollbackDepth + 1).foreach { _ =>
withClue(s"Append ${d.blockchain.height + 1} block: ") {
d.appendBlock()
}
}

withClue("No data before current height: ") {
checkDataOnlySinceHeight(d, defaultAddresses, d.blockchain.height - maxRollbackDepth - 1)
}
}
}

private def checkDataOnlySinceHeight(d: Domain, addresses: Seq[Address], sinceHeight: Int): Unit = {
val addressIds = addresses.map(getAddressId(d, _))
Seq(
KeyTags.ChangedAssetBalances,
KeyTags.WavesBalanceHistory,
KeyTags.AssetBalanceHistory,
KeyTags.ChangedDataKeys,
KeyTags.DataHistory,
KeyTags.ChangedAddresses
).foreach { keyTag =>
withClue(s"$keyTag:") {
d.rdb.db.iterateOver(keyTag) { e =>
val (affectedHeight, affectedAddressIds) = getHeightAndAddressIds(keyTag, e)
if (affectedAddressIds.exists(addressIds.contains)) {
withClue(s"$addresses: ") {
affectedHeight should be >= sinceHeight
}
}
}
}
}
}

private def getHeightAndAddressIds(tag: KeyTag, bytes: DBEntry): (Int, Seq[AddressId]) = {
val (heightBytes, addresses) = tag match {
case KeyTags.ChangedAddresses | KeyTags.ChangedAssetBalances =>
(
bytes.getKey.drop(Shorts.BYTES),
readAddressIds(bytes.getValue)
)

case KeyTags.WavesBalanceHistory | KeyTags.AssetBalanceHistory | KeyTags.ChangedDataKeys =>
(
bytes.getKey.takeRight(Ints.BYTES),
Seq(AddressId.fromByteArray(bytes.getKey.dropRight(Ints.BYTES).takeRight(Longs.BYTES)))
)

case KeyTags.DataHistory =>
(
bytes.getKey.takeRight(Ints.BYTES),
Seq(AddressId.fromByteArray(bytes.getKey.drop(Shorts.BYTES)))
)

case _ => throw new IllegalArgumentException(s"$tag")
}

(Ints.fromByteArray(heightBytes), addresses)
}

private def getAddressId(d: Domain, address: Address): AddressId =
d.rdb.db.get(Keys.addressId(address)).getOrElse(throw new RuntimeException(s"Can't find address id for $address"))
}

0 comments on commit 08601d7

Please sign in to comment.