Skip to content

Commit

Permalink
Merge branch 'version-1.5.x' into NODE-2622-delete-old-data
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Jan 10, 2024
2 parents 30cfdc7 + 3533edc commit 2ebe1f9
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 32 deletions.
25 changes: 14 additions & 11 deletions node/src/main/scala/com/wavesplatform/Exporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object Exporter extends ScorexLogging {
// noinspection ScalaStyle
def main(args: Array[String]): Unit = {
OParser.parse(commandParser, args, ExporterOptions()).foreach {
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportSnapshots, exportHeight, format) =>
case ExporterOptions(configFile, blocksOutputFileNamePrefix, snapshotsOutputFileNamePrefix, exportHeight, format) =>
val settings = Application.loadApplicationConfig(configFile)

Using.resources(
Expand All @@ -53,8 +53,9 @@ object Exporter extends ScorexLogging {
val blocksOutputFilename = s"$blocksOutputFileNamePrefix-$height"
log.info(s"Blocks output file: $blocksOutputFilename")

val exportSnapshots = snapshotsOutputFileNamePrefix.isDefined
val snapshotsOutputFilename = if (exportSnapshots) {
val filename = s"$snapshotsOutputFileNamePrefix-$height"
val filename = s"${snapshotsOutputFileNamePrefix.get}-$height"
log.info(s"Snapshots output file: $filename")
Some(filename)
} else None
Expand All @@ -75,7 +76,12 @@ object Exporter extends ScorexLogging {
var exportedSnapshotsBytes = 0L
val start = System.currentTimeMillis()

new BlockSnapshotIterator(rdb, height, settings.enableLightMode).asScala.foreach { case (h, block, txSnapshots) =>
new BlockSnapshotIterator(rdb, height, exportSnapshots).asScala.foreach { case (h, block, txSnapshots) =>
val txCount = block.transactionData.length
if (exportSnapshots && txCount != txSnapshots.length)
throw new RuntimeException(
s"${txSnapshots.length} snapshot(s) don't match $txCount transaction(s) on height $h, data is corrupted"
)
exportedBlocksBytes += IO.exportBlock(blocksStream, Some(block), format == Formats.Binary)
snapshotsStream.foreach { output =>
exportedSnapshotsBytes += IO.exportBlockTxSnapshots(output, txSnapshots)
Expand All @@ -101,7 +107,8 @@ object Exporter extends ScorexLogging {
}
}

private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, isLightMode: Boolean) extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] {
private class BlockSnapshotIterator(rdb: RDB, targetHeight: Int, exportSnapshots: Boolean)
extends AbstractIterator[(Int, Block, Seq[Array[Byte]])] {
var nextTxEntry: Option[(Int, Transaction)] = None
var nextSnapshotEntry: Option[(Int, Array[Byte])] = None

Expand Down Expand Up @@ -159,7 +166,7 @@ object Exporter extends ScorexLogging {
case Some(_) => Seq.empty
case _ => loadTxData[Transaction](Seq.empty, h, txIterator, (h, tx) => nextTxEntry = Some(h -> tx))
}
val snapshots = if (isLightMode) {
val snapshots = if (exportSnapshots) {
nextSnapshotEntry match {
case Some((snapshotHeight, txSnapshot)) if snapshotHeight == h =>
nextSnapshotEntry = None
Expand Down Expand Up @@ -273,8 +280,7 @@ object Exporter extends ScorexLogging {
private[this] final case class ExporterOptions(
configFileName: Option[File] = None,
blocksOutputFileNamePrefix: String = "blockchain",
snapshotsFileNamePrefix: String = "snapshots",
exportSnapshots: Boolean = false,
snapshotsFileNamePrefix: Option[String] = None,
exportHeight: Option[Int] = None,
format: String = Formats.Binary
)
Expand All @@ -296,10 +302,7 @@ object Exporter extends ScorexLogging {
.action((p, c) => c.copy(blocksOutputFileNamePrefix = p)),
opt[String]('s', "snapshot-output-prefix")
.text("Snapshots output file name prefix")
.action((p, c) => c.copy(snapshotsFileNamePrefix = p)),
opt[Unit]('l', "export-snapshots")
.text("Export snapshots for light node")
.action((_, c) => c.copy(exportSnapshots = true)),
.action((p, c) => c.copy(snapshotsFileNamePrefix = Some(p))),
opt[Int]('h', "height")
.text("Export to height")
.action((h, c) => c.copy(exportHeight = Some(h)))
Expand Down
35 changes: 18 additions & 17 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object Importer extends ScorexLogging {
final case class ImportOptions(
configFile: Option[File] = None,
blockchainFile: String = "blockchain",
snapshotsFile: String = "snapshots",
snapshotsFile: Option[String] = None,
importHeight: Int = Int.MaxValue,
format: String = Formats.Binary,
verify: Boolean = true,
Expand All @@ -79,7 +79,7 @@ object Importer extends ScorexLogging {
.action((f, c) => c.copy(blockchainFile = f)),
opt[String]('s', "snapshots-file")
.text("Snapshots data file name")
.action((f, c) => c.copy(snapshotsFile = f)),
.action((f, c) => c.copy(snapshotsFile = Some(f))),
opt[Int]('h', "height")
.text("Import to height")
.action((h, c) => c.copy(importHeight = h))
Expand Down Expand Up @@ -357,10 +357,10 @@ object Importer extends ScorexLogging {
val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, rdb, actorSystem)
checkGenesis(settings, blockchainUpdater, Miner.Disabled)

val (blocksFileOffset, snapshotsFileOffset) =
val blocksFileOffset =
importOptions.format match {
case Formats.Binary =>
var blocksOffset = 0L
var blocksOffset = 0
rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e =>
e.getKey match {
case Array(_, _, 0, 0, 0, 1) => // Skip genesis
Expand All @@ -369,22 +369,23 @@ object Importer extends ScorexLogging {
blocksOffset += meta.size + 4
}
}

var totalSize = 0L
rdb.db.iterateOver(KeyTags.NthTransactionStateSnapshotAtHeight) { e =>
totalSize += (e.getValue.length + 4)
}

val snapshotsOffset = totalSize

blocksOffset -> snapshotsOffset
case _ => 0L -> 0L
blocksOffset
case _ =>
0
}
val blocksInputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, blocksFileOffset), 2 * 1024 * 1024)
val snapshotsInputStream =
if (settings.enableLightMode)
Some(new BufferedInputStream(initFileStream(importOptions.snapshotsFile, snapshotsFileOffset), 20 * 1024 * 1024))
else None
importOptions.snapshotsFile
.map { file =>
val inputStream = new BufferedInputStream(initFileStream(file, 0), 20 * 1024 * 1024)
val sizeBytes = new Array[Byte](Ints.BYTES)
(2 to blockchainUpdater.height).foreach { _ =>
ByteStreams.read(inputStream, sizeBytes, 0, 4)
val snapshotsSize = Ints.fromByteArray(sizeBytes)
ByteStreams.skipFully(inputStream, snapshotsSize)
}
inputStream
}

sys.addShutdownHook {
quit = true
Expand Down
8 changes: 4 additions & 4 deletions node/src/main/scala/com/wavesplatform/database/RDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ object RDB extends StrictLogging {
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx-meta").toPath, 0L)).asJava)
),
new ColumnFamilyDescriptor(
"transactions".utf8Bytes,
"tx".utf8Bytes,
txCfOptions.options
.setCfPaths(Seq(new DbPath(new File(dbDir, "transactions").toPath, 0L)).asJava)
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx").toPath, 0L)).asJava)
),
new ColumnFamilyDescriptor(
"transactions-snapshot".utf8Bytes,
"tx-snapshot".utf8Bytes,
txSnapshotCfOptions.options
.setCfPaths(Seq(new DbPath(new File(dbDir, "transactions-snapshot").toPath, 0L)).asJava)
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx-snapshot").toPath, 0L)).asJava)
)
).asJava,
handles
Expand Down

0 comments on commit 2ebe1f9

Please sign in to comment.