Skip to content

Commit

Permalink
removed duplicate DBResource
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Oct 1, 2024
1 parent fc56992 commit 0a0268b
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ object AddressTransactions {
types: Set[Transaction.Type]
) extends AbstractIterator[Seq[(TxMeta, Transaction, Option[TxNum])]] {
private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId, apiHandle))
db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))()
db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))(())

final override def computeNext(): Seq[(TxMeta, Transaction, Option[TxNum])] = db.withSafePrefixIterator { dbIterator =>
val keysBuffer = new ArrayBuffer[Key[Option[(TxMeta, Transaction)]]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object CommonAccountsApi {

private val length: Int = entriesFromDiff.length

db.withSafePrefixIterator(_.seek(prefix))()
db.withSafePrefixIterator(_.seek(prefix))(())

private var nextIndex = 0
private var nextDbEntry: Option[DataEntry[?]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.wavesplatform.state.LeaseDetails
private class LeaseByAddressIterator(resource: DBResource, apiHandle: RDB.ApiHandle, addressId: AddressId)
extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] {
private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId, apiHandle))
resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))()
resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))(())

final override def computeNext(): Seq[(ByteStr, LeaseDetails)] =
resource.withSafePrefixIterator { iterator =>
Expand Down
79 changes: 38 additions & 41 deletions node/src/main/scala/com/wavesplatform/database/DBResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,53 @@ import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator}
import scala.collection.View
import scala.collection.mutable.ArrayBuffer

trait DBResource extends AutoCloseable {
def get[V](key: Key[V]): V
def get(key: Array[Byte]): Array[Byte]
def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A]
def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A]
def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A]
def prefixIterator: RocksIterator // Should have a single instance
def fullIterator: RocksIterator // Should have a single instance
def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A
def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A
}

object DBResource {
def apply(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None): DBResource = new DBResource {
private[this] val snapshot = db.getSnapshot
// checksum may be verification is **very** expensive, so it's explicitly disabled
private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false)
class DBResource(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None) extends AutoCloseable {
private[this] val snapshot = db.getSnapshot
// checksum verification is **very** expensive, so it's explicitly disabled
private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false)

override def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes))
def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes))

override def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key)
def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key)

override def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] =
db.multiGetFlat(readOptions, keys, valBufferSizes)
def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] =
db.multiGetFlat(readOptions, keys, valBufferSizes)

def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] =
db.multiGet(readOptions, keys, valBufferSizes)
def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] =
db.multiGet(readOptions, keys, valBufferSizes)

def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] =
db.multiGet(readOptions, keys, valBufferSize)
def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] =
db.multiGet(readOptions, keys, valBufferSize)

override lazy val prefixIterator: RocksIterator =
db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true))
@volatile private var prefixIteratorWasOpened = false
/**
* Finds the exact key for iter.seek(key) if key.length < 10 and becomes invalid on iter.next().
* Works as intended if prefix(key).length >= 10.
* @see RDB.newColumnFamilyOptions
*/
lazy val prefixIterator: RocksIterator = {
prefixIteratorWasOpened = true
db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true))
}

override lazy val fullIterator: RocksIterator =
db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true))
@volatile private var fullIteratorWasOpened = false
lazy val fullIterator: RocksIterator = {
fullIteratorWasOpened = true
db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true))
}

override def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized {
if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed
}
def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized {
if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed
}

override def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized {
if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed
}
def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized {
if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed
}

override def close(): Unit = {
prefixIterator.synchronized(prefixIterator.close())
fullIterator.synchronized(fullIterator.close())
db.releaseSnapshot(snapshot)
readOptions.close()
}
override def close(): Unit = {
if (prefixIteratorWasOpened) prefixIterator.synchronized(prefixIterator.close())
if (fullIteratorWasOpened) fullIterator.synchronized(fullIterator.close())
db.releaseSnapshot(snapshot)
readOptions.close()
}
}
8 changes: 4 additions & 4 deletions node/src/main/scala/com/wavesplatform/database/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -540,19 +540,19 @@ package object database {
}

def resourceObservable: Observable[DBResource] =
Observable.resource(Task(DBResource(db, None)))(r => Task(r.close()))
Observable.resource(Task(new DBResource(db, None)))(r => Task(r.close()))

def resourceObservable(iteratorCfHandle: ColumnFamilyHandle): Observable[DBResource] =
Observable.resource(Task(DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close()))
Observable.resource(Task(new DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close()))

