Skip to content

Commit

Permalink
#1213 added example of rowbuilder interface
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjstevo authored and chris committed Dec 13, 2024
1 parent 4236412 commit 3e31219
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.thread.RunOnceLifeCycleRunner
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.module.basket.BasketConstants
import org.finos.vuu.core.module.basket.BasketModule.BasketConstituentColumnNames.{BasketId, Change, Description, LastTrade, Ric, RicBasketId, Side, Volume, Weighting}
import org.finos.vuu.core.module.basket.csv.BasketLoader
import org.finos.vuu.core.table.{DataTable, RowWithData}
import org.finos.vuu.provider.DefaultProvider
Expand All @@ -13,6 +14,16 @@ class BasketConstituentProvider(val table: DataTable)(implicit lifecycle: Lifecy
private val runner = new RunOnceLifeCycleRunner("BasketConstituentProvider", runOnce)
private val basketLoader = new BasketLoader()

private val ricCol = table.getTableDef.columnForName(Ric)
private val basketIdCol = table.getTableDef.columnForName(BasketId)
private val ricBasketIdCol = table.getTableDef.columnForName(RicBasketId)
private val lastTradeCol = table.getTableDef.columnForName(LastTrade)
private val changeCol = table.getTableDef.columnForName(Change)
private val weightingCol = table.getTableDef.columnForName(Weighting)
private val volumeCol = table.getTableDef.columnForName(Volume)
private val sideCol = table.getTableDef.columnForName(Side)
private val descCol = table.getTableDef.columnForName(Description)

lifecycle(this).dependsOn(runner)

import org.finos.vuu.core.module.basket.BasketModule.BasketConstituentColumnNames._
Expand All @@ -23,10 +34,13 @@ class BasketConstituentProvider(val table: DataTable)(implicit lifecycle: Lifecy
}

def updateBasketConstituents(basketId: String): Unit = {

val list = basketLoader.loadConstituents(basketId)

list.foreach(row => {

if (row.nonEmpty) {

val symbol = row("Symbol").asInstanceOf[String]
val name = row("Name")
val lastTrade = row("Last Trade")
Expand All @@ -35,17 +49,20 @@ class BasketConstituentProvider(val table: DataTable)(implicit lifecycle: Lifecy
val weighting = row("Weighting")
val side = BasketConstants.Side.Buy
val ricBasketId = symbol + "." + basketId
table.processUpdate(ricBasketId, RowWithData(ricBasketId, Map(
Ric -> symbol,
BasketId -> basketId,
RicBasketId -> ricBasketId,
LastTrade -> lastTrade,
Change -> change,
Weighting -> weighting,
Volume -> volume,
Description -> name,
Side -> side
)), clock.now())

val rowData = table.newRow(ricBasketId)
.setString(ricCol, symbol)
.setString(basketIdCol, basketId)
.setString(ricBasketIdCol, ricBasketId)
.setString(lastTradeCol, Option(lastTrade).getOrElse("").toString)
.setString(changeCol, Option(change).getOrElse("").toString)
.setDouble(weightingCol, weighting.asInstanceOf[Double])
.setString(volumeCol, Option(volume).getOrElse("").toString)
.setString(descCol, name.toString)
.setString(sideCol, side)
.asRow

table.processUpdate(rowData, clock.now())
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.SessionTableDef
import org.finos.vuu.core.table.{ColumnValueProvider, InMemSessionDataTable, RowWithData, TableData, TablePrimaryKeys}
import org.finos.vuu.core.table.{ColumnValueProvider, InMemSessionDataTable, RowData, RowWithData, TableData, TablePrimaryKeys}
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.provider.{JoinTableProvider, VirtualizedProvider}

Expand All @@ -17,7 +17,7 @@ class VirtualizedSessionTable(clientSessionId: ClientSessionId,
@volatile private var dataSetSize: Int = 0
@volatile private var range = VirtualizedRange(0, 0)

override def toString: String = s"VirtualizedSessionTable(tableDef=${sessionTableDef.name}, name=${name})"
override def toString: String = s"VirtualizedSessionTable(tableDef=${sessionTableDef.name}, name=$name)"

override def primaryKeys: TablePrimaryKeys = super.primaryKeys

Expand Down Expand Up @@ -55,7 +55,7 @@ class VirtualizedSessionTable(clientSessionId: ClientSessionId,
logger.error("Trying to set range on non-virtualized data, something has gone bad.")
}
}
override def processUpdate(rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = super.processUpdate(rowKey, rowData, timeStamp)
override def processUpdate(rowKey: String, rowData: RowData, timeStamp: Long): Unit = super.processUpdate(rowKey, rowData, timeStamp)

override def processDelete(rowKey: String): Unit = super.processDelete(rowKey)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class VirtualizedSessionTableData(cacheSize: Int)(implicit clock: Clock) extends
}
}

override def update(key: String, update: RowWithData): TableData = {
override def update(key: String, update: RowData): TableData = {
rowCache.put(key, update)
this
}
Expand Down
45 changes: 45 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/row/InMemMapRowBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.finos.vuu.core.row
import org.finos.vuu.core.table.{Column, RowData, RowWithData}

import scala.collection.mutable

class InMemMapRowBuilder extends RowBuilder {

private val mutableMap = new mutable.HashMap[String, Any]()
private var key: String = ""
override def setLong(column: Column, v: Long): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setDouble(column: Column, v: Double): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setInt(column: Column, v: Int): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setString(column: Column, v: String): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setBoolean(column: Column, v: Boolean): RowBuilder = {
mutableMap.put(column.name, v)
this
}
override def setKey(key: String): RowBuilder = {
this.key = key
this
}
override def asRow: RowData = {
val immMap = mutableMap.toMap
val rowData = RowWithData(key, immMap)
mutableMap.clear()
key = ""
rowData
}
}
13 changes: 13 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/row/RowBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.finos.vuu.core.row

import org.finos.vuu.core.table.{Column, RowData}

trait RowBuilder {
def setKey(key: String): RowBuilder
def setLong(column: Column, v: Long): RowBuilder
def setDouble(column: Column, v: Double): RowBuilder
def setInt(column: Column, v: Int): RowBuilder
def setString(column: Column, v: String): RowBuilder
def setBoolean(column: Column, v: Boolean): RowBuilder
def asRow: RowData
}
23 changes: 17 additions & 6 deletions vuu/src/main/scala/org/finos/vuu/core/table/InMemDataTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.finos.vuu.viewport.{RowProcessor, RowSource, ViewPortColumns}
import org.finos.toolbox.collection.array.ImmutableArray
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.text.AsciiUtil
import org.finos.vuu.core.row.{InMemMapRowBuilder, RowBuilder}
import org.finos.vuu.feature.inmem.InMemTablePrimaryKeys

import java.util
Expand All @@ -22,6 +23,8 @@ trait DataTable extends KeyedObservable[RowKeyUpdate] with RowSource {

def updateCounter: Long

def newRow(key: String): RowBuilder

def incrementUpdateCounter(): Unit

def indexForColumn(column: Column): Option[IndexedField[_]]
Expand All @@ -42,7 +45,11 @@ trait DataTable extends KeyedObservable[RowKeyUpdate] with RowSource {

def getTableDef: TableDef

def processUpdate(rowKey: String, rowUpdate: RowWithData, timeStamp: Long): Unit
def processUpdate(rowUpdate: RowData, timeStamp: Long): Unit = {
processUpdate(rowUpdate.key(), rowUpdate, timeStamp)
}

def processUpdate(rowKey: String, rowUpdate: RowData, timeStamp: Long): Unit

def hasRowChanged(row: RowWithData): Boolean = {
val existingRow = this.pullRow(row.key)
Expand Down Expand Up @@ -170,10 +177,10 @@ case class InMemDataTableData(data: ConcurrentHashMap[String, RowData], private

//protected def merge(update: RowUpdate, data: RowData): RowData = MergeFunctions.mergeLeftToRight(update, data)

protected def merge(update: RowWithData, data: RowWithData): RowWithData =
protected def merge(update: RowData, data: RowData): RowData =
MergeFunctions.mergeLeftToRight(update, data)

def update(key: String, update: RowWithData): TableData = {
def update(key: String, update: RowData): TableData = {

val table = data.synchronized {

Expand Down Expand Up @@ -220,6 +227,10 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider

private final val columnValueProvider = InMemColumnValueProvider(this)

override def newRow(key: String): RowBuilder = {
new InMemMapRowBuilder().setKey(key)
}

private def buildIndexForColumn(c: Column): IndexedField[_] = {
c.dataType match {
case DataType.StringDataType =>
Expand Down Expand Up @@ -333,7 +344,7 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider
def columns(): Array[Column] = tableDef.columns
lazy val viewPortColumns: ViewPortColumns = ViewPortColumnCreator.create(this, tableDef.columns.map(_.name).toList)

private def updateIndices(rowkey: String, rowUpdate: RowWithData): Unit = {
private def updateIndices(rowkey: String, rowUpdate: RowData): Unit = {
this.indices.foreach(colTup => {
val column = colTup._1
val index = colTup._2
Expand Down Expand Up @@ -375,7 +386,7 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider
})
}

def update(rowkey: String, rowUpdate: RowWithData): Unit = {
def update(rowkey: String, rowUpdate: RowData): Unit = {
data = data.update(rowkey, rowUpdate)
updateIndices(rowkey, rowUpdate)
}
Expand Down Expand Up @@ -446,7 +457,7 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider
}
}

def processUpdate(rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = {
def processUpdate(rowKey: String, rowData: RowData, timeStamp: Long): Unit = {

onUpdateMeter.mark()

Expand Down
8 changes: 5 additions & 3 deletions vuu/src/main/scala/org/finos/vuu/core/table/JoinTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.finos.vuu.provider.JoinTableProvider
import org.finos.vuu.viewport.{RowProcessor, ViewPortColumns}
import org.finos.toolbox.collection.array.{ImmutableArray, ImmutableArrays}
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.vuu.core.row.RowBuilder
import org.finos.vuu.feature.inmem.InMemTablePrimaryKeys

import java.util
Expand Down Expand Up @@ -104,7 +105,7 @@ case class JoinDataTableData(tableDef: JoinTableDef, var keysByJoinIndex: Array[
map.toMap
}

def rowUpdateToArray(update: RowWithData): Array[Any] = {
def rowUpdateToArray(update: RowData): Array[Any] = {
//val data = columns.map(update.get(_))

var index = 0
Expand Down Expand Up @@ -189,7 +190,7 @@ case class JoinDataTableData(tableDef: JoinTableDef, var keysByJoinIndex: Array[
}
}

def processUpdate(rowKey: String, rowUpdate: RowWithData, joinTable: JoinTable, sourceTables: Map[String, DataTable]): JoinDataTableData = {
def processUpdate(rowKey: String, rowUpdate: RowData, joinTable: JoinTable, sourceTables: Map[String, DataTable]): JoinDataTableData = {

val updateByKeyIndex = rowUpdateToArray(rowUpdate)

Expand Down Expand Up @@ -347,7 +348,7 @@ class JoinTable(val tableDef: JoinTableDef, val sourceTables: Map[String, DataTa

override def incrementUpdateCounter(): Unit = updateCounterInternal +=1

override def processUpdate(rowKey: String, rowUpdate: RowWithData, timeStamp: Long): Unit = {
override def processUpdate(rowKey: String, rowUpdate: RowData, timeStamp: Long): Unit = {

onUpdateMeter.mark()

Expand Down Expand Up @@ -658,4 +659,5 @@ class JoinTable(val tableDef: JoinTableDef, val sourceTables: Map[String, DataTa
}

override def getColumnValueProvider: ColumnValueProvider = InMemColumnValueProvider(this)
override def newRow(key: String): RowBuilder = ???
}
13 changes: 7 additions & 6 deletions vuu/src/main/scala/org/finos/vuu/core/table/MergeFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package org.finos.vuu.core.table

object MergeFunctions {

def mergeLeftToRight(update: RowWithData, data: RowWithData): RowWithData = {
def mergeLeftToRight(update: RowData, data: RowData): RowData = {

assert(update.key == data.key, s"check we're updating the same row ${update.key} != ${data.key}")
assert(update.key == data.key, s"check we're updating the same row ${update.key()} != ${data.key()}")

var newData: RowWithData = data
var newData: RowData = data

update.data.foreach({ case (field, value) => newData = newData.set(field, value) })

//println(">" + newData)
update match {
case update: RowWithData =>
update.data.foreach({ case (field, value) => newData = newData.set(field, value) })
}

newData
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.finos.toolbox.collection.array.ImmutableArray

trait TableData {
def dataByKey(key: String): RowData
def update(key: String, update: RowWithData): TableData
def update(key: String, update: RowData): TableData
def delete(key: String): TableData
def deleteAll(): TableData
def primaryKeyValues: TablePrimaryKeys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TreeSessionTableImpl(val source: RowSource, val session: ClientSessionId,
onRowDeleteFn = fn
}

override def processUpdate(rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = {
override def processUpdate(rowKey: String, rowData: RowData, timeStamp: Long): Unit = {
logger.debug(s"ChrisChris>> GroupBySession processUpdate $rowKey $rowData")
super.processUpdate(rowKey, rowData, timeStamp)
incrementUpdateCounter()
Expand Down

0 comments on commit 3e31219

Please sign in to comment.