Skip to content

Commit

Permalink
Bugfix/time tick conversion (#137)
Browse files Browse the repository at this point in the history
* gofmt

* Remove dead code, add a test

* Add test for loader time parsing

* Fix docs to reflect current system

* Add test that catches broken interval ticks calculation

* Fix incorrect index to time conversion

* gofmt

* Revert

* Fix tests
  • Loading branch information
Notargets authored Nov 1, 2018
1 parent 2b55814 commit d97170e
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 1,390 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
working_directory: ~/github.com/alpacahq/marketstore
steps:
- checkout
- run: make vendor all plugins
- run: make all plugins

test:
docker:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ plugins:
$(MAKE) -C contrib/bitmexfeeder
$(MAKE) -C contrib/binancefeeder

unittest: all
unittest: install
go fmt ./...
go test ./...
$(MAKE) -C tests/integ test
Expand Down
30 changes: 30 additions & 0 deletions cmd/connect/loader/all_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package loader

import (
. "gopkg.in/check.v1"
"testing"
"time"
)

// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }

type LoaderTests struct{}

var _ = Suite(&LoaderTests{})

func (s *LoaderTests) SetUpSuite(c *C) {}
func (s *LoaderTests) TearDownSuite(c *C) {}

func (s *LoaderTests) TestParseTime(c *C) {
tt := time.Date(2016, 12, 30, 21, 59, 20, 383000000, time.UTC)
var fAdj int
timeFormat := "20060102 15:04:05"
dateTime := "20161230 21:59:20 383000"
tzLoc := time.UTC
tTest, err := parseTime(timeFormat, dateTime, tzLoc, fAdj)
c.Assert(err != nil, Equals, true)
formatAdj := len(dateTime) - len(timeFormat)
tTest, err = parseTime(timeFormat, dateTime, tzLoc, formatAdj)
c.Assert(tt == tTest, Equals, true)
}
47 changes: 1 addition & 46 deletions cmd/connect/loader/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,10 @@ package loader

import (
"fmt"
"strconv"
"time"

"github.com/alpacahq/marketstore/executor"
"github.com/alpacahq/marketstore/utils/io"
"strconv"
)

// WriteChunk writes data to the database.
func WriteChunk(dbWriter *executor.Writer, dataShapes []io.DataShape, dbKey io.TimeBucketKey, columnIndex []int, csvDataChunk [][]string, conf *CSVConfig) (start, end time.Time) {

epochCol, nanosCol := readTimeColumns(csvDataChunk, columnIndex, conf)
if epochCol == nil {
fmt.Println("Error building time columns from csv data")
return
}

csmInit := io.NewColumnSeriesMap()
csmInit.AddColumn(dbKey, "Epoch", epochCol)
csm := columnSeriesMapFromCSVData(csmInit, dbKey, csvDataChunk, columnIndex[2:], dataShapes)
csmInit.AddColumn(dbKey, "Nanoseconds", nanosCol)

dsMap := make(map[io.TimeBucketKey][]io.DataShape)
dsMap[dbKey] = dataShapes
rsMap := csm.ToRowSeriesMap(dsMap)
rs := rsMap[dbKey]
if rs.GetNumRows() != len(csvDataChunk) {
fmt.Println("Error obtaining rows from CSV file - not enough rows converted")
fmt.Println("Expected: ", len(csvDataChunk), " Got: ", rs.GetNumRows())
for _, cs := range csm {
fmt.Println("ColNames: ", cs.GetColumnNames())
}
return
}
fmt.Printf("beginning to write %d records...", rs.GetNumRows())
indexTime := make([]time.Time, 0)
for i := 0; i < rs.GetNumRows(); i++ {
indexTime = append(indexTime, time.Unix(epochCol[i], int64(nanosCol[i])).UTC())
}
dbWriter.WriteRecords(indexTime, rs.GetData())

executor.ThisInstance.WALFile.RequestFlush()
fmt.Printf("Done.\n")

start = time.Unix(epochCol[0], 0).UTC()
end = time.Unix(epochCol[len(epochCol)-1], 0).UTC()

return start, end
}

func columnSeriesMapFromCSVData(csmInit io.ColumnSeriesMap, key io.TimeBucketKey, csvRows [][]string, columnIndex []int,
dataShapes []io.DataShape) (csm io.ColumnSeriesMap) {

Expand Down
6 changes: 3 additions & 3 deletions docs/design/file_format_design.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ The timestamp for each data item must be obtainable, but we can avoid storing th

However, if we consider situations where the file contents might have been corrupted by a partial write, we would be greatly helped by having a known value appear more frequently. Ultimately, we must maintain every candle's data, so the best case is to have a per-entry validation of both position and value integrity. A compromise solution uses a key for each data entry that is the position within the time interval for that entry. If there is corruption in the file contents, we will likely see that value corrupted so it can act as a cheap form of validation that we have both(strong) position correctness and (weak) value integrity.

We can store a 64-bit integer value describing the location of each data item within the year, starting at 0 for the first time interval of the first day. This will provide for resolutions up to (us) and will provide enough surface for data corruption to manifest in the key.
We can store a 64-bit integer value describing the location of each data item within the year, starting at 1 for the first time interval of the first day. This will provide for resolutions up to (us) and will provide enough surface for data corruption to manifest in the key.

The index for the N-th time interval within the M-th day of the year will be:
intervals := (intervals/day)
index(N,M) = N + intervals*M
index(N,M) = 1 + N + intervals*M

For example, if intervals is in minutes, we will have 1440 intervals per day (intervals := 1440). If we are accessing the 244th day at 12:00 Noon, the index will be:
index(1220,244) = 1220 + 1440*244 = 352580
index(720,244) = 1 + 720 + 1440*244 = 352081

===> We store a 64-bit integer key for each data value that is the number of intervals since the start of the year

Expand Down
63 changes: 63 additions & 0 deletions executor/ticks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package executor

import (
"fmt"
"github.com/alpacahq/marketstore/utils"
"github.com/alpacahq/marketstore/utils/io"
. "gopkg.in/check.v1"
"math"
"time"
)

var _ = Suite(&TickTests{})

type TickTests struct{}

func (s *TickTests) SetUpSuite(c *C) {}
func (s *TickTests) TearDownSuite(c *C) {}

func (s *TickTests) TestTimeToIntervals(c *C) {
t2 := time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)
index := io.TimeToIndex(t2, time.Minute)
c.Assert(index == 1, Equals, true)
t2 = time.Date(2016, 12, 31, 23, 59, 0, 0, time.UTC)
index = io.TimeToIndex(t2, time.Minute)
c.Assert(index == 366*1440, Equals, true)

//20161230 21:59:20 383000
t1 := time.Date(2016, 12, 30, 21, 59, 20, 383000000, time.UTC)
fmt.Println("LAL t1 = ", t1)

// Check the 1Min interval
utils.InstanceConfig.Timezone = time.UTC
index = io.TimeToIndex(t1, time.Minute)

o_t1 := io.IndexToTime(index, time.Minute, 2016)
//fmt.Println("Index Time: ", o_t1, " Minutes: ", o_t1.Minute(), " Seconds: ", o_t1.Second())
c.Assert(o_t1.Hour(), Equals, 21)
c.Assert(o_t1.Minute(), Equals, 59)
c.Assert(o_t1.Second(), Equals, 0)

o_t1 = io.IndexToTimeDepr(index, 1440, 2016)
fmt.Println("Index Time: ", o_t1, " Minutes: ", o_t1.Minute(), " Seconds: ", o_t1.Second())
c.Assert(o_t1.Hour(), Equals, 21)
c.Assert(o_t1.Minute(), Equals, 59)
c.Assert(o_t1.Second(), Equals, 0)

ticks := io.GetIntervalTicks32Bit(t1, index, 1440)
fmt.Printf("Interval ticks = \t\t\t\t %d\n", int(ticks))

seconds := t1.Second()
nanos := t1.Nanosecond()
fractionalSeconds := float64(seconds) + float64(nanos)/1000000000.
fractionalInterval := fractionalSeconds / 60.
intervalTicks := uint32(fractionalInterval * math.MaxUint32)
fmt.Printf("Manual calculation of interval ticks = \t\t %d\t%f\t%f\n", int(intervalTicks), fractionalSeconds, fractionalInterval)
// Now let's build up a timestamp from the interval ticks
fSec1 := 60. * (float64(intervalTicks) / float64(math.MaxUint32))
fSec := 60. * (float64(ticks) / float64(math.MaxUint32))
fmt.Printf("Fractional seconds from reconstruction: %f, from calc: %f\n", fSec1, fSec)
c.Assert(math.Abs(fSec-20.383) < 0.0000001, Equals, true)

c.Assert(math.Abs(float64(intervalTicks)-float64(ticks)) < 2., Equals, true)
}
52 changes: 25 additions & 27 deletions tests/integ/bin/runtests.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#!/bin/bash

rm -rf testdata test_ticks.csv && mkdir -p testdata/mktsdb

if [ $? -ne 0 ]; then
exit 1
fi
if [ $? -ne 0 ]; then exit 1; fi

marketstore connect -d `pwd`/testdata/mktsdb <<- EOF
\create TEST/1Min/TICK:Symbol/Timeframe/AttributeGroup Bid,Ask/float32 variable
Expand All @@ -13,39 +10,40 @@ marketstore connect -d `pwd`/testdata/mktsdb <<- EOF
\o test_ticks.csv
\show TEST/1Min/TICK 1970-01-01
EOF

if [ $? -ne 0 ]; then
exit 1
fi
if [ $? -ne 0 ]; then exit 1; fi

diff -q bin/ticks-example-1-output.csv test_ticks.csv && echo "Passed"

if [ $? -ne 0 ]; then
exit 1
fi
if [ $? -ne 0 ]; then exit 1; fi

rm -f test_ticks.csv

if [ $? -ne 0 ]; then
exit 1
fi
if [ $? -ne 0 ]; then exit 1; fi

marketstore connect -d `pwd`/testdata/mktsdb <<- EOF
\getinfo TEST/1Min/TICK
\load TEST/1Min/TICK bin/ticks-example-2.csv bin/ticks-example-2.yaml
\getinfo TEST2/1Min/TICK
\create TEST2/1Min/TICK:Symbol/Timeframe/AttributeGroup Bid,Ask/float32 variable
\load TEST2/1Min/TICK bin/ticks-example-2.csv bin/ticks-example-2.yaml
\o test_ticks.csv
\show TEST/1Min/TICK 1970-01-01
\show TEST2/1Min/TICK 1970-01-01
EOF

if [ $? -ne 0 ]; then
exit 1
fi
if [ $? -ne 0 ]; then exit 1; fi

diff -q bin/ticks-example-2-output.csv test_ticks.csv && echo "Passed"
if [ $? -ne 0 ]; then exit 1; fi

rm -f test_ticks.csv
if [ $? -ne 0 ]; then exit 1; fi

if [ $? -ne 0 ]; then
exit 1
fi
marketstore connect -d `pwd`/testdata/mktsdb <<- EOF
\getinfo TEST2/1Min/TICK
\create TEST2/1Min/TICK:Symbol/Timeframe/AttributeGroup Bid,Ask/float32 variable
\load TEST2/1Min/TICK bin/ticks-example-1.csv bin/ticks-example-1.yaml
\o test_ticks.csv
\show TEST2/1Min/TICK 1970-01-01
EOF
if [ $? -ne 0 ]; then exit 1; fi

rm -rf testdata test_ticks.csv
cat bin/ticks-example-1-output.csv bin/ticks-example-2-output.csv > tmp.csv
diff -q tmp.csv test_ticks.csv && echo "Passed"
if [ $? -ne 0 ]; then exit 1; fi

rm -rf testdata test_ticks.csv tmp.csv
Loading

0 comments on commit d97170e

Please sign in to comment.