Skip to content

Commit

Permalink
switch from period to forecast_time tag/label
Browse files Browse the repository at this point in the history
forecast_time is 0 for past data
  • Loading branch information
Ted Pearson committed Dec 16, 2021
1 parent 5a8aa75 commit d4ed3b6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 49 deletions.
45 changes: 8 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package main

import (
"fmt"
"log"
"net/http"
"net/url"
"strings"
"time"

"github.com/gregjones/httpcache"
"github.com/gregjones/httpcache/diskcache"
"github.com/pkg/errors"
"github.com/spf13/viper"
myhttp "github.com/tedpearson/weather2influxdb/http"
"github.com/tedpearson/weather2influxdb/influx"
Expand Down Expand Up @@ -55,8 +50,6 @@ func main() {
retryer: retryer,
config: config,
}
app.DeleteSeries(config.InfluxDB.Host, config.Forecast.MeasurementName,
config.Astronomy.MeasurementName)

for _, location := range config.Locations {
for _, src := range config.Sources.Enabled {
Expand Down Expand Up @@ -96,13 +89,13 @@ func (app App) RunForecast(src string, loc Location) {
MeasurementName: c.Forecast.MeasurementName,
Location: loc.Name,
Database: c.InfluxDB.Database,
Period: weather.Future,
ForecastTime: time.Now().Truncate(time.Hour).Format("2006-01-02:15"),
}

// write forecast

log.Printf(`Writing %d points {loc:"%s", src:"%s", measurement:"%s"`, len(records.Values),
loc.Name, src, c.Forecast.MeasurementName)
log.Printf(`Writing %d points {loc:"%s", src:"%s", measurement:"%s", forecast_time:"%s"}`,
len(records.Values), loc.Name, src, c.Forecast.MeasurementName, forecastOptions.ForecastTime)
err = app.writer.WriteMeasurements(
records.ToPoints(forecastOptions))
if err != nil {
Expand All @@ -116,7 +109,7 @@ func (app App) RunForecast(src string, loc Location) {
Values: []weather.Record{record},
}
nextHourOptions := forecastOptions
nextHourOptions.Period = weather.Past
nextHourOptions.ForecastTime = "0"
err = app.writer.WriteMeasurements(nextHourRecord.ToPoints(nextHourOptions))
break
}
Expand All @@ -129,29 +122,6 @@ func (app App) RunForecast(src string, loc Location) {
}
}

func (app App) DeleteSeries(host string, measurements ...string) {
// delete series from victoriametrics
path := fmt.Sprintf(`%s/api/v1/admin/tsdb/delete_series`, host)
joined := strings.Join(measurements, "|")
// note: kinda dangerous delete pattern. easy to accidentally delete everything
match := fmt.Sprintf(`{__name__=~"(%s).+",period="%s"}`, joined, weather.Future)
values := url.Values{
"match": {match},
}
client := &http.Client{}
req, err := http.NewRequest("POST", path, strings.NewReader(values.Encode()))
if err != nil {
panic(errors.Wrap(err, "Failed to create http request"))
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
ic := app.config.InfluxDB
req.SetBasicAuth(ic.User, ic.Password)
_, err = client.Do(req)
if err != nil {
panic(errors.Wrap(err, "Failed to delete series from victoriametrics."))
}
}

func (app App) RunAstrocast(forecaster weather.Forecaster, options weather.WriteOptions) {
astrocaster, ok := forecaster.(weather.Astrocaster)
if ok {
Expand All @@ -160,8 +130,9 @@ func (app App) RunAstrocast(forecaster weather.Forecaster, options weather.Write
log.Printf("%+v", err)
return
}
log.Printf(`Writing %d points {loc:"%s", src:"%s", measurement:"%s"`, len(events.Values),
options.Location, options.ForecastSource, options.MeasurementName)
log.Printf(`Writing %d points {loc:"%s", src:"%s", measurement:"%s", forecast_time:"%s"}`,
len(events.Values), options.Location, options.ForecastSource, options.MeasurementName,
options.ForecastTime)
err = app.writer.WriteMeasurements(events.ToPoints(options))
if err != nil {
log.Printf("%+v", err)
Expand All @@ -174,7 +145,7 @@ func (app App) RunAstrocast(forecaster weather.Forecaster, options weather.Write
Values: []weather.AstroEvent{event},
}
nextHourOptions := options
nextHourOptions.Period = weather.Past
nextHourOptions.ForecastTime = "0"
err = app.writer.WriteMeasurements(nextHourEvent.ToPoints(nextHourOptions))
break
}
Expand Down
17 changes: 5 additions & 12 deletions weather/forecast.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,12 @@ import (
"github.com/tedpearson/weather2influxdb/http"
)

type Period string

const (
Past Period = "past"
Future Period = "future"
)

type WriteOptions struct {
Database string
ForecastSource string
MeasurementName string
Location string
Period Period
ForecastTime string
}

type Record struct {
Expand Down Expand Up @@ -83,7 +76,7 @@ func toPoints(items []interface{}, options WriteOptions) (influxdb1.BatchPoints,
for _, item := range items {
t := reflect.ValueOf(item).FieldByName("Time").Interface().(time.Time)
// only send future datapoints.
if options.Period == Future && t.Before(time.Now().Add(time.Hour+1)) {
if options.ForecastTime != "0" && t.Before(time.Now().Add(time.Hour+1)) {
continue
}
point, err := toPoint(t, item, options)
Expand All @@ -98,9 +91,9 @@ func toPoints(items []interface{}, options WriteOptions) (influxdb1.BatchPoints,

func toPoint(t time.Time, i interface{}, options WriteOptions) (*influxdb1.Point, error) {
tags := map[string]string{
"source": options.ForecastSource,
"location": options.Location,
"period": string(options.Period),
"source": options.ForecastSource,
"location": options.Location,
"forecast_time": options.ForecastTime,
}
fields := make(map[string]interface{})
e := reflect.ValueOf(i)
Expand Down

0 comments on commit d4ed3b6

Please sign in to comment.