Skip to content

Commit

Permalink
feat(bigtable): Client side metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bhshkh committed May 31, 2024
1 parent 5ccd88c commit cbc1197
Show file tree
Hide file tree
Showing 10 changed files with 1,939 additions and 55 deletions.
178 changes: 135 additions & 43 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
btopt "cloud.google.com/go/bigtable/internal/option"
"cloud.google.com/go/internal/trace"
gax "github.com/googleapis/gax-go/v2"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
Expand Down Expand Up @@ -56,6 +57,14 @@ type Client struct {
client btpb.BigtableClient
project, instance string
appProfile string
metricsConfig *metricsConfigInternal
}

// metricsConfig is used to represent user provided config.
// This is not configurable right now and only a default instance is used
type metricsConfig struct {
builtInEnabled bool
meterProviders []*sdkmetric.MeterProvider
}

// ClientConfig has configurations for the client.
Expand Down Expand Up @@ -95,12 +104,19 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
return nil, fmt.Errorf("dialing: %w", err)
}

// Create a OpenTelemetry metrics configuration
metricsConfig, err := newMetricsConfigInternal(ctx, project, instance, nil, opts...)
if err != nil {
return nil, err
}

return &Client{
connPool: connPool,
client: btpb.NewBigtableClient(connPool),
project: project,
instance: instance,
appProfile: config.AppProfile,
connPool: connPool,
client: btpb.NewBigtableClient(connPool),
project: project,
instance: instance,
appProfile: config.AppProfile,
metricsConfig: metricsConfig,
}, nil
}

Expand Down Expand Up @@ -166,19 +182,21 @@ func init() {
}

// Convert error to grpc status error
func convertToGrpcStatusErr(err error) error {
if err != nil {
if errStatus, ok := status.FromError(err); ok {
return status.Error(errStatus.Code(), errStatus.Message())
}
func convertToGrpcStatusErr(err error) (codes.Code, error) {
if err == nil {
return codes.OK, nil
}

ctxStatus := status.FromContextError(err)
if ctxStatus.Code() != codes.Unknown {
return status.Error(ctxStatus.Code(), ctxStatus.Message())
}
if errStatus, ok := status.FromError(err); ok {
return errStatus.Code(), status.Error(errStatus.Code(), errStatus.Message())
}

return err
ctxStatus := status.FromContextError(err)
if ctxStatus.Code() != codes.Unknown {
return ctxStatus.Code(), status.Error(ctxStatus.Code(), ctxStatus.Message())
}

return codes.Unknown, err
}

func (c *Client) fullTableName(table string) string {
Expand Down Expand Up @@ -285,6 +303,10 @@ func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *Re
return ti.Table.ApplyReadModifyWrite(ctx, row, m)
}

func (ti *tableImpl) getOperationRecorder(ctx context.Context, method string, isStreaming bool) (*builtinMetricsTracer, func()) {
return ti.Table.getOperationRecorder(ctx, method, isStreaming)
}

// TODO(dsymonds): Read method that returns a sequence of ReadItems.

// ReadRows reads rows from a table. f is called for each row.
Expand All @@ -295,13 +317,22 @@ func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *Re
// By default, the yielded rows will contain all values in all cells.
// Use RowFilter to limit the cells returned.
func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) {
method := "cloud.google.com/go/bigtable.ReadRows"
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows")
ctx = trace.StartSpan(ctx, method)
defer func() { trace.EndSpan(ctx, err) }()

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()

err = t.readRows(ctx, arg, f, metricsTracer, opts...)
return metricsTracer.recordAndConvertErr(err)
}

func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})
err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err = t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand Down Expand Up @@ -340,12 +371,18 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
cr = newChunkReader()
}

*mt.headerMD, err = stream.Header()
if err != nil {
return err
}
for {
res, err := stream.Recv()
if err == io.EOF {
*mt.trailerMD = stream.Trailer()
break
}
if err != nil {
*mt.trailerMD = stream.Trailer()
// Reset arg for next Invoke call.
if arg == nil {
// Should be lowest possible key value, an empty byte array
Expand Down Expand Up @@ -381,6 +418,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
cancel()
for {
if _, err := stream.Recv(); err != nil {
*mt.trailerMD = stream.Trailer()
// The stream has ended. We don't return an error
// because the caller has intentionally interrupted the scan.
return nil
Expand All @@ -407,7 +445,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
return err
}, retryOptions...)

return convertToGrpcStatusErr(err)
return err
}

// ReadRow is a convenience implementation of a single-row reader.
Expand Down Expand Up @@ -919,10 +957,19 @@ const maxMutations = 100000
// Apply mutates a row atomically. A mutation must contain at least one
// operation and at most 100000 operations.
func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) {
method := "cloud.google.com/go/bigtable/Apply"
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
ctx = trace.StartSpan(ctx, method)
defer func() { trace.EndSpan(ctx, err) }()

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, false)
defer opRecorder()

err = t.apply(ctx, metricsTracer, row, m, opts...)
return metricsTracer.recordAndConvertErr(err)
}

func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, m *Mutation, opts ...ApplyOption) (err error) {
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
Expand All @@ -945,15 +992,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
callOptions = retryOptions
}
var res *btpb.MutateRowResponse
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err := t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = t.c.client.MutateRow(ctx, req)
res, err = t.c.client.MutateRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD))
return err
}, callOptions...)
if err == nil {
after(res)
}
return convertToGrpcStatusErr(err)
return err
}

