Skip to content

Commit

Permalink
Made timestamp in Avro a string
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed May 27, 2024
1 parent eb8a5c4 commit 2b55ae7
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 34 deletions.
43 changes: 16 additions & 27 deletions roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ internal const val SCHEMA = """{
"type": "record",
"name": "PriceItemV2",
"fields": [
{"name": "timestamp_nanos", "type" : "long"},
{"name": "timestamp", "type" : "string"},
{"name": "symbol", "type": "string"},
{"name": "type", "type": { "type": "enum", "name": "item_type", "symbols" : ["BAR", "TRADE", "QUOTE", "BOOK"]}},
{"name": "type", "type": { "type": "enum", "name": "type", "symbols" : ["BAR", "TRADE", "QUOTE", "BOOK"]}},
{"name": "values", "type": {"type": "array", "items" : "double"}},
{"name": "other", "type": ["null", "string"], "default": null}
{"name": "meta", "type": ["null", "string"], "default": null}
]
}"""

Expand Down Expand Up @@ -90,13 +90,6 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP
return DataFileReader(path.toFile(), GenericDatumReader())
}

private fun ofEpochNano(value: Long): Instant {
return if (value >= 0L)
Instant.ofEpochSecond(value / 1_000_000_000L, value % 1_000_000_000L)
else
Instant.ofEpochSecond(value / 1_000_000_000L, -value % 1_000_000_000L)
}

