Skip to content

Commit

Permalink
[WIP] In process telemetry logger
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyas-goenka committed Jan 22, 2025
1 parent ab10720 commit 3964d8d
Show file tree
Hide file tree
Showing 33 changed files with 1,020 additions and 535 deletions.
2 changes: 1 addition & 1 deletion acceptance/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestAccept(t *testing.T) {
cloudEnv := os.Getenv("CLOUD_ENV")

if cloudEnv == "" {
server := StartServer(t)
server := testutil.StartServer(t)
AddHandlers(server)
// Redirect API access to local server:
t.Setenv("DATABRICKS_HOST", server.URL)
Expand Down
61 changes: 2 additions & 59 deletions acceptance/server_test.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,16 @@
package acceptance_test

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/databricks/cli/internal/testutil"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/workspace"
)

type TestServer struct {
*httptest.Server
Mux *http.ServeMux
}

type HandlerFunc func(r *http.Request) (any, error)

func NewTestServer() *TestServer {
mux := http.NewServeMux()
server := httptest.NewServer(mux)

return &TestServer{
Server: server,
Mux: mux,
}
}

func (s *TestServer) Handle(pattern string, handler HandlerFunc) {
s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
resp, err := handler(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")

var respBytes []byte

respString, ok := resp.(string)
if ok {
respBytes = []byte(respString)
} else {
respBytes, err = json.MarshalIndent(resp, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

if _, err := w.Write(respBytes); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
}

func StartServer(t *testing.T) *TestServer {
server := NewTestServer()
t.Cleanup(func() {
server.Close()
})
return server
}

func AddHandlers(server *TestServer) {
func AddHandlers(server *testutil.Server) {
server.Handle("/api/2.0/policies/clusters/list", func(r *http.Request) (any, error) {
return compute.ListPoliciesResponse{
Policies: []compute.Policy{
Expand Down
4 changes: 4 additions & 0 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ func (b *Bundle) GetSyncIncludePatterns(ctx context.Context) ([]string, error) {
return append(b.Config.Sync.Include, filepath.ToSlash(filepath.Join(internalDirRel, "*.*"))), nil
}

// TODO: Add end to end tests that the Environment variables returned by the
// AuthEnv function are correct + the config set in the context is fully resolved
// (instead of just being the input).

// AuthEnv returns a map with environment variables and their values
// derived from the workspace client configuration that was resolved
// in the context of this bundle.
Expand Down
6 changes: 6 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/databricks/cli/cmd/labs"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/cmd/sync"
sendtestevent "github.com/databricks/cli/cmd/telemetry/send_test_event"
"github.com/databricks/cli/cmd/telemetry/worker"
"github.com/databricks/cli/cmd/version"
"github.com/databricks/cli/cmd/workspace"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -75,5 +77,9 @@ func New(ctx context.Context) *cobra.Command {
cli.AddCommand(sync.New())
cli.AddCommand(version.New())

// TODO: Move these under the telemetry subcommand?
cli.AddCommand(worker.New())
cli.AddCommand(sendtestevent.New())

return cli
}
7 changes: 6 additions & 1 deletion cmd/root/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
var (
workspaceClient int
accountClient int
configUsed int

// TODO: Does the config used have the resolved configuration? Like has the
// profile been loaded?
configUsed int
)

type ErrNoWorkspaceProfiles struct {
Expand Down Expand Up @@ -334,6 +337,8 @@ func AccountClient(ctx context.Context) *databricks.AccountClient {
func ConfigUsed(ctx context.Context) *config.Config {
cfg, ok := ctx.Value(&configUsed).(*config.Config)
if !ok {
// todo: remove this, just for testing.
return &config.Config{}
panic("cannot get *config.Config. Please report it as a bug")
}
return cfg
Expand Down
63 changes: 63 additions & 0 deletions cmd/root/is_web_terminal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package root

import (
"context"
"fmt"
"os"
"strconv"
"strings"

"github.com/databricks/cli/libs/dbr"
)

// TODO: Split this into a separate PR and add a test.
func isWebTerminal(ctx context.Context) bool {
if !dbr.RunsOnRuntime(ctx) {
return false
}

cur := os.Getpid()

// Max number of ancestors to check for trying to detect if the process is
// running in a web terminal (i.e. launched by ttyd).
maxHeight := 10

for range maxHeight {
// If the pid is a 0 or 1, we are at the root of the process tree.
if cur == 0 || cur == 1 {
return false
}

// Read the name of the current process
b, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", cur))
if err != nil {
return false
}

// If the name for any of the parent processes is ttyd, then the
// CLI has been run from the web terminal.
if strings.TrimSpace(string(b)) == "ttyd" {
return true
}

// The 4th field in /proc/<pid>/stat is the parent pid.
b, err = os.ReadFile(fmt.Sprintf("/proc/%d/stat", cur))
if err != nil {
return false
}

stat := strings.Split(string(b), " ")
if len(stat) < 4 {
return false
}

v, err := strconv.Atoi(stat[3])
if err != nil {
return false
}

cur = v
}

return false
}
140 changes: 130 additions & 10 deletions cmd/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ package root

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"runtime"
"strings"
"time"

"github.com/databricks/cli/internal/build"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -73,9 +81,6 @@ func New(ctx context.Context) *cobra.Command {
// get the context back
ctx = cmd.Context()

// Detect if the CLI is running on DBR and store this on the context.
ctx = dbr.DetectRuntime(ctx)

// Configure our user agent with the command that's about to be executed.
ctx = withCommandInUserAgent(ctx, cmd)
ctx = withCommandExecIdInUserAgent(ctx)
Expand All @@ -94,32 +99,147 @@ func flagErrorFunc(c *cobra.Command, err error) error {
return fmt.Errorf("%w\n\n%s", err, c.UsageString())
}

// TODO CONTINUE: This setup should mostly work. There are a couple of open questions:
// 4. I can print the output from the telemetry-worker command and a waiting mode
// to the root.Execution method here to see whether the expected output matches.

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.

// TODO: The test runner also relies on this function. Create a separate function to
// avoid logging telemetry in our testcli runner.
func Execute(ctx context.Context, cmd *cobra.Command) error {
// TODO: deferred panic recovery
ctx = telemetry.WithNewLogger(ctx)
ctx = dbr.DetectRuntime(ctx)
start := time.Now()

// Run the command
cmd, err := cmd.ExecuteContextC(ctx)
if err != nil && !errors.Is(err, ErrAlreadyPrinted) {
cmd, cmdErr := cmd.ExecuteContextC(ctx)
if cmdErr != nil && !errors.Is(cmdErr, ErrAlreadyPrinted) {
// If cmdio logger initialization succeeds, then this function logs with the
// initialized cmdio logger, otherwise with the default cmdio logger
cmdio.LogError(cmd.Context(), err)
cmdio.LogError(cmd.Context(), cmdErr)
}

// Log exit status and error
// We only log if logger initialization succeeded and is stored in command
// context
if logger, ok := log.FromContext(cmd.Context()); ok {
if err == nil {
if cmdErr == nil {
logger.Info("completed execution",
slog.String("exit_code", "0"))
} else {
logger.Error("failed execution",
slog.String("exit_code", "1"),
slog.String("error", err.Error()))
slog.String("error", cmdErr.Error()))
}
}

return err
end := time.Now()

exitCode := 0
if cmdErr != nil {
exitCode = 1
}

if env.Get(ctx, telemetry.SkipEnvVar) != "true" {
logTelemetry(ctx, commandString(cmd), start, end, exitCode)
}

return cmdErr
}

// TODO: Do not log for integration tests using the CLI.
// TODO: Skip telemetry if the credentials are invalid.
func logTelemetry(ctx context.Context, cmdStr string, start, end time.Time, exitCode int) {
telemetry.SetExecutionContext(ctx, protos.ExecutionContext{
CmdExecId: cmdExecId,
Version: build.GetInfo().Version,
Command: cmdStr,
OperatingSystem: runtime.GOOS,
DbrVersion: env.Get(ctx, dbr.EnvVarName),
FromWebTerminal: isWebTerminal(ctx),
ExecutionTimeMs: end.Sub(start).Milliseconds(),
ExitCode: int64(exitCode),
})

// TODO: Better check?
// Do not log telemetry for the telemetry-worker command to avoid fork bombs.
if cmdStr == "telemetry-worker" {
return
}

execPath, err := os.Executable()
if err != nil {
log.Debugf(ctx, "failed to get executable path: %s", err)
}
telemetryCmd := exec.Command(execPath, "telemetry-worker")

// TODO: Add test that ensures that the context key for cli commands stores a
// resolved auth configuration.
// TODO: Add test that the worker inherits the environment variables from the
// parent process.
in := telemetry.WorkerInput{
AuthConfig: ConfigUsed(ctx),
Logs: telemetry.GetLogs(ctx),
}

if len(in.Logs) == 0 {
return
}

b, err := json.Marshal(in)
if err != nil {
log.Debugf(ctx, "failed to marshal telemetry logs: %s", err)
return
}

stdin, err := telemetryCmd.StdinPipe()
if err != nil {
log.Debugf(ctx, "failed to create stdin pipe for telemetry worker: %s", err)
}

stdout, err := telemetryCmd.StdoutPipe()
if err != nil {
log.Debugf(ctx, "failed to create stdout pipe for telemetry worker: %s", err)
}

err = telemetryCmd.Start()
if err != nil {
log.Debugf(ctx, "failed to start telemetry worker: %s", err)
return
}

// Set DATABRICKS_CLI_SKIP_TELEMETRY to true to ensure that the telemetry worker
// command accidentally does not call itself causing a fork bomb. This can happen
// if a change starts logging telemetry in the telemetry worker command's code
// path.
telemetryCmd.Env = os.Environ()
telemetryCmd.Env = append(telemetryCmd.Env, telemetry.SkipEnvVar+"=true")

_, err = stdin.Write(b)
if err != nil {
log.Debugf(ctx, "failed to write to telemetry worker: %s", err)
}

err = stdin.Close()
if err != nil {
log.Debugf(ctx, "failed to close stdin for telemetry worker: %s", err)
}

// This is only meant for testing purposes, to do assertions on the output
// of the telemetry worker command.
if env.Get(ctx, telemetry.BlockOnUploadEnvVar) == "true" {
err = telemetryCmd.Wait()
if err != nil {
log.Debugf(ctx, "failed to wait for telemetry worker: %s", err)
}

cmdio.LogString(ctx, "telemetry-worker output:")
b, err := io.ReadAll(stdout)
if err != nil {
log.Debugf(ctx, "failed to read telemetry worker output: %s", err)
}
cmdio.LogString(ctx, string(b))
}
}
Loading

0 comments on commit 3964d8d

Please sign in to comment.