Skip to content

Commit

Permalink
#989 Added first working in simul main of Virtualized table.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjstevo committed Jan 3, 2024
1 parent 18bf20f commit 857a681
Show file tree
Hide file tree
Showing 52 changed files with 619 additions and 152 deletions.
6 changes: 6 additions & 0 deletions example/main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
<version>0.9.36-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.finos.vuu</groupId>
<artifactId>virtualized-table</artifactId>
<version>0.9.36-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.finos.vuu</groupId>
<artifactId>price</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions example/main/src/main/scala/org/finos/vuu/SimulMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import org.finos.vuu.core.module.price.PriceModule
import org.finos.vuu.core.module.simul.SimulationModule
import org.finos.vuu.core.module.typeahead.TypeAheadModule
import org.finos.vuu.core.module.vui.VuiStateModule
import org.finos.vuu.example.virtualtable.module.VirtualTableModule
import org.finos.vuu.net.auth.AlwaysHappyAuthenticator
import org.finos.vuu.net.http.VuuHttp2ServerOptions
import org.finos.vuu.net.{AlwaysHappyLoginValidator, Authenticator, LoggedInTokenValidator}
import org.finos.vuu.order.oms.OmsApi
import org.finos.vuu.plugin.virtualized.VirtualizedTablePlugin
import org.finos.vuu.state.MemoryBackedVuiStateStore

