Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8291] Use Global Connection to App in Config Watcher #4773

Merged
merged 25 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion components/camera/transformpipeline/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/config"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/rimage"
Expand Down Expand Up @@ -45,7 +46,8 @@ func writeTempConfig(cfg *config.Config) (string, error) {
// make a fake robot with a vision service.
func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) {
// add a fake camera to the config
cfg, err := config.Read(context.Background(), artifact.MustPath("components/camera/transformpipeline/vision.json"), logger)
cfg, err := config.Read(context.Background(), artifact.MustPath("components/camera/transformpipeline/vision.json"), logger,
&grpc.AppConn{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] You might've used this pattern elsewhere, but I often see nil passed instead of an "empty" struct when we want to communicate to another function that we have no special object for that function to use and it should create its own. Hypothetically, I'd have a small preference for passing nil here and not &grpc.AppConn.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I like this I for some reason glossed over this as an option when trying to figure how to make it apparent that the code path followed by the tests with an empty connection struct didn't actually need/use the conn so I like this

if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.viam.com/rdk/components/encoder/incremental"
fakemotor "go.viam.com/rdk/components/motor/fake"
"go.viam.com/rdk/config"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/referenceframe"
"go.viam.com/rdk/resource"
Expand All @@ -40,7 +41,7 @@ import (

func TestConfigRobot(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/robot.json", logger)
cfg, err := config.Read(context.Background(), "data/robot.json", logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.Components, test.ShouldHaveLength, 3)
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestConfig3(t *testing.T) {
logger := logging.NewTestLogger(t)

test.That(t, os.Setenv("TEST_THING_FOO", "5"), test.ShouldBeNil)
cfg, err := config.Read(context.Background(), "data/config3.json", logger)
cfg, err := config.Read(context.Background(), "data/config3.json", logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)

test.That(t, len(cfg.Components), test.ShouldEqual, 4)
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestConfig3(t *testing.T) {

func TestConfigWithLogDeclarations(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/config_with_log.json", logger)
cfg, err := config.Read(context.Background(), "data/config_with_log.json", logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)

test.That(t, len(cfg.Components), test.ShouldEqual, 4)
Expand Down Expand Up @@ -1207,15 +1208,15 @@ func TestPackageConfig(t *testing.T) {

func TestConfigRobotWebProfile(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/config_with_web_profile.json", logger)
cfg, err := config.Read(context.Background(), "data/config_with_web_profile.json", logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.EnableWebProfile, test.ShouldBeTrue)
}

func TestConfigRobotRevision(t *testing.T) {
logger := logging.NewTestLogger(t)
cfg, err := config.Read(context.Background(), "data/config_with_revision.json", logger)
cfg, err := config.Read(context.Background(), "data/config_with_revision.json", logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.Revision, test.ShouldEqual, "rev1")
Expand Down
9 changes: 5 additions & 4 deletions config/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.viam.com/rdk/components/board"
fakeboard "go.viam.com/rdk/components/board/fake"
"go.viam.com/rdk/config"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/utils"
Expand Down Expand Up @@ -400,9 +401,9 @@ func TestDiffConfigs(t *testing.T) {
t.Run(fmt.Sprintf("revealSensitiveConfigDiffs=%t", revealSensitiveConfigDiffs), func(t *testing.T) {
logger.Infof("Test name: %v LeftFile: `%v` RightFile: `%v`", tc.Name, tc.LeftFile, tc.RightFile)
logger := logging.NewTestLogger(t)
left, err := config.Read(context.Background(), tc.LeftFile, logger)
left, err := config.Read(context.Background(), tc.LeftFile, logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)
right, err := config.Read(context.Background(), tc.RightFile, logger)
right, err := config.Read(context.Background(), tc.RightFile, logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)

diff, err := config.DiffConfigs(*left, *right, revealSensitiveConfigDiffs)
Expand Down Expand Up @@ -445,9 +446,9 @@ func TestDiffConfigHeterogenousTypes(t *testing.T) {
} {
t.Run(tc.Name, func(t *testing.T) {
logger := logging.NewTestLogger(t)
left, err := config.Read(context.Background(), tc.LeftFile, logger)
left, err := config.Read(context.Background(), tc.LeftFile, logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)
right, err := config.Read(context.Background(), tc.RightFile, logger)
right, err := config.Read(context.Background(), tc.RightFile, logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)

_, err = config.DiffConfigs(*left, *right, true)
Expand Down
67 changes: 23 additions & 44 deletions config/reader.go
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To pass an AppConn in the tests in this [config] package, I needed to import the grpc package in many of the files. Unfortunately, the app_conn.go file (part of the grpc package) already imports the config package, so this would've lead to a cyclical import. To bypass this, I removed the dependency on config in the AppConn implementation by a) passing individually the fields of the config.Cloud struct that are used by the AppConn constructor instead of the whole struct and b) moving the function GetTimeoutCtx (used in the AppConn constructor) out of the config package and to the contextutils package.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"path/filepath"
"runtime"
"sort"
"time"

"github.com/a8m/envsubst"
"github.com/pkg/errors"
Expand All @@ -24,6 +23,7 @@ import (
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
rutils "go.viam.com/rdk/utils"
"go.viam.com/rdk/utils/contextutils"
)

// RDK versioning variables which are replaced by LD flags.
Expand All @@ -34,9 +34,6 @@ var (
)

const (
initialReadTimeout = 1 * time.Second
readTimeout = 5 * time.Second
readTimeoutBehindProxy = time.Minute
// PackagesDirName is where packages go underneath viamDotDir.
PackagesDirName = "packages"
// LocalPackagesSuffix is used by the local package manager.
Expand Down Expand Up @@ -182,28 +179,6 @@ func isLocationSecretsEqual(prevCloud, cloud *Cloud) bool {
return true
}

// GetTimeoutCtx returns a context [and its cancel function] with a timeout value determined by whether we are behind a proxy and whether a
// cached config exists.
func GetTimeoutCtx(ctx context.Context, shouldReadFromCache bool, id string) (context.Context, func()) {
timeout := readTimeout
// When environment indicates we are behind a proxy, bump timeout. Network
// operations tend to take longer when behind a proxy.
if proxyAddr := os.Getenv(rpc.SocksProxyEnvVar); proxyAddr != "" {
timeout = readTimeoutBehindProxy
}

// use shouldReadFromCache to determine whether this is part of initial read or not, but only shorten timeout
// if cached config exists
cachedConfigExists := false
if _, err := os.Stat(getCloudCacheFilePath(id)); err == nil {
cachedConfigExists = true
}
if shouldReadFromCache && cachedConfigExists {
timeout = initialReadTimeout
}
return context.WithTimeout(ctx, timeout)
}

// readFromCloud fetches a robot config from the cloud based
// on the given config.
func readFromCloud(
Expand All @@ -213,10 +188,11 @@ func readFromCloud(
shouldReadFromCache bool,
checkForNewCert bool,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, error) {
logger.Debug("reading configuration from the cloud")
cloudCfg := originalCfg.Cloud
unprocessedConfig, cached, err := getFromCloudOrCache(ctx, cloudCfg, shouldReadFromCache, logger)
unprocessedConfig, cached, err := getFromCloudOrCache(ctx, cloudCfg, shouldReadFromCache, logger, conn)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,7 +236,7 @@ func readFromCloud(
if !cfg.Cloud.SignalingInsecure && (checkForNewCert || tls.certificate == "" || tls.privateKey == "") {
logger.Debug("reading tlsCertificate from the cloud")

ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
ctxWithTimeout, cancel := contextutils.GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
certData, err := readCertificateDataFromCloudGRPC(ctxWithTimeout, cloudCfg, logger)
if err != nil {
cancel()
Expand Down Expand Up @@ -350,13 +326,14 @@ func Read(
ctx context.Context,
filePath string,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, error) {
buf, err := envsubst.ReadFile(filePath)
if err != nil {
return nil, err
}

return FromReader(ctx, filePath, bytes.NewReader(buf), logger)
return FromReader(ctx, filePath, bytes.NewReader(buf), logger, conn)
}

// ReadLocalConfig reads a config from the given file but does not fetch any config from the remote servers.
Expand All @@ -370,7 +347,8 @@ func ReadLocalConfig(
return nil, err
}

return fromReader(ctx, filePath, bytes.NewReader(buf), logger, false)
var nilConn rpc.ClientConn
return fromReader(ctx, filePath, bytes.NewReader(buf), logger, nilConn)
}

// FromReader reads a config from the given reader and specifies
Expand All @@ -380,8 +358,9 @@ func FromReader(
originalPath string,
r io.Reader,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, error) {
return fromReader(ctx, originalPath, r, logger, true)
return fromReader(ctx, originalPath, r, logger, conn)
}

// fromReader reads a config from the given reader and specifies
Expand All @@ -391,7 +370,7 @@ func fromReader(
originalPath string,
r io.Reader,
logger logging.Logger,
shouldReadFromCloud bool,
conn rpc.ClientConn,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you take a look and see if we can replace shouldReadFromCloud with conn entirely? so the existence of a connection itself is the indicator whether a we should be reading from cloud or not

totally ok if not a straight swap, just curious

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep I'll investigate

) (*Config, error) {
// First read and process config from disk
unprocessedConfig := Config{
Expand All @@ -406,8 +385,8 @@ func fromReader(
return nil, errors.Wrapf(err, "failed to process Config")
}

if shouldReadFromCloud && cfgFromDisk.Cloud != nil {
cfg, err := readFromCloud(ctx, cfgFromDisk, nil, true, true, logger)
if conn != nil && cfgFromDisk.Cloud != nil {
cfg, err := readFromCloud(ctx, cfgFromDisk, nil, true, true, logger, conn)
return cfg, err
}

Expand Down Expand Up @@ -648,13 +627,19 @@ func processConfig(unprocessedConfig *Config, fromCloud bool, logger logging.Log

// getFromCloudOrCache returns the config from the gRPC endpoint. If failures during cloud lookup fallback to the
// local cache if the error indicates it should.
func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCache bool, logger logging.Logger) (*Config, bool, error) {
func getFromCloudOrCache(
ctx context.Context,
cloudCfg *Cloud,
shouldReadFromCache bool,
logger logging.Logger,
conn rpc.ClientConn,
) (*Config, bool, error) {
var cached bool

ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
ctxWithTimeout, cancel := contextutils.GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
defer cancel()

cfg, errorShouldCheckCache, err := getFromCloudGRPC(ctxWithTimeout, cloudCfg, logger)
cfg, errorShouldCheckCache, err := getFromCloudGRPC(ctxWithTimeout, cloudCfg, logger, conn)
if err != nil {
if shouldReadFromCache && errorShouldCheckCache {
cachedConfig, cacheErr := readFromCache(cloudCfg.ID)
Expand Down Expand Up @@ -687,15 +672,9 @@ func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCac
}

// getFromCloudGRPC actually does the fetching of the robot config from the gRPC endpoint.
func getFromCloudGRPC(ctx context.Context, cloudCfg *Cloud, logger logging.Logger) (*Config, bool, error) {
func getFromCloudGRPC(ctx context.Context, cloudCfg *Cloud, logger logging.Logger, conn rpc.ClientConn) (*Config, bool, error) {
shouldCheckCacheOnFailure := true

conn, err := CreateNewGRPCClient(ctx, cloudCfg, logger)
if err != nil {
return nil, shouldCheckCacheOnFailure, errors.WithMessage(err, "error creating cloud grpc client")
}
defer utils.UncheckedErrorFunc(conn.Close)

agentInfo, err := getAgentInfo(logger)
if err != nil {
return nil, shouldCheckCacheOnFailure, errors.WithMessage(err, "error getting agent info")
Expand Down
13 changes: 7 additions & 6 deletions config/reader_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@ import (

"go.viam.com/rdk/components/arm"
"go.viam.com/rdk/config"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
)

func TestFromReaderValidate(t *testing.T) {
logger := logging.NewTestLogger(t)
_, err := config.FromReader(context.Background(), "somepath", strings.NewReader(""), logger)
_, err := config.FromReader(context.Background(), "somepath", strings.NewReader(""), logger, &grpc.AppConn{})
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "json: EOF")

_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": 1}`), logger)
_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": 1}`), logger, &grpc.AppConn{})
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "unmarshal")

conf, err := config.FromReader(context.Background(), "somepath", strings.NewReader(`{}`), logger)
conf, err := config.FromReader(context.Background(), "somepath", strings.NewReader(`{}`), logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)
test.That(t, conf, test.ShouldResemble, &config.Config{
ConfigFilePath: "somepath",
Expand All @@ -39,12 +40,12 @@ func TestFromReaderValidate(t *testing.T) {
},
})

_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": {}}`), logger)
_, err = config.FromReader(context.Background(), "somepath", strings.NewReader(`{"cloud": {}}`), logger, &grpc.AppConn{})
test.That(t, err, test.ShouldNotBeNil)
test.That(t, resource.GetFieldFromFieldRequiredError(err), test.ShouldEqual, "id")

_, err = config.FromReader(context.Background(),
"somepath", strings.NewReader(`{"disable_partial_start":true,"components": [{}]}`), logger)
"somepath", strings.NewReader(`{"disable_partial_start":true,"components": [{}]}`), logger, &grpc.AppConn{})
test.That(t, err, test.ShouldNotBeNil)
var fre resource.FieldRequiredError
test.That(t, errors.As(err, &fre), test.ShouldBeTrue)
Expand All @@ -54,7 +55,7 @@ func TestFromReaderValidate(t *testing.T) {
conf, err = config.FromReader(context.Background(),
"somepath",
strings.NewReader(`{"components": [{"name": "foo", "type": "arm", "model": "foo"}]}`),
logger)
logger, &grpc.AppConn{})
test.That(t, err, test.ShouldBeNil)
expected := &config.Config{
ConfigFilePath: "somepath",
Expand Down
Loading
Loading