req := &btpb.CheckAndMutateRowRequest{
Expand Down Expand Up @@ -982,15 +1029,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
callOptions = retryOptions
}
var cmRes *btpb.CheckAndMutateRowResponse
err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err = t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
var err error
cmRes, err = t.c.client.CheckAndMutateRow(ctx, req)
cmRes, err = t.c.client.CheckAndMutateRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD))
return err
}, callOptions...)
if err == nil {
after(cmRes)
}
return convertToGrpcStatusErr(err)
return err
}

// An ApplyOption is an optional argument to Apply.
Expand Down Expand Up @@ -1118,10 +1165,18 @@ type entryErr struct {
//
// Conditional mutations cannot be applied in bulk and providing one will result in an error.
func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
method := "cloud.google.com/go/bigtable/Apply"
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
ctx = trace.StartSpan(ctx, method)
defer func() { trace.EndSpan(ctx, err) }()

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()
errs, err = t.applyBulk(ctx, metricsTracer, rowKeys, muts, opts...)
return errs, metricsTracer.recordAndConvertErr(err)
}

func (t *Table) applyBulk(ctx context.Context, metricsTracer *builtinMetricsTracer, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
if len(rowKeys) != len(muts) {
return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
}
Expand All @@ -1137,10 +1192,10 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio

for _, group := range groupEntries(origEntries, maxMutations) {
attrMap := make(map[string]interface{})
err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err = t.c.metricsConfig.gaxInvokeWithRecorder(ctx, metricsTracer, func(ctx context.Context, _ gax.CallSettings) error {
attrMap["rowCount"] = len(group)
trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
err := t.doApplyBulk(ctx, group, opts...)
err := t.doApplyBulk(ctx, group, metricsTracer, opts...)
if err != nil {
// We want to retry the entire request with the current group
return err
Expand Down Expand Up @@ -1187,7 +1242,7 @@ func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr {
}

// doApplyBulk does the work of a single ApplyBulk invocation
func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error {
func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, mt *builtinMetricsTracer, opts ...ApplyOption) error {
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
Expand All @@ -1207,16 +1262,20 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...
} else {
req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
}

stream, err := t.c.client.MutateRows(ctx, req)
if err != nil {
return err
}
*mt.headerMD, err = stream.Header()
for {
res, err := stream.Recv()
if err == io.EOF {
*mt.trailerMD = stream.Trailer()
break
}
if err != nil {
*mt.trailerMD = stream.Trailer()
return err
}

Expand Down Expand Up @@ -1288,6 +1347,16 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp {
// It returns the newly written cells.
func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
ctx = mergeOutgoingMetadata(ctx, t.md)

method := "cloud.google.com/go/bigtable/ApplyReadModifyWrite"
metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, false)
defer opRecorder()

updatedRow, err := t.applyReadModifyWrite(ctx, metricsTracer, row, m)
return updatedRow, metricsTracer.recordAndConvertErr(err)
}

func (t *Table) applyReadModifyWrite(ctx context.Context, mt *builtinMetricsTracer, row string, m *ReadModifyWrite) (Row, error) {
req := &btpb.ReadModifyWriteRowRequest{
AppProfileId: t.c.appProfile,
RowKey: []byte(row),
Expand All @@ -1298,18 +1367,23 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod
} else {
req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
}
res, err := t.c.client.ReadModifyWriteRow(ctx, req)
if err != nil {
return nil, err
}
if res.Row == nil {
return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil")
}
r := make(Row)
for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
decodeFamilyProto(r, row, fam)
}
return r, nil

var r Row
err := t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
res, err := t.c.client.ReadModifyWriteRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD))
if err != nil {
return err
}
if res.Row == nil {
return errors.New("unable to apply ReadModifyWrite: res.Row=nil")
}
r = make(Row)
for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
decodeFamilyProto(r, row, fam)
}
return nil
})
return r, err
}

// ReadModifyWrite represents a set of operations on a single row of a table.
Expand Down Expand Up @@ -1352,9 +1426,19 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of
// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces.
func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
method := "cloud.google.com/go/bigtable/SampleRowKeys"
ctx = mergeOutgoingMetadata(ctx, t.md)

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()

rowKeys, err := t.sampleRowKeys(ctx, metricsTracer)
return rowKeys, metricsTracer.recordAndConvertErr(err)
}

func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]string, error) {
var sampledRowKeys []string
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err := t.c.metricsConfig.gaxInvokeWithRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
sampledRowKeys = nil
req := &btpb.SampleRowKeysRequest{
AppProfileId: t.c.appProfile,
Expand All @@ -1371,12 +1455,15 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
if err != nil {
return err
}
*mt.headerMD, err = stream.Header()
for {
res, err := stream.Recv()
if err == io.EOF {
*mt.trailerMD = stream.Trailer()
break
}
if err != nil {
*mt.trailerMD = stream.Trailer()
return err
}

Expand All @@ -1389,5 +1476,10 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
}
return nil
}, retryOptions...)
return sampledRowKeys, convertToGrpcStatusErr(err)

return sampledRowKeys, err
}

func (t *Table) getOperationRecorder(ctx context.Context, method string, isStreaming bool) (*builtinMetricsTracer, func()) {
return t.c.metricsConfig.getOperationRecorder(ctx, t.table, t.c.appProfile, method, isStreaming)
}
5 changes: 3 additions & 2 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ func TestApplyErrors(t *testing.T) {
ctx := context.Background()
table := &Table{
c: &Client{
project: "P",
instance: "I",
project: "P",
instance: "I",
metricsConfig: &metricsConfigInternal{},
},
table: "t",
}
Expand Down
Loading

0 comments on commit cbc1197

Please sign in to comment.