From ab10720027288b2d5d0d7dea10c8a559000bfffa Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 20 Jan 2025 17:28:02 +0100 Subject: [PATCH] squash all commits --- integration/libs/telemetry/telemetry_test.go | 91 ++++++++++++ libs/telemetry/api.go | 19 +++ libs/telemetry/context.go | 62 +++++++++ libs/telemetry/context_test.go | 77 ++++++++++ libs/telemetry/frontend_log.go | 22 +++ libs/telemetry/logger.go | 139 +++++++++++++++++++ libs/telemetry/logger_test.go | 113 +++++++++++++++ libs/telemetry/mock_logger.go | 22 +++ libs/telemetry/protos/README.md | 3 + libs/telemetry/protos/bundle_init.go | 39 ++++++ libs/telemetry/protos/test_event.go | 16 +++ 11 files changed, 603 insertions(+) create mode 100644 integration/libs/telemetry/telemetry_test.go create mode 100644 libs/telemetry/api.go create mode 100644 libs/telemetry/context.go create mode 100644 libs/telemetry/context_test.go create mode 100644 libs/telemetry/frontend_log.go create mode 100644 libs/telemetry/logger.go create mode 100644 libs/telemetry/logger_test.go create mode 100644 libs/telemetry/mock_logger.go create mode 100644 libs/telemetry/protos/README.md create mode 100644 libs/telemetry/protos/bundle_init.go create mode 100644 libs/telemetry/protos/test_event.go diff --git a/integration/libs/telemetry/telemetry_test.go b/integration/libs/telemetry/telemetry_test.go new file mode 100644 index 0000000000..cd9040422f --- /dev/null +++ b/integration/libs/telemetry/telemetry_test.go @@ -0,0 +1,91 @@ +package telemetry_test + +import ( + "context" + "net/http" + "reflect" + "testing" + "time" + + "github.com/databricks/cli/integration/internal/acc" + "github.com/databricks/cli/libs/telemetry" + "github.com/databricks/cli/libs/telemetry/protos" + "github.com/databricks/databricks-sdk-go/client" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Wrapper to capture the response from the API client since that's not directly +// accessible from the logger. +type apiClientWrapper struct { + response *telemetry.ResponseBody + apiClient *client.DatabricksClient +} + +func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string, + headers map[string]string, request, response any, + visitors ...func(*http.Request) error, +) error { + err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...) + wrapper.response = response.(*telemetry.ResponseBody) + return err +} + +func TestTelemetryLogger(t *testing.T) { + events := []telemetry.DatabricksCliLog{ + { + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue1, + }, + }, + { + BundleInitEvent: &protos.BundleInitEvent{ + Uuid: uuid.New().String(), + TemplateName: "abc", + TemplateEnumArgs: []protos.BundleInitTemplateEnumArg{ + { + Key: "a", + Value: "b", + }, + { + Key: "c", + Value: "d", + }, + }, + }, + }, + } + + assert.Equal(t, len(events), reflect.TypeOf(telemetry.DatabricksCliLog{}).NumField(), + "Number of events should match the number of fields in DatabricksCliLog. Please add a new event to this test.") + + ctx, w := acc.WorkspaceTest(t) + ctx = telemetry.WithDefaultLogger(ctx) + + // Extend the maximum wait time for the telemetry flush just for this test. + oldV := telemetry.MaxAdditionalWaitTime + telemetry.MaxAdditionalWaitTime = 1 * time.Hour + t.Cleanup(func() { + telemetry.MaxAdditionalWaitTime = oldV + }) + + for _, event := range events { + telemetry.Log(ctx, event) + } + + apiClient, err := client.New(w.W.Config) + require.NoError(t, err) + + // Flush the events. + wrapper := &apiClientWrapper{ + apiClient: apiClient, + } + telemetry.Flush(ctx, wrapper) + + // Assert that the events were logged. + assert.Equal(t, telemetry.ResponseBody{ + NumProtoSuccess: int64(len(events)), + Errors: []telemetry.LogError{}, + }, *wrapper.response) +} diff --git a/libs/telemetry/api.go b/libs/telemetry/api.go new file mode 100644 index 0000000000..e3f80fadb8 --- /dev/null +++ b/libs/telemetry/api.go @@ -0,0 +1,19 @@ +package telemetry + +// RequestBody is the request body type bindings for the /telemetry-ext API endpoint. +type RequestBody struct { + UploadTime int64 `json:"uploadTime"` + Items []string `json:"items"` + ProtoLogs []string `json:"protoLogs"` +} + +// ResponseBody is the response body type bindings for the /telemetry-ext API endpoint. +type ResponseBody struct { + Errors []LogError `json:"errors"` + NumProtoSuccess int64 `json:"numProtoSuccess"` +} + +type LogError struct { + Message string `json:"message"` + ErrorType string `json:"ErrorType"` +} diff --git a/libs/telemetry/context.go b/libs/telemetry/context.go new file mode 100644 index 0000000000..c0aea80bc3 --- /dev/null +++ b/libs/telemetry/context.go @@ -0,0 +1,62 @@ +package telemetry + +import ( + "context" + "fmt" +) + +// Private type to store the telemetry logger in the context +type telemetryLogger int + +// Key to store the telemetry logger in the context +var telemetryLoggerKey telemetryLogger + +func WithDefaultLogger(ctx context.Context) context.Context { + v := ctx.Value(telemetryLoggerKey) + + // If no logger is set in the context, set the default logger. + if v == nil { + nctx := context.WithValue(ctx, telemetryLoggerKey, &defaultLogger{}) + return nctx + } + + switch v.(type) { + case *defaultLogger: + panic(fmt.Errorf("default telemetry logger already set in the context: %T", v)) + case *mockLogger: + // Do nothing. Unit and integration tests set the mock logger in the context + // to avoid making actual API calls. Thus WithDefaultLogger should silently + // ignore the mock logger. + default: + panic(fmt.Errorf("unexpected telemetry logger type: %T", v)) + } + + return ctx +} + +// WithMockLogger sets a mock telemetry logger in the context. It overrides the +// default logger if it is already set in the context. +func WithMockLogger(ctx context.Context) context.Context { + v := ctx.Value(telemetryLoggerKey) + if v != nil { + panic(fmt.Errorf("telemetry logger already set in the context: %T", v)) + } + + return context.WithValue(ctx, telemetryLoggerKey, &mockLogger{}) +} + +func fromContext(ctx context.Context) Logger { + v := ctx.Value(telemetryLoggerKey) + if v == nil { + panic(fmt.Errorf("telemetry logger not found in the context")) + } + + switch vv := v.(type) { + case *defaultLogger: + return vv + case *mockLogger: + return vv + default: + panic(fmt.Errorf("unexpected telemetry logger type: %T", v)) + } +} diff --git a/libs/telemetry/context_test.go b/libs/telemetry/context_test.go new file mode 100644 index 0000000000..ddcdb83deb --- /dev/null +++ b/libs/telemetry/context_test.go @@ -0,0 +1,77 @@ +package telemetry + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWithDefaultLogger(t *testing.T) { + ctx := context.Background() + + // No default logger set + ctx1 := WithDefaultLogger(ctx) + assert.Equal(t, &defaultLogger{}, ctx1.Value(telemetryLoggerKey)) + + // Default logger already set + assert.PanicsWithError(t, "default telemetry logger already set in the context: *telemetry.defaultLogger", func() { + WithDefaultLogger(ctx1) + }) + + // Mock logger already set + ctx2 := WithMockLogger(ctx) + assert.NotPanics(t, func() { + WithDefaultLogger(ctx2) + }) + + // Unexpected logger type + type foobar struct{} + ctx3 := context.WithValue(ctx, telemetryLoggerKey, &foobar{}) + assert.PanicsWithError(t, "unexpected telemetry logger type: *telemetry.foobar", func() { + WithDefaultLogger(ctx3) + }) +} + +func TestWithMockLogger(t *testing.T) { + ctx := context.Background() + + // No logger set + ctx1 := WithMockLogger(ctx) + assert.Equal(t, &mockLogger{}, ctx1.Value(telemetryLoggerKey)) + + // Logger already set + assert.PanicsWithError(t, "telemetry logger already set in the context: *telemetry.mockLogger", func() { + WithMockLogger(ctx1) + }) + + // Default logger already set + ctx2 := WithDefaultLogger(ctx) + assert.PanicsWithError(t, "telemetry logger already set in the context: *telemetry.defaultLogger", func() { + WithMockLogger(ctx2) + }) +} + +func TestFromContext(t *testing.T) { + ctx := context.Background() + + // No logger set + assert.PanicsWithError(t, "telemetry logger not found in the context", func() { + fromContext(ctx) + }) + + // Default logger set + ctx1 := WithDefaultLogger(ctx) + assert.Equal(t, &defaultLogger{}, fromContext(ctx1)) + + // Mock logger set + ctx2 := WithMockLogger(ctx) + assert.Equal(t, &mockLogger{}, fromContext(ctx2)) + + // Unexpected logger type + type foobar struct{} + ctx3 := context.WithValue(ctx, telemetryLoggerKey, &foobar{}) + assert.PanicsWithError(t, "unexpected telemetry logger type: *telemetry.foobar", func() { + fromContext(ctx3) + }) +} diff --git a/libs/telemetry/frontend_log.go b/libs/telemetry/frontend_log.go new file mode 100644 index 0000000000..168d61c988 --- /dev/null +++ b/libs/telemetry/frontend_log.go @@ -0,0 +1,22 @@ +package telemetry + +import "github.com/databricks/cli/libs/telemetry/protos" + +// This corresponds to the FrontendLog lumberjack proto in universe. +// FrontendLog is the top-level struct for any client-side logs at Databricks +// regardless of whether they are generated from the CLI or the web UI. +type FrontendLog struct { + // A unique identifier for the log event generated from the CLI. + FrontendLogEventID string `json:"frontend_log_event_id,omitempty"` + + Entry FrontendLogEntry `json:"entry,omitempty"` +} + +type FrontendLogEntry struct { + DatabricksCliLog DatabricksCliLog `json:"databricks_cli_log,omitempty"` +} + +type DatabricksCliLog struct { + CliTestEvent *protos.CliTestEvent `json:"cli_test_event,omitempty"` + BundleInitEvent *protos.BundleInitEvent `json:"bundle_init_event,omitempty"` +} diff --git a/libs/telemetry/logger.go b/libs/telemetry/logger.go new file mode 100644 index 0000000000..77fffac8a9 --- /dev/null +++ b/libs/telemetry/logger.go @@ -0,0 +1,139 @@ +package telemetry + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/databricks/cli/libs/log" + "github.com/google/uuid" +) + +// Interface abstraction created to mock out the Databricks client for testing. +type DatabricksApiClient interface { + Do(ctx context.Context, method, path string, + headers map[string]string, request, response any, + visitors ...func(*http.Request) error) error +} + +type Logger interface { + // Record a telemetry event, to be flushed later. + Log(event DatabricksCliLog) + + // Flush all the telemetry events that have been logged so far. We expect + // this to be called once per CLI command for the default logger. + Flush(ctx context.Context, apiClient DatabricksApiClient) + + // This function is meant to be only to be used in tests to introspect + // the telemetry logs that have been logged so far. + Introspect() []DatabricksCliLog +} + +type defaultLogger struct { + logs []FrontendLog +} + +func (l *defaultLogger) Log(event DatabricksCliLog) { + if l.logs == nil { + l.logs = make([]FrontendLog, 0) + } + l.logs = append(l.logs, FrontendLog{ + // The telemetry endpoint deduplicates logs based on the FrontendLogEventID. + // This it's important to generate a unique ID for each log event. + FrontendLogEventID: uuid.New().String(), + Entry: FrontendLogEntry{ + DatabricksCliLog: event, + }, + }) +} + +// Maximum additional time to wait for the telemetry event to flush. We expect the flush +// method to be called when the CLI command is about to exit, so this caps the maximum +// additional time the user will experience because of us logging CLI telemetry. +var MaxAdditionalWaitTime = 3 * time.Second + +// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events +// right about as the CLI command is about to exit. The API endpoint can handle +// payloads with ~1000 events easily. Thus we log all the events at once instead of +// batching the logs across multiple API calls. +func (l *defaultLogger) Flush(ctx context.Context, apiClient DatabricksApiClient) { + // Set a maximum time to wait for the telemetry event to flush. + ctx, cancel := context.WithTimeout(ctx, MaxAdditionalWaitTime) + defer cancel() + + if len(l.logs) == 0 { + log.Debugf(ctx, "No telemetry events to flush") + return + } + + var protoLogs []string + for _, event := range l.logs { + s, err := json.Marshal(event) + if err != nil { + log.Debugf(ctx, "Error marshalling the telemetry event %v: %v", event, err) + continue + } + + protoLogs = append(protoLogs, string(s)) + } + + resp := &ResponseBody{} + for { + select { + case <-ctx.Done(): + log.Debugf(ctx, "Timed out before flushing telemetry events") + return + default: + // Proceed + } + + // Log the CLI telemetry events. + err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{ + UploadTime: time.Now().Unix(), + ProtoLogs: protoLogs, + + // A bug in the telemetry API requires us to send an empty items array. + // Otherwise we get an opaque 500 internal server error. + Items: []string{}, + }, resp) + if err != nil { + // The SDK automatically performs retries for 429s and 503s. Thus if we + // see an error here, do not retry logging the telemetry. + log.Debugf(ctx, "Error making the API request to /telemetry-ext: %v", err) + return + } + // If not all the logs were successfully sent, we'll retry and log everything + // again. + // + // Note: This will result in server side duplications but that's fine since + // we can always deduplicate in the data pipeline itself. + if len(l.logs) > int(resp.NumProtoSuccess) { + log.Debugf(ctx, "Not all logs were successfully sent. Retrying...") + continue + } + + // All logs were successfully sent. We can exit the function. + log.Debugf(ctx, "Successfully flushed telemetry events") + return + } +} + +func (l *defaultLogger) Introspect() []DatabricksCliLog { + panic("not implemented") +} + +func Log(ctx context.Context, event DatabricksCliLog) { + l := fromContext(ctx) + l.Log(event) +} + +func Flush(ctx context.Context, apiClient DatabricksApiClient) { + l := fromContext(ctx) + l.Flush(ctx, apiClient) +} + +func Introspect(ctx context.Context) []DatabricksCliLog { + l := fromContext(ctx) + return l.Introspect() +} diff --git a/libs/telemetry/logger_test.go b/libs/telemetry/logger_test.go new file mode 100644 index 0000000000..0d5eb01ffc --- /dev/null +++ b/libs/telemetry/logger_test.go @@ -0,0 +1,113 @@ +package telemetry + +import ( + "context" + "math/rand" + "net/http" + "testing" + + "github.com/databricks/cli/libs/telemetry/protos" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +type mockDatabricksClient struct { + numCalls int + t *testing.T +} + +func (m *mockDatabricksClient) Do(ctx context.Context, method, path string, headers map[string]string, request, response any, visitors ...func(*http.Request) error) error { + // Block until the fire channel is fired. + m.numCalls++ + + assertRequestPayload := func(reqb RequestBody) { + expectedProtoLogs := []string{ + "{\"frontend_log_event_id\":\"0194fdc2-fa2f-4cc0-81d3-ff12045b73c8\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", + "{\"frontend_log_event_id\":\"6e4ff95f-f662-45ee-a82a-bdf44a2d0b75\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", + "{\"frontend_log_event_id\":\"fb180daf-48a7-4ee0-b10d-394651850fd4\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", + "{\"frontend_log_event_id\":\"a178892e-e285-4ce1-9114-55780875d64e\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE3\"}}}}", + } + + // Assert payload matches the expected payload. + assert.Equal(m.t, expectedProtoLogs, reqb.ProtoLogs) + } + + switch m.numCalls { + case 1: + // The call is successful but not all events are successfully logged. + assertRequestPayload(request.(RequestBody)) + *(response.(*ResponseBody)) = ResponseBody{ + NumProtoSuccess: 3, + } + case 2: + // The call is successful and all events are successfully logged. + assertRequestPayload(request.(RequestBody)) + *(response.(*ResponseBody)) = ResponseBody{ + NumProtoSuccess: 4, + } + default: + panic("unexpected number of calls") + } + + return nil +} + +func TestTelemetryLoggerFlushesEvents(t *testing.T) { + mockClient := &mockDatabricksClient{ + t: t, + } + + // Set the random number generator to a fixed seed to ensure that the UUIDs are deterministic. + uuid.SetRand(rand.New(rand.NewSource(0))) + t.Cleanup(func() { + uuid.SetRand(nil) + }) + + ctx := WithDefaultLogger(context.Background()) + + for _, v := range []protos.DummyCliEnum{protos.DummyCliEnumValue1, protos.DummyCliEnumValue2, protos.DummyCliEnumValue2, protos.DummyCliEnumValue3} { + Log(ctx, DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{Name: v}, + }) + } + + // Flush the events. + Flush(ctx, mockClient) + + // Assert that the .Do method is called twice, because all logs were not + // successfully logged in the first call. + assert.Equal(t, 2, mockClient.numCalls) +} + +func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) { + // Set the maximum additional wait time to 0 to ensure that the Flush method times out immediately. + oldV := MaxAdditionalWaitTime + MaxAdditionalWaitTime = 0 + t.Cleanup(func() { + MaxAdditionalWaitTime = oldV + }) + + mockClient := &mockDatabricksClient{ + t: t, + } + + // Set the random number generator to a fixed seed to ensure that the UUIDs are deterministic. + uuid.SetRand(rand.New(rand.NewSource(0))) + t.Cleanup(func() { + uuid.SetRand(nil) + }) + + ctx := WithDefaultLogger(context.Background()) + + for _, v := range []protos.DummyCliEnum{protos.DummyCliEnumValue1, protos.DummyCliEnumValue2, protos.DummyCliEnumValue2, protos.DummyCliEnumValue3} { + Log(ctx, DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{Name: v}, + }) + } + + // Flush the events. + Flush(ctx, mockClient) + + // Assert that the .Do method is never called since the timeout is set to 0. + assert.Equal(t, 0, mockClient.numCalls) +} diff --git a/libs/telemetry/mock_logger.go b/libs/telemetry/mock_logger.go new file mode 100644 index 0000000000..be33ee6582 --- /dev/null +++ b/libs/telemetry/mock_logger.go @@ -0,0 +1,22 @@ +package telemetry + +import "context" + +type mockLogger struct { + events []DatabricksCliLog +} + +func (l *mockLogger) Log(event DatabricksCliLog) { + if l.events == nil { + l.events = make([]DatabricksCliLog, 0) + } + l.events = append(l.events, event) +} + +func (l *mockLogger) Flush(ctx context.Context, apiClient DatabricksApiClient) { + // Do nothing +} + +func (l *mockLogger) Introspect() []DatabricksCliLog { + return l.events +} diff --git a/libs/telemetry/protos/README.md b/libs/telemetry/protos/README.md new file mode 100644 index 0000000000..a188a08683 --- /dev/null +++ b/libs/telemetry/protos/README.md @@ -0,0 +1,3 @@ +The types in this package are equivalent to the lumberjack protos defined in Universe. +You can find all lumberjack protos for the Databricks CLI in the `proto/logs/frontend/databricks_cli` +directory. diff --git a/libs/telemetry/protos/bundle_init.go b/libs/telemetry/protos/bundle_init.go new file mode 100644 index 0000000000..b020083143 --- /dev/null +++ b/libs/telemetry/protos/bundle_init.go @@ -0,0 +1,39 @@ +package protos + +// Corresponds to the `DatabricksCliBundleInitEvent` proto message in `databricks_cli_log.proto` +// as of 20 Dec 2024. +type BundleInitEvent struct { + // UUID associated with the DAB itself. This is serialized into the DAB + // when a user runs `databricks bundle init` and all subsequent deployments of + // that DAB can then be associated with this init event. + Uuid string `json:"bundle_uuid,omitempty"` + + // Name of the template initialized when the user ran `databricks bundle init` + // This is only populated when the template is a first party template like + // mlops-stacks or default-python. + TemplateName string `json:"template_name,omitempty"` + + // Arguments used by the user to initialize the template. Only enum + // values will be set here by the Databricks CLI. + // + // We use a generic map representation here because a bundle template's args are + // managed in the template itself and maintaining a copy typed schema for it here + // will be untenable in the long term. + TemplateEnumArgs []BundleInitTemplateEnumArg `json:"template_enum_args,omitempty"` +} + +type BundleInitTemplateEnumArg struct { + // Valid key values for the template. These correspond to the keys specified in + // the "properties" section of the `databricks_template_schema.json` file. + // + // Note: `databricks_template_schema.json` contains a JSON schema type specification + // for the arguments that the template accepts. + Key string `json:"key"` + + // Value that the user set for the field. This is only populated for properties + // that have the "enum" field specified in the JSON schema type specification. + // + // The Databricks CLI ensures that the value here is one of the "enum" values from + // the template specification. + Value string `json:"value"` +} diff --git a/libs/telemetry/protos/test_event.go b/libs/telemetry/protos/test_event.go new file mode 100644 index 0000000000..c4f650cda8 --- /dev/null +++ b/libs/telemetry/protos/test_event.go @@ -0,0 +1,16 @@ +package protos + +// dummy event for testing the telemetry pipeline. Corresponds to `DatabricksCliTestEvent` +// proto in `databricks_cli_log.proto` as of 20 Dec 2024. +type CliTestEvent struct { + Name DummyCliEnum `json:"name,omitempty"` +} + +type DummyCliEnum string + +const ( + DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED" + DummyCliEnumValue1 DummyCliEnum = "VALUE1" + DummyCliEnumValue2 DummyCliEnum = "VALUE2" + DummyCliEnumValue3 DummyCliEnum = "VALUE3" +)