Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Nov 19, 2023
1 parent 121b5f0 commit d731e01
Show file tree
Hide file tree
Showing 19 changed files with 527 additions and 417 deletions.
9 changes: 6 additions & 3 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.wavesplatform.features.BlockchainFeatures
import com.wavesplatform.history.StorageFactory
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.mining.Miner
import com.wavesplatform.protobuf.PBSnapshots
import com.wavesplatform.protobuf.block.{PBBlocks, VanillaBlock}
import com.wavesplatform.protobuf.snapshot.TransactionStateSnapshot
import com.wavesplatform.settings.WavesSettings
Expand Down Expand Up @@ -255,10 +256,12 @@ object Importer extends ScorexLogging {
BlockSnapshot(
block.id(),
block.transactionData
.foldLeft((0, Seq.empty[(StateSnapshot, TxMeta.Status)])) { case ((offset, acc), _) =>
.foldLeft((0, Seq.empty[(StateSnapshot, TxMeta.Status)])) { case ((offset, acc), tx) =>
val txSnapshotSize = Ints.fromByteArray(bytes.slice(offset, offset + Ints.BYTES))
val txSnapshot = StateSnapshot.fromProtobuf(
TransactionStateSnapshot.parseFrom(bytes.slice(offset + Ints.BYTES, offset + Ints.BYTES + txSnapshotSize))
val txSnapshot = PBSnapshots.fromProtobuf(
TransactionStateSnapshot.parseFrom(bytes.slice(offset + Ints.BYTES, offset + Ints.BYTES + txSnapshotSize)),
tx.id(),
???
)
(offset + Ints.BYTES + txSnapshotSize, txSnapshot +: acc)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import com.wavesplatform.state.{StateSnapshot, TxMeta}
case class BlockSnapshot(blockId: BlockId, snapshots: Seq[(StateSnapshot, TxMeta.Status)])

object BlockSnapshot {
def fromResponse(response: BlockSnapshotResponse): BlockSnapshot =
BlockSnapshot(response.blockId, response.snapshots.map(StateSnapshot.fromProtobuf))
def fromResponse(response: BlockSnapshotResponse): BlockSnapshot = ???
// BlockSnapshot(response.blockId, response.snapshots.map(StateSnapshot.fromProtobuf))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import com.wavesplatform.state.{StateSnapshot, TxMeta}
case class MicroBlockSnapshot(totalBlockId: BlockId, snapshots: Seq[(StateSnapshot, TxMeta.Status)])

object MicroBlockSnapshot {
def fromResponse(response: MicroBlockSnapshotResponse): MicroBlockSnapshot =
MicroBlockSnapshot(response.totalBlockId, response.snapshots.map(StateSnapshot.fromProtobuf))
def fromResponse(response: MicroBlockSnapshotResponse): MicroBlockSnapshot = ???
// MicroBlockSnapshot(response.totalBlockId, response.snapshots.map(StateSnapshot.fromProtobuf))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.wavesplatform.features.BlockchainFeatures
import com.wavesplatform.lang.ValidationError
import com.wavesplatform.protobuf.block.PBBlocks
import com.wavesplatform.protobuf.snapshot.{TransactionStateSnapshot, TransactionStatus as PBStatus}
import com.wavesplatform.protobuf.{ByteStrExt, ByteStringExt}
import com.wavesplatform.protobuf.{ByteStrExt, ByteStringExt, PBSnapshots}
import com.wavesplatform.settings.{BlockchainSettings, DBSettings}
import com.wavesplatform.state.*
import com.wavesplatform.state.reader.LeaseDetails
Expand Down Expand Up @@ -298,7 +298,7 @@ class RocksDBWriter(

private def appendBalances(
balances: Map[(AddressId, Asset), (CurrentBalance, BalanceNode)],
assetStatics: Map[IssuedAsset, TransactionStateSnapshot.AssetStatic],
assetStatics: Map[IssuedAsset, AssetStaticInfo],
rw: RW
): Unit = {
val changedAssetBalances = MultimapBuilder.hashKeys().hashSetValues().build[IssuedAsset, java.lang.Long]()
Expand Down Expand Up @@ -452,8 +452,8 @@ class RocksDBWriter(

for ((asset, (assetStatic, assetNum)) <- snapshot.indexedAssetStatics) {
val pbAssetStatic = StaticAssetInfo(
assetStatic.sourceTransactionId,
assetStatic.issuerPublicKey,
assetStatic.source.toByteString,
assetStatic.issuer.toByteString,
assetStatic.decimals,
assetStatic.nft,
assetNum,
Expand Down Expand Up @@ -523,7 +523,7 @@ class RocksDBWriter(
val txId = TransactionId(id)

val size = rw.put(Keys.transactionAt(Height(height), num, rdb.txHandle), Some((meta, tx)))
rw.put(Keys.transactionStateSnapshotAt(Height(height), num, rdb.txSnapshotHandle), Some(txInfo.snapshot.toProtobuf(txInfo.status)))
rw.put(Keys.transactionStateSnapshotAt(Height(height), num, rdb.txSnapshotHandle), Some(PBSnapshots.toProtobuf(txInfo.snapshot, txInfo.status)))
rw.put(Keys.transactionMetaById(txId, rdb.txMetaHandle), Some(TransactionMeta(height, num, tx.tpe.id, meta.status.protobuf, 0, size)))
targetBf.put(id.arr)

Expand Down Expand Up @@ -798,7 +798,7 @@ class RocksDBWriter(
).explicitGet()

val snapshot = if (isLightMode) {
Some(BlockSnapshot(block.id(), loadTxStateSnapshotsWithStatus(currentHeight, rdb)))
Some(BlockSnapshot(block.id(), loadTxStateSnapshotsWithStatus(currentHeight, rdb, block.transactionData)))
} else None

(block, Caches.toHitSource(discardedMeta), snapshot)
Expand Down
22 changes: 7 additions & 15 deletions node/src/main/scala/com/wavesplatform/database/package.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wavesplatform

import java.nio.ByteBuffer
import java.util
import java.util.Map as JMap
import com.google.common.base.Charsets.UTF_8
import com.google.common.collect.{Interners, Maps}
import com.google.common.io.ByteStreams.{newDataInput, newDataOutput}
Expand All @@ -19,31 +16,26 @@ import com.wavesplatform.database.protobuf as pb
import com.wavesplatform.database.protobuf.DataEntry.Value
import com.wavesplatform.database.protobuf.TransactionData.Transaction as TD
import com.wavesplatform.lang.script.ScriptReader
import com.wavesplatform.protobuf.ByteStringExt
import com.wavesplatform.protobuf.block.PBBlocks
import com.wavesplatform.protobuf.snapshot.TransactionStateSnapshot
import com.wavesplatform.protobuf.transaction.{PBRecipients, PBTransactions}
import com.wavesplatform.protobuf.{ByteStringExt, PBSnapshots}
import com.wavesplatform.state.*
import com.wavesplatform.state.StateHash.SectionId
import com.wavesplatform.state.reader.LeaseDetails
import com.wavesplatform.transaction.Asset.IssuedAsset
import com.wavesplatform.transaction.lease.LeaseTransaction
import com.wavesplatform.transaction.{
EthereumTransaction,
GenesisTransaction,
PBSince,
PaymentTransaction,
Transaction,
TransactionParsers,
TxValidationError
}
import com.wavesplatform.transaction.{EthereumTransaction, GenesisTransaction, PBSince, PaymentTransaction, Transaction, TransactionParsers, TxValidationError}
import com.wavesplatform.utils.*
import monix.eval.Task
import monix.reactive.Observable
import org.rocksdb.*
import sun.nio.ch.Util
import supertagged.TaggedType

import java.nio.ByteBuffer
import java.util
import java.util.Map as JMap
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.collection.{View, mutable}
Expand Down Expand Up @@ -672,8 +664,8 @@ package object database {
txSnapshots.result()
}

def loadTxStateSnapshotsWithStatus(height: Height, rdb: RDB): Seq[(StateSnapshot, TxMeta.Status)] =
loadTxStateSnapshots(height, rdb).map(StateSnapshot.fromProtobuf)
def loadTxStateSnapshotsWithStatus(height: Height, rdb: RDB, transactions: Seq[Transaction]): Seq[(StateSnapshot, TxMeta.Status)] =
loadTxStateSnapshots(height, rdb).zip(transactions).map { case (s, tx) => PBSnapshots.fromProtobuf(s, tx.id(), height) }

def loadBlock(height: Height, rdb: RDB): Option[Block] =
for {
Expand Down
5 changes: 3 additions & 2 deletions node/src/main/scala/com/wavesplatform/history/History.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database
import com.wavesplatform.database.RDB
import com.wavesplatform.protobuf.PBSnapshots
import com.wavesplatform.protobuf.snapshot.TransactionStateSnapshot
import com.wavesplatform.state.{Blockchain, Height, StateSnapshot}

Expand Down Expand Up @@ -43,15 +44,15 @@ object History {

override def loadBlockSnapshots(id: ByteStr): Option[Seq[TransactionStateSnapshot]] =
liquidBlockSnapshot(id)
.map(_.transactions.values.toSeq.map(txInfo => txInfo.snapshot.toProtobuf(txInfo.status)))
.map(_.transactions.values.toSeq.map(txInfo => PBSnapshots.toProtobuf(txInfo.snapshot, txInfo.status)))
.orElse(blockchain.heightOf(id).map { h =>
database.loadTxStateSnapshots(Height(h), rdb)
})

override def loadMicroBlockSnapshots(id: ByteStr): Option[Seq[TransactionStateSnapshot]] =
microBlockSnapshot(id)
.map(_.transactions.values.toSeq.map { txInfo =>
txInfo.snapshot.toProtobuf(txInfo.status)
PBSnapshots.toProtobuf(txInfo.snapshot, txInfo.status)
})
}
}
2 changes: 1 addition & 1 deletion node/src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class MinerImpl(

def appendTask(block: Block, totalConstraint: MiningConstraint) =
BlockAppender(blockchainUpdater, timeService, utx, pos, appenderScheduler)(block, None).flatMap {
case Left(BlockFromFuture(_)) => // Time was corrected, retry
case Left(BlockFromFuture(_, _)) => // Time was corrected, retry
generateBlockTask(account, None)

case Left(err) =>
Expand Down
195 changes: 195 additions & 0 deletions node/src/main/scala/com/wavesplatform/protobuf/PBSnapshots.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package com.wavesplatform.protobuf

import com.google.protobuf.ByteString
import com.wavesplatform.account.{Address, Alias, PublicKey}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.common.utils.EitherExt2
import com.wavesplatform.crypto.KeyLength
import com.wavesplatform.lang.script.ScriptReader
import com.wavesplatform.protobuf.snapshot.TransactionStateSnapshot
import com.wavesplatform.protobuf.snapshot.TransactionStateSnapshot.AssetStatic
import com.wavesplatform.protobuf.transaction.{PBAmounts, PBTransactions}
import com.wavesplatform.state.*
import com.wavesplatform.state.reader.LeaseDetails
import com.wavesplatform.state.reader.LeaseDetails.Status
import com.wavesplatform.transaction.Asset
import com.wavesplatform.transaction.Asset.IssuedAsset

import scala.collection.immutable.VectorMap

object PBSnapshots {

import com.wavesplatform.protobuf.snapshot.TransactionStateSnapshot as S

def toProtobuf(snapshot: StateSnapshot, txStatus: TxMeta.Status): TransactionStateSnapshot = {
import snapshot.*
TransactionStateSnapshot(
balances.map { case ((address, asset), balance) =>
S.Balance(address.toByteString, Some(PBAmounts.fromAssetAndAmount(asset, balance)))
}.toSeq,
leaseBalances.map { case (address, balance) =>
S.LeaseBalance(address.toByteString, balance.in, balance.out)
}.toSeq,
assetStatics.map { case (id, st) =>
AssetStatic(id.id.toByteString, st.issuer.toByteString, st.decimals, st.nft)
}.toSeq,
assetVolumes.map { case (asset, info) =>
S.AssetVolume(asset.id.toByteString, info.isReissuable, ByteString.copyFrom(info.volume.toByteArray))
}.toSeq,
assetNamesAndDescriptions.map { case (asset, info) =>
S.AssetNameAndDescription(asset.id.toByteString, info.name.toStringUtf8, info.description.toStringUtf8)
}.toSeq,
assetScripts.map { case (asset, script) =>
S.AssetScript(asset.id.toByteString, script.script.bytes().toByteString)
}.headOption,
aliases.map { case (alias, address) => S.Alias(address.toByteString, alias.name) }.headOption,
orderFills.map { case (orderId, VolumeAndFee(volume, fee)) =>
S.OrderFill(orderId.toByteString, volume, fee)
}.toSeq,
leaseStates.map { case (leaseId, LeaseSnapshot(sender, recipient, amount, status)) =>
val pbStatus = status match {
case Status.Active =>
S.LeaseState.Status.Active(S.LeaseState.Active(amount, sender.toByteString, recipient.asInstanceOf[Address].toByteString))
case _: Status.Cancelled | _: Status.Expired =>
S.LeaseState.Status.Cancelled(S.LeaseState.Cancelled())
}
S.LeaseState(leaseId.toByteString, pbStatus)
}.toSeq,
accountScripts.map { case (publicKey, scriptOpt) =>
scriptOpt.fold(
S.AccountScript(publicKey.toByteString)
)(script =>
S.AccountScript(
publicKey.toByteString,
script.script.bytes().toByteString,
script.verifierComplexity
)
)
}.headOption,
accountData.map { case (address, data) =>
S.AccountData(address.toByteString, data.values.map(PBTransactions.toPBDataEntry).toSeq)
}.toSeq,
sponsorships.collect { case (asset, SponsorshipValue(minFee)) =>
S.Sponsorship(asset.id.toByteString, minFee)
}.toSeq,
txStatus.protobuf
)
}

def fromProtobuf(pbSnapshot: TransactionStateSnapshot, txId: ByteStr, height: Int): (StateSnapshot, TxMeta.Status) = {
val balances: VectorMap[(Address, Asset), Long] =
VectorMap() ++ pbSnapshot.balances.map(b => (b.address.toAddress(), b.getAmount.assetId.toAssetId) -> b.getAmount.amount)

val leaseBalances: Map[Address, LeaseBalance] =
pbSnapshot.leaseBalances
.map(b => b.address.toAddress() -> LeaseBalance(b.in, b.out))
.toMap

val assetScripts: Map[IssuedAsset, AssetScriptInfo] =
pbSnapshot.assetScripts.map { s =>
s.assetId.toIssuedAssetId -> AssetScriptInfo(ScriptReader.fromBytes(s.script.toByteArray).explicitGet(), 0)
}.toMap

val assetStatics: VectorMap[IssuedAsset, AssetStaticInfo] =
VectorMap() ++ pbSnapshot.assetStatics.map(info =>
info.assetId.toIssuedAssetId -> AssetStaticInfo(
info.assetId.toByteStr,
TransactionId(txId),
PublicKey(info.issuerPublicKey.toByteStr),
info.decimals,
info.nft
)
)

val assetVolumes: Map[IssuedAsset, AssetVolumeInfo] =
pbSnapshot.assetVolumes
.map(v => v.assetId.toIssuedAssetId -> AssetVolumeInfo(v.reissuable, BigInt(v.volume.toByteArray)))
.toMap

val assetNamesAndDescriptions: Map[IssuedAsset, AssetInfo] =
pbSnapshot.assetNamesAndDescriptions
.map(i => i.assetId.toIssuedAssetId -> AssetInfo(i.name, i.description, Height @@ height))
.toMap

val sponsorships: Map[IssuedAsset, SponsorshipValue] =
pbSnapshot.sponsorships
.map(s => s.assetId.toIssuedAssetId -> SponsorshipValue(s.minFee))
.toMap

val leaseStates: Map[ByteStr, LeaseSnapshot] =
pbSnapshot.leaseStates.map { ls =>
ls.status match {
case TransactionStateSnapshot.LeaseState.Status.Active(value) =>
ls.leaseId.toByteStr -> LeaseSnapshot(
value.sender.toPublicKey,
value.recipient.toAddress(),
value.amount,
LeaseDetails.Status.Active
)
case _: TransactionStateSnapshot.LeaseState.Status.Cancelled | TransactionStateSnapshot.LeaseState.Status.Empty =>
ls.leaseId.toByteStr -> LeaseSnapshot(
PublicKey(ByteStr.fill(KeyLength)(0)),
Address(Array.fill(Address.HashLength)(0)),
0,
LeaseDetails.Status.Cancelled(0, None)
)
}
}.toMap

val aliases: Map[Alias, Address] =
pbSnapshot.aliases
.map(a => Alias.create(a.alias).explicitGet() -> a.address.toAddress())
.toMap

val orderFills: Map[ByteStr, VolumeAndFee] =
pbSnapshot.orderFills
.map(of => of.orderId.toByteStr -> VolumeAndFee(of.volume, of.fee))
.toMap

val accountScripts: Map[PublicKey, Option[AccountScriptInfo]] =
pbSnapshot.accountScripts.map { pbInfo =>
val info =
if (pbInfo.script.isEmpty)
None
else
Some(
AccountScriptInfo(
pbInfo.senderPublicKey.toPublicKey,
ScriptReader.fromBytes(pbInfo.script.toByteArray).explicitGet(),
pbInfo.verifierComplexity
)
)
pbInfo.senderPublicKey.toPublicKey -> info
}.toMap

val accountData: Map[Address, Map[String, DataEntry[?]]] =
pbSnapshot.accountData.map { data =>
val entries =
data.entries.map { pbEntry =>
val entry = PBTransactions.toVanillaDataEntry(pbEntry)
entry.key -> entry
}.toMap
data.address.toAddress() -> entries
}.toMap

(
StateSnapshot(
VectorMap(),
balances,
leaseBalances,
assetStatics,
assetVolumes,
assetNamesAndDescriptions,
assetScripts,
sponsorships,
leaseStates,
aliases,
orderFills,
accountScripts,
accountData
),
TxMeta.Status.fromProtobuf(pbSnapshot.transactionStatus)
)
}

}
Loading

0 comments on commit d731e01

Please sign in to comment.