Skip to content

Commit

Permalink
NODE-2596 Optimised active leases (#3884)
Browse files Browse the repository at this point in the history
  • Loading branch information
xrtm000 authored Nov 30, 2023
1 parent c590007 commit 4f0cbb2
Show file tree
Hide file tree
Showing 22 changed files with 324 additions and 159 deletions.
11 changes: 7 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ Global / onChangedBuildSource := ReloadOnSourceChanges

enablePlugins(GitVersioning)

git.uncommittedSignifier := Some("DIRTY")
git.useGitDescribe := true
git.uncommittedSignifier := Some("DIRTY")
git.useGitDescribe := true
ThisBuild / git.useGitDescribe := true
ThisBuild / PB.protocVersion := "3.24.4" // https://protobuf.dev/support/version-support/#java
ThisBuild / PB.protocVersion := "3.24.4" // https://protobuf.dev/support/version-support/#java

lazy val lang =
crossProject(JSPlatform, JVMPlatform)
Expand Down Expand Up @@ -50,7 +50,10 @@ lazy val `lang-testkit` = project
.dependsOn(`lang-jvm`)
.in(file("lang/testkit"))
.settings(
libraryDependencies ++= Dependencies.test.map(_.withConfigurations(Some("compile"))) ++ Dependencies.qaseReportDeps
libraryDependencies ++=
Dependencies.test.map(_.withConfigurations(Some("compile"))) ++ Dependencies.qaseReportDeps ++ Dependencies.logDeps ++ Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5"
)
)

lazy val `lang-tests` = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class AccountsApiGrpcSpec extends FreeSpec with BeforeAndAfterAll with DiffMatch

grpcApi.getActiveLeases(AccountRequest.of(ByteString.copyFrom(recipient.toAddress.bytes)), observer)