def withResource[A](f: DBResource => A): A = {
val resource = DBResource(db)
val resource = new DBResource(db)
try f(resource)
finally resource.close()
}

def withResource[A](iteratorCfHandle: ColumnFamilyHandle)(f: DBResource => A): A = {
val resource = DBResource(db, Some(iteratorCfHandle))
val resource = new DBResource(db, Some(iteratorCfHandle))
try f(resource)
finally resource.close()
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.wavesplatform.database

import com.google.common.base.Charsets.UTF_8
import com.google.common.collect.{Interners, Maps}
import com.google.common.collect.Interners
import com.google.common.io.ByteStreams.{newDataInput, newDataOutput}
import com.google.common.io.{ByteArrayDataInput, ByteArrayDataOutput}
import com.google.common.primitives.{Bytes, Ints, Longs}
import com.google.common.primitives.{Ints, Longs}
import com.google.protobuf.ByteString
import com.wavesplatform.account.AddressScheme
import com.wavesplatform.common.state.ByteStr
Expand All @@ -18,15 +18,12 @@ import com.wavesplatform.state.*
import com.wavesplatform.transaction
import com.wavesplatform.transaction.TxPositiveAmount
import com.wavesplatform.utils.*
import monix.eval.Task
import monix.reactive.Observable
import org.rocksdb.*
import sun.nio.ch.Util

import java.nio.ByteBuffer
import java.util
import java.util.Map as JMap
import scala.annotation.tailrec
import scala.collection.View
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.*
Expand Down Expand Up @@ -152,19 +149,18 @@ package object rocksdb {
Right(
LeaseDetails(
LeaseStaticInfo(
d.senderPublicKey.toPublicKey,
PBRecipients.toAddress(d.recipient.get, AddressScheme.current.chainId).explicitGet(),
TxPositiveAmount.unsafeFrom(d.amount),
d.senderPublicKey.toPublicKey,
PBRecipients.toAddress(d.recipient.get, AddressScheme.current.chainId).explicitGet(),
TxPositiveAmount.unsafeFrom(d.amount),
d.sourceId.toByteStr,
d.height
),
d.cancelReason match {
case pb.LeaseDetails.CancelReason.Empty => LeaseDetails.Status.Active
case pb.LeaseDetails.CancelReason.Empty => LeaseDetails.Status.Active
case pb.LeaseDetails.CancelReason.Expired(pb.LeaseDetails.Expired(height, _)) => LeaseDetails.Status.Expired(height)
case pb.LeaseDetails.CancelReason.Cancelled(pb.LeaseDetails.Cancelled(height, transactionId, _)) =>
LeaseDetails.Status.Cancelled(height, Some(transactionId.toByteStr).filter(!_.isEmpty))
}

)
)
}
Expand Down Expand Up @@ -465,35 +461,6 @@ package object rocksdb {
def get[A](key: Key[A], readOptions: ReadOptions): A = key.parse(db.get(readOptions, key.keyBytes))
def has(key: Key[?]): Boolean = db.get(key.keyBytes) != null

def iterateOver(tag: KeyTags.KeyTag)(f: DBEntry => Unit): Unit = iterateOver(tag.prefixBytes)(f)

def iterateOver(prefix: Array[Byte], seekPrefix: Array[Byte] = Array.emptyByteArray, cfh: Option[ColumnFamilyHandle] = None)(
f: DBEntry => Unit
): Unit = {
@tailrec
def loop(iter: RocksIterator): Unit = {
if (iter.isValid && iter.key().startsWith(prefix)) {
f(Maps.immutableEntry(iter.key(), iter.value()))
iter.next()
loop(iter)
} else ()
}

val iterator = db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), new ReadOptions().setTotalOrderSeek(true))
try {
iterator.seek(Bytes.concat(prefix, seekPrefix))
loop(iterator)
} finally iterator.close()
}

def resourceObservable: Observable[DBResource] = Observable.resource(Task(DBResource(db)))(r => Task(r.close()))

def withResource[A](f: DBResource => A): A = {
val resource = DBResource(db)
try f(resource)
finally resource.close()
}

private def getKeyBuffersFromKeys(keys: collection.Seq[Key[?]]): util.List[ByteBuffer] =
keys.map { k =>
val arr = k.keyBytes
Expand Down

0 comments on commit 0a0268b

Please sign in to comment.