diff --git a/robot/impl/local_robot.go b/robot/impl/local_robot.go index d91517a5741..b0f6c6167db 100644 --- a/robot/impl/local_robot.go +++ b/robot/impl/local_robot.go @@ -458,6 +458,9 @@ func newWithResources( // we assume these never appear in our configs and as such will not be removed from the // resource graph r.webSvc = web.New(r, logger, rOpts.webOptions...) + if r.ftdc != nil { + r.ftdc.Add("web", r.webSvc.RequestCounter()) + } r.frameSvc, err = framesystem.New(ctx, resource.Dependencies{}, logger) if err != nil { return nil, err diff --git a/robot/web/web.go b/robot/web/web.go index 72a5411d73f..7c10e3008e6 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -79,6 +79,8 @@ type Service interface { ModuleAddress() string Stats() any + + RequestCounter() *RequestCounter } type webService struct { @@ -101,6 +103,8 @@ type webService struct { isRunning bool webWorkers sync.WaitGroup modWorkers sync.WaitGroup + + requestCounter RequestCounter } var internalWebServiceName = resource.NewName( @@ -220,6 +224,7 @@ func (svc *webService) StartModule(ctx context.Context) error { // Attach the module name (as defined by the robot config) to the handler context. Can be // accessed via `grpc.GetModuleName`. unaryInterceptors = append(unaryInterceptors, grpc.ModNameUnaryServerInterceptor) + unaryInterceptors = append(unaryInterceptors, svc.requestCounter.UnaryInterceptor) opManager := svc.r.OperationManager() unaryInterceptors = append(unaryInterceptors, @@ -498,6 +503,68 @@ func (svc *webService) runWeb(ctx context.Context, options weboptions.Options) ( return err } +// Namer is used to get a resource name from incoming requests for countingfor request. Requests for +// resources are expected to be a gRPC object that includes a `GetName` method. +type Namer interface { + GetName() string +} + +// RequestCounter maps string keys to atomic ints that get bumped on every incoming gRPC request for +// components. +type RequestCounter struct { + counts sync.Map +} + +// UnaryInterceptor returns an incoming server interceptor that will pull method information and +// optionally resource information to bump the request counters. +func (rc *RequestCounter) UnaryInterceptor( + ctx context.Context, req any, info *googlegrpc.UnaryServerInfo, handler googlegrpc.UnaryHandler, +) (resp any, err error) { + // Handle `info.FullMethod` values such as: + // - `/viam.component.motor.v1.MotorService/IsMoving` + // - `/viam.robot.v1.RobotService/SendSessionHeartbeat` + // + // Only count component APIs, for now. + var apiMethod string + switch { + case strings.HasPrefix(info.FullMethod, "/viam.component."): + apiMethod = info.FullMethod[strings.LastIndexByte(info.FullMethod, byte('/'))+1:] + default: + } + + // Storing in FTDC: `web.motor-name.IsMoving: `. + if apiMethod != "" { + if namer, ok := req.(Namer); ok { + key := fmt.Sprintf("%v.%v", namer.GetName(), apiMethod) + if apiCounts, ok := rc.counts.Load(key); ok { + apiCounts.(*atomic.Int64).Add(1) + } else { + newCounter := new(atomic.Int64) + newCounter.Add(1) + rc.counts.Store(key, newCounter) + } + } + } + + return handler(ctx, req) +} + +// Stats satisfies the ftdc.Statser interface and will return a copy of the counters. +func (rc *RequestCounter) Stats() any { + ret := make(map[string]int64) + rc.counts.Range(func(key, value any) bool { + ret[key.(string)] = value.(*atomic.Int64).Load() + return true + }) + + return ret +} + +// RequestCounter returns the request counter object. +func (svc *webService) RequestCounter() *RequestCounter { + return &svc.requestCounter +} + // Initialize RPC Server options. func (svc *webService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options weboptions.Options) ([]rpc.ServerOption, error) { hosts := options.GetHosts(listenerTCPAddr) @@ -530,8 +597,8 @@ func (svc *webService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options webo } var unaryInterceptors []googlegrpc.UnaryServerInterceptor - unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor) + unaryInterceptors = append(unaryInterceptors, svc.requestCounter.UnaryInterceptor) if options.Debug { rpcOpts = append(rpcOpts, rpc.WithDebug())