Skip to content

Commit

Permalink
Add a MetricRecord interface, introduce CounterStartTime.
Browse files Browse the repository at this point in the history
MetricRecord is now an interface that is implemented by
DatastoreMetricRecord. This makes testing easier, and will also make it
easier to add alternative data stores for metric records.

I've not changed existing tests to use now-mockable MetricRecord, since
running tests against local datastore emulator works well enough.

This commit also adds a new field to the metric record
(CounterStartTime) that is necessary for google#13.
  • Loading branch information
knyar committed Nov 23, 2018
1 parent bfc2012 commit 64204b9
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 42 deletions.
95 changes: 95 additions & 0 deletions mocks/mock_metric_record.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/mock_source_metric.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 50 additions & 10 deletions record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,50 @@ import (
"google.golang.org/appengine/log"
)

//go:generate mockgen -destination=../mocks/mock_metric_record.go -package=mocks github.com/google/ts-bridge/record MetricRecord

// MetricRecord is an interface implemented by DatastoreMetricRecord.
type MetricRecord interface {
UpdateError(ctx context.Context, e error) error
UpdateSuccess(ctx context.Context, points int, msg string) error
GetLastUpdate() time.Time
GetCounterStartTime() time.Time
SetCounterStartTime(ctx context.Context, start time.Time) error
}

// Name of the Datastore kind where metric records are stored.
const kindName = "MetricRecords"

