From 3533edc1af86ffc530fe62580b66d60b9ef23f6f Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Wed, 10 Jan 2024 17:57:42 +0300 Subject: [PATCH] NODE-2639 Improved Exporter and Importer (#3926) --- .../scala/com/wavesplatform/Exporter.scala | 28 ++++++++------- .../scala/com/wavesplatform/Importer.scala | 35 ++++++++++--------- .../com/wavesplatform/database/RDB.scala | 8 ++--- 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/Exporter.scala b/node/src/main/scala/com/wavesplatform/Exporter.scala index 55b124e5df..f001dff3c6 100644 --- a/node/src/main/scala/com/wavesplatform/Exporter.scala +++ b/node/src/main/scala/com/wavesplatform/Exporter.scala @@ -1,8 +1,6 @@ package com.wavesplatform import com.google.common.collect.AbstractIterator - -import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream} import com.google.common.primitives.Ints import com.wavesplatform.block.Block import com.wavesplatform.database.protobuf.BlockMeta @@ -19,6 +17,7 @@ import kamon.Kamon import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB} import scopt.OParser +import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream} import scala.concurrent.Await import scala.concurrent.duration.* import scala.jdk.CollectionConverters.* @@ -39,7 +38,7 @@ object Exporter extends ScorexLogging { // noinspection ScalaStyle def main(args: Array[String]): Unit = { OParser.parse(commandParser, args, ExporterOptions()).foreach { - case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportSnapshots, exportHeight, format) => + case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight, format) => val settings = Application.loadApplicationConfig(configFile) Using.resources( @@ -53,8 +52,9 @@ object Exporter extends ScorexLogging { val blocksOutputFilename = s"$blocksOutputFileNamePrefix-$height" log.info(s"Blocks output file: $blocksOutputFilename") + val exportSnapshots = snapshotsOutputFileNamePrefix.isDefined val snapshotsOutputFilename = if (exportSnapshots) { - val filename = s"$snapshotsOutputFileNamePrefix-$height" + val filename = s"${snapshotsOutputFileNamePrefix.get}-$height" log.info(s"Snapshots output file: $filename") Some(filename) } else None @@ -74,7 +74,12 @@ object Exporter extends ScorexLogging { var exportedSnapshotsBytes = 0L val start = System.currentTimeMillis() - new BlockSnapshotIterator(rdb, height, settings.enableLightMode).asScala.foreach { case (h, block, txSnapshots) => + new BlockSnapshotIterator(rdb, height, exportSnapshots).asScala.foreach { case (h, block, txSnapshots) => + val txCount = block.transactionData.length + if (exportSnapshots && txCount != txSnapshots.length) + throw new RuntimeException( + s"${txSnapshots.length} snapshot(s) don't match $txCount transaction(s) on height $h, data is corrupted" + ) exportedBlocksBytes += IO.exportBlock(blocksStream, Some(block), format == Formats.Binary) snapshotsStream.foreach { output => exportedSnapshotsBytes += IO.exportBlockTxSnapshots(output, txSnapshots) @@ -100,7 +105,8 @@ object Exporter extends ScorexLogging { } } - private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, isLightMode: Boolean) extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] { + private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, exportSnapshots: Boolean) + extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] { var nextTxEntry: Option[(Int, Transaction)] = None var nextSnapshotEntry: Option[(Int, Array[Byte])] = None @@ -156,7 +162,7 @@ object Exporter extends ScorexLogging { case Some(_) => Seq.empty case _ => loadTxData[Transaction](Seq.empty, h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx)) } - val snapshots = if (isLightMode) { + val snapshots = if (exportSnapshots) { nextSnapshotEntry match { case Some((snapshotHeight, txSnapshot)) if snapshotHeight == h => nextSnapshotEntry = None @@ -267,8 +273,7 @@ object Exporter extends ScorexLogging { private[this] final case class ExporterOptions( configFileName: Option[File] = None, blocksOutputFileNamePrefix: String = "blockchain", - snapshotsFileNamePrefix: String = "snapshots", - exportSnapshots: Boolean = false, + snapshotsFileNamePrefix: Option[String] = None, exportHeight: Option[Int] = None, format: String = Formats.Binary ) @@ -290,10 +295,7 @@ object Exporter extends ScorexLogging { .action((p, c) => c.copy(blocksOutputFileNamePrefix = p)), opt[String]('s', "snapshot-output-prefix") .text("Snapshots output file name prefix") - .action((p, c) => c.copy(snapshotsFileNamePrefix = p)), - opt[Unit]('l', "export-snapshots") - .text("Export snapshots for light node") - .action((_, c) => c.copy(exportSnapshots = true)), + .action((p, c) => c.copy(snapshotsFileNamePrefix = Some(p))), opt[Int]('h', "height") .text("Export to height") .action((h, c) => c.copy(exportHeight = Some(h))) diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index f254c24ccd..818191ef53 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -53,7 +53,7 @@ object Importer extends ScorexLogging { final case class ImportOptions( configFile: Option[File] = None, blockchainFile: String = "blockchain", - snapshotsFile: String = "snapshots", + snapshotsFile: Option[String] = None, importHeight: Int = Int.MaxValue, format: String = Formats.Binary, verify: Boolean = true, @@ -79,7 +79,7 @@ object Importer extends ScorexLogging { .action((f, c) => c.copy(blockchainFile = f)), opt[String]('s', "snapshots-file") .text("Snapshots data file name") - .action((f, c) => c.copy(snapshotsFile = f)), + .action((f, c) => c.copy(snapshotsFile = Some(f))), opt[Int]('h', "height") .text("Import to height") .action((h, c) => c.copy(importHeight = h)) @@ -357,10 +357,10 @@ object Importer extends ScorexLogging { val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb, actorSystem) checkGenesis(settings, blockchainUpdater, Miner.Disabled) - val (blocksFileOffset, snapshotsFileOffset) = + val blocksFileOffset = importOptions.format match { case Formats.Binary => - var blocksOffset = 0L + var blocksOffset = 0 rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e => e.getKey match { case Array(_, _, 0, 0, 0, 1) => // Skip genesis @@ -369,22 +369,23 @@ object Importer extends ScorexLogging { blocksOffset += meta.size + 4 } } - - var totalSize = 0L - rdb.db.iterateOver(KeyTags.NthTransactionStateSnapshotAtHeight) { e => - totalSize += (e.getValue.length + 4) - } - - val snapshotsOffset = totalSize - - blocksOffset -> snapshotsOffset - case _ => 0L -> 0L + blocksOffset + case _ => + 0 } val blocksInputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, blocksFileOffset), 2 * 1024 * 1024) val snapshotsInputStream = - if (settings.enableLightMode) - Some(new BufferedInputStream(initFileStream(importOptions.snapshotsFile, snapshotsFileOffset), 20 * 1024 * 1024)) - else None + importOptions.snapshotsFile + .map { file => + val inputStream = new BufferedInputStream(initFileStream(file, 0), 20 * 1024 * 1024) + val sizeBytes = new Array[Byte](Ints.BYTES) + (2 to blockchainUpdater.height).foreach { _ => + ByteStreams.read(inputStream, sizeBytes, 0, 4) + val snapshotsSize = Ints.fromByteArray(sizeBytes) + ByteStreams.skipFully(inputStream, snapshotsSize) + } + inputStream + } sys.addShutdownHook { quit = true diff --git a/node/src/main/scala/com/wavesplatform/database/RDB.scala b/node/src/main/scala/com/wavesplatform/database/RDB.scala index f2c1ba1412..9f7501e63a 100644 --- a/node/src/main/scala/com/wavesplatform/database/RDB.scala +++ b/node/src/main/scala/com/wavesplatform/database/RDB.scala @@ -62,14 +62,14 @@ object RDB extends StrictLogging { .setCfPaths(Seq(new DbPath(new File(dbDir, "tx-meta").toPath, 0L)).asJava) ), new ColumnFamilyDescriptor( - "transactions".utf8Bytes, + "tx".utf8Bytes, txCfOptions.options - .setCfPaths(Seq(new DbPath(new File(dbDir, "transactions").toPath, 0L)).asJava) + .setCfPaths(Seq(new DbPath(new File(dbDir, "tx").toPath, 0L)).asJava) ), new ColumnFamilyDescriptor( - "transactions-snapshot".utf8Bytes, + "tx-snapshot".utf8Bytes, txSnapshotCfOptions.options - .setCfPaths(Seq(new DbPath(new File(dbDir, "transactions-snapshot").toPath, 0L)).asJava) + .setCfPaths(Seq(new DbPath(new File(dbDir, "tx-snapshot").toPath, 0L)).asJava) ) ).asJava, handles