/*
Expand Down Expand Up @@ -86,6 +88,8 @@ object SimulMain extends App with StrictLogging {
.withModule(EditableModule())
.withModule(PermissionModule())
.withModule(BasketModule(omsApi))
.withModule(VirtualTableModule())
.withPlugin(VirtualizedTablePlugin)

val vuuServer = new VuuServer(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ case class BigDataOrder(orderId: Long, quantity: Int, price: Long, side: String,
class FakeBigDataCache {

private val random = new Random()
private final val DATASET_SIZE = 1_000_000 //this would be dynamically loaded from the data source in a real example
private final val DATASET_SIZE = 1_000_000_000 //this would be dynamically loaded from the data source in a real example

def loadOrdersInRange(from: Int, to: Int): (Int, List[(Int, BigDataOrder)]) = {

//lets fake some processing time, 30 millis should do....
Thread.sleep(30)
// Thread.sleep(30)

val bigOrdersWithIndex = (from until to).map( i => {
random.setSeed(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object VirtualTableModule extends DefaultModule{
keyField = "orderId",
Columns.fromNames("orderId".string(), "quantity".int(), "price".long(), "side".string(), "trader".string())
),
(table, vs) => new ReallyBigVirtualizedDataProvider(table),
(table, vs) => new ReallyBigVirtualizedDataProvider(),
(table, _, _, _) => ViewPortDef(
columns = table.getTableDef.columns,
service = new VirtualService()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package org.finos.vuu.example.virtualtable.provider

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.table.{DataTable, RowWithData}
import org.finos.vuu.example.virtualtable.bigdatacache.FakeBigDataCache
import org.finos.vuu.plugin.virtualized.table.{VirtualizedRange, VirtualizedSessionTable}
import org.finos.vuu.plugin.virtualized.table.{VirtualizedRange, VirtualizedSessionTable, VirtualizedViewPortKeys}
import org.finos.vuu.provider.VirtualizedProvider
import org.finos.vuu.viewport.ViewPort

class ReallyBigVirtualizedDataProvider(val table: DataTable)(implicit clock: Clock) extends VirtualizedProvider {
class ReallyBigVirtualizedDataProvider(implicit clock: Clock) extends VirtualizedProvider with StrictLogging {

final val cache = new FakeBigDataCache
final val internalTable = table.asInstanceOf[VirtualizedSessionTable]

override def runOnce(viewPort: ViewPort): Unit = {

logger.info("[ReallyBigVirtualizedDataProvider] Starting runOnce")

//if this were a real virtualized provider
//I would delegate these sorts and filters down into
//the provider itself, in this example, I'm going to cheat and ignore them :-)
Expand All @@ -24,32 +26,46 @@ class ReallyBigVirtualizedDataProvider(val table: DataTable)(implicit clock: Clo

//typically we would want to get a bigger data set than the viewport is specifically looking at
//as is probably more efficient, in this case we'll get just what they are asking for....
val startIndex = range.from
val endIndex = range.to
val startIndex = Math.min((range.from - 5000), 0)
val endIndex = range.to + 5000

val (totalSize, bigOrders) = cache.loadOrdersInRange(startIndex, endIndex)
logger.info("[ReallyBigVirtualizedDataProvider] Loading orders from Big Data Cache")

internalTable.setRange(VirtualizedRange(startIndex, endIndex))
internalTable.setSize(totalSize)
val (totalSize, bigOrders) = cache.loadOrdersInRange(startIndex, endIndex)

bigOrders.foreach({case(index, order) => {
val rowWithData = RowWithData(order.orderId.toString,
Map("orderId" -> order.orderId.toString, "quantity" -> order.quantity, "price" -> order.price,
"side" -> order.side, "trader" -> order.trader)
)
internalTable.processUpdateForIndex(index, order.orderId.toString, rowWithData, clock.now())
}})
viewPort.table.asTable match {
case tbl: VirtualizedSessionTable =>
logger.info("[ReallyBigVirtualizedDataProvider] Set Range")
tbl.setRange(VirtualizedRange(startIndex, endIndex))
logger.info("[ReallyBigVirtualizedDataProvider] Set Size")
tbl.setSize(totalSize)
logger.info("[ReallyBigVirtualizedDataProvider] Adding rows ")
bigOrders.foreach({case(index, order) => {
val rowWithData = RowWithData(order.orderId.toString,
Map("orderId" -> order.orderId.toString, "quantity" -> order.quantity, "price" -> order.price,
"side" -> order.side, "trader" -> order.trader)
)
tbl.processUpdateForIndex(index, order.orderId.toString, rowWithData, clock.now())
}})

logger.info("[ReallyBigVirtualizedDataProvider] Getting Primary Keys")
val tableKeys = tbl.primaryKeys

logger.info("[ReallyBigVirtualizedDataProvider] Setting Primary Keys")
viewPort.setKeys(new VirtualizedViewPortKeys(tableKeys))
}
logger.info("[ReallyBigVirtualizedDataProvider] Complete runOnce")
}

override def subscribe(key: String): Unit = ???
override def subscribe(key: String): Unit = {}

override def doStart(): Unit = ???
override def doStart(): Unit = {}

override def doStop(): Unit = ???
override def doStop(): Unit = {}

override def doInitialize(): Unit = ???
override def doInitialize(): Unit = {}

override def doDestroy(): Unit = ???
override def doDestroy(): Unit = {}

override val lifecycleId: String = ???
override val lifecycleId: String = "org.finos.vuu.example.virtualtable.provider.ReallyBigVirtualizedDataProvider"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.finos.vuu.example.virtualized.table

import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl}
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.{Clock, TestFriendlyClock}
import org.finos.vuu.core.module.TableDefContainer
import org.finos.vuu.example.virtualtable.module.VirtualTableModule
import org.finos.vuu.plugin.virtualized.VirtualizedTablePlugin
import org.finos.vuu.provider.VirtualizedProvider
import org.finos.vuu.test.VuuServerTestCase
import org.finos.vuu.util.table.TableAsserts.assertVpEq
import org.finos.vuu.viewport.ViewPortRange
import org.scalatest.prop.Tables.Table

class VirtualizedViewPortTest extends VuuServerTestCase {

Feature("Virtualized Table Viewport") {

Scenario("Check creation of a virtualized viewport") {

implicit val clock: Clock = new TestFriendlyClock(10001L)
implicit val lifecycle: LifecycleContainer = new LifecycleContainer()
implicit val tableDefContainer: TableDefContainer = new TableDefContainer(Map())
implicit val metricsProvider: MetricsProvider = new MetricsProviderImpl

withVuuServer(VirtualTableModule()) {
vuuServer =>

vuuServer.registerPlugin(new VirtualizedTablePlugin())

vuuServer.login("testUser", "testToken")

val viewport = vuuServer.createViewPort(VirtualTableModule.NAME, "bigOrders", ViewPortRange(0, 10))

val virtualizedProvider = viewport.table.asTable.getProvider.asInstanceOf[VirtualizedProvider]

virtualizedProvider.runOnce(viewport)

assertVpEq(combineQsForVp(viewport)) {
Table(
("orderId" ,"quantity","price" ,"side" ,"trader" ),
("0" ,-1155484576,-3109364765729502342L,"Buy" ,"trader1" ),
("1" ,-1155869325,1853403699951111791L,"Sell" ,"trader1" ),
("2" ,-1154715079,5411842376618821008L,"Sell" ,"trader1" ),
("3" ,-1155099828,-8072133231410116475L,"Buy" ,"trader1" ),
("4" ,-1157023572,-1705034981011564721L,"Buy" ,"trader1" ),
("5" ,-1157408321,3257733484669049412L,"Buy" ,"trader1" ),
("6" ,-1156254074,6816172161336758629L,"Sell" ,"trader1" ),
("7" ,-1156638823,-6667803446692178854L,"Sell" ,"trader1" ),
("8" ,-1158562568,-300705196293627100L,"Sell" ,"trader1" ),
("9" ,-1158947317,4662063269386987033L,"Buy" ,"trader1" )
)
}

viewport.setRange(ViewPortRange(5, 15))

assertVpEq(combineQsForVp(viewport)) {
Table(
("orderId" ,"quantity","price" ,"side" ,"trader" ),
(null ,null ,null ,null ,null )
)
}

virtualizedProvider.runOnce(viewport)

assertVpEq(combineQsForVp(viewport)) {
Table(
("orderId" ,"quantity","price" ,"side" ,"trader" ),
("10" ,-1157793070,8220501950349663546L,"Sell" ,"trader1" ),
("11" ,-1158177819,-5263473657679273937L,"Sell" ,"trader1" ),
("12" ,-1160101563,1103624592719277817L,"Sell" ,"trader1" ),
("13" ,-1160486312,6066393058399891950L,"Sell" ,"trader1" ),
("14" ,-1159332065,-8821912338641950449L,"Buy" ,"trader1" ),
("5" ,-1157408321,3257733484669049412L,"Buy" ,"trader1" ),
("6" ,-1156254074,6816172161336758629L,"Sell" ,"trader1" ),
("7" ,-1156638823,-6667803446692178854L,"Sell" ,"trader1" ),
("8" ,-1158562568,-300705196293627100L,"Sell" ,"trader1" ),
("9" ,-1158947317,4662063269386987033L,"Buy" ,"trader1" )
)
}
}

}

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.finos.vuu.feature.ignite

import org.finos.toolbox.jmx.MetricsProvider
import org.finos.vuu.feature.{FilterFactory, JoinTableFactory, SessionTableFactory, SortFactory, TableFactory, ViewPortCallableFactory, ViewPortFactory, ViewPortKeysCreator, ViewPortTreeCallableFactory}
import org.finos.vuu.feature.{FilterFactory, JoinTableFactory, SessionTableFactory, SortFactory, TableFactory, ViewPortCallableFactory, ViewPortFactory, ViewPortKeysCreator, ViewPortTableCreator, ViewPortTreeCallableFactory}
import org.finos.vuu.plugin.{DefaultPlugin, PluginType}

object VuuIgnitePluginType extends PluginType
Expand Down Expand Up @@ -29,4 +29,6 @@ object VuuIgnitePlugin extends DefaultPlugin {
override def viewPortCallableFactory: ViewPortCallableFactory = ???

override def viewPortTreeCallableFactory: ViewPortTreeCallableFactory = ???

override def viewPortTableCreator: ViewPortTableCreator = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.finos.vuu.plugin.virtualized

import org.finos.toolbox.jmx.MetricsProvider
import org.finos.vuu.api.TableDef
import org.finos.vuu.core.table.InMemDataTable
import org.finos.vuu.feature.{FilterFactory, JoinTableFactory, SessionTableFactory, SortFactory, TableFactory, ViewPortCallableFactory, ViewPortFactory, ViewPortKeysCreator, ViewPortTableCreator, ViewPortTreeCallableFactory}
import org.finos.vuu.plugin.virtualized.plugin.ViewPortVirtualizedTableCreator
import org.finos.vuu.plugin.virtualized.viewport.VirtualizedViewPortCallableFactory
import org.finos.vuu.plugin.{DefaultPlugin, PluginType}
import org.finos.vuu.provider.JoinTableProvider


object VirtualizedTablePlugin extends DefaultPlugin {

final val callableFactory = new VirtualizedViewPortCallableFactory

override def tableFactory(implicit metrics: MetricsProvider): TableFactory = (tableDef: TableDef, joinTableProvider: JoinTableProvider) => {
new InMemDataTable(tableDef, joinTableProvider)
}

override def pluginType: PluginType = VirtualizedTablePluginType

override def joinTableFactory(implicit metrics: MetricsProvider): JoinTableFactory = ???

override def sessionTableFactory: SessionTableFactory = ???

override def viewPortKeysCreator: ViewPortKeysCreator = ???

override def viewPortFactory: ViewPortFactory = ???

override def filterFactory: FilterFactory = ???

override def sortFactory: SortFactory = ???

override def viewPortCallableFactory: ViewPortCallableFactory = callableFactory

override def viewPortTreeCallableFactory: ViewPortTreeCallableFactory = ???

override def viewPortTableCreator: ViewPortTableCreator = ViewPortVirtualizedTableCreator
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.finos.vuu.plugin.virtualized

import org.finos.vuu.plugin.PluginType

object VirtualizedTablePluginType extends PluginType
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package org.finos.vuu.plugin.virtualized.api

import org.finos.vuu.api.{Indices, SessionTableDef, VisualLinks}
import org.finos.vuu.core.table.Column
import org.finos.vuu.plugin.PluginType
import org.finos.vuu.plugin.virtualized.VirtualizedTablePluginType

case class VirtualizedSessionTableDef (override val name: String, override val keyField: String,
override val columns: Array[Column]) extends SessionTableDef(name, keyField, columns, Seq(), false, VisualLinks(), Indices()){

override def pluginType: PluginType = VirtualizedTablePluginType
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.finos.vuu.plugin.virtualized.plugin

import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.table.TableContainer
import org.finos.vuu.feature.ViewPortTableCreator
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.plugin.virtualized.api.VirtualizedSessionTableDef
import org.finos.vuu.plugin.virtualized.table.VirtualizedSessionTable
import org.finos.vuu.viewport.{GroupBy, RowSource}

object ViewPortVirtualizedTableCreator extends ViewPortTableCreator{

override def create(table: RowSource, clientSession: ClientSessionId, groupBy: GroupBy, tableContainer: TableContainer)(implicit metrics: MetricsProvider, clock: Clock): RowSource = {

assert(table.asTable.getTableDef.isInstanceOf[VirtualizedSessionTableDef])

val sessionTableDef = table.asTable.getTableDef.asInstanceOf[VirtualizedSessionTableDef]

val sessionTable = new VirtualizedSessionTable(clientSession, sessionTableDef, tableContainer.joinTableProvider, cacheSize = 20_000)

val archetypeTable = tableContainer.getTable(sessionTableDef.name)

val provider = archetypeTable.getProvider

sessionTable.setProvider(provider)

sessionTable
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class VirtualizedSessionTable(clientSessionId: ClientSessionId,
@volatile private var dataSetSize: Int = 0
@volatile private var range = VirtualizedRange(0, 0)

override def toString: String = s"VirtualizedSessionTable(tableDef=${sessionTableDef.name}, name=${name})"

override def primaryKeys: TablePrimaryKeys = super.primaryKeys

override protected def createDataTableData(): TableData = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.finos.vuu.plugin.virtualized.table

import org.finos.toolbox.collection.window.MovingWindow
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.table._

class VirtualizedSessionTableData(cacheSize: Int) extends TableData {
class VirtualizedSessionTableData(cacheSize: Int)(implicit clock: Clock) extends TableData {

final val rowCache: RollingCache[String, RowData] = RowDataCache(cacheSize)
final val keysWindow: MovingWindow[String] = RollingKeysWindow(cacheSize)
final val rowCache: WindowedCache[String, RowData] = RowDataCache(cacheSize)
final val keysWindow: MovingWindow[String] = WindowedTableKeys(cacheSize)

@volatile var length: Int = 0

Expand Down
Loading

0 comments on commit 857a681

Please sign in to comment.