Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-9818: Annotate gRPC requests from modules to the viam-server with module names. #4749

Merged
merged 5 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions grpc/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// DefaultMethodTimeout is the default context timeout for all inbound gRPC
Expand Down Expand Up @@ -43,3 +44,62 @@ func EnsureTimeoutUnaryClientInterceptor(

return invoker(ctx, method, req, reply, cc, opts...)
}

// The following code is for appending/extracting grpc metadata regarding module names/origins via
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoving this all inside of grpc/interceptors.go seemed like a good spot, but open to alternatives. At first I tried inside the robot web server or modules directory (can't remember), but hit go import cycles.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file seems like the appropriate place IMO.

// contexts.
type modNameKeyType int

const modNameKeyID = modNameKeyType(iota)

// GetModuleName returns the module name (if any) the request came from. The module name will match
// a string from the robot config.
func GetModuleName(ctx context.Context) string {
valI := ctx.Value(modNameKeyID)
if val, ok := valI.(string); ok {
return val
}

return ""
}

const modNameMetadataKey = "modName"

// ModInterceptors takes a user input `ModName` and exposes an interceptor method that will attach
// it to outgoing gRPC requests.
type ModInterceptors struct {
ModName string
}

// UnaryClientInterceptor adds a module name to any outgoing unary gRPC request.
func (mc *ModInterceptors) UnaryClientInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, modNameMetadataKey, mc.ModName)
return invoker(ctx, method, req, reply, cc, opts...)
}

// ModNameUnaryServerInterceptor checks the incoming RPC metadata for a module name and attaches any
// information to a context that can be retrieved with `GetModuleName`.
func ModNameUnaryServerInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return handler(ctx, req)
}

values := meta.Get(modNameMetadataKey)
if len(values) == 1 {
ctx = context.WithValue(ctx, modNameKeyID, values[0])
}

return handler(ctx, req)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you not need Stream versions of the interceptors? Or do we only care about unary for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only care about unary right now. I'm inclined to believe a stream would never need this? Or at least not in the in the same mold as we have here. I need to identify to module/connection such that I can use webrtc video streaming as opposed to a grpc stream.

But adding it couldn't hurt. Certainly would be surprising if one day someone did try using grabbing a module name from a stream request and failed to get it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take this back -- streams don't have contexts. We play games to make that a thing. I'm inclined to punt for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to believe a stream would never need this?

Well we certainly don't have any gRPC streaming API between a module and rdk right now. Whether we never will: I'm not sure. I think punting is totally fine for now. I do remember looking at that ServerStreamContextWrapper thing and scratching my head... I guess we'd have to do that in the future if we ever need the same feature for streaming gRPC APIs module <-> rdk 🤷🏻 .