/**
* (Re)play the events of the feed using the provided [EventChannel]
*
Expand All @@ -117,7 +110,7 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP
val rec = it.next()

// Optimize unnecessary parsing of the whole record
val now = ofEpochNano(rec[0] as Long)
val now = Instant.parse(rec[0] as Utf8)
if (now < timeframe) continue

if (now != last) {
Expand All @@ -135,8 +128,8 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP

@Suppress("UNCHECKED_CAST")
val values = rec.get(3) as List<Double>
val other = rec.get("other") as Utf8?
val item = serializer.deserialize(asset, priceItemType, values, other?.toString())
val meta = rec.get(4) as Utf8?
val item = serializer.deserialize(asset, priceItemType, values, meta?.toString())
items.add(item)
}
channel.sendNotEmpty(Event(last, items))
Expand All @@ -153,14 +146,15 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP
getReader().use {
while (it.hasNext()) {
val position = it.tell()
val t = ofEpochNano(it.next().get(0) as Long)
val t = it.next().get(0) as Utf8
it.seek(position)
if (it.hasNext()) {
index.putIfAbsent(t,position)
index.putIfAbsent(Instant.parse(t),position)
it.nextBlock()
}
}
}
logger.info { "Index created" }
return index
}

Expand All @@ -169,20 +163,15 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP
val start = index.firstKey()
getReader().use {
position(it, index.lastKey())
var timestamp = index.lastKey().toEpochNano()
var timestamp = index.lastKey()
while (it.hasNext()) {
timestamp = it.next().get(0) as Long
timestamp = Instant.parse(it.next().get(0) as Utf8)
}
return Timeframe(start, ofEpochNano(timestamp), true)
logger.info { "Timeframe calculated" }
return Timeframe(start, timestamp, true)
}
}

private fun Instant.toEpochNano(): Long {
var currentTimeNano = epochSecond * 1_000_000_000L
currentTimeNano += if (currentTimeNano > 0) nano else -nano
return currentTimeNano
}

/**
* Record the price-actions in a [feed] and store them in an Avro file that can be later used as input for
* an AvroFeed. The provided [feed] needs to implement the [AssetFeed] interface.
Expand Down Expand Up @@ -211,6 +200,7 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP
require(exists()) {"File $file doesn't exist yet, cannot append"}
dataFileWriter.appendTo(file)
} else {
if (exists()) logger.info { "Overwriting existing Avro file $file" }
if (compress) dataFileWriter.setCodec(CodecFactory.snappyCodec())
dataFileWriter.setSyncInterval(syncInterval)
dataFileWriter.create(schema, file)
Expand All @@ -229,12 +219,11 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP

while (true) {
val event = channel.receive()
val now = event.time.toEpochNano()

for (action in event.items.filterIsInstance<PriceItem>()) {

val asset = action.asset
record.put(0, now)
record.put(0, event.time.toString())
record.put(1, asset.symbol)

val serialization = serializer.serialize(action)
Expand All @@ -245,7 +234,7 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP
arr.addAll(serialization.values)
record.put(3, arr)

record.put(4, serialization.other)
record.put(4, serialization.meta)
dataFileWriter.append(record)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.roboquant.feeds.*
*/
internal class PriceItemSerializer {

internal class Serialization(val type: PriceItemType, val values: List<Double>, val other: String? = null)
internal class Serialization(val type: PriceItemType, val values: List<Double>, val meta: String? = null)

private val timeSpans = mutableMapOf<String, TimeSpan>()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,16 @@ internal class AvroSamples {

@Test
@Ignore
internal fun quotes() = runBlocking {
val tf = Timeframe.parse("2021-01-01", "2022-01-01", true)
val f = RandomWalkFeed(tf, 1.minutes, nAssets = 1, priceType = PriceItemType.QUOTE)
internal fun quotes_hft() = runBlocking {
val tf = Timeframe.parse("2021-01-01T21:00:00Z", "2021-01-01T22:00:00Z", true)
val f = RandomWalkFeed(tf, 10.millis, nAssets = 1, priceType = PriceItemType.QUOTE)
val feed = AvroFeed("/tmp/test.avro")
feed.record(f, compress = false)
assertTrue(feed.exists())
assertEquals(tf, feed.timeframe)

feed.timeframe.sample(10.days, 5).forEach {
feed.timeframe.sample(1.minutes, 5).forEach {
println(it)
val account = run(feed, EMAStrategy(), timeframe = it)
println(account)
}
Expand Down
17 changes: 17 additions & 0 deletions roboquant/src/main/kotlin/org/roboquant/common/TimeSpan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.roboquant.common

import java.time.*
import java.time.temporal.ChronoUnit
import java.time.temporal.TemporalUnit

/**
* Deprecated, use [TimeSpan] instead
Expand Down Expand Up @@ -128,6 +130,21 @@ class TimeSpan internal constructor(internal val period: Period, internal val du
override fun hashCode(): Int {
return period.hashCode() + duration.hashCode()
}

/**
* Resolution based on contained values
* @return ChronoUnit
*/
fun resolution() : ChronoUnit {
return when {
duration.toNanosPart() != 0 -> ChronoUnit.NANOS
duration.toMillisPart() != 0 -> ChronoUnit.MILLIS
duration.toSecondsPart() != 0 -> ChronoUnit.SECONDS
duration.toMinutesPart() != 0 -> ChronoUnit.MINUTES
duration.toHoursPart() != 0 -> ChronoUnit.HOURS
else -> ChronoUnit.DAYS
}
}

}

Expand Down
5 changes: 3 additions & 2 deletions roboquant/src/main/kotlin/org/roboquant/common/Timeframe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ data class Timeframe(val start: Instant, val end: Instant, val inclusive: Boolea
fun sample(
period: TimeSpan,
samples: Int = 1,
resolution: TemporalUnit = ChronoUnit.DAYS,
resolution: TemporalUnit? = period.resolution(),
random: Random = Config.random,
): List<Timeframe> {
require(samples >= 1) { "samples need to be >= 1" }
Expand All @@ -336,7 +336,8 @@ data class Timeframe(val start: Instant, val end: Instant, val inclusive: Boolea
val result = mutableSetOf<Timeframe>()
while (result.size < samples) {
val offset = random.nextLong(duration)
val newStart = start.plusMillis(offset).truncatedTo(resolution)
var newStart = start.plusMillis(offset)
if (resolution != null) newStart = newStart.truncatedTo(resolution)
result.add(Timeframe(newStart, newStart + period))
}
return result.toList()
Expand Down

0 comments on commit 2b55ae7

Please sign in to comment.