Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logger for CLI telemetry #1943

Closed
wants to merge 12 commits into from
13 changes: 13 additions & 0 deletions integration/libs/telemetry/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package telemetry_test

import (
"testing"

"github.com/databricks/cli/integration/internal"
)

// TestMain is the entrypoint executed by the test runner.
// See [internal.Main] for prerequisites for running integration tests.
func TestMain(m *testing.M) {
internal.Main(m)
}
74 changes: 74 additions & 0 deletions integration/libs/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package telemetry_test

import (
"context"
"net/http"
"testing"
"time"

"github.com/databricks/cli/integration/internal/acc"
"github.com/databricks/cli/libs/telemetry"
"github.com/databricks/databricks-sdk-go/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Wrapper to capture the response from the API client since that's not directly
// accessible from the logger.
type apiClientWrapper struct {
response *telemetry.ResponseBody
apiClient *client.DatabricksClient
}

func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string,
headers map[string]string, request, response any,
visitors ...func(*http.Request) error,
) error {
err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...)
wrapper.response = response.(*telemetry.ResponseBody)
return err
}

func TestTelemetryLogger(t *testing.T) {
ctx, w := acc.WorkspaceTest(t)
ctx = telemetry.NewContext(ctx)

// Extend the maximum wait time for the telemetry flush just for this test.
telemetry.MaxAdditionalWaitTime = 1 * time.Hour
t.Cleanup(func() {
telemetry.MaxAdditionalWaitTime = 2 * time.Second
})

// Log some events.
err := telemetry.Log(ctx, telemetry.FrontendLogEntry{
DatabricksCliLog: telemetry.DatabricksCliLog{
CliTestEvent: telemetry.CliTestEvent{
Name: telemetry.DummyCliEnumValue1,
},
},
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we modify Log to accept Name directly? It seems like that is the only field there?

Then this and other cases can be written as

telemetry.Log(ctx, telemetry.DummyCliEnumValue1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There'll be other CLI events, like: #2037

require.NoError(t, err)
err = telemetry.Log(ctx, telemetry.FrontendLogEntry{
DatabricksCliLog: telemetry.DatabricksCliLog{
CliTestEvent: telemetry.CliTestEvent{
Name: telemetry.DummyCliEnumValue2,
},
},
})
require.NoError(t, err)

apiClient, err := client.New(w.W.Config)
require.NoError(t, err)

// Flush the events.
wrapper := &apiClientWrapper{
apiClient: apiClient,
}
telemetry.Flush(ctx, wrapper)

// Assert that the events were logged.
assert.Equal(t, telemetry.ResponseBody{
NumProtoSuccess: 2,
Errors: []telemetry.LogError{},
}, *wrapper.response)
}
19 changes: 19 additions & 0 deletions libs/telemetry/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package telemetry

// RequestBody is the request body type bindings for the /telemetry-ext API endpoint.
type RequestBody struct {
UploadTime int64 `json:"uploadTime"`
Items []string `json:"items"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you are not emitting usage logs, you dont need the items field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bug in the /telemetry-ext API where the requests are not logged successfully unless there's an empty Items list included in the request payload.

That's why Items is included here to forcefully serialize an empty array in the request body.

ProtoLogs []string `json:"protoLogs"`
}

// ResponseBody is the response body type bindings for the /telemetry-ext API endpoint.
type ResponseBody struct {
Errors []LogError `json:"errors"`
NumProtoSuccess int64 `json:"numProtoSuccess"`
}

type LogError struct {
Message string `json:"message"`
ErrorType string `json:"ErrorType"`
}
28 changes: 28 additions & 0 deletions libs/telemetry/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package telemetry

import (
"context"
)

// Private type to store the telemetry logger in the context
type telemetryLogger int

// Key to store the telemetry logger in the context
var telemetryLoggerKey telemetryLogger

func NewContext(ctx context.Context) context.Context {
_, ok := ctx.Value(telemetryLoggerKey).(*logger)
if ok {
panic("telemetry logger already exists in the context")
}

return context.WithValue(ctx, telemetryLoggerKey, &logger{protoLogs: []string{}})
}

func fromContext(ctx context.Context) *logger {
l, ok := ctx.Value(telemetryLoggerKey).(*logger)
if !ok {
panic("telemetry logger not found in the context")
}
return l
}
33 changes: 33 additions & 0 deletions libs/telemetry/frontend_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package telemetry

// This corresponds to the FrontendLog lumberjack proto in universe.
// FrontendLog is the top-level struct for any client-side logs at Databricks
// regardless of whether they are generated from the CLI or the web UI.
type FrontendLog struct {
// A unique identifier for the log event generated from the CLI.
FrontendLogEventID string `json:"frontend_log_event_id,omitempty"`

Entry FrontendLogEntry `json:"entry,omitempty"`
}

type FrontendLogEntry struct {
DatabricksCliLog DatabricksCliLog `json:"databricks_cli_log,omitempty"`
}

type DatabricksCliLog struct {
CliTestEvent CliTestEvent `json:"cli_test_event,omitempty"`
}

// dummy event for testing the telemetry pipeline
type CliTestEvent struct {
Name DummyCliEnum `json:"name,omitempty"`
}

type DummyCliEnum string

const (
DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED"
DummyCliEnumValue1 DummyCliEnum = "VALUE1"
DummyCliEnumValue2 DummyCliEnum = "VALUE2"
DummyCliEnumValue3 DummyCliEnum = "VALUE3"
)
112 changes: 112 additions & 0 deletions libs/telemetry/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package telemetry

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/client"
"github.com/google/uuid"
)

// Interface abstraction created to mock out the Databricks client for testing.
type DatabricksApiClient interface {
Do(ctx context.Context, method, path string,
headers map[string]string, request, response any,
visitors ...func(*http.Request) error) error
}

func Log(ctx context.Context, event FrontendLogEntry) error {
l := fromContext(ctx)

FrontendLog := FrontendLog{
// The telemetry endpoint deduplicates logs based on the FrontendLogEventID.
// This it's important to generate a unique ID for each log event.
FrontendLogEventID: uuid.New().String(),
Entry: event,
}

protoLog, err := json.Marshal(FrontendLog)
if err != nil {
return fmt.Errorf("error marshalling the telemetry event: %v", err)
}

l.protoLogs = append(l.protoLogs, string(protoLog))
return nil
}

type logger struct {
protoLogs []string
}

// Maximum additional time to wait for the telemetry event to flush. We expect the flush
// method to be called when the CLI command is about to exist, so this caps the maximum
// additional time the user will experience because of us logging CLI telemetry.
var MaxAdditionalWaitTime = 2 * time.Second

// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events
// right about as the CLI command is about to exit. The API endpoint can handle
// payloads with ~1000 events easily. Thus we log all the events at once instead of
// batching the logs across multiple API calls.
Comment on lines +51 to +54

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine, but something to keep in mind that if there are failures, then we expect clients to retry with backoffs. If you start the upload process at the tail end of a command, then it might not complete successfully.

func Flush(ctx context.Context, apiClient DatabricksApiClient) {
// Set a maximum time to wait for the telemetry event to flush.
ctx, cancel := context.WithTimeout(ctx, MaxAdditionalWaitTime)
defer cancel()
l := fromContext(ctx)

// We pass the API client as an arg to mock it in unit tests.
if apiClient == nil {
var err error

// Create API client to make the the telemetry API call.
apiClient, err = client.New(root.WorkspaceClient(ctx).Config)
if err != nil {
log.Debugf(ctx, "error creating API client for telemetry: %v", err)
return
}
}

resp := &ResponseBody{}
for {
select {
case <-ctx.Done():
log.Debugf(ctx, "Timed out before flushing telemetry events")
return
default:
// Proceed
}

// Log the CLI telemetry events.
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{
UploadTime: time.Now().Unix(),
ProtoLogs: l.protoLogs,

// A bug in the telemetry API requires us to send an empty items array.
// Otherwise we get an opaque 500 internal server error.
Items: []string{},
}, resp)
if err != nil {
// The SDK automatically performs retries for 429s and 503s. Thus if we
// see an error here, do not retry logging the telemetry.
log.Debugf(ctx, "Error making the API request to /telemetry-ext: %v", err)
return
}
// If not all the logs were successfully sent, we'll retry and log everything
// again.
//
// Note: This will result in server side duplications but that's fine since
// we can always deduplicate in the data pipeline itself.
if len(l.protoLogs) > int(resp.NumProtoSuccess) {
log.Debugf(ctx, "Not all logs were successfully sent. Retrying...")
continue
}

// All logs were successfully sent. We can exit the function.
log.Debugf(ctx, "Successfully flushed telemetry events")
return
}
}
Loading
Loading