diff --git a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala index bfbc56bd26..4fb8afb8d8 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala @@ -135,7 +135,7 @@ object AddressTransactions { types: Set[Transaction.Type] ) extends AbstractIterator[Seq[(TxMeta, Transaction, Option[TxNum])]] { private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId, apiHandle)) - db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))() + db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))(()) final override def computeNext(): Seq[(TxMeta, Transaction, Option[TxNum])] = db.withSafePrefixIterator { dbIterator => val keysBuffer = new ArrayBuffer[Key[Option[(TxMeta, Transaction)]]]() diff --git a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala index 4faaf1b6c1..299d738c27 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala @@ -141,7 +141,7 @@ object CommonAccountsApi { private val length: Int = entriesFromDiff.length - db.withSafePrefixIterator(_.seek(prefix))() + db.withSafePrefixIterator(_.seek(prefix))(()) private var nextIndex = 0 private var nextDbEntry: Option[DataEntry[?]] = None diff --git a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala index f9821a7d21..97211b5b57 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala @@ -9,7 +9,7 @@ import com.wavesplatform.state.LeaseDetails private class LeaseByAddressIterator(resource: DBResource, apiHandle: RDB.ApiHandle, addressId: AddressId) extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] { private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId, apiHandle)) - resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))() + resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))(()) final override def computeNext(): Seq[(ByteStr, LeaseDetails)] = resource.withSafePrefixIterator { iterator => diff --git a/node/src/main/scala/com/wavesplatform/database/DBResource.scala b/node/src/main/scala/com/wavesplatform/database/DBResource.scala index 7577b8047c..0d5afe026a 100644 --- a/node/src/main/scala/com/wavesplatform/database/DBResource.scala +++ b/node/src/main/scala/com/wavesplatform/database/DBResource.scala @@ -5,56 +5,53 @@ import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator} import scala.collection.View import scala.collection.mutable.ArrayBuffer -trait DBResource extends AutoCloseable { - def get[V](key: Key[V]): V - def get(key: Array[Byte]): Array[Byte] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] - def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] - def prefixIterator: RocksIterator // Should have a single instance - def fullIterator: RocksIterator // Should have a single instance - def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A - def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A -} - -object DBResource { - def apply(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None): DBResource = new DBResource { - private[this] val snapshot = db.getSnapshot - // checksum may be verification is **very** expensive, so it's explicitly disabled - private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) +class DBResource(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None) extends AutoCloseable { + private[this] val snapshot = db.getSnapshot + // checksum verification is **very** expensive, so it's explicitly disabled + private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) - override def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes)) + def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes)) - override def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key) + def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key) - override def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] = - db.multiGetFlat(readOptions, keys, valBufferSizes) + def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] = + db.multiGetFlat(readOptions, keys, valBufferSizes) - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] = - db.multiGet(readOptions, keys, valBufferSizes) + def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] = + db.multiGet(readOptions, keys, valBufferSizes) - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] = - db.multiGet(readOptions, keys, valBufferSize) + def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] = + db.multiGet(readOptions, keys, valBufferSize) - override lazy val prefixIterator: RocksIterator = - db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) + @volatile private var prefixIteratorWasOpened = false + /** + * Finds the exact key for iter.seek(key) if key.length < 10 and becomes invalid on iter.next(). + * Works as intended if prefix(key).length >= 10. + * @see RDB.newColumnFamilyOptions + */ + lazy val prefixIterator: RocksIterator = { + prefixIteratorWasOpened = true + db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) + } - override lazy val fullIterator: RocksIterator = - db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true)) + @volatile private var fullIteratorWasOpened = false + lazy val fullIterator: RocksIterator = { + fullIteratorWasOpened = true + db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true)) + } - override def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized { - if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed - } + def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized { + if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed + } - override def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized { - if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed - } + def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized { + if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed + } - override def close(): Unit = { - prefixIterator.synchronized(prefixIterator.close()) - fullIterator.synchronized(fullIterator.close()) - db.releaseSnapshot(snapshot) - readOptions.close() - } + override def close(): Unit = { + if (prefixIteratorWasOpened) prefixIterator.synchronized(prefixIterator.close()) + if (fullIteratorWasOpened) fullIterator.synchronized(fullIterator.close()) + db.releaseSnapshot(snapshot) + readOptions.close() } } diff --git a/node/src/main/scala/com/wavesplatform/database/package.scala b/node/src/main/scala/com/wavesplatform/database/package.scala index 36d6ce8631..59a768b73f 100644 --- a/node/src/main/scala/com/wavesplatform/database/package.scala +++ b/node/src/main/scala/com/wavesplatform/database/package.scala @@ -540,19 +540,19 @@ package object database { } def resourceObservable: Observable[DBResource] = - Observable.resource(Task(DBResource(db, None)))(r => Task(r.close())) + Observable.resource(Task(new DBResource(db, None)))(r => Task(r.close())) def resourceObservable(iteratorCfHandle: ColumnFamilyHandle): Observable[DBResource] = - Observable.resource(Task(DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close())) + Observable.resource(Task(new DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close())) def withResource[A](f: DBResource => A): A = { - val resource = DBResource(db) + val resource = new DBResource(db) try f(resource) finally resource.close() } def withResource[A](iteratorCfHandle: ColumnFamilyHandle)(f: DBResource => A): A = { - val resource = DBResource(db, Some(iteratorCfHandle)) + val resource = new DBResource(db, Some(iteratorCfHandle)) try f(resource) finally resource.close() } diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/DBResource.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/DBResource.scala deleted file mode 100644 index a020084d2d..0000000000 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/DBResource.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.wavesplatform.database.rocksdb - -import org.rocksdb.{ReadOptions, RocksDB, RocksIterator} - -import scala.collection.View -import scala.collection.mutable.ArrayBuffer - -trait DBResource extends AutoCloseable { - def get[V](key: Key[V]): V - def get(key: Array[Byte]): Array[Byte] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] - def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] - def prefixIterator: RocksIterator // Should have a single instance - def fullIterator: RocksIterator - def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A - def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A -} - -object DBResource { - def apply(db: RocksDB): DBResource = new DBResource { - private[this] val snapshot = db.getSnapshot - private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) - - override def get[V](key: Key[V]): V = key.parse(db.get(readOptions, key.keyBytes)) - - override def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key) - - override def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] = - db.multiGetFlat(readOptions, keys, valBufferSizes) - - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] = - db.multiGet(readOptions, keys, valBufferSizes) - - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] = - db.multiGet(readOptions, keys, valBufferSize) - - @volatile private var prefixIteratorWasOpened = false - /** - * Finds the exact key for iter.seek(key) if key.length < 10 and becomes invalid on iter.next(). - * Works as intended if prefix(key).length >= 10. - * @see RDB.newColumnFamilyOptions - */ - override lazy val prefixIterator: RocksIterator = { - prefixIteratorWasOpened = true - db.newIterator(readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) - } - - @volatile private var fullIteratorWasOpened = false - override lazy val fullIterator: RocksIterator = { - fullIteratorWasOpened = true - db.newIterator(readOptions.setTotalOrderSeek(true)) - } - - override def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized { - if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed - } - - override def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized { - if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed - } - - override def close(): Unit = { - if (prefixIteratorWasOpened) prefixIterator.synchronized(prefixIterator.close()) - if (fullIteratorWasOpened) fullIterator.synchronized(fullIterator.close()) - snapshot.close() - readOptions.close() - } - } -} diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/package.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/package.scala index b05dbdf68e..7c0f1bdc9e 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/package.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/package.scala @@ -1,10 +1,10 @@ package com.wavesplatform.database import com.google.common.base.Charsets.UTF_8 -import com.google.common.collect.{Interners, Maps} +import com.google.common.collect.Interners import com.google.common.io.ByteStreams.{newDataInput, newDataOutput} import com.google.common.io.{ByteArrayDataInput, ByteArrayDataOutput} -import com.google.common.primitives.{Bytes, Ints, Longs} +import com.google.common.primitives.{Ints, Longs} import com.google.protobuf.ByteString import com.wavesplatform.account.AddressScheme import com.wavesplatform.common.state.ByteStr @@ -18,15 +18,12 @@ import com.wavesplatform.state.* import com.wavesplatform.transaction import com.wavesplatform.transaction.TxPositiveAmount import com.wavesplatform.utils.* -import monix.eval.Task -import monix.reactive.Observable import org.rocksdb.* import sun.nio.ch.Util import java.nio.ByteBuffer import java.util import java.util.Map as JMap -import scala.annotation.tailrec import scala.collection.View import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.* @@ -152,19 +149,18 @@ package object rocksdb { Right( LeaseDetails( LeaseStaticInfo( - d.senderPublicKey.toPublicKey, - PBRecipients.toAddress(d.recipient.get, AddressScheme.current.chainId).explicitGet(), - TxPositiveAmount.unsafeFrom(d.amount), + d.senderPublicKey.toPublicKey, + PBRecipients.toAddress(d.recipient.get, AddressScheme.current.chainId).explicitGet(), + TxPositiveAmount.unsafeFrom(d.amount), d.sourceId.toByteStr, d.height ), d.cancelReason match { - case pb.LeaseDetails.CancelReason.Empty => LeaseDetails.Status.Active + case pb.LeaseDetails.CancelReason.Empty => LeaseDetails.Status.Active case pb.LeaseDetails.CancelReason.Expired(pb.LeaseDetails.Expired(height, _)) => LeaseDetails.Status.Expired(height) case pb.LeaseDetails.CancelReason.Cancelled(pb.LeaseDetails.Cancelled(height, transactionId, _)) => LeaseDetails.Status.Cancelled(height, Some(transactionId.toByteStr).filter(!_.isEmpty)) } - ) ) } @@ -465,35 +461,6 @@ package object rocksdb { def get[A](key: Key[A], readOptions: ReadOptions): A = key.parse(db.get(readOptions, key.keyBytes)) def has(key: Key[?]): Boolean = db.get(key.keyBytes) != null - def iterateOver(tag: KeyTags.KeyTag)(f: DBEntry => Unit): Unit = iterateOver(tag.prefixBytes)(f) - - def iterateOver(prefix: Array[Byte], seekPrefix: Array[Byte] = Array.emptyByteArray, cfh: Option[ColumnFamilyHandle] = None)( - f: DBEntry => Unit - ): Unit = { - @tailrec - def loop(iter: RocksIterator): Unit = { - if (iter.isValid && iter.key().startsWith(prefix)) { - f(Maps.immutableEntry(iter.key(), iter.value())) - iter.next() - loop(iter) - } else () - } - - val iterator = db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), new ReadOptions().setTotalOrderSeek(true)) - try { - iterator.seek(Bytes.concat(prefix, seekPrefix)) - loop(iterator) - } finally iterator.close() - } - - def resourceObservable: Observable[DBResource] = Observable.resource(Task(DBResource(db)))(r => Task(r.close())) - - def withResource[A](f: DBResource => A): A = { - val resource = DBResource(db) - try f(resource) - finally resource.close() - } - private def getKeyBuffersFromKeys(keys: collection.Seq[Key[?]]): util.List[ByteBuffer] = keys.map { k => val arr = k.keyBytes