result.runSyncUnsafe() shouldBe List(
result.runSyncUnsafe() should contain theSameElementsAs List(
LeaseResponse.of(
ByteString.copyFrom(lease3.id().arr),
ByteString.copyFrom(lease3.id().arr),
Expand Down
14 changes: 14 additions & 0 deletions lang/testkit/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date %-5level [%.15thread] %logger{26} - %msg%n</pattern>
</encoder>
</appender>

<logger name="scorex.crypto.signatures.Curve25519$" level="INFO"/>

<root level="${logback.test.level:-DEBUG}">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.wavesplatform.report

import com.typesafe.scalalogging.StrictLogging
import com.wavesplatform.report.QaseReporter.{CheckPRRunIdKey, QaseProjects, TestResult}
import io.qase.api.QaseClient
import io.qase.api.config.QaseConfig.{PROJECT_CODE_KEY, RUN_ID_KEY}
import io.qase.api.exceptions.QaseException
import io.qase.api.services.impl.ReportersResultOperationsImpl
import io.qase.client.ApiClient
import io.qase.client.api.{CasesApi, ResultsApi, RunsApi}
Expand All @@ -15,8 +17,8 @@ import scala.annotation.tailrec
import scala.io.Source
import scala.util.Using

object QaseRunCompleter extends App {
if (QaseClient.getConfig.isEnabled) {
object QaseRunCompleter extends App with StrictLogging {
if (QaseClient.getConfig.isEnabled) try {

val apiClient: ApiClient = QaseClient.getApiClient
val runsApi = new RunsApi(apiClient)
Expand Down Expand Up @@ -78,6 +80,9 @@ object QaseRunCompleter extends App {
}
}(_.foreach(f => Files.delete(f.toPath)))
}
} catch {
case e: QaseException =>
logger.error(s"Qase error: ${e.getCode} ${e.getResponseBody}", e)
}

@tailrec
Expand Down
4 changes: 4 additions & 0 deletions node/src/main/protobuf/waves/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,7 @@ message LeaseDetails {
Expired expired = 12;
}
}

message LeaseIds {
repeated bytes ids = 1;
}
1 change: 1 addition & 0 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ waves {
db {
directory = ${waves.directory}"/data"
store-transactions-by-address = true
store-lease-states-by-address = true
store-invoke-script-results = true
store-state-hashes = false
# Limits the size of caches which are used during block validation. Lower values slightly decrease memory consumption,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.wavesplatform.account.Address
import com.wavesplatform.api.common.AddressTransactions.TxByAddressIterator.BatchSize
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.protobuf.EthereumTransactionMeta
import com.wavesplatform.database.{AddressId, DBExt, DBResource, Key, KeyTags, Keys, RDB, readTransactionHNSeqAndType}
import com.wavesplatform.database.{AddressId, DBExt, DBResource, Key, Keys, RDB, readTransactionHNSeqAndType}
import com.wavesplatform.state.{Height, InvokeScriptResult, StateSnapshot, TransactionId, TxMeta, TxNum}
import com.wavesplatform.transaction.{Authorized, EthereumTransaction, GenesisTransaction, Transaction, TransactionType}
import monix.eval.Task
Expand Down Expand Up @@ -118,7 +118,7 @@ object AddressTransactions {
.filter { case (_, tx) => types.isEmpty || types.contains(tx.tpe) }
.collect { case (m, tx: Authorized) if sender.forall(_ == tx.sender.toAddress) => (m, tx, None) }

class TxByAddressIterator(
private class TxByAddressIterator(
db: DBResource,
txHandle: RDB.TxHandle,
addressId: AddressId,
Expand All @@ -127,9 +127,7 @@ object AddressTransactions {
sender: Option[Address],
types: Set[Transaction.Type]
) extends AbstractIterator[Seq[(TxMeta, Transaction, Option[TxNum])]] {
val prefix: Array[Byte] = KeyTags.AddressTransactionHeightTypeAndNums.prefixBytes ++ addressId.toByteArray
val seqNr: Int = db.get(Keys.addressTransactionSeqNr(addressId))

private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId))
db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr).keyBytes))()

final override def computeNext(): Seq[(TxMeta, Transaction, Option[TxNum])] = db.withSafePrefixIterator { dbIterator =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@ import com.google.common.base.Charsets
import com.google.common.collect.AbstractIterator
import com.wavesplatform.account.{Address, Alias}
import com.wavesplatform.api.common.AddressPortfolio.{assetBalanceIterator, nftIterator}
import com.wavesplatform.api.common.TransactionMeta.Ethereum
import com.wavesplatform.api.common.lease.AddressLeaseInfo
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.database.{DBExt, DBResource, KeyTags, Keys, RDB}
import com.wavesplatform.features.BlockchainFeatures
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.protobuf.transaction.PBRecipients
import com.wavesplatform.state.LeaseDetails.Status
import com.wavesplatform.state.{AccountScriptInfo, AssetDescription, Blockchain, DataEntry, Height, InvokeScriptResult, SnapshotBlockchain, TxMeta}
import com.wavesplatform.state.{AccountScriptInfo, AssetDescription, Blockchain, DataEntry, SnapshotBlockchain}
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.transaction.EthereumTransaction.Invocation
import com.wavesplatform.transaction.lease.LeaseTransaction
import com.wavesplatform.transaction.{EthereumTransaction, TransactionType}
import monix.eval.Task
import monix.reactive.Observable

Expand Down Expand Up @@ -127,75 +122,13 @@ object CommonAccountsApi {
override def resolveAlias(alias: Alias): Either[ValidationError, Address] = blockchain.resolveAlias(alias)

override def activeLeases(address: Address): Observable[LeaseInfo] =
addressTransactions(
rdb,
Some(Height(blockchain.height) -> compositeBlockchain().snapshot),
address,
None,
Set(TransactionType.Lease, TransactionType.InvokeScript, TransactionType.InvokeExpression, TransactionType.Ethereum),
None
).flatMapIterable {
case TransactionMeta(leaseHeight, lt: LeaseTransaction, TxMeta.Status.Succeeded) if leaseIsActive(lt.id()) =>
Seq(
LeaseInfo(
lt.id(),
lt.id(),
lt.sender.toAddress,
blockchain.resolveAlias(lt.recipient).explicitGet(),
lt.amount.value,
leaseHeight,
LeaseInfo.Status.Active
)
)
case TransactionMeta.Invoke(invokeHeight, originTransaction, TxMeta.Status.Succeeded, _, Some(scriptResult)) =>
extractLeases(address, scriptResult, originTransaction.id(), invokeHeight)
case Ethereum(height, tx @ EthereumTransaction(_: Invocation, _, _, _), TxMeta.Status.Succeeded, _, _, Some(scriptResult)) =>
extractLeases(address, scriptResult, tx.id(), height)
case _ => Seq()
}

private def extractLeases(subject: Address, result: InvokeScriptResult, txId: ByteStr, height: Height): Seq[LeaseInfo] = {
(for {
lease <- result.leases
details <- blockchain.leaseDetails(lease.id) if details.isActive
sender = details.sender.toAddress
recipient <- blockchain.resolveAlias(lease.recipient).toOption if subject == sender || subject == recipient
} yield LeaseInfo(
lease.id,
txId,
sender,
recipient,
lease.amount,
height,
LeaseInfo.Status.Active
)) ++ {
result.invokes.flatMap(i => extractLeases(subject, i.stateChanges, txId, height))
}
}

def leaseInfo(leaseId: ByteStr): Option[LeaseInfo] = blockchain.leaseDetails(leaseId) map { ld =>
LeaseInfo(
leaseId,
ld.sourceId,
ld.sender.toAddress,
ld.recipientAddress,
ld.amount.value,
ld.height,
ld.status match {
case Status.Active => LeaseInfo.Status.Active
case Status.Cancelled(_, _) => LeaseInfo.Status.Canceled
case Status.Expired(_) => LeaseInfo.Status.Expired
},
ld.status.cancelHeight,
ld.status.cancelTransactionId
)
}
AddressLeaseInfo.activeLeases(rdb, compositeBlockchain().snapshot, address)

private[this] def leaseIsActive(id: ByteStr): Boolean =
blockchain.leaseDetails(id).exists(_.isActive)
def leaseInfo(leaseId: ByteStr): Option[LeaseInfo] =
blockchain.leaseDetails(leaseId).map(LeaseInfo.fromLeaseDetails(leaseId, _))
}

class AddressDataIterator(
private class AddressDataIterator(
db: DBResource,
address: Address,
entriesFromDiff: Array[DataEntry[?]],
Expand Down
20 changes: 19 additions & 1 deletion node/src/main/scala/com/wavesplatform/api/common/LeaseInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,33 @@ package com.wavesplatform.api.common
import com.wavesplatform.account.Address
import com.wavesplatform.api.common.LeaseInfo.Status
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.state.LeaseDetails

object LeaseInfo {
type Status = Status.Value
//noinspection TypeAnnotation
// noinspection TypeAnnotation
object Status extends Enumeration {
val Active = Value(1)
val Canceled = Value(0)
val Expired = Value(2)
}

def fromLeaseDetails(id: ByteStr, details: LeaseDetails): LeaseInfo =
LeaseInfo(
id,
details.sourceId,
details.sender.toAddress,
details.recipientAddress,
details.amount.value,
details.height,
details.status match {
case LeaseDetails.Status.Active => LeaseInfo.Status.Active
case LeaseDetails.Status.Cancelled(_, _) => LeaseInfo.Status.Canceled
case LeaseDetails.Status.Expired(_) => LeaseInfo.Status.Expired
},
details.status.cancelHeight,
details.status.cancelTransactionId
)
}

case class LeaseInfo(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.wavesplatform.api.common.lease

import com.wavesplatform.account.Address
import com.wavesplatform.api.common.LeaseInfo
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.{AddressId, DBExt, DBResource, Keys, RDB}
import com.wavesplatform.state.{LeaseDetails, StateSnapshot}
import monix.eval.Task
import monix.reactive.Observable

import scala.jdk.CollectionConverters.IteratorHasAsScala

object AddressLeaseInfo {
def activeLeases(
rdb: RDB,
snapshot: StateSnapshot,
subject: Address
): Observable[LeaseInfo] = {
val snapshotLeases = leasesFromSnapshot(snapshot, subject)
val dbLeases = leasesFromDb(rdb, subject)
Observable.fromIterable(snapshotLeases) ++ dbLeases.filterNot(info => snapshot.cancelledLeases.contains(info.id))
}

private def leasesFromSnapshot(snapshot: StateSnapshot, subject: Address): Seq[LeaseInfo] =
snapshot.newLeases.collect {
case (id, leaseStatic)
if !snapshot.cancelledLeases.contains(id) &&
(subject == leaseStatic.sender.toAddress || subject == leaseStatic.recipientAddress) =>
LeaseInfo(
id,
leaseStatic.sourceId,
leaseStatic.sender.toAddress,
leaseStatic.recipientAddress,
leaseStatic.amount.value,
leaseStatic.height,
LeaseInfo.Status.Active
)
}.toSeq

private def leasesFromDb(rdb: RDB, subject: Address): Observable[LeaseInfo] =
for {
dbResource <- rdb.db.resourceObservable
(leaseId, details) <- dbResource
.get(Keys.addressId(subject))
.map(fromLeaseDbIterator(dbResource, _))
.getOrElse(Observable.empty)
} yield LeaseInfo.fromLeaseDetails(leaseId, details)

private def fromLeaseDbIterator(dbResource: DBResource, addressId: AddressId): Observable[(ByteStr, LeaseDetails)] =
Observable
.fromIterator(Task(new LeaseByAddressIterator(dbResource, addressId).asScala))
.concatMapIterable(identity)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.wavesplatform.api.common.lease

import com.google.common.collect.AbstractIterator
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database
import com.wavesplatform.database.{AddressId, DBResource, Keys}
import com.wavesplatform.state.LeaseDetails

import scala.collection.mutable

private class LeaseByAddressIterator(resource: DBResource, addressId: AddressId) extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] {
private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId))
resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr).keyBytes))()

final override def computeNext(): Seq[(ByteStr, LeaseDetails)] =
resource.withSafePrefixIterator { iterator =>
val buffer = mutable.Map[ByteStr, LeaseDetails]()
while (iterator.isValid) {
for {
id <- database.readLeaseIdSeq(iterator.value())
details <- database.loadLease(resource, id) if details.isActive
} buffer.update(id, details)
iterator.prev()
}
if (buffer.nonEmpty)
buffer.toSeq
else
endOfData()
}(
endOfData()
)
}
4 changes: 3 additions & 1 deletion node/src/main/scala/com/wavesplatform/database/KeyTags.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ object KeyTags extends Enumeration {
EthereumTransactionMeta,
NthTransactionStateSnapshotAtHeight,
MaliciousMinerBanHeights,
BlockStateHash = Value
BlockStateHash,
AddressLeaseInfoSeqNr,
AddressLeaseInfoSeq = Value

final implicit class KeyTagExt(val t: KeyTag) extends AnyVal {
@inline def prefixBytes: Array[Byte] = Shorts.toByteArray(t.id.toShort)
Expand Down
19 changes: 12 additions & 7 deletions node/src/main/scala/com/wavesplatform/database/Keys.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,7 @@ object DataNode {

object Keys {
import KeyHelpers.*
import KeyTags.{
AddressId as AddressIdTag,
EthereumTransactionMeta as EthereumTransactionMetaTag,
InvokeScriptResult as InvokeScriptResultTag,
LeaseDetails as LeaseDetailsTag,
*
}
import KeyTags.{AddressId as AddressIdTag, EthereumTransactionMeta as EthereumTransactionMetaTag, InvokeScriptResult as InvokeScriptResultTag, LeaseDetails as LeaseDetailsTag, *}

val version: Key[Int] = intKey(Version, default = 1)
val height: Key[Height] =
Expand Down Expand Up @@ -190,6 +184,17 @@ object Keys {
writeTransactionHNSeqAndType
)

def addressLeaseSeqNr(addressId: AddressId): Key[Int] =
bytesSeqNr(AddressLeaseInfoSeqNr, addressId.toByteArray)

def addressLeaseSeq(addressId: AddressId, seqNr: Int): Key[Option[Seq[ByteStr]]] =
Key.opt(
AddressLeaseInfoSeq,
hBytes(addressId.toByteArray, seqNr),
readLeaseIdSeq,
writeLeaseIdSeq
)

def transactionMetaById(txId: TransactionId, cfh: RDB.TxMetaHandle): Key[Option[TransactionMeta]] =
Key.opt(
TransactionMetaById,
Expand Down
Loading

0 comments on commit 4f0cbb2

Please sign in to comment.