Skip to content

Commit

Permalink
upgraded to aws sdk v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Sjuul Janssen committed Jan 3, 2023
1 parent 4d52255 commit 480e13d
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 183 deletions.
18 changes: 9 additions & 9 deletions basic/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ type Metric struct {
}

type Collector struct {
config *config.Config
sessions *sessions.Sessions
metrics []Metric
l log.Logger
config *config.Config
awsConfigs *sessions.Configs
metrics []Metric
l log.Logger
}

// New creates a new instance of a Collector.
func New(config *config.Config, sessions *sessions.Sessions) *Collector {
func New(config *config.Config, awsConfigs *sessions.Configs) *Collector {
return &Collector{
config: config,
sessions: sessions,
metrics: Metrics,
l: log.With("component", "basic"),
config: config,
awsConfigs: awsConfigs,
metrics: Metrics,
l: log.With("component", "basic"),
}
}

Expand Down
39 changes: 20 additions & 19 deletions basic/scraper.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package basic

import (
"context"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
cloudwatchtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/prometheus/client_golang/prometheus"

"github.com/percona/rds_exporter/config"
Expand All @@ -24,17 +26,17 @@ type Scraper struct {
ch chan<- prometheus.Metric

// internal
svc *cloudwatch.CloudWatch
svc *cloudwatch.Client
constLabels prometheus.Labels
}

func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prometheus.Metric) *Scraper {
// Create CloudWatch client
sess, _ := collector.sessions.GetSession(instance.Region, instance.Instance)
if sess == nil {
awsConfig, _ := collector.awsConfigs.GetSession(instance.Region, instance.Instance)
if awsConfig == nil {
return nil
}
svc := cloudwatch.New(sess)
svc := cloudwatch.NewFromConfig(*awsConfig)

constLabels := prometheus.Labels{
"region": instance.Region,
Expand All @@ -60,12 +62,12 @@ func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prome
}
}

func getLatestDatapoint(datapoints []*cloudwatch.Datapoint) *cloudwatch.Datapoint {
var latest *cloudwatch.Datapoint = nil
func getLatestDatapoint(datapoints []cloudwatchtypes.Datapoint) *cloudwatchtypes.Datapoint {
var latest *cloudwatchtypes.Datapoint = nil

for dp := range datapoints {
if latest == nil || latest.Timestamp.Before(*datapoints[dp].Timestamp) {
latest = datapoints[dp]
latest = &datapoints[dp]
}
}

Expand Down Expand Up @@ -94,26 +96,25 @@ func (s *Scraper) Scrape() {
func (s *Scraper) scrapeMetric(metric Metric) error {
now := time.Now()
end := now.Add(-Delay)
period := int32(Period.Seconds())

params := &cloudwatch.GetMetricStatisticsInput{
EndTime: aws.Time(end),
StartTime: aws.Time(end.Add(-Range)),

Period: aws.Int64(int64(Period.Seconds())),
EndTime: aws.Time(end),
StartTime: aws.Time(end.Add(-Range)),
Period: &period,
MetricName: aws.String(metric.cwName),
Namespace: aws.String("AWS/RDS"),
Dimensions: []*cloudwatch.Dimension{},
Statistics: aws.StringSlice([]string{"Average"}),
Unit: nil,
Dimensions: []cloudwatchtypes.Dimension{},
Statistics: []cloudwatchtypes.Statistic{"Average"},
}

params.Dimensions = append(params.Dimensions, &cloudwatch.Dimension{
params.Dimensions = append(params.Dimensions, cloudwatchtypes.Dimension{
Name: aws.String("DBInstanceIdentifier"),
Value: aws.String(s.instance.Instance),
})

// Call CloudWatch to gather the datapoints
resp, err := s.svc.GetMetricStatistics(params)
resp, err := s.svc.GetMetricStatistics(context.TODO(), params)
if err != nil {
return err
}
Expand All @@ -127,7 +128,7 @@ func (s *Scraper) scrapeMetric(metric Metric) error {
dp := getLatestDatapoint(resp.Datapoints)

// Get the metric.
v := aws.Float64Value(dp.Average)
v := *dp.Average
switch metric.cwName {
case "EngineUptime":
// "Fake EngineUptime -> node_boot_time with time.Now().Unix() - EngineUptime."
Expand Down
6 changes: 3 additions & 3 deletions enhanced/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// Collector collects enhanced RDS metrics by utilizing several scrapers.
type Collector struct {
sessions *sessions.Sessions
sessions *sessions.Configs
logger log.Logger

rw sync.RWMutex
Expand All @@ -27,14 +27,14 @@ const (
)

// NewCollector creates new collector and starts scrapers.
func NewCollector(sessions *sessions.Sessions) *Collector {
func NewCollector(sessions *sessions.Configs) *Collector {
c := &Collector{
sessions: sessions,
logger: log.With("component", "enhanced"),
metrics: make(map[string][]prometheus.Metric),
}

for session, instances := range sessions.AllSessions() {
for session, instances := range sessions.AllConfigs() {
s := newScraper(session, instances)

interval := maxInterval
Expand Down
109 changes: 53 additions & 56 deletions enhanced/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"

Expand All @@ -18,14 +17,14 @@ import (
type scraper struct {
instances []sessions.Instance
logStreamNames []string
svc *cloudwatchlogs.CloudWatchLogs
svc *cloudwatchlogs.Client
nextStartTime time.Time
logger log.Logger

testDisallowUnknownFields bool // for tests only
}

func newScraper(session *session.Session, instances []sessions.Instance) *scraper {
func newScraper(config *aws.Config, instances []sessions.Instance) *scraper {
logStreamNames := make([]string, 0, len(instances))
for _, instance := range instances {
logStreamNames = append(logStreamNames, instance.ResourceID)
Expand All @@ -34,7 +33,7 @@ func newScraper(session *session.Session, instances []sessions.Instance) *scrape
return &scraper{
instances: instances,
logStreamNames: logStreamNames,
svc: cloudwatchlogs.New(session),
svc: cloudwatchlogs.NewFromConfig(*config),
nextStartTime: time.Now().Add(-3 * time.Minute).Round(0), // strip monotonic clock reading
logger: log.With("component", "enhanced"),
}
Expand Down Expand Up @@ -78,68 +77,66 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m

input := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String("RDSOSMetrics"),
LogStreamNames: aws.StringSlice(s.logStreamNames[sliceStart:sliceEnd]),
StartTime: aws.Int64(aws.TimeUnixMilli(s.nextStartTime)),
LogStreamNames: s.logStreamNames[sliceStart:sliceEnd],
StartTime: aws.Int64(s.nextStartTime.UnixMilli()),
}

s.logger.With("next_start", s.nextStartTime.UTC()).With("since_last", time.Since(s.nextStartTime)).Debugf("Requesting metrics")

// collect all returned events and metrics/messages
collectAllMetrics := func(output *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
for _, event := range output.Events {
l := s.logger.With("EventId", *event.EventId).With("LogStreamName", *event.LogStreamName)
l = l.With("Timestamp", aws.MillisecondsTimeValue(event.Timestamp).UTC())
l = l.With("IngestionTime", aws.MillisecondsTimeValue(event.IngestionTime).UTC())

var instance *sessions.Instance
for _, i := range s.instances {
if i.ResourceID == *event.LogStreamName {
instance = &i
break
}
}
if instance == nil {
l.Errorf("Failed to find instance.")
continue
}
output, err := s.svc.FilterLogEvents(ctx, input)
if err != nil {
s.logger.Errorf("Failed to filter log events: %s.", err)
}

if instance.DisableEnhancedMetrics {
l.Debugf("Enhanced Metrics are disabled for instance %v.", instance)
continue
}
l = l.With("region", instance.Region).With("instance", instance.Instance)

// l.Debugf("Message:\n%s", *event.Message)
osMetrics, err := parseOSMetrics([]byte(*event.Message), s.testDisallowUnknownFields)
if err != nil {
// only for tests
if s.testDisallowUnknownFields {
panic(fmt.Sprintf("New metrics should be added: %s", err))
}

l.Errorf("Failed to parse metrics: %s.", err)
continue
}
// l.Debugf("OS Metrics:\n%#v", osMetrics)
for _, event := range output.Events {
l := s.logger.With("EventId", *event.EventId).With("LogStreamName", *event.LogStreamName)

timestamp := aws.MillisecondsTimeValue(event.Timestamp).UTC()
l.Debugf("Timestamp from message: %s; from event: %s.", osMetrics.Timestamp.UTC(), timestamp)
l = l.With("Timestamp", time.Unix(*event.Timestamp, 0).UTC())
l = l.With("IngestionTime", time.Unix(*event.IngestionTime, 0).UTC())

if allMetrics[instance.ResourceID] == nil {
allMetrics[instance.ResourceID] = make(map[time.Time][]prometheus.Metric)
var instance *sessions.Instance
for _, i := range s.instances {
if i.ResourceID == *event.LogStreamName {
instance = &i
break
}
allMetrics[instance.ResourceID][timestamp] = osMetrics.makePrometheusMetrics(instance.Region, instance.Labels)
}
if instance == nil {
l.Errorf("Failed to find instance.")
continue
}

if allMessages[instance.ResourceID] == nil {
allMessages[instance.ResourceID] = make(map[time.Time]string)
if instance.DisableEnhancedMetrics {
l.Debugf("Enhanced Metrics are disabled for instance %v.", instance)
continue
}
l = l.With("region", instance.Region).With("instance", instance.Instance)

// l.Debugf("Message:\n%s", *event.Message)
osMetrics, err := parseOSMetrics([]byte(*event.Message), s.testDisallowUnknownFields)
if err != nil {
// only for tests
if s.testDisallowUnknownFields {
panic(fmt.Sprintf("New metrics should be added: %s", err))
}
allMessages[instance.ResourceID][timestamp] = *event.Message

l.Errorf("Failed to parse metrics: %s.", err)
continue
}
// l.Debugf("OS Metrics:\n%#v", osMetrics)

return true // continue pagination
}
if err := s.svc.FilterLogEventsPagesWithContext(ctx, input, collectAllMetrics); err != nil {
s.logger.Errorf("Failed to filter log events: %s.", err)
timestamp := time.Unix(*event.Timestamp, 0).UTC()
l.Debugf("Timestamp from message: %s; from event: %s.", osMetrics.Timestamp.UTC(), timestamp)

if allMetrics[instance.ResourceID] == nil {
allMetrics[instance.ResourceID] = make(map[time.Time][]prometheus.Metric)
}
allMetrics[instance.ResourceID][timestamp] = osMetrics.makePrometheusMetrics(instance.Region, instance.Labels)

if allMessages[instance.ResourceID] == nil {
allMessages[instance.ResourceID] = make(map[time.Time]string)
}
allMessages[instance.ResourceID][timestamp] = *event.Message
}
}
// get better times
Expand Down
12 changes: 6 additions & 6 deletions enhanced/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func TestScraper(t *testing.T) {
sess, err := sessions.New(cfg.Instances, client.HTTP(), false)
require.NoError(t, err)

for session, instances := range sess.AllSessions() {
session, instances := session, instances
for config, instances := range sess.AllConfigs() {
config, instances := config, instances
t.Run(fmt.Sprint(instances), func(t *testing.T) {
// test that there are no new metrics
s := newScraper(session, instances)
s := newScraper(config, instances)
s.testDisallowUnknownFields = true
metrics, messages := s.scrape(context.Background())
require.Len(t, metrics, len(instances))
Expand Down Expand Up @@ -159,10 +159,10 @@ func TestScraperDisableEnhancedMetrics(t *testing.T) {
return false
}

for session, instances := range sess.AllSessions() {
session, instances := session, instances
for config, instances := range sess.AllConfigs() {
config, instances := config, instances
t.Run(fmt.Sprint(instances), func(t *testing.T) {
s := newScraper(session, instances)
s := newScraper(config, instances)
s.testDisallowUnknownFields = true
metrics, _ := s.scrape(context.Background())

Expand Down
17 changes: 15 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ module github.com/percona/rds_exporter
go 1.17

require (
github.com/aws/aws-sdk-go v1.36.30
github.com/aws/aws-sdk-go-v2 v1.17.3
github.com/aws/aws-sdk-go-v2/config v1.18.7
github.com/aws/aws-sdk-go-v2/credentials v1.13.7
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.23.1
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.17.3
github.com/aws/aws-sdk-go-v2/service/rds v1.38.0
github.com/aws/aws-sdk-go-v2/service/sts v1.17.7
github.com/percona/exporter_shared v0.7.3
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/common v0.24.0
Expand All @@ -15,6 +21,14 @@ require (
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.28 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.11 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -28,7 +42,6 @@ require (
github.com/prometheus/procfs v0.6.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
Expand Down
Loading

0 comments on commit 480e13d

Please sign in to comment.