Skip to content

Commit

Permalink
Merge branch 'version-1.5.x' into node-2652-lazy-grpc-api-response
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot authored Mar 11, 2024
2 parents 4bf52d8 + 979d2ea commit a0c2f3e
Show file tree
Hide file tree
Showing 122 changed files with 2,536 additions and 704 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check-pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
sbt lang/assembly
git clone https://github.com/waves-exchange/neutrino-contract
git clone https://github.com/waves-exchange/contracts
git clone https://oauth2:${{ secrets.DUCKS_GITHUB_TOKEN }}@github.com/akharazyan/wavesducks-public
git clone https://github.com/waves-ducks-core/wavesducks-public
git clone https://oauth2:${{ secrets.SWOPFI_GITLAB_TOKEN }}@gitlabwp.wvservices.com/swopfi/swopfi-smart-contracts
find neutrino-contract/script -name "*.ride" -type f -exec java -jar lang/jvm/target/file-compiler.jar {} +;
find contracts/ride -name "*.ride" -type f -exec java -jar lang/jvm/target/file-compiler.jar {} +;
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-docker-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:

- name: Build sources
run: |
sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt buildTarballsForDocker
sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt ;buildTarballsForDocker;buildRIDERunnerForDocker
- name: Setup Docker buildx
uses: docker/setup-buildx-action@v2
Expand Down
14 changes: 7 additions & 7 deletions benchmark/src/main/scala/com/wavesplatform/state/DBState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ abstract class DBState extends ScorexLogging {

lazy val rdb: RDB = RDB.open(settings.dbSettings)

lazy val rocksDBWriter: RocksDBWriter =
new RocksDBWriter(
rdb,
settings.blockchainSettings,
settings.dbSettings.copy(maxCacheSize = 1),
settings.enableLightMode
)
lazy val rocksDBWriter: RocksDBWriter = RocksDBWriter(
rdb,
settings.blockchainSettings,
settings.dbSettings.copy(maxCacheSize = 1),
settings.enableLightMode
)

AddressScheme.current = new AddressScheme { override val chainId: Byte = 'W' }

Expand All @@ -44,6 +43,7 @@ abstract class DBState extends ScorexLogging {

@TearDown
def close(): Unit = {
rocksDBWriter.close()
rdb.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object RollbackBenchmark extends ScorexLogging {
val settings = Application.loadApplicationConfig(Some(new File(args(0))))
val rdb = RDB.open(settings.dbSettings)
val time = new NTP(settings.ntpServer)
val rocksDBWriter = new RocksDBWriter(rdb, settings.blockchainSettings, settings.dbSettings, settings.enableLightMode)
val rocksDBWriter = RocksDBWriter(rdb, settings.blockchainSettings, settings.dbSettings, settings.enableLightMode)

val issuer = KeyPair(new Array[Byte](32))

Expand Down Expand Up @@ -111,6 +111,7 @@ object RollbackBenchmark extends ScorexLogging {
rocksDBWriter.rollbackTo(1)
val end = System.nanoTime()
log.info(f"Rollback took ${(end - start) * 1e-6}%.3f ms")
rocksDBWriter.close()
rdb.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ trait BaseState {

@TearDown
def close(): Unit = {
state.close()
rdb.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wavesplatform.state

import java.nio.file.Files
import java.util.concurrent.TimeUnit

import com.google.common.primitives.Ints
import com.typesafe.config.ConfigFactory
import com.wavesplatform.database.RDB
Expand All @@ -12,6 +9,10 @@ import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import org.rocksdb.{ReadOptions, WriteBatch, WriteOptions}

import java.nio.file.Files
import java.util.concurrent.TimeUnit
import scala.util.Using

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
@Threads(1)
Expand Down Expand Up @@ -60,7 +61,7 @@ object RocksDBIteratorBenchmark {
RDB.open(wavesSettings.dbSettings.copy(directory = dir))
}

val keysPrefix = "keysPrefix"
val keysPrefix = "keysPrefix" // Must have 10 or more bytes, see RDB.newColumnFamilyOptions
val firstKey: Array[Byte] = keysPrefix.getBytes ++ Ints.toByteArray(1)
val lastKey: Array[Byte] = keysPrefix.getBytes ++ Ints.toByteArray(10000)

Expand All @@ -70,14 +71,18 @@ object RocksDBIteratorBenchmark {

val readOptions: ReadOptions = new ReadOptions().setTotalOrderSeek(false).setPrefixSameAsStart(true)

private val wb: WriteBatch = new WriteBatch()
kvs.foreach { case (key, value) =>
wb.put(key, value)
Using.Manager { use =>
val wb = use(new WriteBatch())
val wo = use(new WriteOptions())
kvs.foreach { case (key, value) =>
wb.put(key, value)
}
rdb.db.write(wo, wb)
}
rdb.db.write(new WriteOptions(), wb)

@TearDown
def close(): Unit = {
readOptions.close()
rdb.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package com.wavesplatform.state

import java.nio.file.Files
import java.util.concurrent.TimeUnit

import com.google.common.primitives.{Bytes, Shorts}
import com.typesafe.config.ConfigFactory
import com.wavesplatform.account.Address
import com.wavesplatform.database.{
AddressId,
CurrentData,
Expand All @@ -24,6 +20,10 @@ import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import org.rocksdb.{ReadOptions, WriteBatch, WriteOptions}

import java.nio.file.Files
import java.util.concurrent.TimeUnit
import scala.util.Using

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
@Threads(1)
Expand Down Expand Up @@ -63,27 +63,29 @@ object RocksDBSeekForPrevBenchmark {
RDB.open(wavesSettings.dbSettings.copy(directory = dir))
}

val address: Address = Address(Array.fill(20)(1.toByte))
val addressId: AddressId = AddressId(1L)

val keyString = "key"
val currentDataKey: Array[Byte] = Keys.data(address, keyString).keyBytes
val currentDataKey: Array[Byte] = Keys.data(addressId, keyString).keyBytes
val dataNodeKey: Height => Array[Byte] = Keys.dataAt(addressId, "key")(_).keyBytes
val dataNodeKeyPrefix: Array[Byte] = Bytes.concat(Shorts.toByteArray(KeyTags.DataHistory.id.toShort), addressId.toByteArray, keyString.getBytes)

private val dataEntry: StringDataEntry = StringDataEntry(keyString, "value")

val readOptions: ReadOptions = new ReadOptions()

private val wb: WriteBatch = new WriteBatch()
wb.put(currentDataKey, writeCurrentData(CurrentData(dataEntry, Height(10000), Height(9999))))
(1 to 1000).foreach { h =>
wb.put(dataNodeKey(Height(h)), writeDataNode(DataNode(dataEntry, Height(h - 1))))
Using.Manager { use =>
val wb = use(new WriteBatch())
wb.put(currentDataKey, writeCurrentData(CurrentData(dataEntry, Height(10000), Height(9999))))
(1 to 1000).foreach { h =>
wb.put(dataNodeKey(Height(h)), writeDataNode(DataNode(dataEntry, Height(h - 1))))
}
rdb.db.write(use(new WriteOptions()), wb)
}
rdb.db.write(new WriteOptions(), wb)

@TearDown
def close(): Unit = {
readOptions.close()
rdb.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wavesplatform.state

import java.io.File
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}

import com.typesafe.config.ConfigFactory
import com.wavesplatform.account.*
import com.wavesplatform.api.BlockMeta
Expand All @@ -17,7 +14,10 @@ import com.wavesplatform.transaction.Transaction
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole

import java.io.File
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import scala.io.Codec
import scala.util.Using

/** Tests over real database. How to test:
* 1. Download a database 2. Import it:
Expand Down Expand Up @@ -87,7 +87,7 @@ object RocksDBWriterBenchmark {
RDB.open(wavesSettings.dbSettings)
}

val db = new RocksDBWriter(rawDB, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)
val db = RocksDBWriter(rawDB, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)

def loadBlockInfoAt(height: Int): Option[(BlockMeta, Seq[(TxMeta, Transaction)])] =
loadBlockMetaAt(height).map { meta =>
Expand All @@ -102,15 +102,12 @@ object RocksDBWriterBenchmark {

@TearDown
def close(): Unit = {
db.close()
rawDB.close()
}

protected def load[T](label: String, absolutePath: String)(f: String => T): Vector[T] = {
scala.io.Source
.fromFile(absolutePath)(Codec.UTF8)
.getLines()
.map(f)
.toVector
Using.resource(scala.io.Source.fromFile(absolutePath)(Codec.UTF8))(_.getLines().map(f).toVector)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scodec.bits.BitVector
import java.io.File
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import scala.io.Codec
import scala.util.Using

/** Tests over real database. How to test:
* 1. Download a database 2. Import it:
Expand Down Expand Up @@ -134,8 +135,8 @@ object WavesEnvironmentBenchmark {
RDB.open(wavesSettings.dbSettings)
}

val state = RocksDBWriter(rdb, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)
val environment: Environment[Id] = {
val state = new RocksDBWriter(rdb, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)
WavesEnvironment(
AddressScheme.current.chainId,
Coeval.raiseError(new NotImplementedError("`tx` is not implemented")),
Expand All @@ -149,15 +150,12 @@ object WavesEnvironmentBenchmark {

@TearDown
def close(): Unit = {
state.close()
rdb.close()
}

protected def load[T](label: String, absolutePath: String)(f: String => T): Vector[T] = {
scala.io.Source
.fromFile(absolutePath)(Codec.UTF8)
.getLines()
.map(f)
.toVector
Using.resource(scala.io.Source.fromFile(absolutePath)(Codec.UTF8))(_.getLines().map(f).toVector)
}
}

Expand Down
15 changes: 9 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ lazy val repl = crossProject(JSPlatform, JVMPlatform)
libraryDependencies ++=
Dependencies.protobuf.value ++
Dependencies.langCompilerPlugins.value ++
Dependencies.circe.value ++
Seq(
"org.scala-js" %%% "scala-js-macrotask-executor" % "1.0.0"
),
Dependencies.circe.value,
inConfig(Compile)(
Seq(
PB.targets += scalapb.gen(flatPackage = true) -> sourceManaged.value,
Expand All @@ -109,6 +106,9 @@ lazy val `repl-jvm` = repl.jvm
)

lazy val `repl-js` = repl.js.dependsOn(`lang-js`)
.settings(
libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "1.1.1"
)

lazy val `curve25519-test` = project.dependsOn(node)

Expand Down Expand Up @@ -198,6 +198,10 @@ buildTarballsForDocker := {
(`grpc-server` / Universal / packageZipTarball).value,
baseDirectory.value / "docker" / "target" / "waves-grpc-server.tgz"
)
}

lazy val buildRIDERunnerForDocker = taskKey[Unit]("Package RIDE Runner tarball and copy it to docker/target")
buildRIDERunnerForDocker := {
IO.copyFile(
(`ride-runner` / Universal / packageZipTarball).value,
(`ride-runner` / baseDirectory).value / "docker" / "target" / s"${(`ride-runner` / name).value}.tgz"
Expand All @@ -213,7 +217,7 @@ checkPRRaw := Def
(`repl-jvm` / Test / test).value
(`lang-js` / Compile / fastOptJS).value
(`lang-tests-js` / Test / test).value
(`grpc-server` / Test / test).value
// (`grpc-server` / Test / test).value
(node / Test / test).value
(`repl-js` / Compile / fastOptJS).value
(`node-it` / Test / compile).value
Expand Down Expand Up @@ -244,7 +248,6 @@ lazy val buildDebPackages = taskKey[Unit]("Build debian packages")
buildDebPackages := {
(`grpc-server` / Debian / packageBin).value
(node / Debian / packageBin).value
(`ride-runner` / Debian / packageBin).value
}

def buildPackages: Command = Command("buildPackages")(_ => Network.networkParser) { (state, args) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.wavesplatform.api.grpc

import scala.concurrent.Future
import com.wavesplatform.account.AddressScheme
import com.wavesplatform.api.common.{CommonTransactionsApi, TransactionMeta}
import com.wavesplatform.api.grpc.TransactionsApiGrpcImpl.applicationStatusFromTxStatus
import com.wavesplatform.protobuf.*
import com.wavesplatform.protobuf.transaction.*
import com.wavesplatform.protobuf.utils.PBImplicitConversions.PBRecipientImplicitConversionOps
import com.wavesplatform.state.{Blockchain, TxMeta, InvokeScriptResult as VISR}
import com.wavesplatform.transaction.{Authorized, EthereumTransaction}
import com.wavesplatform.transaction.TxValidationError.GenericError
import io.grpc.{Status, StatusRuntimeException}
import com.wavesplatform.transaction.{Authorized, EthereumTransaction}
import io.grpc.stub.StreamObserver
import io.grpc.{Status, StatusRuntimeException}
import monix.execution.Scheduler
import monix.reactive.Observable

import scala.concurrent.Future

class TransactionsApiGrpcImpl(blockchain: Blockchain, commonApi: CommonTransactionsApi)(implicit sc: Scheduler)
extends TransactionsApiGrpc.TransactionsApi {

Expand Down Expand Up @@ -67,6 +68,20 @@ class TransactionsApiGrpcImpl(blockchain: Blockchain, commonApi: CommonTransacti
)
}

override def getTransactionSnapshots(
request: TransactionSnapshotsRequest,
responseObserver: StreamObserver[TransactionSnapshotResponse]
): Unit =
responseObserver.interceptErrors {
val snapshots =
for {
id <- Observable.fromIterable(request.transactionIds)
(snapshot, status) <- Observable.fromIterable(blockchain.transactionSnapshot(id.toByteStr))
pbSnapshot = PBSnapshots.toProtobuf(snapshot, status)
} yield TransactionSnapshotResponse(id, Some(pbSnapshot))
responseObserver.completeWith(snapshots)
}

override def getUnconfirmed(request: TransactionsRequest, responseObserver: StreamObserver[TransactionResponse]): Unit =
responseObserver.interceptErrors {
val unconfirmedTransactions = if (!request.sender.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.wavesplatform.events

import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.RDB
import com.wavesplatform.events.api.grpc.protobuf.BlockchainUpdatesApiGrpc
import com.wavesplatform.events.settings.BlockchainUpdatesSettings
import com.wavesplatform.extensions.{Context, Extension}
Expand All @@ -14,6 +13,7 @@ import io.grpc.{Metadata, Server, ServerStreamTracer, Status}
import monix.execution.schedulers.SchedulerService
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import net.ceedubs.ficus.Ficus.*
import org.rocksdb.RocksDB

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
Expand All @@ -31,9 +31,8 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
)

private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
// todo: no need to open column families here
private[this] val rdb = RDB.open(context.settings.dbSettings.copy(directory = context.settings.directory + "/blockchain-updates"))
private[this] val repo = new Repo(rdb.db, context.blocksApi)
private[this] val rdb = RocksDB.open(context.settings.directory + "/blockchain-updates")
private[this] val repo = new Repo(rdb, context.blocksApi)

private[this] val grpcServer: Server = NettyServerBuilder
.forAddress(new InetSocketAddress("0.0.0.0", settings.grpcPort))
Expand Down
Loading

0 comments on commit a0c2f3e

Please sign in to comment.