// MetricRecord defines a Datastore entity that is used to store status information about an imported metric.
type MetricRecord struct {
// DatastoreMetricRecord defines a Datastore entity that is used to store status information about an imported metric.
type DatastoreMetricRecord struct {
Name string
Query string
LastUpdate time.Time // last time we wrote any points to SD.
LastAttempt time.Time // last time we attempted an update.
LastStatus string

// CounterStartTime is used to keep start timestamp for cumulative metrics.
CounterStartTime time.Time
}

// NewDatastoreMetricRecord returns a Datastore-based metric record for a given metric name.
func NewDatastoreMetricRecord(ctx context.Context, name, query string) (*DatastoreMetricRecord, error) {
r := &DatastoreMetricRecord{Name: name}
if err := r.load(ctx); err != nil {
return nil, err
}
r.Query = query
return r, nil
}

// Write metric data back to Datastore.
func (m *MetricRecord) Write(ctx context.Context) error {
func (m *DatastoreMetricRecord) write(ctx context.Context) error {
_, err := datastore.Put(ctx, m.key(ctx), m)
return err
}

// Load metric record state from Datastore.
func (m *MetricRecord) Load(ctx context.Context) error {
func (m *DatastoreMetricRecord) load(ctx context.Context) error {
err := datastore.Get(ctx, m.key(ctx), m)
if err != nil && err != datastore.ErrNoSuchEntity {
return err
Expand All @@ -52,27 +76,43 @@ func (m *MetricRecord) Load(ctx context.Context) error {
}

// key returns the Datastore key for a given metric record.
func (m *MetricRecord) key(ctx context.Context) *datastore.Key {
func (m *DatastoreMetricRecord) key(ctx context.Context) *datastore.Key {
return datastore.NewKey(ctx, kindName, m.Name, 0, nil)
}

// GetLastUpdate returns LastUpdate timestamp.
func (m *DatastoreMetricRecord) GetLastUpdate() time.Time {
return m.LastUpdate
}

// GetCounterStartTime returns CounterStartTime.
func (m *DatastoreMetricRecord) GetCounterStartTime() time.Time {
return m.CounterStartTime
}

// SetCounterStartTime sets CounterStartTime and persists metric data.
func (m *DatastoreMetricRecord) SetCounterStartTime(ctx context.Context, start time.Time) error {
m.CounterStartTime = start
return m.write(ctx)
}

// UpdateError updates metric status in Datastore with a given error message.
func (m *MetricRecord) UpdateError(ctx context.Context, e error) error {
func (m *DatastoreMetricRecord) UpdateError(ctx context.Context, e error) error {
log.Errorf(ctx, "%s: %s", m.Name, e)
m.LastStatus = fmt.Sprintf("ERROR: %s", e)
m.LastAttempt = time.Now()
return m.Write(ctx)
return m.write(ctx)
}

// UpdateSuccess updates metric status in Datastore with a given message.
func (m *MetricRecord) UpdateSuccess(ctx context.Context, points int, msg string) error {
func (m *DatastoreMetricRecord) UpdateSuccess(ctx context.Context, points int, msg string) error {
log.Infof(ctx, "%s: %s", m.Name, msg)
m.LastStatus = fmt.Sprintf("OK: %s", msg)
m.LastAttempt = time.Now()
if points > 0 {
m.LastUpdate = time.Now()
}
return m.Write(ctx)
return m.write(ctx)
}

// CleanupRecords removes obsolete metric records from Datastore.
Expand All @@ -82,7 +122,7 @@ func CleanupRecords(ctx context.Context, valid []string) error {
existing[m] = true
}
q := datastore.NewQuery(kindName)
var records []*MetricRecord
var records []*DatastoreMetricRecord
if _, err := q.GetAll(ctx, &records); err != nil {
return fmt.Errorf("could not list metric records: %v", err)
}
Expand Down
24 changes: 12 additions & 12 deletions record/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,21 @@ var metricRecordTests = []struct {
{"5 points written", true, 5, true},
}

func TestMetricRecords(t *testing.T) {
func TestDatastoreMetricRecords(t *testing.T) {
for _, tt := range metricRecordTests {
t.Run(tt.name, func(t *testing.T) {
var err error

// initialize the record with update time 1hr in the past.
r := MetricRecord{
r := DatastoreMetricRecord{
Name: "metricname",
Query: "query",
LastStatus: "OK: all good",
LastAttempt: time.Now().Add(-time.Hour),
LastUpdate: time.Now().Add(-time.Hour),
}
if err := r.Write(testCtx); err != nil {
t.Fatalf("error while initializing MetricRecord: %v", err)
if err := r.write(testCtx); err != nil {
t.Fatalf("error while initializing DatastoreMetricRecord: %v", err)
}

if tt.success {
Expand All @@ -80,12 +80,12 @@ func TestMetricRecords(t *testing.T) {
err = r.UpdateError(testCtx, fmt.Errorf("Test Message"))
}
if err != nil {
t.Fatalf("error while updating MetricRecord: %v", err)
t.Fatalf("error while updating DatastoreMetricRecord: %v", err)
}

rr := MetricRecord{}
rr := DatastoreMetricRecord{}
if err := datastore.Get(testCtx, r.key(testCtx), &rr); err != nil {
t.Fatalf("error while fetching MetricRecord: %v", err)
t.Fatalf("error while fetching DatastoreMetricRecord: %v", err)
}

if !strings.Contains(rr.LastStatus, "Test Message") {
Expand Down Expand Up @@ -113,17 +113,17 @@ func TestMetricRecords(t *testing.T) {
}
}

func TestCleanupMetricRecords(t *testing.T) {
func TestCleanupDatastoreMetricRecords(t *testing.T) {
for _, name := range []string{"metric1", "metric2"} {
r := MetricRecord{
r := DatastoreMetricRecord{
Name: name,
Query: "query",
LastStatus: "OK: all good",
LastAttempt: time.Now().Add(-time.Hour),
LastUpdate: time.Now().Add(-time.Hour),
}
if err := r.Write(testCtx); err != nil {
t.Fatalf("error while initializing MetricRecord: %v", err)
if err := r.write(testCtx); err != nil {
t.Fatalf("error while initializing DatastoreMetricRecord: %v", err)
}
}

Expand All @@ -134,7 +134,7 @@ func TestCleanupMetricRecords(t *testing.T) {
}

q := datastore.NewQuery(kindName)
var records []*MetricRecord
var records []*DatastoreMetricRecord
if _, err := q.GetAll(testCtx, &records); err != nil {
t.Fatalf("error while reading metric records: %v", err)
}
Expand Down
23 changes: 11 additions & 12 deletions tsbridge/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Metric struct {
Name string
Source SourceMetric
SDProject string
Record *record.MetricRecord
Record record.MetricRecord
}

//go:generate mockgen -destination=../mocks/mock_source_metric.go -package=mocks github.com/google/ts-bridge/tsbridge SourceMetric
Expand All @@ -45,7 +45,7 @@ type Metric struct {
type SourceMetric interface {
StackdriverName() string
Query() string
StackdriverData(ctx context.Context, since time.Time, record *record.MetricRecord) (*metricpb.MetricDescriptor, []*monitoringpb.TimeSeries, error)
StackdriverData(ctx context.Context, since time.Time, record record.MetricRecord) (*metricpb.MetricDescriptor, []*monitoringpb.TimeSeries, error)
}

//go:generate mockgen -destination=../mocks/mock_sd_adapter.go -package=mocks github.com/google/ts-bridge/tsbridge StackdriverAdapter
Expand Down Expand Up @@ -87,8 +87,8 @@ func UpdateAllMetrics(ctx context.Context, c *Config, sd StackdriverAdapter, par

// After all metrics are updated, find the oldest write timestamp.
for _, m := range c.Metrics() {
if m.Record.LastUpdate.Before(oldestWrite) {
oldestWrite = m.Record.LastUpdate
if m.Record.GetLastUpdate().Before(oldestWrite) {
oldestWrite = m.Record.GetLastUpdate()
}
}
for err := range errchan {
Expand All @@ -99,17 +99,16 @@ func UpdateAllMetrics(ctx context.Context, c *Config, sd StackdriverAdapter, par

// NewMetric creates a Metric based on a SourceMetric and the destination Stackdriver project.
func NewMetric(ctx context.Context, name string, s SourceMetric, sdProject string) (*Metric, error) {
m := &Metric{
r, err := record.NewDatastoreMetricRecord(ctx, name, s.Query())
if err != nil {
return nil, err
}
return &Metric{
Name: name,
Source: s,
SDProject: sdProject,
Record: &record.MetricRecord{Name: name},
}
if err := m.Record.Load(ctx); err != nil {
return nil, err
}
m.Record.Query = s.Query()
return m, nil
Record: r,
}, nil
}

// Update issues a configured query and imports new points to Stackdriver.
Expand Down
15 changes: 8 additions & 7 deletions tsbridge/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ func TestMetricUpdate(t *testing.T) {
if err != nil {
t.Fatalf("error while creating metric: %v", err)
}
m.Record.LastStatus = "OK: all good"
m.Record.LastAttempt = time.Now().Add(-time.Hour)
rec := m.Record.(*record.DatastoreMetricRecord)
rec.LastStatus = "OK: all good"
rec.LastAttempt = time.Now().Add(-time.Hour)

mockSD := mocks.NewMockStackdriverAdapter(mockCtrl)
tt.setup(mockSource, mockSD)
Expand All @@ -137,11 +138,11 @@ func TestMetricUpdate(t *testing.T) {
if err := m.Update(testCtx, mockSD, collector); err != nil {
t.Errorf("Metric.Update() returned error %v", err)
}
if time.Now().Sub(m.Record.LastAttempt) > time.Minute {
if time.Now().Sub(rec.LastAttempt) > time.Minute {
t.Errorf("expected to see LastAttempt updated")
}
if !strings.Contains(m.Record.LastStatus, tt.wantStatus) {
t.Errorf("expected to see LastStatus contain '%s'; got %s", tt.wantStatus, m.Record.LastStatus)
if !strings.Contains(rec.LastStatus, tt.wantStatus) {
t.Errorf("expected to see LastStatus contain '%s'; got %s", tt.wantStatus, rec.LastStatus)
}
collector.Close()
if got, ok := exporter.values["ts_bridge/metric_import_latencies:metricname"]; !ok {
Expand Down Expand Up @@ -219,7 +220,7 @@ func TestUpdateAllMetrics(t *testing.T) {
src.EXPECT().StackdriverName().MaxTimes(100).Return(name)
metric := &Metric{
Name: name,
Record: &record.MetricRecord{LastUpdate: time.Now().Add(-time.Hour)},
Record: &record.DatastoreMetricRecord{LastUpdate: time.Now().Add(-time.Hour)},
Source: src,
}
config.metrics = append(config.metrics, metric)
Expand Down Expand Up @@ -268,7 +269,7 @@ func TestUpdateAllMetricsErrors(t *testing.T) {
&Metric{
// Having an emoji symbol in metric name should produce an error while defining an OpenCensus tag.
Name: "invalid metric name 🥒",
Record: &record.MetricRecord{LastUpdate: time.Now().Add(-time.Hour)},
Record: &record.DatastoreMetricRecord{LastUpdate: time.Now().Add(-time.Hour)},
Source: src,
},
},
Expand Down

0 comments on commit 64204b9

Please sign in to comment.