From cbc11975648e20bfd0aaa4d22e62184447e4c068 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 May 2024 20:55:34 +0000 Subject: [PATCH] feat(bigtable): Client side metrics --- bigtable/bigtable.go | 178 ++++-- bigtable/bigtable_test.go | 5 +- bigtable/go.mod | 15 +- bigtable/go.sum | 8 +- bigtable/integration_test.go | 63 ++ bigtable/metrics.go | 479 +++++++++++++++ bigtable/metrics_monitoring_exporter.go | 367 ++++++++++++ bigtable/metrics_monitoring_exporter_test.go | 580 +++++++++++++++++++ bigtable/metrics_test.go | 293 ++++++++++ bigtable/retry_test.go | 6 +- 10 files changed, 1939 insertions(+), 55 deletions(-) create mode 100644 bigtable/metrics.go create mode 100644 bigtable/metrics_monitoring_exporter.go create mode 100644 bigtable/metrics_monitoring_exporter_test.go create mode 100644 bigtable/metrics_test.go diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index b3fbefa8aa8f..1ffd79ebd0fb 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -29,6 +29,7 @@ import ( btopt "cloud.google.com/go/bigtable/internal/option" "cloud.google.com/go/internal/trace" gax "github.com/googleapis/gax-go/v2" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" gtransport "google.golang.org/api/transport/grpc" @@ -56,6 +57,14 @@ type Client struct { client btpb.BigtableClient project, instance string appProfile string + metricsConfig *metricsConfigInternal +} + +// metricsConfig is used to represent user provided config. +// This is not configurable right now and only a default instance is used +type metricsConfig struct { + builtInEnabled bool + meterProviders []*sdkmetric.MeterProvider } // ClientConfig has configurations for the client. @@ -95,12 +104,19 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C return nil, fmt.Errorf("dialing: %w", err) } + // Create a OpenTelemetry metrics configuration + metricsConfig, err := newMetricsConfigInternal(ctx, project, instance, nil, opts...) + if err != nil { + return nil, err + } + return &Client{ - connPool: connPool, - client: btpb.NewBigtableClient(connPool), - project: project, - instance: instance, - appProfile: config.AppProfile, + connPool: connPool, + client: btpb.NewBigtableClient(connPool), + project: project, + instance: instance, + appProfile: config.AppProfile, + metricsConfig: metricsConfig, }, nil } @@ -166,19 +182,21 @@ func init() { } // Convert error to grpc status error -func convertToGrpcStatusErr(err error) error { - if err != nil { - if errStatus, ok := status.FromError(err); ok { - return status.Error(errStatus.Code(), errStatus.Message()) - } +func convertToGrpcStatusErr(err error) (codes.Code, error) { + if err == nil { + return codes.OK, nil + } - ctxStatus := status.FromContextError(err) - if ctxStatus.Code() != codes.Unknown { - return status.Error(ctxStatus.Code(), ctxStatus.Message()) - } + if errStatus, ok := status.FromError(err); ok { + return errStatus.Code(), status.Error(errStatus.Code(), errStatus.Message()) } - return err + ctxStatus := status.FromContextError(err) + if ctxStatus.Code() != codes.Unknown { + return ctxStatus.Code(), status.Error(ctxStatus.Code(), ctxStatus.Message()) + } + + return codes.Unknown, err } func (c *Client) fullTableName(table string) string { @@ -285,6 +303,10 @@ func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *Re return ti.Table.ApplyReadModifyWrite(ctx, row, m) } +func (ti *tableImpl) getOperationRecorder(ctx context.Context, method string, isStreaming bool) (*builtinMetricsTracer, func()) { + return ti.Table.getOperationRecorder(ctx, method, isStreaming) +} + // TODO(dsymonds): Read method that returns a sequence of ReadItems. // ReadRows reads rows from a table. f is called for each row. @@ -295,13 +317,22 @@ func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *Re // By default, the yielded rows will contain all values in all cells. // Use RowFilter to limit the cells returned. func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) { + method := "cloud.google.com/go/bigtable.ReadRows" ctx = mergeOutgoingMetadata(ctx, t.md) - ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows") + ctx = trace.StartSpan(ctx, method) defer func() { trace.EndSpan(ctx, err) }() + metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true) + defer opRecorder() + + err = t.readRows(ctx, arg, f, metricsTracer, opts...) + return metricsTracer.recordAndConvertErr(err) +} + +func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) { var prevRowKey string attrMap := make(map[string]interface{}) - err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err = t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error { req := &btpb.ReadRowsRequest{ AppProfileId: t.c.appProfile, } @@ -340,12 +371,18 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts cr = newChunkReader() } + *mt.headerMD, err = stream.Header() + if err != nil { + return err + } for { res, err := stream.Recv() if err == io.EOF { + *mt.trailerMD = stream.Trailer() break } if err != nil { + *mt.trailerMD = stream.Trailer() // Reset arg for next Invoke call. if arg == nil { // Should be lowest possible key value, an empty byte array @@ -381,6 +418,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts cancel() for { if _, err := stream.Recv(); err != nil { + *mt.trailerMD = stream.Trailer() // The stream has ended. We don't return an error // because the caller has intentionally interrupted the scan. return nil @@ -407,7 +445,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts return err }, retryOptions...) - return convertToGrpcStatusErr(err) + return err } // ReadRow is a convenience implementation of a single-row reader. @@ -919,10 +957,19 @@ const maxMutations = 100000 // Apply mutates a row atomically. A mutation must contain at least one // operation and at most 100000 operations. func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) { + method := "cloud.google.com/go/bigtable/Apply" ctx = mergeOutgoingMetadata(ctx, t.md) - ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply") + ctx = trace.StartSpan(ctx, method) defer func() { trace.EndSpan(ctx, err) }() + metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, false) + defer opRecorder() + + err = t.apply(ctx, metricsTracer, row, m, opts...) + return metricsTracer.recordAndConvertErr(err) +} + +func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, m *Mutation, opts ...ApplyOption) (err error) { after := func(res proto.Message) { for _, o := range opts { o.after(res) @@ -945,15 +992,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl callOptions = retryOptions } var res *btpb.MutateRowResponse - err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err := t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error { var err error - res, err = t.c.client.MutateRow(ctx, req) + res, err = t.c.client.MutateRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD)) return err }, callOptions...) if err == nil { after(res) } - return convertToGrpcStatusErr(err) + return err } req := &btpb.CheckAndMutateRowRequest{ @@ -982,15 +1029,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl callOptions = retryOptions } var cmRes *btpb.CheckAndMutateRowResponse - err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err = t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error { var err error - cmRes, err = t.c.client.CheckAndMutateRow(ctx, req) + cmRes, err = t.c.client.CheckAndMutateRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD)) return err }, callOptions...) if err == nil { after(cmRes) } - return convertToGrpcStatusErr(err) + return err } // An ApplyOption is an optional argument to Apply. @@ -1118,10 +1165,18 @@ type entryErr struct { // // Conditional mutations cannot be applied in bulk and providing one will result in an error. func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) { + method := "cloud.google.com/go/bigtable/Apply" ctx = mergeOutgoingMetadata(ctx, t.md) - ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk") + ctx = trace.StartSpan(ctx, method) defer func() { trace.EndSpan(ctx, err) }() + metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true) + defer opRecorder() + errs, err = t.applyBulk(ctx, metricsTracer, rowKeys, muts, opts...) + return errs, metricsTracer.recordAndConvertErr(err) +} + +func (t *Table) applyBulk(ctx context.Context, metricsTracer *builtinMetricsTracer, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) { if len(rowKeys) != len(muts) { return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts)) } @@ -1137,10 +1192,10 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio for _, group := range groupEntries(origEntries, maxMutations) { attrMap := make(map[string]interface{}) - err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err = t.c.metricsConfig.gaxInvokeWithRecorder(ctx, metricsTracer, func(ctx context.Context, _ gax.CallSettings) error { attrMap["rowCount"] = len(group) trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") - err := t.doApplyBulk(ctx, group, opts...) + err := t.doApplyBulk(ctx, group, metricsTracer, opts...) if err != nil { // We want to retry the entire request with the current group return err @@ -1187,7 +1242,7 @@ func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { } // doApplyBulk does the work of a single ApplyBulk invocation -func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error { +func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, mt *builtinMetricsTracer, opts ...ApplyOption) error { after := func(res proto.Message) { for _, o := range opts { o.after(res) @@ -1207,16 +1262,20 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ... } else { req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView) } + stream, err := t.c.client.MutateRows(ctx, req) if err != nil { return err } + *mt.headerMD, err = stream.Header() for { res, err := stream.Recv() if err == io.EOF { + *mt.trailerMD = stream.Trailer() break } if err != nil { + *mt.trailerMD = stream.Trailer() return err } @@ -1288,6 +1347,16 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp { // It returns the newly written cells. func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { ctx = mergeOutgoingMetadata(ctx, t.md) + + method := "cloud.google.com/go/bigtable/ApplyReadModifyWrite" + metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, false) + defer opRecorder() + + updatedRow, err := t.applyReadModifyWrite(ctx, metricsTracer, row, m) + return updatedRow, metricsTracer.recordAndConvertErr(err) +} + +func (t *Table) applyReadModifyWrite(ctx context.Context, mt *builtinMetricsTracer, row string, m *ReadModifyWrite) (Row, error) { req := &btpb.ReadModifyWriteRowRequest{ AppProfileId: t.c.appProfile, RowKey: []byte(row), @@ -1298,18 +1367,23 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod } else { req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView) } - res, err := t.c.client.ReadModifyWriteRow(ctx, req) - if err != nil { - return nil, err - } - if res.Row == nil { - return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil") - } - r := make(Row) - for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family - decodeFamilyProto(r, row, fam) - } - return r, nil + + var r Row + err := t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error { + res, err := t.c.client.ReadModifyWriteRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD)) + if err != nil { + return err + } + if res.Row == nil { + return errors.New("unable to apply ReadModifyWrite: res.Row=nil") + } + r = make(Row) + for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family + decodeFamilyProto(r, row, fam) + } + return nil + }) + return r, err } // ReadModifyWrite represents a set of operations on a single row of a table. @@ -1352,9 +1426,19 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) { // SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of // the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces. func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { + method := "cloud.google.com/go/bigtable/SampleRowKeys" ctx = mergeOutgoingMetadata(ctx, t.md) + + metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true) + defer opRecorder() + + rowKeys, err := t.sampleRowKeys(ctx, metricsTracer) + return rowKeys, metricsTracer.recordAndConvertErr(err) +} + +func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]string, error) { var sampledRowKeys []string - err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { + err := t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error { sampledRowKeys = nil req := &btpb.SampleRowKeysRequest{ AppProfileId: t.c.appProfile, @@ -1371,12 +1455,15 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { if err != nil { return err } + *mt.headerMD, err = stream.Header() for { res, err := stream.Recv() if err == io.EOF { + *mt.trailerMD = stream.Trailer() break } if err != nil { + *mt.trailerMD = stream.Trailer() return err } @@ -1389,5 +1476,10 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { } return nil }, retryOptions...) - return sampledRowKeys, convertToGrpcStatusErr(err) + + return sampledRowKeys, err +} + +func (t *Table) getOperationRecorder(ctx context.Context, method string, isStreaming bool) (*builtinMetricsTracer, func()) { + return t.c.metricsConfig.getOperationRecorder(ctx, t.table, t.c.appProfile, method, isStreaming) } diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 0f63b6346f16..2dad0381e1c4 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -253,8 +253,9 @@ func TestApplyErrors(t *testing.T) { ctx := context.Background() table := &Table{ c: &Client{ - project: "P", - instance: "I", + project: "P", + instance: "I", + metricsConfig: &metricsConfigInternal{}, }, table: "t", } diff --git a/bigtable/go.mod b/bigtable/go.mod index 67174904e415..f5da9d2ffcbc 100644 --- a/bigtable/go.mod +++ b/bigtable/go.mod @@ -6,18 +6,27 @@ require ( cloud.google.com/go v0.114.0 cloud.google.com/go/iam v1.1.8 cloud.google.com/go/longrunning v0.5.7 + cloud.google.com/go/monitoring v1.18.2 github.com/google/btree v1.1.2 github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 github.com/googleapis/cloud-bigtable-clients-test v0.0.2 github.com/googleapis/gax-go/v2 v2.12.4 + go.opentelemetry.io/otel v1.24.0 // Use older version compatible with Go 1.20 + go.opentelemetry.io/otel/metric v1.24.0 // Use older version compatible with Go 1.20 + go.opentelemetry.io/otel/sdk v1.24.0 // Use older version compatible with Go 1.20 + go.opentelemetry.io/otel/sdk/metric v1.24.0 // Use older version compatible with Go 1.20 google.golang.org/api v0.182.0 google.golang.org/genproto v0.0.0-20240429193739-8cf5692501f6 + google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.34.1 rsc.io/binaryregexp v0.2.0 ) +require github.com/golang/protobuf v1.5.4 // indirect + require ( cloud.google.com/go/auth v0.4.2 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect @@ -31,16 +40,11 @@ require ( github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/s2a-go v0.1.7 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect - go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/net v0.25.0 // indirect @@ -49,5 +53,4 @@ require ( golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect ) diff --git a/bigtable/go.sum b/bigtable/go.sum index d5f6186ec1dd..5224cc1be4b6 100644 --- a/bigtable/go.sum +++ b/bigtable/go.sum @@ -11,6 +11,8 @@ cloud.google.com/go/iam v1.1.8 h1:r7umDwhj+BQyz0ScZMp4QrGXjSTI3ZINnpgU2nlB/K0= cloud.google.com/go/iam v1.1.8/go.mod h1:GvE6lyMmfxXauzNq8NbgJbeVQNspG+tcdL/W8QO1+zE= cloud.google.com/go/longrunning v0.5.7 h1:WLbHekDbjK1fVFD3ibpFFVoyizlLRl73I7YKuAKilhU= cloud.google.com/go/longrunning v0.5.7/go.mod h1:8GClkudohy1Fxm3owmBGid8W0pSgodEMwEAztp38Xng= +cloud.google.com/go/monitoring v1.18.2 h1:nIQdZdf1/9M0cEmXSlLB2jMq/k3CRh9p3oUzS06VDG8= +cloud.google.com/go/monitoring v1.18.2/go.mod h1:MuL95M6d9HtXQOaWP9JxhFZJKP+fdTF0Gt5xl4IDsew= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= @@ -98,6 +100,8 @@ go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGX go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= +go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -151,8 +155,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20240429193739-8cf5692501f6 h1:MTmrc2F5TZKDKXigcZetYkH04YwqtOPEQJwh4PPOgfk= google.golang.org/genproto v0.0.0-20240429193739-8cf5692501f6/go.mod h1:2ROWwqCIx97Y7CSyp11xB8fori0wzvD6+gbacaf5c8I= -google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 h1:W5Xj/70xIA4x60O/IFyXivR5MGqblAb8R3w26pnD6No= -google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8/go.mod h1:vPrPUTsDCYxXWjP7clS81mZ6/803D8K4iM9Ma27VKas= +google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= +google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index e87b343e6c88..a5d53ab4631f 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -38,6 +38,8 @@ import ( "cloud.google.com/go/internal/optional" "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" + monitoring "cloud.google.com/go/monitoring/apiv3/v2" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/google/go-cmp/cmp" gax "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" @@ -46,6 +48,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -274,6 +277,7 @@ func TestIntegration_ReadRowList(t *testing.T) { t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) } } + func TestIntegration_ReadRowListReverse(t *testing.T) { ctx := context.Background() _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) @@ -748,6 +752,18 @@ func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { ctx := context.Background() + origSamplePeriod := defaultSamplePeriod + defaultSamplePeriod = time.Minute + defer func() { + defaultSamplePeriod = origSamplePeriod + }() + testStartTime := time.Now() + + tsListStart := ×tamppb.Timestamp{ + Seconds: testStartTime.Unix(), + Nanos: int32(testStartTime.Nanosecond()), + } + testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) @@ -899,6 +915,53 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { } else if status[0] == nil || status[1] == nil { t.Fatalf("No error for bad bulk mutation") } + + if !testEnv.Config().UseProd { + t.Logf("Skipping metrics validation while using emulator") + return + } + + // Validate that metrics are exported + elapsedTime := time.Since(testStartTime) + if elapsedTime < 2*defaultSamplePeriod { + // Ensure at least 2 datapoints are recorded + time.Sleep(2*defaultSamplePeriod - elapsedTime) + } + + timeListEnd := time.Now() + tsListEnd := ×tamppb.Timestamp{ + Seconds: timeListEnd.Unix(), + Nanos: int32(timeListEnd.Nanosecond()), + } + + monitoringClient, err := monitoring.NewMetricClient(ctx) + if err != nil { + t.Errorf("Failed to create metric client: %v", err) + } + + metricNamesValidate := []string{ + metricNameOperationLatencies, + metricNameAttemptLatencies, + metricNameServerLatencies, + } + for _, metricName := range metricNamesValidate { + // ListTimeSeries can list only one metric type at a time. + // So, call ListTimeSeries with different metric names + iter := monitoringClient.ListTimeSeries(ctx, &monitoringpb.ListTimeSeriesRequest{ + Name: fmt.Sprintf("projects/%s", testEnv.Config().Project), + Interval: &monitoringpb.TimeInterval{ + StartTime: tsListStart, + EndTime: tsListEnd, + }, + Filter: fmt.Sprintf("metric.type = starts_with(\"bigtable.googleapis.com/client/%v\")", metricName), + }) + + // Assert at least 1 datapoint was exported + _, err := iter.Next() + if err != nil { + t.Errorf("%v not exported\n", metricName) + } + } } func TestIntegration_Read(t *testing.T) { diff --git a/bigtable/metrics.go b/bigtable/metrics.go new file mode 100644 index 000000000000..48c59ea34565 --- /dev/null +++ b/bigtable/metrics.go @@ -0,0 +1,479 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import ( + "context" + "fmt" + "log" + "os" + "strconv" + "strings" + "time" + + "cloud.google.com/go/bigtable/internal" + "github.com/google/uuid" + gax "github.com/googleapis/gax-go/v2" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + "google.golang.org/api/option" + btpb "google.golang.org/genproto/googleapis/bigtable/v2" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" +) + +const ( + // instrumentationScope is the instrumentation name that will be associated with the emitted telemetry. + instrumentationScope = "cloud.google.com/go" + + metricsPrefix = "bigtable/" + locationMDKey = "x-goog-ext-425905942-bin" + serverTimingMDKey = "server-timing" + + // Monitored resource labels + monitoredResLabelKeyProject = "project_id" + monitoredResLabelKeyInstance = "instance" + monitoredResLabelKeyTable = "table" + monitoredResLabelKeyCluster = "cluster" + monitoredResLabelKeyZone = "zone" + + // Metric labels + metricLabelKeyAppProfile = "app_profile" + metricLabelKeyMethod = "method" + metricLabelKeyOperationStatus = "status" + metricLabelKeyStreamingOperation = "streaming" + metricLabelKeyClientName = "client_name" + metricLabelKeyClientUID = "client_uid" + + // Metric names + metricNameOperationLatencies = "operation_latencies" + metricNameAttemptLatencies = "attempt_latencies" + metricNameRetryCount = "retry_count" + metricNameServerLatencies = "server_latencies" +) + +var ( + // duration between two metric exports + defaultSamplePeriod = 5 * time.Minute + + clientName = fmt.Sprintf("cloud.google.com/go/bigtable v%v", internal.Version) + builtInEnabledDefault = true + + // All the built-in metrics have same attributes except 'status' and 'streaming' + // These attributes need to be added to only few of the metrics + builtinMetrics = map[string]metricInfo{ + metricNameOperationLatencies: { + desc: "Total time until final operation success or failure, including retries and backoff.", + metricType: otelmetric.InstrumentKindHistogram, + unit: "ns", + additionalAttributes: []string{ + metricLabelKeyOperationStatus, + metricLabelKeyStreamingOperation, + }, + }, + metricNameAttemptLatencies: { + desc: "Client observed latency per RPC attempt.", + metricType: otelmetric.InstrumentKindHistogram, + unit: "ns", + additionalAttributes: []string{ + metricLabelKeyOperationStatus, + metricLabelKeyStreamingOperation, + }, + }, + metricNameServerLatencies: { + desc: "The latency measured from the moment that the RPC entered the Google data center until the RPC was completed.", + metricType: otelmetric.InstrumentKindHistogram, + unit: "ns", + additionalAttributes: []string{ + metricLabelKeyOperationStatus, + metricLabelKeyStreamingOperation, + }, + }, + metricNameRetryCount: { + desc: "The number of additional RPCs sent after the initial attempt.", + metricType: otelmetric.InstrumentKindCounter, + additionalAttributes: []string{ + metricLabelKeyOperationStatus, + }, + }, + } + + noOpRecordFn = func() {} +) + +type metricInfo struct { + desc string + metricType otelmetric.InstrumentKind + unit string + additionalAttributes []string +} + +type builtInMetricInstruments struct { + distributions map[string]metric.Float64Histogram // Key is metric name e.g. operation_latencies + counters map[string]metric.Int64Counter // Key is metric name e.g. retry_count +} + +// Generates unique client ID in the format go-@<>hostname +func generateClientUID() (string, error) { + hostname := "localhost" + hostname, err := os.Hostname() + if err != nil { + return "", err + } + return "go-" + uuid.NewString() + "@" + hostname, nil +} + +type metricsConfigInternal struct { + exporter otelmetric.Exporter + + project string + + // Flag to indicate whether built-in metrics should be recorded and exported to GCM + builtInEnabled bool + + // attributes that are specific to a client instance and + // do not change across different function calls on client + clientAttributes []attribute.KeyValue + + // Contains one entry per meter provider + instruments []builtInMetricInstruments +} + +func newMetricsConfigInternal(ctx context.Context, project, instance string, userProvidedConfig *metricsConfig, opts ...option.ClientOption) (*metricsConfigInternal, error) { + clientUID, err := generateClientUID() + if err != nil { + log.Printf("built-in metrics: generateClientUID failed: %v. Using empty string in the %v metric atteribute", err, metricLabelKeyClientUID) + } + + // Create default meter provider + internalMetricsConfig := &metricsConfigInternal{ + project: project, + clientAttributes: []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyProject, project), + attribute.String(monitoredResLabelKeyInstance, instance), + attribute.String(metricLabelKeyClientUID, clientUID), + attribute.String(metricLabelKeyClientName, clientName), + }, + instruments: []builtInMetricInstruments{}, + builtInEnabled: builtInEnabledDefault, + } + + defaultExporter, err := newMonitoringExporter(ctx, internalMetricsConfig.project, opts...) + if err != nil { + return nil, err + } + internalMetricsConfig.exporter = defaultExporter + + // Create default meter provider + defaultMp := otelmetric.NewMeterProvider( + otelmetric.WithReader( + otelmetric.NewPeriodicReader( + defaultExporter, + otelmetric.WithInterval(defaultSamplePeriod), + ), + ), + ) + + userProvidedMeterProviders := []*otelmetric.MeterProvider{} + if addr := os.Getenv("BIGTABLE_EMULATOR_HOST"); addr != "" { + // Do not emit metrics when emulator is being used + internalMetricsConfig.builtInEnabled = false + } else if userProvidedConfig != nil { + internalMetricsConfig.builtInEnabled = userProvidedConfig.builtInEnabled + userProvidedMeterProviders = userProvidedConfig.meterProviders + } + + if !internalMetricsConfig.builtInEnabled { + return internalMetricsConfig, nil + } + + // Create instruments on all meter providers + allMeterProviders := append(userProvidedMeterProviders, defaultMp) + for _, mp := range allMeterProviders { + builtInMetricInstruments := builtInMetricInstruments{ + distributions: make(map[string]metric.Float64Histogram), + counters: make(map[string]metric.Int64Counter), + } + + // Create meter + meter := mp.Meter(instrumentationScope, metric.WithInstrumentationVersion(internal.Version)) + + // Create instruments + for metricName, metricDetails := range builtinMetrics { + if metricDetails.metricType == otelmetric.InstrumentKindHistogram { + builtInMetricInstruments.distributions[metricName], err = meter.Float64Histogram( + metricName, + metric.WithDescription(metricDetails.desc), + metric.WithUnit(metricDetails.unit), + ) + if err != nil { + return internalMetricsConfig, err + } + } else if metricDetails.metricType == otelmetric.InstrumentKindCounter { + builtInMetricInstruments.counters[metricName], err = meter.Int64Counter( + metricName, + metric.WithDescription(metricDetails.desc), + metric.WithUnit(metricDetails.unit), + ) + if err != nil { + return internalMetricsConfig, err + } + } + internalMetricsConfig.instruments = append(internalMetricsConfig.instruments, builtInMetricInstruments) + } + } + return internalMetricsConfig, nil +} + +func (config *metricsConfigInternal) getOperationRecorder(ctx context.Context, table string, appProfile string, method string, isStreaming bool) (*builtinMetricsTracer, func()) { + mt := newBuiltinMetricsTracer(method, table, appProfile, isStreaming) + return &mt, config.recordOperationCompletion(ctx, &mt) +} + +func (config *metricsConfigInternal) gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, + f func(ctx context.Context, _ gax.CallSettings) error, opts ...gax.CallOption) error { + + callWrapper := func(ctx context.Context, callSettings gax.CallSettings) error { + // Increment number of attempts + mt.attemptCount++ + + recorder := config.recordAttemptCompletion(ctx, mt) + defer recorder() + + err := f(ctx, callSettings) + + // Record attempt status + statusCode, _ := convertToGrpcStatusErr(err) + mt.status = statusCode.String() + return err + } + return gax.Invoke(ctx, callWrapper, opts...) +} + +// recordAttemptCompletion returns a function that should be executed to record attempt specific metrics +// It records as many metrics as it can and does not return error on first failure +func (config *metricsConfigInternal) recordAttemptCompletion(ctx context.Context, mt *builtinMetricsTracer) func() { + if !config.builtInEnabled { + return noOpRecordFn + } + startTime := time.Now() + + return func() { + // Calculate elapsed time + elapsedTime := time.Since(startTime).Nanoseconds() + + // Attributes for attempt_latencies + attemptLatCurrCallAttrs, attemptLatErr := mt.toOtelMetricAttrs(metricNameAttemptLatencies) + attemptLatAllAttrs := metric.WithAttributes(append(config.clientAttributes, attemptLatCurrCallAttrs...)...) + + // Attributes for server_latencies + serverLatCurrCallAttrs, serverLatErr := mt.toOtelMetricAttrs(metricNameServerLatencies) + serverLatAllAttres := metric.WithAttributes(append(config.clientAttributes, serverLatCurrCallAttrs...)...) + + for _, builtInMetricInstruments := range config.instruments { + if attemptLatErr == nil { + builtInMetricInstruments.distributions[metricNameAttemptLatencies].Record(ctx, float64(elapsedTime), attemptLatAllAttrs) + } + + if serverLatErr == nil { + serverLatency, serverLatErr := mt.getServerLatency() + if serverLatErr == nil { + builtInMetricInstruments.distributions[metricNameServerLatencies].Record(ctx, float64(serverLatency), serverLatAllAttres) + } + } + } + } +} + +// recordOperationCompletion returns a function that should be executed to record total operation metrics +// It records as many metrics as it can and does not return error on first failure +func (config *metricsConfigInternal) recordOperationCompletion(ctx context.Context, mt *builtinMetricsTracer) func() { + if !config.builtInEnabled { + return noOpRecordFn + } + startTime := time.Now() + + return func() { + // Calculate elapsed time + elapsedTime := time.Since(startTime).Nanoseconds() + + // Attributes for operation_latencies + opLatCurrCallAttrs, opLatErr := mt.toOtelMetricAttrs(metricNameOperationLatencies) + opLatAllAttrs := metric.WithAttributes(append(config.clientAttributes, opLatCurrCallAttrs...)...) + + // Attributes for retry_count + retryCntCurrCallAttrs, retryCountErr := mt.toOtelMetricAttrs(metricNameRetryCount) + retryCntAllAttrs := metric.WithAttributes(append(config.clientAttributes, retryCntCurrCallAttrs...)...) + + for _, builtInMetricInstruments := range config.instruments { + if opLatErr == nil { + builtInMetricInstruments.distributions[metricNameOperationLatencies].Record(ctx, float64(elapsedTime), opLatAllAttrs) + } + + // Only record when retry count is greater than 0 so the retry + // graph will be less confusing + if mt.attemptCount > 1 { + if retryCountErr == nil { + builtInMetricInstruments.counters[metricNameRetryCount].Add(ctx, mt.attemptCount-1, retryCntAllAttrs) + } + } + } + } +} + +// builtinMetricsTracer is created one per function call +// It is used to store metric attribute values and other data required to obtain them +type builtinMetricsTracer struct { + tableName string + appProfileID string + method string + isStreaming bool + + // gRPC status code + status string + + // Contains the header response metadata which is used to extract cluster and zone + headerMD *metadata.MD + + // Contains the trailer response metadata which is used to extract cluster and zone + trailerMD *metadata.MD + + attemptCount int64 +} + +func newBuiltinMetricsTracer(method, tableName, appProfile string, isStreaming bool) builtinMetricsTracer { + headerMD := metadata.New(nil) + trailerMD := metadata.New(nil) + return builtinMetricsTracer{ + tableName: tableName, + appProfileID: appProfile, + method: method, + isStreaming: isStreaming, + status: "", + headerMD: &headerMD, + trailerMD: &trailerMD, + attemptCount: 0, + } +} + +func (mt *builtinMetricsTracer) recordAndConvertErr(err error) error { + statusCode, statusErr := convertToGrpcStatusErr(err) + mt.status = statusCode.String() + return statusErr +} + +// mt.toOtelMetricAttrs converts recorded metric attributes values to OpenTelemetry attributes format +func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) ([]attribute.KeyValue, error) { + clusterID, zoneID, _ := mt.getLocation() + + // Create attribute key value pairs for attributes common to all metricss + attrKeyValues := []attribute.KeyValue{ + attribute.String(metricLabelKeyAppProfile, mt.appProfileID), + attribute.String(metricLabelKeyMethod, mt.method), + + // Add resource labels to otel metric labels. + // These will be used for creating the monitored resource but exporter + // will not add them to Google Cloud Monitoring metric labels + attribute.String(monitoredResLabelKeyTable, mt.tableName), + attribute.String(monitoredResLabelKeyCluster, clusterID), + attribute.String(monitoredResLabelKeyZone, zoneID), + } + + metricDetails, found := builtinMetrics[metricName] + if !found { + return nil, fmt.Errorf("Unable to create attributes list for unknown metric: %v", metricName) + } + + // Add additional attributes to metrics + for _, attrKey := range metricDetails.additionalAttributes { + switch attrKey { + case metricLabelKeyOperationStatus: + attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyOperationStatus, mt.status)) + case metricLabelKeyStreamingOperation: + attrKeyValues = append(attrKeyValues, attribute.Bool(metricLabelKeyStreamingOperation, mt.isStreaming)) + default: + return nil, fmt.Errorf("Unknown additional attribute: %v", attrKey) + } + } + + return attrKeyValues, nil +} + +// get GFE latency from response metadata +func (mt *builtinMetricsTracer) getServerLatency() (int, error) { + serverLatencyNano := 0 + serverTimingStr := "" + + // Check whether server latency available in response header metadata + if mt.headerMD != nil { + headerMDValues := mt.headerMD.Get(serverTimingMDKey) + if len(headerMDValues) != 0 { + serverTimingStr = headerMDValues[0] + } + } + + if len(serverTimingStr) == 0 { + // Check whether server latency available in response trailer metadata + if mt.trailerMD != nil { + trailerMDValues := mt.trailerMD.Get(serverTimingMDKey) + if len(trailerMDValues) != 0 { + serverTimingStr = trailerMDValues[0] + } + } + } + + serverTimingValPrefix := "gfet4t7; dur=" + serverLatencyMillis, err := strconv.Atoi(strings.TrimPrefix(serverTimingStr, serverTimingValPrefix)) + if !strings.HasPrefix(serverTimingStr, serverTimingValPrefix) || err != nil { + return serverLatencyNano, err + } + + serverLatencyNano = serverLatencyMillis * 1000000 + return serverLatencyNano, nil +} + +// Obtain cluster and zone from response metadata +func (mt *builtinMetricsTracer) getLocation() (string, string, error) { + var locationMetadata []string + + // Check whether location metadata available in response header metadata + if mt.headerMD != nil { + locationMetadata = mt.headerMD.Get(locationMDKey) + } + + if locationMetadata == nil { + // Check whether location metadata available in response trailer metadata + // if none found in response header metadata + if mt.trailerMD != nil { + locationMetadata = mt.trailerMD.Get(locationMDKey) + } + } + + if len(locationMetadata) < 1 { + return "", "", fmt.Errorf("Failed to get location metadata") + } + + // Unmarshal binary location metadata + responseParams := &btpb.ResponseParams{} + err := proto.Unmarshal([]byte(locationMetadata[0]), responseParams) + if err != nil { + return "", "", err + } + + return responseParams.GetClusterId(), responseParams.GetZoneId(), nil +} diff --git a/bigtable/metrics_monitoring_exporter.go b/bigtable/metrics_monitoring_exporter.go new file mode 100644 index 000000000000..6e6c37ce1dc9 --- /dev/null +++ b/bigtable/metrics_monitoring_exporter.go @@ -0,0 +1,367 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import ( + "context" + "errors" + "fmt" + "math" + "reflect" + "sync" + "time" + + monitoring "cloud.google.com/go/monitoring/apiv3/v2" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + otelmetricdata "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/api/option" + "google.golang.org/genproto/googleapis/api/distribution" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + meterName = "bigtable.googleapis.com/internal/client/" + bigtableResourceType = "bigtable_client_raw" + + // The number of timeserieses to send to GCM in a single request. This + // is a hard limit in the GCM API, so we never want to exceed 200. + sendBatchSize = 200 +) + +var ( + monitoredResLabelsSet = map[string]bool{ + monitoredResLabelKeyProject: true, + monitoredResLabelKeyInstance: true, + monitoredResLabelKeyCluster: true, + monitoredResLabelKeyTable: true, + monitoredResLabelKeyZone: true, + } + + errShutdown = fmt.Errorf("exporter is shutdown") +) + +type errUnexpectedAggregationKind struct { + kind string +} + +func (e errUnexpectedAggregationKind) Error() string { + return fmt.Sprintf("the metric kind is unexpected: %v", e.kind) +} + +// monitoringExporter is the implementation of OpenTelemetry metric exporter for +// Google Cloud Monitoring. +// Default exporter for built-in metrics +type monitoringExporter struct { + shutdown chan struct{} + client *monitoring.MetricClient + shutdownOnce sync.Once + projectID string +} + +func newMonitoringExporter(ctx context.Context, project string, opts ...option.ClientOption) (*monitoringExporter, error) { + client, err := monitoring.NewMetricClient(ctx, opts...) + if err != nil { + return nil, err + } + return &monitoringExporter{ + client: client, + shutdown: make(chan struct{}), + projectID: project, + }, nil +} + +// ForceFlush does nothing, the exporter holds no state. +func (e *monitoringExporter) ForceFlush(ctx context.Context) error { return ctx.Err() } + +// Shutdown shuts down the client connections. +func (e *monitoringExporter) Shutdown(ctx context.Context) error { + err := errShutdown + e.shutdownOnce.Do(func() { + close(e.shutdown) + err = errors.Join(ctx.Err(), e.client.Close()) + }) + return err +} + +// Export exports OpenTelemetry Metrics to Google Cloud Monitoring. +func (me *monitoringExporter) Export(ctx context.Context, rm *otelmetricdata.ResourceMetrics) error { + select { + case <-me.shutdown: + return errShutdown + default: + } + + return me.exportTimeSeries(ctx, rm) +} + +// Temporality returns the Temporality to use for an instrument kind. +func (me *monitoringExporter) Temporality(ik otelmetric.InstrumentKind) otelmetricdata.Temporality { + return otelmetric.DefaultTemporalitySelector(ik) +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (me *monitoringExporter) Aggregation(ik otelmetric.InstrumentKind) otelmetric.Aggregation { + return otelmetric.DefaultAggregationSelector(ik) +} + +// exportTimeSeries create TimeSeries from the records in cps. +// res should be the common resource among all TimeSeries, such as instance id, application name and so on. +func (me *monitoringExporter) exportTimeSeries(ctx context.Context, rm *otelmetricdata.ResourceMetrics) error { + tss, err := me.recordsToTspbs(rm) + if len(tss) == 0 { + return err + } + + name := fmt.Sprintf("projects/%s", me.projectID) + + errs := []error{err} + for i := 0; i < len(tss); i += sendBatchSize { + j := i + sendBatchSize + if j >= len(tss) { + j = len(tss) + } + + req := &monitoringpb.CreateTimeSeriesRequest{ + Name: name, + TimeSeries: tss[i:j], + } + errs = append(errs, me.client.CreateServiceTimeSeries(ctx, req)) + } + + return errors.Join(errs...) +} + +// recordToMpb converts data from records to Metric and Monitored resource proto type for Cloud Monitoring. +func (me *monitoringExporter) recordToMpb(metrics otelmetricdata.Metrics, attributes attribute.Set) (*googlemetricpb.Metric, *monitoredrespb.MonitoredResource) { + mr := &monitoredrespb.MonitoredResource{ + Type: bigtableResourceType, + Labels: map[string]string{}, + } + labels := make(map[string]string) + addAttributes := func(attr *attribute.Set) { + iter := attr.Iter() + for iter.Next() { + kv := iter.Attribute() + labelKey := string(kv.Key) + + if _, isResLabel := monitoredResLabelsSet[labelKey]; isResLabel { + // Add labels to monitored resource + mr.Labels[labelKey] = kv.Value.Emit() + } else { + // Add labels to metric + labels[labelKey] = kv.Value.Emit() + + } + } + } + addAttributes(&attributes) + return &googlemetricpb.Metric{ + Type: fmt.Sprintf("%v%s", meterName, metrics.Name), + Labels: labels, + }, mr +} + +// recordToTspb converts record to TimeSeries proto type with common resource. +// ref. https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries +func (me *monitoringExporter) recordToTspb(m otelmetricdata.Metrics) ([]*monitoringpb.TimeSeries, error) { + var tss []*monitoringpb.TimeSeries + var errs []error + if m.Data == nil { + return nil, nil + } + switch a := m.Data.(type) { + case otelmetricdata.Histogram[float64]: + for _, point := range a.DataPoints { + metric, mr := me.recordToMpb(m, point.Attributes) + ts, err := histogramToTimeSeries(point, m, mr) + if err != nil { + errs = append(errs, err) + continue + } + ts.Metric = metric + tss = append(tss, ts) + } + case otelmetricdata.Sum[int64]: + for _, point := range a.DataPoints { + metric, mr := me.recordToMpb(m, point.Attributes) + var ts *monitoringpb.TimeSeries + var err error + if a.IsMonotonic { + fmt.Printf("Is monotonic\n") + ts, err = sumToTimeSeries[int64](point, m, mr) + } else { + fmt.Printf("Is non-monotonic\n") + // Send non-monotonic sums as gauges + ts, err = gaugeToTimeSeries[int64](point, m, mr) + } + if err != nil { + errs = append(errs, err) + continue + } + ts.Metric = metric + tss = append(tss, ts) + } + default: + errs = append(errs, errUnexpectedAggregationKind{kind: reflect.TypeOf(m.Data).String()}) + } + return tss, errors.Join(errs...) +} + +func (me *monitoringExporter) recordsToTspbs(rm *otelmetricdata.ResourceMetrics) ([]*monitoringpb.TimeSeries, error) { + var ( + tss []*monitoringpb.TimeSeries + errs []error + ) + for _, scope := range rm.ScopeMetrics { + for _, metrics := range scope.Metrics { + ts, err := me.recordToTspb(metrics) + errs = append(errs, err) + tss = append(tss, ts...) + } + } + + return tss, errors.Join(errs...) +} + +func gaugeToTimeSeries[N int64 | float64](point otelmetricdata.DataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { + value, valueType := numberDataPointToValue(point) + timestamp := timestamppb.New(point.Time) + if err := timestamp.CheckValid(); err != nil { + return nil, err + } + return &monitoringpb.TimeSeries{ + Resource: mr, + Unit: string(metrics.Unit), + MetricKind: googlemetricpb.MetricDescriptor_GAUGE, + ValueType: valueType, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + EndTime: timestamp, + }, + Value: value, + }}, + }, nil +} + +func sumToTimeSeries[N int64 | float64](point otelmetricdata.DataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { + interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) + if err != nil { + return nil, err + } + value, valueType := numberDataPointToValue[N](point) + return &monitoringpb.TimeSeries{ + Resource: mr, + Unit: string(metrics.Unit), + MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + ValueType: valueType, + Points: []*monitoringpb.Point{{ + Interval: interval, + Value: value, + }}, + }, nil +} + +func histogramToTimeSeries[N int64 | float64](point otelmetricdata.HistogramDataPoint[N], metrics otelmetricdata.Metrics, mr *monitoredrespb.MonitoredResource) (*monitoringpb.TimeSeries, error) { + interval, err := toNonemptyTimeIntervalpb(point.StartTime, point.Time) + if err != nil { + return nil, err + } + distributionValue := histToDistribution(point) + return &monitoringpb.TimeSeries{ + Resource: mr, + Unit: string(metrics.Unit), + MetricKind: googlemetricpb.MetricDescriptor_CUMULATIVE, + ValueType: googlemetricpb.MetricDescriptor_DISTRIBUTION, + Points: []*monitoringpb.Point{{ + Interval: interval, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: distributionValue, + }, + }, + }}, + }, nil +} + +func toNonemptyTimeIntervalpb(start, end time.Time) (*monitoringpb.TimeInterval, error) { + // The end time of a new interval must be at least a millisecond after the end time of the + // previous interval, for all non-gauge types. + // https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#timeinterval + if end.Sub(start).Milliseconds() <= 1 { + end = start.Add(time.Millisecond) + } + startpb := timestamppb.New(start) + endpb := timestamppb.New(end) + err := errors.Join( + startpb.CheckValid(), + endpb.CheckValid(), + ) + if err != nil { + return nil, err + } + + return &monitoringpb.TimeInterval{ + StartTime: startpb, + EndTime: endpb, + }, nil +} + +func histToDistribution[N int64 | float64](hist otelmetricdata.HistogramDataPoint[N]) *distribution.Distribution { + counts := make([]int64, len(hist.BucketCounts)) + for i, v := range hist.BucketCounts { + counts[i] = int64(v) + } + var mean float64 + if !math.IsNaN(float64(hist.Sum)) && hist.Count > 0 { // Avoid divide-by-zero + mean = float64(hist.Sum) / float64(hist.Count) + } + return &distribution.Distribution{ + Count: int64(hist.Count), + Mean: mean, + BucketCounts: counts, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ + Bounds: hist.Bounds, + }, + }, + }, + } +} + +func numberDataPointToValue[N int64 | float64]( + point otelmetricdata.DataPoint[N], +) (*monitoringpb.TypedValue, googlemetricpb.MetricDescriptor_ValueType) { + switch v := any(point.Value).(type) { + case int64: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: v, + }}, + googlemetricpb.MetricDescriptor_INT64 + case float64: + return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{ + DoubleValue: v, + }}, + googlemetricpb.MetricDescriptor_DOUBLE + } + // It is impossible to reach this statement + return nil, googlemetricpb.MetricDescriptor_INT64 +} diff --git a/bigtable/metrics_monitoring_exporter_test.go b/bigtable/metrics_monitoring_exporter_test.go new file mode 100644 index 000000000000..e1a21a1bd469 --- /dev/null +++ b/bigtable/metrics_monitoring_exporter_test.go @@ -0,0 +1,580 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package bigtable + +import ( + "context" + "errors" + "fmt" + "net" + "reflect" + "strings" + "sync" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "google.golang.org/api/option" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/emptypb" +) + +type MetricsTestServer struct { + lis net.Listener + srv *grpc.Server + Endpoint string + userAgent string + createMetricDescriptorReqs []*monitoringpb.CreateMetricDescriptorRequest + createServiceTimeSeriesReqs []*monitoringpb.CreateTimeSeriesRequest + RetryCount int + mu sync.Mutex +} + +func (m *MetricsTestServer) Shutdown() { + // this will close mts.lis + m.srv.GracefulStop() +} + +// Pops out the CreateMetricDescriptorRequests which the test server has received so far. +func (m *MetricsTestServer) CreateMetricDescriptorRequests() []*monitoringpb.CreateMetricDescriptorRequest { + m.mu.Lock() + defer m.mu.Unlock() + reqs := m.createMetricDescriptorReqs + m.createMetricDescriptorReqs = nil + return reqs +} + +// Pops out the UserAgent from the most recent CreateTimeSeriesRequests or CreateServiceTimeSeriesRequests. +func (m *MetricsTestServer) UserAgent() string { + m.mu.Lock() + defer m.mu.Unlock() + ua := m.userAgent + m.userAgent = "" + return ua +} + +// Pops out the CreateServiceTimeSeriesRequests which the test server has received so far. +func (m *MetricsTestServer) CreateServiceTimeSeriesRequests() []*monitoringpb.CreateTimeSeriesRequest { + m.mu.Lock() + defer m.mu.Unlock() + reqs := m.createServiceTimeSeriesReqs + m.createServiceTimeSeriesReqs = nil + return reqs +} + +func (m *MetricsTestServer) appendCreateMetricDescriptorReq(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest) { + m.mu.Lock() + defer m.mu.Unlock() + m.createMetricDescriptorReqs = append(m.createMetricDescriptorReqs, req) +} + +func (m *MetricsTestServer) appendCreateServiceTimeSeriesReq(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) { + m.mu.Lock() + defer m.mu.Unlock() + m.createServiceTimeSeriesReqs = append(m.createServiceTimeSeriesReqs, req) + if md, ok := metadata.FromIncomingContext(ctx); ok { + m.userAgent = strings.Join(md.Get("User-Agent"), ";") + } +} + +func (m *MetricsTestServer) Serve() error { + return m.srv.Serve(m.lis) +} + +type fakeMetricServiceServer struct { + monitoringpb.UnimplementedMetricServiceServer + metricsTestServer *MetricsTestServer +} + +func (f *fakeMetricServiceServer) CreateServiceTimeSeries( + ctx context.Context, + req *monitoringpb.CreateTimeSeriesRequest, +) (*emptypb.Empty, error) { + f.metricsTestServer.appendCreateServiceTimeSeriesReq(ctx, req) + return &emptypb.Empty{}, nil +} + +func (f *fakeMetricServiceServer) CreateMetricDescriptor( + ctx context.Context, + req *monitoringpb.CreateMetricDescriptorRequest, +) (*metricpb.MetricDescriptor, error) { + f.metricsTestServer.appendCreateMetricDescriptorReq(ctx, req) + return &metricpb.MetricDescriptor{}, nil +} + +func NewMetricTestServer() (*MetricsTestServer, error) { + srv := grpc.NewServer() + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, err + } + testServer := &MetricsTestServer{ + Endpoint: lis.Addr().String(), + lis: lis, + srv: srv, + } + + monitoringpb.RegisterMetricServiceServer( + srv, + &fakeMetricServiceServer{metricsTestServer: testServer}, + ) + + return testServer, nil +} + +func requireNoError(t *testing.T, err error) { + if err != nil { + t.Fatalf("Received unexpected error: \n%v", err) + } +} + +func assertNoError(t *testing.T, err error) { + if err != nil { + t.Errorf("Received unexpected error: \n%v", err) + } +} + +func assertErrorIs(t *testing.T, gotErr error, wantErr error) { + if !errors.Is(gotErr, wantErr) { + t.Errorf("error got: %v, want: %v", gotErr, wantErr) + } +} + +func assertEqual(t *testing.T, got, want interface{}) { + if !testutil.Equal(got, want) { + t.Errorf("got: %+v, want: %+v", got, want) + } + +} + +func TestExportMetrics(t *testing.T) { + ctx := context.Background() + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + res := &resource.Resource{} + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + if err != nil { + t.Errorf("Error occurred when creating exporter: %v", err) + } + provider := metric.NewMeterProvider( + metric.WithReader(metric.NewPeriodicReader(exporter)), + metric.WithResource(res), + ) + + //nolint:errcheck + defer func() { + err = provider.Shutdown(ctx) + assertNoError(t, err) + }() + + meter := provider.Meter("test") + counter, err := meter.Int64Counter("name.lastvalue") + requireNoError(t, err) + + counter.Add(ctx, 1) +} + +func TestExportCounter(t *testing.T) { + ctx := context.Background() + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + provider := metric.NewMeterProvider( + metric.WithReader(metric.NewPeriodicReader(exporter)), + metric.WithResource( + resource.NewWithAttributes( + semconv.SchemaURL, + attribute.String("test_id", "abc123"), + )), + ) + + //nolint:errcheck + defer func() { + err = provider.Shutdown(ctx) + assertNoError(t, err) + }() + + // Start meter + meter := provider.Meter("cloudmonitoring/test") + + // Register counter value + counter, err := meter.Int64Counter("counter-a") + assertNoError(t, err) + clabels := []attribute.KeyValue{attribute.Key("key").String("value")} + counter.Add(ctx, 100, otelmetric.WithAttributes(clabels...)) +} + +func TestExportHistogram(t *testing.T) { + ctx := context.Background() + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + provider := metric.NewMeterProvider( + metric.WithReader(metric.NewPeriodicReader(exporter)), + metric.WithResource( + resource.NewWithAttributes( + semconv.SchemaURL, + attribute.String("test_id", "abc123"), + ), + ), + ) + assertNoError(t, err) + + //nolint:errcheck + defer func() { + err = provider.Shutdown(ctx) + assertNoError(t, err) + }() + + // Start meter + meter := provider.Meter("cloudmonitoring/test") + + // Register counter value + counter, err := meter.Float64Histogram("counter-a") + assertNoError(t, err) + clabels := []attribute.KeyValue{attribute.Key("key").String("value")} + counter.Record(ctx, 100, otelmetric.WithAttributes(clabels...)) + counter.Record(ctx, 50, otelmetric.WithAttributes(clabels...)) + counter.Record(ctx, 200, otelmetric.WithAttributes(clabels...)) +} + +func TestRecordToMpb(t *testing.T) { + metricName := "testing" + + me := &monitoringExporter{} + + monitoredResLabelValueProject := "project01" + monitoredResLabelValueInstance := "instance01" + monitoredResLabelValueZone := "zone01" + monitoredResLabelValueTable := "table01" + monitoredResLabelValueCluster := "cluster01" + + inputAttributes := attribute.NewSet( + attribute.Key("a").String("A"), + attribute.Key("b").Int64(100), + attribute.Key(monitoredResLabelKeyProject).String(monitoredResLabelValueProject), + attribute.Key(monitoredResLabelKeyInstance).String(monitoredResLabelValueInstance), + attribute.Key(monitoredResLabelKeyZone).String(monitoredResLabelValueZone), + attribute.Key(monitoredResLabelKeyTable).String(monitoredResLabelValueTable), + attribute.Key(monitoredResLabelKeyCluster).String(monitoredResLabelValueCluster), + ) + inputMetrics := metricdata.Metrics{ + Name: metricName, + } + + wantMetric := &googlemetricpb.Metric{ + Type: fmt.Sprintf("%v%s", meterName, metricName), + Labels: map[string]string{ + "a": "A", + "b": "100", + }, + } + + wantMonitoredResource := &monitoredrespb.MonitoredResource{ + Type: "bigtable_client_raw", + Labels: map[string]string{ + monitoredResLabelKeyProject: monitoredResLabelValueProject, + monitoredResLabelKeyInstance: monitoredResLabelValueInstance, + monitoredResLabelKeyZone: monitoredResLabelValueZone, + monitoredResLabelKeyTable: monitoredResLabelValueTable, + monitoredResLabelKeyCluster: monitoredResLabelValueCluster, + }, + } + + gotMetric, gotMonitoredResource := me.recordToMpb(inputMetrics, inputAttributes) + if !reflect.DeepEqual(wantMetric, gotMetric) { + t.Errorf("Metric: expected: %v, actual: %v", wantMetric, gotMetric) + } + if !reflect.DeepEqual(wantMonitoredResource, gotMonitoredResource) { + t.Errorf("Monitored resource: expected: %v, actual: %v", wantMonitoredResource, gotMonitoredResource) + } +} + +func TestTimeIntervalStaggering(t *testing.T) { + var tm time.Time + + interval, err := toNonemptyTimeIntervalpb(tm, tm) + if err != nil { + t.Fatalf("conversion to PB failed: %v", err) + } + + if err := interval.StartTime.CheckValid(); err != nil { + t.Fatalf("unable to convert start time from PB: %v", err) + } + start := interval.StartTime.AsTime() + + if err := interval.EndTime.CheckValid(); err != nil { + t.Fatalf("unable to convert end time to PB: %v", err) + } + end := interval.EndTime.AsTime() + + if end.Before(start.Add(time.Millisecond)) { + t.Fatalf("expected end=%v to be at least %v after start=%v, but it wasn't", end, time.Millisecond, start) + } +} + +func TestTimeIntervalPassthru(t *testing.T) { + var tm time.Time + + interval, err := toNonemptyTimeIntervalpb(tm, tm.Add(time.Second)) + if err != nil { + t.Fatalf("conversion to PB failed: %v", err) + } + + if err := interval.StartTime.CheckValid(); err != nil { + t.Fatalf("unable to convert start time from PB: %v", err) + } + start := interval.StartTime.AsTime() + + if err := interval.EndTime.CheckValid(); err != nil { + t.Fatalf("unable to convert end time to PB: %v", err) + } + end := interval.EndTime.AsTime() + + assertEqual(t, start, tm) + assertEqual(t, end, tm.Add(time.Second)) +} + +func TestConcurrentCallsAfterShutdown(t *testing.T) { + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + ctx := context.Background() + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + + err = exporter.Shutdown(ctx) + assertNoError(t, err) + + var wg sync.WaitGroup + wg.Add(3) + + go func() { + err := exporter.Shutdown(ctx) + assertErrorIs(t, err, errShutdown) + wg.Done() + }() + go func() { + err := exporter.ForceFlush(ctx) + assertNoError(t, err) + wg.Done() + }() + go func() { + err := exporter.Export(ctx, &metricdata.ResourceMetrics{}) + assertErrorIs(t, err, errShutdown) + wg.Done() + }() + + wg.Wait() +} + +func TestConcurrentExport(t *testing.T) { + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + defer testServer.Shutdown() + assertNoError(t, err) + + ctx := context.Background() + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + + defer func() { + err := exporter.Shutdown(ctx) + assertNoError(t, err) + }() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + err := exporter.Export(ctx, &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Name: "testing", Data: metricdata.Histogram[float64]{}}, + {Name: "test/of/path", Data: metricdata.Histogram[float64]{}}, + }, + }, + }, + }) + assertNoError(t, err) + wg.Done() + }() + go func() { + err := exporter.Export(ctx, &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + {Name: "testing", Data: metricdata.Histogram[float64]{}}, + {Name: "test/of/path", Data: metricdata.Histogram[float64]{}}, + }, + }, + }, + }) + assertNoError(t, err) + wg.Done() + }() + + wg.Wait() +} + +func TestBatchingExport(t *testing.T) { + ctx := context.Background() + setup := func(t *testing.T) (metric.Exporter, *MetricsTestServer) { + testServer, err := NewMetricTestServer() + //nolint:errcheck + go testServer.Serve() + t.Cleanup(testServer.Shutdown) + + assertNoError(t, err) + + clientOpts := []option.ClientOption{ + option.WithEndpoint(testServer.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + } + exporter, err := newMonitoringExporter(ctx, "PROJECT_ID_NOT_REAL", clientOpts...) + assertNoError(t, err) + + t.Cleanup(func() { + ctx := context.Background() + err := exporter.Shutdown(ctx) + assertNoError(t, err) + }) + + return exporter, testServer + } + + createMetrics := func(n int) []metricdata.Metrics { + inputMetrics := make([]metricdata.Metrics, n) + for i := 0; i < n; i++ { + inputMetrics[i] = metricdata.Metrics{Name: "testing", Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {}, + }, + }} + } + + return inputMetrics + } + + for _, tc := range []struct { + desc string + numMetrics int + expectedCreateTSCalls int + }{ + {desc: "0 metrics"}, + { + desc: "150 metrics", + numMetrics: 150, + expectedCreateTSCalls: 1, + }, + { + desc: "200 metrics", + numMetrics: 200, + expectedCreateTSCalls: 1, + }, + { + desc: "201 metrics", + numMetrics: 201, + expectedCreateTSCalls: 2, + }, + { + desc: "500 metrics", + numMetrics: 500, + expectedCreateTSCalls: 3, + }, + { + desc: "1199 metrics", + numMetrics: 1199, + expectedCreateTSCalls: 6, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + exporter, testServer := setup(t) + input := createMetrics(tc.numMetrics) + + err := exporter.Export(ctx, &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: input, + }, + }, + }) + assertNoError(t, err) + + gotCalls := testServer.CreateServiceTimeSeriesRequests() + assertEqual(t, len(gotCalls), tc.expectedCreateTSCalls) + }) + } +} diff --git a/bigtable/metrics_test.go b/bigtable/metrics_test.go new file mode 100644 index 000000000000..3af2eea65ff6 --- /dev/null +++ b/bigtable/metrics_test.go @@ -0,0 +1,293 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bigtable + +import ( + "fmt" + "strings" + "testing" + + "cloud.google.com/go/internal/testutil" + "github.com/google/go-cmp/cmp/cmpopts" + "go.opentelemetry.io/otel/attribute" + btpb "google.golang.org/genproto/googleapis/bigtable/v2" + "google.golang.org/grpc/codes" + "google.golang.org/protobuf/proto" + + "google.golang.org/grpc/metadata" +) + +var ( + clusterID1 = "cluster-id-1" + clusterID2 = "cluster-id-2" + zoneID1 = "zone-id-1" + + testHeaders, _ = proto.Marshal(&btpb.ResponseParams{ + ClusterId: &clusterID1, + ZoneId: &zoneID1, + }) + testTrailers, _ = proto.Marshal(&btpb.ResponseParams{ + ClusterId: &clusterID2, + ZoneId: &zoneID1, + }) + + testHeaderMD = &metadata.MD{ + locationMDKey: []string{string(testHeaders)}, + serverTimingMDKey: []string{"gfet4t7; dur=1234"}, + } + testTrailerMD = &metadata.MD{ + locationMDKey: []string{string(testTrailers)}, + serverTimingMDKey: []string{"gfet4t7; dur=5678"}, + } +) + +func equalErrs(gotErr error, wantErr error) bool { + if gotErr == nil && wantErr == nil { + return true + } + if gotErr == nil || wantErr == nil { + return false + } + return strings.Contains(gotErr.Error(), wantErr.Error()) +} + +func TestToOtelMetricAttrs(t *testing.T) { + mt := builtinMetricsTracer{ + tableName: "my-table", + appProfileID: "my-app-profile", + method: "ReadRows", + isStreaming: true, + status: codes.OK.String(), + headerMD: testHeaderMD, + trailerMD: &metadata.MD{}, + attemptCount: 1, + } + tests := []struct { + desc string + mt builtinMetricsTracer + metricName string + wantAttrs []attribute.KeyValue + wantError error + }{ + { + desc: "Known metric", + mt: mt, + metricName: metricNameOperationLatencies, + wantAttrs: []attribute.KeyValue{ + attribute.String(monitoredResLabelKeyTable, "my-table"), + attribute.String(metricLabelKeyAppProfile, "my-app-profile"), + attribute.String(metricLabelKeyMethod, "ReadRows"), + attribute.Bool(metricLabelKeyStreamingOperation, true), + attribute.String(metricLabelKeyOperationStatus, codes.OK.String()), + attribute.String(monitoredResLabelKeyCluster, clusterID1), + attribute.String(monitoredResLabelKeyZone, zoneID1), + }, + wantError: nil, + }, + { + desc: "Unknown metric", + mt: mt, + metricName: "unknown_metric", + wantAttrs: nil, + wantError: fmt.Errorf("Unable to create attributes list for unknown metric: unknown_metric"), + }, + } + + lessKeyValue := func(a, b attribute.KeyValue) bool { return a.Key < b.Key } + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotAttrs, gotErr := test.mt.toOtelMetricAttrs(test.metricName) + if !equalErrs(gotErr, test.wantError) { + t.Errorf("error got: %v, want: %v", gotErr, test.wantError) + } + if diff := testutil.Diff(gotAttrs, test.wantAttrs, + cmpopts.IgnoreUnexported(attribute.KeyValue{}, attribute.Value{}), + cmpopts.SortSlices(lessKeyValue)); diff != "" { + t.Errorf("got=-, want=+ \n%v", diff) + } + }) + } +} + +func TestGetServerLatency(t *testing.T) { + invalidFormat := "invalid format" + invalidFormatMD := &metadata.MD{ + serverTimingMDKey: []string{invalidFormat}, + } + invalidFormatErr := fmt.Errorf("strconv.Atoi: parsing %q: invalid syntax", invalidFormat) + + tests := []struct { + desc string + headerMD *metadata.MD + trailerMD *metadata.MD + wantLatency int + wantError error + }{ + { + desc: "No server latency in header or trailer", + headerMD: &metadata.MD{}, + trailerMD: &metadata.MD{}, + wantLatency: 0, + wantError: fmt.Errorf("strconv.Atoi: parsing \"\": invalid syntax"), + }, + { + desc: "Server latency in header", + headerMD: &metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=1234"}, + }, + trailerMD: &metadata.MD{}, + wantLatency: 1234000000, + wantError: nil, + }, + { + desc: "Server latency in trailer", + headerMD: &metadata.MD{}, + trailerMD: &metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=5678"}, + }, + wantLatency: 5678000000, + wantError: nil, + }, + { + desc: "Server latency in both header and trailer", + headerMD: &metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=1234"}, + }, + trailerMD: &metadata.MD{ + serverTimingMDKey: []string{"gfet4t7; dur=5678"}, + }, + wantLatency: 1234000000, + wantError: nil, + }, + { + desc: "Invalid server latency format in header", + headerMD: invalidFormatMD, + trailerMD: &metadata.MD{}, + wantLatency: 0, + wantError: invalidFormatErr, + }, + { + desc: "Invalid server latency format in trailer", + headerMD: &metadata.MD{}, + trailerMD: invalidFormatMD, + wantLatency: 0, + wantError: invalidFormatErr, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + mt := builtinMetricsTracer{ + headerMD: test.headerMD, + trailerMD: test.trailerMD, + } + + gotLatency, gotErr := mt.getServerLatency() + if !equalErrs(gotErr, test.wantError) { + t.Errorf("error got: %v, want: %v", gotErr, test.wantError) + } + if gotLatency != test.wantLatency { + t.Errorf("latency got: %v, want: %v", gotLatency, test.wantLatency) + } + }) + } +} + +func TestGetLocation(t *testing.T) { + invalidFormatErr := "cannot parse invalid wire-format data" + tests := []struct { + desc string + headerMD *metadata.MD + trailerMD *metadata.MD + wantCluster string + wantZone string + wantError error + }{ + { + desc: "No location metadata in header or trailer", + headerMD: &metadata.MD{}, + trailerMD: &metadata.MD{}, + wantCluster: "", + wantZone: "", + wantError: fmt.Errorf("Failed to get location metadata"), + }, + { + desc: "Location metadata in header", + headerMD: testHeaderMD, + trailerMD: &metadata.MD{}, + wantCluster: clusterID1, + wantZone: zoneID1, + wantError: nil, + }, + { + desc: "Location metadata in trailer", + headerMD: &metadata.MD{}, + trailerMD: testTrailerMD, + wantCluster: clusterID2, + wantZone: zoneID1, + wantError: nil, + }, + { + desc: "Location metadata in both header and trailer", + headerMD: testHeaderMD, + trailerMD: testTrailerMD, + wantCluster: clusterID1, + wantZone: zoneID1, + wantError: nil, + }, + { + desc: "Invalid location metadata format in header", + headerMD: &metadata.MD{ + locationMDKey: []string{"invalid format"}, + }, + trailerMD: &metadata.MD{}, + wantCluster: "", + wantZone: "", + wantError: fmt.Errorf(invalidFormatErr), + }, + { + desc: "Invalid location metadata format in trailer", + headerMD: &metadata.MD{}, + trailerMD: &metadata.MD{ + locationMDKey: []string{"invalid format"}, + }, + wantCluster: "", + wantZone: "", + wantError: fmt.Errorf(invalidFormatErr), + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + mt := builtinMetricsTracer{ + headerMD: test.headerMD, + trailerMD: test.trailerMD, + } + + gotCluster, gotZone, gotErr := mt.getLocation() + if gotCluster != test.wantCluster { + t.Errorf("cluster got: %v, want: %v", gotCluster, test.wantCluster) + } + if gotZone != test.wantZone { + t.Errorf("zone got: %v, want: %v", gotZone, test.wantZone) + } + if !equalErrs(gotErr, test.wantError) { + t.Errorf("error got: %v, want: %v", gotErr, test.wantError) + } + }) + } +} diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go index 9d59175cc28e..35d2654c5684 100644 --- a/bigtable/retry_test.go +++ b/bigtable/retry_test.go @@ -33,6 +33,8 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) +// TODO: Add test to simulate failure and assert that retry_count is recorded + func setupFakeServer(opt ...grpc.ServerOption) (tbl *Table, cleanup func(), err error) { srv, err := bttest.NewServer("localhost:0", opt...) if err != nil { @@ -346,8 +348,8 @@ func TestRetryApplyBulk_IndividualErrorsAndDeadlineExceeded(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, -10*time.Millisecond) defer cancel() errors, err := tbl.ApplyBulk(ctx, []string{"row1", "row2", "row3"}, []*Mutation{m1, m2, m3}) - wantErr := context.DeadlineExceeded - if wantErr != err { + wantErr := status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()) + if !equalErrs(wantErr, err) { t.Fatalf("deadline exceeded error: got: %v, want: %v", err, wantErr) } if errors != nil {