Skip to content

Commit

Permalink
fix: lint issues (#570)
Browse files Browse the repository at this point in the history
* fix(lint): gocritic issues
  • Loading branch information
dakimura authored Feb 28, 2022
1 parent 13b5577 commit c02fca1
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 179 deletions.
2 changes: 1 addition & 1 deletion contrib/alpaca/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func quoteHandler(q *api.Quote) {
// and stores it to the cache.
func aggregateToMinuteHandler(agg *api.AggregateToMinute) {
writeAggregateToMinute(agg)
updateMetrics("minute_bar", time.Unix(0, int64(1e6*agg.EndTime)))
updateMetrics("minute_bar", time.Unix(0, 1e6*agg.EndTime))
}

func updateMetrics(msgType string, msgTimestamp time.Time) {
Expand Down
39 changes: 17 additions & 22 deletions contrib/alpacabkfeeder/writer/bar_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ type BarWriterImpl struct {
// Write converts the Response of the ListBars API to a ColumnSeriesMap and write it to the local marketstore server.
func (b BarWriterImpl) Write(symbol string, bars []alpaca.Bar) error {
// convert Bar Data to CSM (ColumnSeriesMap)
csm, err := b.convertToCSM(symbol, bars)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to create CSM from Bar Data. %v", bars))
}
csm := b.convertToCSM(symbol, bars)

// write CSM to marketstore
if err := b.MarketStoreWriter.Write(csm); err != nil {
Expand All @@ -42,18 +39,16 @@ func (b BarWriterImpl) Write(symbol string, bars []alpaca.Bar) error {
return nil
}

func (b *BarWriterImpl) convertToCSM(symbol string, bars []alpaca.Bar) (io.ColumnSeriesMap, error) {
var (
epochs []int64
opens []float32
closes []float32
highs []float32
lows []float32
volumes []int32
)
func (b *BarWriterImpl) convertToCSM(symbol string, bars []alpaca.Bar) io.ColumnSeriesMap {
epochs := make([]int64, len(bars))
opens := make([]float32, len(bars))
closes := make([]float32, len(bars))
highs := make([]float32, len(bars))
lows := make([]float32, len(bars))
volumes := make([]int32, len(bars))
csm := io.NewColumnSeriesMap()

for _, bar := range bars {
for i := range bars {
// // skip the symbol which timestamp is empty string and cannot be parsed,
// // which means the symbols have never been executed
// if time.Time(bar.Timestamp) == (time.Time{}) {
Expand All @@ -72,24 +67,24 @@ func (b *BarWriterImpl) convertToCSM(symbol string, bars []alpaca.Bar) (io.Colum

// Start time of each bar is used for "epoch"
// to align with the 1-day chart backfill. ("00:00:00"(starting time of a day) is used for epoch)
epochs = append(epochs, bar.Time)
opens = append(opens, bar.Open)
closes = append(closes, bar.Close)
highs = append(highs, bar.High)
lows = append(lows, bar.Low)
volumes = append(volumes, bar.Volume)
epochs = append(epochs, bars[i].Time)
opens = append(opens, bars[i].Open)
closes = append(closes, bars[i].Close)
highs = append(highs, bars[i].High)
lows = append(lows, bars[i].Low)
volumes = append(volumes, bars[i].Volume)
}

// to avoid that empty array is added to csm when all data are Volume=0 and there is no data to write
if len(epochs) == 0 {
// no data to write.
return csm, nil
return csm
}

cs := b.newColumnSeries(epochs, opens, closes, highs, lows, volumes)
tbk := io.NewTimeBucketKey(symbol + "/" + b.Timeframe + "/OHLCV")
csm.AddColumnSeries(*tbk, cs)
return csm, nil
return csm
}

func (b BarWriterImpl) newColumnSeries(epochs []int64, opens, closes, highs, lows []float32, volumes []int32,
Expand Down
12 changes: 8 additions & 4 deletions contrib/candler/tickcandler/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/alpacahq/marketstore/v4/catalog"
"github.com/alpacahq/marketstore/v4/contrib/candler/tickcandler"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestTickCandler(t *testing.T) {
/*
Create some tick data with symbol "TEST"
*/
createTickBucket("TEST", rootDir, metadata.CatalogDir, metadata.WALFile)
createTickBucket(t, "TEST", rootDir, metadata.CatalogDir, metadata.WALFile)

/*
Read some tick data
Expand Down Expand Up @@ -112,15 +113,17 @@ func TestTickCandler(t *testing.T) {
/*
Utility functions.
*/
func createTickBucket(symbol, rootDir string, catalogDir *catalog.Directory, wf *executor.WALFileType) {
func createTickBucket(t *testing.T, symbol, rootDir string, catalogDir *catalog.Directory, wf *executor.WALFileType) {
t.Helper()
// Create a new variable data bucket
tbk := io.NewTimeBucketKey(symbol + "/1Min/TICK")
tf := utils.NewTimeframe("1Min")
eTypes := []io.EnumElementType{io.FLOAT32, io.FLOAT32}
eNames := []string{"Bid", "Ask"}
dsv := io.NewDataShapeVector(eNames, eTypes)
tbinfo := io.NewTimeBucketInfo(*tf, tbk.GetPathToYearFiles(rootDir), "Test", int16(2016), dsv, io.VARIABLE)
catalogDir.AddTimeBucket(tbk, tbinfo)
err := catalogDir.AddTimeBucket(tbk, tbinfo)
require.Nil(t, err)

/*
Write some data
Expand All @@ -138,7 +141,8 @@ func createTickBucket(symbol, rootDir string, catalogDir *catalog.Directory, wf
ts = ts.Add(time.Second)
row.Epoch = ts.Unix()
buffer, _ := io.Serialize([]byte{}, row)
w.WriteRecords([]time.Time{ts}, buffer, dsv, tbinfo)
err = w.WriteRecords([]time.Time{ts}, buffer, dsv, tbinfo)
require.Nil(t, err)
}
wf.RequestFlush()
}
13 changes: 8 additions & 5 deletions contrib/iex/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/alpacahq/marketstore/v4/executor"
"github.com/alpacahq/marketstore/v4/plugins/trigger"
"github.com/alpacahq/marketstore/v4/utils"
. "github.com/alpacahq/marketstore/v4/utils/io"
utilsio "github.com/alpacahq/marketstore/v4/utils/io"
"github.com/alpacahq/marketstore/v4/utils/log"
)

Expand All @@ -46,7 +46,10 @@ func init() {
}

func main() {
initWriter()
if err := initWriter(); err != nil {
log.Error(err.Error())
os.Exit(1)
}

start, err := time.Parse(format, from)
if err != nil {
Expand Down Expand Up @@ -179,13 +182,13 @@ func makeBars(trades []*tops.TradeReportMessage, openTime, closeTime time.Time)
}

func writeBars(bars []*consolidator.Bar) error {
csm := NewColumnSeriesMap()
csm := utilsio.NewColumnSeriesMap()

for i := range bars {
batch, index := nextBatch(bars, i)

if len(batch) > 0 {
tbk := NewTimeBucketKeyFromString(fmt.Sprintf("%s/1Min/OHLCV", batch[0].Symbol))
tbk := utilsio.NewTimeBucketKeyFromString(fmt.Sprintf("%s/1Min/OHLCV", batch[0].Symbol))

epoch := make([]int64, len(batch))
open := make([]float32, len(batch))
Expand All @@ -203,7 +206,7 @@ func writeBars(bars []*consolidator.Bar) error {
volume[j] = int32(bar.Volume)
}

cs := NewColumnSeries()
cs := utilsio.NewColumnSeries()
cs.AddColumn("Epoch", epoch)
cs.AddColumn("Open", open)
cs.AddColumn("High", high)
Expand Down
79 changes: 39 additions & 40 deletions contrib/ondiskagg/aggtrigger/aggtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,50 +317,49 @@ func aggregate(cs *io.ColumnSeries, aggTbk, baseTbk *io.TimeBucketKey, symbol st
cs2 := bar.GetCs()

return cs2, nil
} else {
// bars to bars
params = []accumParam{
{"Open", "first", "Open"},
{"High", "max", "High"},
{"Low", "min", "Low"},
{"Close", "last", "Close"},
}
if cs.Exists("Volume") {
params = append(params, accumParam{"Volume", "sum", "Volume"})
}
}
// bars to bars
params = []accumParam{
{"Open", "first", "Open"},
{"High", "max", "High"},
{"Low", "min", "Low"},
{"Close", "last", "Close"},
}
if cs.Exists("Volume") {
params = append(params, accumParam{"Volume", "sum", "Volume"})
}

accumGroup := newAccumGroup(cs, params)

ts, _ := cs.GetTime()
outEpoch := make([]int64, 0)

groupKey := timeWindow.Truncate(ts[0])
groupStart := 0
// accumulate inputs. Since the input is ordered by
// time, it is just to slice by correct boundaries
for i, t := range ts {
if !timeWindow.IsWithin(t, groupKey) {
// Emit new row and re-init aggState
outEpoch = append(outEpoch, groupKey.Unix())
if err := accumGroup.apply(groupStart, i); err != nil {
return nil, fmt.Errorf("apply to group. groupStart=%d, i=%d:%w", groupStart, i, err)
}
groupKey = timeWindow.Truncate(t)
groupStart = i
accumGroup := newAccumGroup(cs, params)

ts, _ := cs.GetTime()
outEpoch := make([]int64, 0)

groupKey := timeWindow.Truncate(ts[0])
groupStart := 0
// accumulate inputs. Since the input is ordered by
// time, it is just to slice by correct boundaries
for i, t := range ts {
if !timeWindow.IsWithin(t, groupKey) {
// Emit new row and re-init aggState
outEpoch = append(outEpoch, groupKey.Unix())
if err := accumGroup.apply(groupStart, i); err != nil {
return nil, fmt.Errorf("apply to group. groupStart=%d, i=%d:%w", groupStart, i, err)
}
groupKey = timeWindow.Truncate(t)
groupStart = i
}
// accumulate any remaining values if not yet
outEpoch = append(outEpoch, groupKey.Unix())
if err := accumGroup.apply(groupStart, len(ts)); err != nil {
return nil, fmt.Errorf("apply to group. groupStart=%d, i=%d:%w", groupStart, len(ts), err)
}

// finalize output
outCs := io.NewColumnSeries()
outCs.AddColumn("Epoch", outEpoch)
accumGroup.addColumns(outCs)
return outCs, nil
}
// accumulate any remaining values if not yet
outEpoch = append(outEpoch, groupKey.Unix())
if err := accumGroup.apply(groupStart, len(ts)); err != nil {
return nil, fmt.Errorf("apply to group. groupStart=%d, i=%d:%w", groupStart, len(ts), err)
}

// finalize output
outCs := io.NewColumnSeries()
outCs.AddColumn("Epoch", outEpoch)
accumGroup.addColumns(outCs)
return outCs, nil
}

func (s *OnDiskAggTrigger) query(
Expand Down
2 changes: 1 addition & 1 deletion contrib/ondiskagg/aggtrigger/aggtrigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func getConfig(t *testing.T, data string) (ret map[string]interface{}) {

err := json.Unmarshal([]byte(data), &ret)
require.Nil(t, err)
return
return ret
}

func TestNew(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions contrib/ondiskagg/aggtrigger/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (tfs *timeframes) UpperBound() (tf *utils.Timeframe) {
}
}

return
return tf
}

func (tfs *timeframes) LowerBound() (tf *utils.Timeframe) {
Expand All @@ -41,5 +41,5 @@ func (tfs *timeframes) LowerBound() (tf *utils.Timeframe) {
}
}

return
return tf
}
1 change: 1 addition & 0 deletions contrib/ondiskagg/ondiskagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

// NewTrigger returns a new on-disk aggregate trigger based on the configuration.
// nolint:deadcode // plugin interface
func NewTrigger(conf map[string]interface{}) (trigger.Trigger, error) {
return aggtrigger.NewTrigger(conf)
}
Expand Down
24 changes: 13 additions & 11 deletions contrib/polygon/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ConsolidatedUpdateInfo struct {

var (
WriteTime time.Duration
ApiCallTime time.Duration
APICallTime time.Duration
WaitTime time.Duration
NoIngest bool
)
Expand Down Expand Up @@ -117,7 +117,7 @@ func Bars(client *http.Client, symbol string, from, to time.Time,
if err != nil {
return err
}
ApiCallTime += time.Since(t)
APICallTime += time.Since(t)

if NoIngest {
return nil
Expand Down Expand Up @@ -192,9 +192,11 @@ func tradesToBars(ticks []api.TradeTick, model *models.Bar, exchangeIDs []int) {
return
}

var epoch int64
var open, high, low, close_ float64
var volume int
var (
epoch int64
open, high, low, clos float64
volume int
)

lastBucketTimestamp := time.Time{}

Expand Down Expand Up @@ -229,7 +231,7 @@ func tradesToBars(ticks []api.TradeTick, model *models.Bar, exchangeIDs []int) {
modelsenum.Price(open),
modelsenum.Price(high),
modelsenum.Price(low),
modelsenum.Price(close_),
modelsenum.Price(clos),
modelsenum.Size(volume))
}

Expand All @@ -238,7 +240,7 @@ func tradesToBars(ticks []api.TradeTick, model *models.Bar, exchangeIDs []int) {
open = 0
high = 0
low = math.MaxFloat64
close_ = 0
clos = 0
volume = 0
}

Expand All @@ -261,7 +263,7 @@ func tradesToBars(ticks []api.TradeTick, model *models.Bar, exchangeIDs []int) {
if open == 0 {
open = price
}
close_ = price
clos = price
}

if updateInfo.UpdateVolume {
Expand All @@ -275,7 +277,7 @@ func tradesToBars(ticks []api.TradeTick, model *models.Bar, exchangeIDs []int) {
modelsenum.Price(open),
modelsenum.Price(high),
modelsenum.Price(low),
modelsenum.Price(close_),
modelsenum.Price(clos),
modelsenum.Size(volume))
}
}
Expand All @@ -293,7 +295,7 @@ func Trades(client *http.Client, symbol string, from, to time.Time, batchSize in
trades = append(trades, resp.Results...)
}
}
ApiCallTime += time.Since(t)
APICallTime += time.Since(t)

if NoIngest {
return nil
Expand Down Expand Up @@ -343,7 +345,7 @@ func Quotes(client *http.Client, symbol string, from, to time.Time, batchSize in
quotes = append(quotes, resp.Ticks...)
}
}
ApiCallTime += time.Since(t)
APICallTime += time.Since(t)

if NoIngest {
return nil
Expand Down
Loading

0 comments on commit c02fca1

Please sign in to comment.