1 change: 1 addition & 0 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,7 @@ func getFullEnvironment(
environment := map[string]string{
"VIAM_HOME": viamHomeDir,
"VIAM_MODULE_DATA": dataDir,
"VIAM_MODULE_NAME": cfg.Name,
}
if cfg.Type == config.ModuleTypeRegistry {
environment["VIAM_MODULE_ID"] = cfg.ModuleID
Expand Down
22 changes: 21 additions & 1 deletion module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ type peerResourceState struct {

// Module represents an external resource module that services components/services.
type Module struct {
// The name of the module as per the robot config. This value is communicated via the
// `VIAM_MODULE_NAME` env var.
name string

shutdownCtx context.Context
shutdownFn context.CancelFunc
parent *client.RobotClient
Expand Down Expand Up @@ -219,7 +223,12 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
}

cancelCtx, cancel := context.WithCancel(context.Background())

// If the env variable does not exist, the empty string is returned.
modName, _ := os.LookupEnv("VIAM_MODULE_NAME")

m := &Module{
name: modName,
shutdownCtx: cancelCtx,
shutdownFn: cancel,
logger: logger,
Expand Down Expand Up @@ -369,7 +378,18 @@ func (m *Module) connectParent(ctx context.Context) error {
clientLogger := logging.NewLogger("networking.module-connection")
clientLogger.SetLevel(m.logger.GetLevel())
// TODO(PRODUCT-343): add session support to modules
rc, err := client.New(ctx, fullAddr, clientLogger, client.WithDisableSessions())

connectOptions := []client.RobotClientOption{
client.WithDisableSessions(),
}

// Modules compiled against newer SDKs may be running against older `viam-server`s that do not
// provide the module name as an env variable.
if m.name != "" {
connectOptions = append(connectOptions, client.WithModName(m.name))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can go back to the chaining in the old code -- I thought it looked nicer.

I only did this to make checking for the empty string to be explicit. But in reality, the other end calling GetModuleName copes with "old modules" by returning the empty string.

Maybe it'd be better for GetModuleName to return an error -- I don't know!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion either way; I think what you have here is totally fine. GetModuleName returning an error in the event of no module name on the metadata/context seems odd to me, though, given we expect to be interacting with older modules and inability to find the module name is not really an "error state."

}

rc, err := client.New(ctx, fullAddr, m.logger, connectOptions...)
if err != nil {
return err
}
Expand Down
23 changes: 21 additions & 2 deletions module/testmodule/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/components/motor"
"go.viam.com/rdk/components/sensor"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/module"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -106,9 +107,23 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err
func newHelper(
ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger,
) (resource.Resource, error) {
var dependsOnSensor sensor.Sensor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[opt] Doesn't really matter, but generally I've seen FromDependencies used instead of for loop searches like the one you're doing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a change, but I feel there's still some awkwardness.

I think the trouble arises because this "helper" model is "schema-less". Most tests aren't creating this with dependencies (obviously as per needing this change). And if they did want dependencies, it's not clear they'd want a sensor one? I don't think there's really a way to make this dependency parsing usage for "helper" be forward compatible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool; yeah it's still awkward, sorry about that. What you have seems fine. I think you're right that if we wanted this to "look good" (and I don't really care too much since this is just testing code) we'd need a type for helper configs and probably an associated Validate method.

var err error
if len(conf.DependsOn) > 0 {
dependsOnSensor, err = sensor.FromDependencies(deps, conf.DependsOn[0])
if err != nil {
return nil, err
}
}

if len(deps) > 0 && dependsOnSensor == nil {
return nil, fmt.Errorf("sensor not found in deps: %v", deps)
}

return &helper{
Named: conf.ResourceName().AsNamed(),
logger: logger,
Named: conf.ResourceName().AsNamed(),
logger: logger,
dependsOnSensor: dependsOnSensor,
}, nil
}

Expand All @@ -117,6 +132,7 @@ type helper struct {
resource.TriviallyCloseable
logger logging.Logger
numReconfigurations int
dependsOnSensor sensor.Sensor
}

// DoCommand looks up the "real" command from the map it's passed.
Expand Down Expand Up @@ -191,6 +207,9 @@ func (h *helper) DoCommand(ctx context.Context, req map[string]interface{}) (map
return map[string]any{}, nil
case "get_num_reconfigurations":
return map[string]any{"num_reconfigurations": h.numReconfigurations}, nil
case "do_readings_on_dep":
_, err := h.dependsOnSensor.Readings(ctx, nil)
return nil, err
default:
return nil, fmt.Errorf("unknown command string %s", cmd)
}
Expand Down
5 changes: 5 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
rpc.WithStreamClientInterceptor(streamClientInterceptor()),
)

if rOpts.modName != "" {
inter := &grpc.ModInterceptors{ModName: rOpts.modName}
rc.dialOptions = append(rc.dialOptions, rpc.WithUnaryClientInterceptor(inter.UnaryClientInterceptor))
}

if err := rc.Connect(ctx); err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions robot/client/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type robotClientOpts struct {

// controls whether or not sessions are disabled.
disableSessions bool

modName string
}

// RobotClientOption configures how we set up the connection.
Expand All @@ -56,6 +58,14 @@ func newFuncRobotClientOption(f func(*robotClientOpts)) *funcRobotClientOption {
}
}

// WithModName attaches a unary interceptor that attaches the module name for each outgoing gRPC
// request. Should only be used in Viam module library code.
func WithModName(modName string) RobotClientOption {
return newFuncRobotClientOption(func(o *robotClientOpts) {
o.modName = modName
})
}

// WithRefreshEvery returns a RobotClientOption for how often to refresh the status/parts of the
// robot.
func WithRefreshEvery(refreshEvery time.Duration) RobotClientOption {
Expand Down
78 changes: 78 additions & 0 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4534,3 +4534,81 @@ func TestRemovingOfflineRemotes(t *testing.T) {
cancelReconfig()
wg.Wait()
}

// TestModuleNamePassing asserts that module names are passed from viam-server -> module
// properly. Such that incoming requests from module -> viam-server identify themselves. And can be
// observed on contexts via `[r]grpc.GetModuleName(ctx)`.
func TestModuleNamePassing(t *testing.T) {
logger := logging.NewTestLogger(t)

ctx := context.Background()

// We will inject a `ReadingsFunc` handler. The request should come from the `testmodule` and
// the interceptors should pass along a module name. Which will get captured in the
// `moduleNameCh` that the end of the test will assert on.
//
// The channel must be buffered to such that the `ReadingsFunc` returns without waiting on a
// reader of the channel.
moduleNameCh := make(chan string, 1)
callbackSensor := &inject.Sensor{
ReadingsFunc: func(ctx context.Context, extra map[string]any) (map[string]any, error) {
moduleNameCh <- rgrpc.GetModuleName(ctx)
return map[string]any{
"reading": 42,
}, nil
},
CloseFunc: func(ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised you need a no-op CloseFunc; I assume you get a panic without this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied it from elsewhere. But that was my reasoning -- was that it existed to avoid a panic.

return nil
},
}

// The resource registry is a global. We must use unique model names to avoid unexpected
// collisions.
callbackModelName := resource.DefaultModelFamily.WithModel(utils.RandomAlphaString(8))
resource.RegisterComponent(
sensor.API,
callbackModelName,
resource.Registration[sensor.Sensor, resource.NoNativeConfig]{Constructor: func(
ctx context.Context,
deps resource.Dependencies,
conf resource.Config,
logger logging.Logger,
) (sensor.Sensor, error) {
// Be lazy -- just return an a singleton object.
return callbackSensor, nil
}})

const moduleName = "fancy_module_name"
localRobot := setupLocalRobot(t, ctx, &config.Config{
Modules: []config.Module{
{
Name: moduleName,
ExePath: rtestutils.BuildTempModule(t, "module/testmodule"),
Type: config.ModuleTypeLocal,
},
},
Components: []resource.Config{
// We will invoke a special `DoCommand` on `modularComp`. It will expect its `DependsOn:
// "foo"` to be a sensor. And call the `Readings` API on that sensor.
{
Name: "modularComp",
API: generic.API,
Model: resource.NewModel("rdk", "test", "helper"),
DependsOn: []string{"foo"},
},
// `foo` will be a sensor that we've instrumented with the injected `ReadingsFunc`.
{
Name: "foo",
API: sensor.API,
Model: callbackModelName,
},
},
}, logger)

res, err := localRobot.ResourceByName(generic.Named("modularComp"))
test.That(t, err, test.ShouldBeNil)

_, err = res.DoCommand(ctx, map[string]interface{}{"command": "do_readings_on_dep"})
test.That(t, err, test.ShouldBeNil)
test.That(t, <-moduleNameCh, test.ShouldEqual, moduleName)
}
4 changes: 4 additions & 0 deletions robot/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func (svc *webService) StartModule(ctx context.Context) error {

unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor)

// Attach the module name (as defined by the robot config) to the handler context. Can be
// accessed via `grpc.GetModuleName`.
unaryInterceptors = append(unaryInterceptors, grpc.ModNameUnaryServerInterceptor)

opManager := svc.r.OperationManager()
unaryInterceptors = append(unaryInterceptors,
opManager.UnaryServerInterceptor, logging.UnaryServerInterceptor)
Expand Down
Loading