From 2b55ae777572d5cd28fad02263b1ecfb361aff50 Mon Sep 17 00:00:00 2001 From: Peter Dekkers Date: Mon, 27 May 2024 08:16:50 +0200 Subject: [PATCH] Made timestamp in Avro a string --- .../kotlin/org/roboquant/avro/AvroFeed.kt | 43 +++++++------------ .../org/roboquant/avro/PriceItemSerializer.kt | 2 +- .../org/roboquant/samples/AvroSamples.kt | 9 ++-- .../kotlin/org/roboquant/common/TimeSpan.kt | 17 ++++++++ .../kotlin/org/roboquant/common/Timeframe.kt | 5 ++- 5 files changed, 42 insertions(+), 34 deletions(-) diff --git a/roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt b/roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt index 743714af..1941bf4e 100644 --- a/roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt +++ b/roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt @@ -45,11 +45,11 @@ internal const val SCHEMA = """{ "type": "record", "name": "PriceItemV2", "fields": [ - {"name": "timestamp_nanos", "type" : "long"}, + {"name": "timestamp", "type" : "string"}, {"name": "symbol", "type": "string"}, - {"name": "type", "type": { "type": "enum", "name": "item_type", "symbols" : ["BAR", "TRADE", "QUOTE", "BOOK"]}}, + {"name": "type", "type": { "type": "enum", "name": "type", "symbols" : ["BAR", "TRADE", "QUOTE", "BOOK"]}}, {"name": "values", "type": {"type": "array", "items" : "double"}}, - {"name": "other", "type": ["null", "string"], "default": null} + {"name": "meta", "type": ["null", "string"], "default": null} ] }""" @@ -90,13 +90,6 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP return DataFileReader(path.toFile(), GenericDatumReader()) } - private fun ofEpochNano(value: Long): Instant { - return if (value >= 0L) - Instant.ofEpochSecond(value / 1_000_000_000L, value % 1_000_000_000L) - else - Instant.ofEpochSecond(value / 1_000_000_000L, -value % 1_000_000_000L) - } - /** * (Re)play the events of the feed using the provided [EventChannel] * @@ -117,7 +110,7 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP val rec = it.next() // Optimize unnecessary parsing of the whole record - val now = ofEpochNano(rec[0] as Long) + val now = Instant.parse(rec[0] as Utf8) if (now < timeframe) continue if (now != last) { @@ -135,8 +128,8 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP @Suppress("UNCHECKED_CAST") val values = rec.get(3) as List - val other = rec.get("other") as Utf8? - val item = serializer.deserialize(asset, priceItemType, values, other?.toString()) + val meta = rec.get(4) as Utf8? + val item = serializer.deserialize(asset, priceItemType, values, meta?.toString()) items.add(item) } channel.sendNotEmpty(Event(last, items)) @@ -153,14 +146,15 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP getReader().use { while (it.hasNext()) { val position = it.tell() - val t = ofEpochNano(it.next().get(0) as Long) + val t = it.next().get(0) as Utf8 it.seek(position) if (it.hasNext()) { - index.putIfAbsent(t,position) + index.putIfAbsent(Instant.parse(t),position) it.nextBlock() } } } + logger.info { "Index created" } return index } @@ -169,20 +163,15 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP val start = index.firstKey() getReader().use { position(it, index.lastKey()) - var timestamp = index.lastKey().toEpochNano() + var timestamp = index.lastKey() while (it.hasNext()) { - timestamp = it.next().get(0) as Long + timestamp = Instant.parse(it.next().get(0) as Utf8) } - return Timeframe(start, ofEpochNano(timestamp), true) + logger.info { "Timeframe calculated" } + return Timeframe(start, timestamp, true) } } - private fun Instant.toEpochNano(): Long { - var currentTimeNano = epochSecond * 1_000_000_000L - currentTimeNano += if (currentTimeNano > 0) nano else -nano - return currentTimeNano - } - /** * Record the price-actions in a [feed] and store them in an Avro file that can be later used as input for * an AvroFeed. The provided [feed] needs to implement the [AssetFeed] interface. @@ -211,6 +200,7 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP require(exists()) {"File $file doesn't exist yet, cannot append"} dataFileWriter.appendTo(file) } else { + if (exists()) logger.info { "Overwriting existing Avro file $file" } if (compress) dataFileWriter.setCodec(CodecFactory.snappyCodec()) dataFileWriter.setSyncInterval(syncInterval) dataFileWriter.create(schema, file) @@ -229,12 +219,11 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP while (true) { val event = channel.receive() - val now = event.time.toEpochNano() for (action in event.items.filterIsInstance()) { val asset = action.asset - record.put(0, now) + record.put(0, event.time.toString()) record.put(1, asset.symbol) val serialization = serializer.serialize(action) @@ -245,7 +234,7 @@ class AvroFeed(private val path: Path, private val template: Asset = Asset("TEMP arr.addAll(serialization.values) record.put(3, arr) - record.put(4, serialization.other) + record.put(4, serialization.meta) dataFileWriter.append(record) } diff --git a/roboquant-avro/src/main/kotlin/org/roboquant/avro/PriceItemSerializer.kt b/roboquant-avro/src/main/kotlin/org/roboquant/avro/PriceItemSerializer.kt index 7fa85bfc..a5ee44b9 100644 --- a/roboquant-avro/src/main/kotlin/org/roboquant/avro/PriceItemSerializer.kt +++ b/roboquant-avro/src/main/kotlin/org/roboquant/avro/PriceItemSerializer.kt @@ -26,7 +26,7 @@ import org.roboquant.feeds.* */ internal class PriceItemSerializer { - internal class Serialization(val type: PriceItemType, val values: List, val other: String? = null) + internal class Serialization(val type: PriceItemType, val values: List, val meta: String? = null) private val timeSpans = mutableMapOf() diff --git a/roboquant-avro/src/test/kotlin/org/roboquant/samples/AvroSamples.kt b/roboquant-avro/src/test/kotlin/org/roboquant/samples/AvroSamples.kt index da5cde80..34efa867 100644 --- a/roboquant-avro/src/test/kotlin/org/roboquant/samples/AvroSamples.kt +++ b/roboquant-avro/src/test/kotlin/org/roboquant/samples/AvroSamples.kt @@ -55,15 +55,16 @@ internal class AvroSamples { @Test @Ignore - internal fun quotes() = runBlocking { - val tf = Timeframe.parse("2021-01-01", "2022-01-01", true) - val f = RandomWalkFeed(tf, 1.minutes, nAssets = 1, priceType = PriceItemType.QUOTE) + internal fun quotes_hft() = runBlocking { + val tf = Timeframe.parse("2021-01-01T21:00:00Z", "2021-01-01T22:00:00Z", true) + val f = RandomWalkFeed(tf, 10.millis, nAssets = 1, priceType = PriceItemType.QUOTE) val feed = AvroFeed("/tmp/test.avro") feed.record(f, compress = false) assertTrue(feed.exists()) assertEquals(tf, feed.timeframe) - feed.timeframe.sample(10.days, 5).forEach { + feed.timeframe.sample(1.minutes, 5).forEach { + println(it) val account = run(feed, EMAStrategy(), timeframe = it) println(account) } diff --git a/roboquant/src/main/kotlin/org/roboquant/common/TimeSpan.kt b/roboquant/src/main/kotlin/org/roboquant/common/TimeSpan.kt index 49396562..0b0cc609 100644 --- a/roboquant/src/main/kotlin/org/roboquant/common/TimeSpan.kt +++ b/roboquant/src/main/kotlin/org/roboquant/common/TimeSpan.kt @@ -19,6 +19,8 @@ package org.roboquant.common import java.time.* +import java.time.temporal.ChronoUnit +import java.time.temporal.TemporalUnit /** * Deprecated, use [TimeSpan] instead @@ -128,6 +130,21 @@ class TimeSpan internal constructor(internal val period: Period, internal val du override fun hashCode(): Int { return period.hashCode() + duration.hashCode() } + + /** + * Resolution based on contained values + * @return ChronoUnit + */ + fun resolution() : ChronoUnit { + return when { + duration.toNanosPart() != 0 -> ChronoUnit.NANOS + duration.toMillisPart() != 0 -> ChronoUnit.MILLIS + duration.toSecondsPart() != 0 -> ChronoUnit.SECONDS + duration.toMinutesPart() != 0 -> ChronoUnit.MINUTES + duration.toHoursPart() != 0 -> ChronoUnit.HOURS + else -> ChronoUnit.DAYS + } + } } diff --git a/roboquant/src/main/kotlin/org/roboquant/common/Timeframe.kt b/roboquant/src/main/kotlin/org/roboquant/common/Timeframe.kt index 444bd0ed..4054968c 100644 --- a/roboquant/src/main/kotlin/org/roboquant/common/Timeframe.kt +++ b/roboquant/src/main/kotlin/org/roboquant/common/Timeframe.kt @@ -326,7 +326,7 @@ data class Timeframe(val start: Instant, val end: Instant, val inclusive: Boolea fun sample( period: TimeSpan, samples: Int = 1, - resolution: TemporalUnit = ChronoUnit.DAYS, + resolution: TemporalUnit? = period.resolution(), random: Random = Config.random, ): List { require(samples >= 1) { "samples need to be >= 1" } @@ -336,7 +336,8 @@ data class Timeframe(val start: Instant, val end: Instant, val inclusive: Boolea val result = mutableSetOf() while (result.size < samples) { val offset = random.nextLong(duration) - val newStart = start.plusMillis(offset).truncatedTo(resolution) + var newStart = start.plusMillis(offset) + if (resolution != null) newStart = newStart.truncatedTo(resolution) result.add(Timeframe(newStart, newStart + period)) } return result.toList()