Skip to content

Commit

Permalink
Apply instrumentation and load metering by default
Browse files Browse the repository at this point in the history
  • Loading branch information
dim committed Mar 13, 2017
1 parent e86ebd7 commit 4150e05
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 12 deletions.
37 changes: 37 additions & 0 deletions load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package grpctools

import (
"golang.org/x/net/context"

"google.golang.org/grpc"
)

type LoadReportMeter interface {
Increment(int64)
}

func UnaryLoadReporter(lrm LoadReportMeter) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
lrm.Increment(1)
return handler(ctx, req)
}
}

func StreamLoadReporter(lrm LoadReportMeter) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, loadReportingStream{
ServerStream: stream,
lrm: lrm,
})
}
}

type loadReportingStream struct {
grpc.ServerStream
lrm LoadReportMeter
}

func (s loadReportingStream) RecvMsg(m interface{}) error {
s.lrm.Increment(1)
return s.ServerStream.RecvMsg(m)
}
4 changes: 2 additions & 2 deletions middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// UnaryServerInterceptorChain combines multiple grpc.UnaryServerInterceptor
// functions into one chain.
func UnaryServerInterceptorChain(fns ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
func unaryServerInterceptorChain(fns ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
wrap := func(fn grpc.UnaryServerInterceptor, info *grpc.UnaryServerInfo, next grpc.UnaryHandler) grpc.UnaryHandler {
return func(ctx context.Context, req interface{}) (interface{}, error) {
return fn(ctx, req, info, next)
Expand All @@ -25,7 +25,7 @@ func UnaryServerInterceptorChain(fns ...grpc.UnaryServerInterceptor) grpc.UnaryS

// StreamServerInterceptorChain combines multiple grpc.StreamServerInterceptor
// functions into one chain.
func StreamServerInterceptorChain(fns ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
func streamServerInterceptorChain(fns ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
wrap := func(fn grpc.StreamServerInterceptor, info *grpc.StreamServerInfo, next grpc.StreamHandler) grpc.StreamHandler {
return func(srv interface{}, stream grpc.ServerStream) error {
return fn(srv, stream, info, next)
Expand Down
74 changes: 65 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

type LoadReportMeter interface {
Increment(int64)
}

// Server embeds a standard grpc Server with a healthcheck
type Server struct {
*grpc.Server
Expand All @@ -26,17 +22,21 @@ type Server struct {
}

// NewServer returns a new Server instance.
func NewServer(name string, addr string, opts ...grpc.ServerOption) *Server {
lrs := load.NewRateReporter(time.Minute)
func NewServer(name string, addr string, opts *Options) *Server {
if opts == nil {
opts = new(Options)
}

lrm := load.NewRateReporter(time.Minute)
srv := &Server{
Server: grpc.NewServer(opts...),
LoadReportMeter: lrs,
Server: grpc.NewServer(opts.grpcServerOpts(lrm)...),
LoadReportMeter: lrm,
name: name,
addr: addr,
health: health.NewServer(),
}
healthpb.RegisterHealthServer(srv.Server, srv.health)
balancepb.RegisterLoadReportServer(srv.Server, lrs)
balancepb.RegisterLoadReportServer(srv.Server, lrm)
return srv
}

Expand All @@ -53,3 +53,59 @@ func (s *Server) ListenAndServe() error {
s.health.SetServingStatus(s.name, healthpb.HealthCheckResponse_NOT_SERVING)
return err
}

// --------------------------------------------------------------------

// Options represent server options
type Options struct {
// MaxConcurrentStreams will apply a limit on the number of concurrent streams to each ServerTransport.
MaxConcurrentStreams uint32
// MaxMsgSize sets the max message size in bytes for inbound mesages.
// If this is not set, gRPC uses the default 4MB.
MaxMsgSize int

SkipCompression bool
SkipLoadReporting bool
SkipInstrumentation bool

UnaryInterceptors []grpc.UnaryServerInterceptor
StreamInterceptors []grpc.StreamServerInterceptor
}

func (o *Options) grpcServerOpts(lrm LoadReportMeter) []grpc.ServerOption {
opts := make([]grpc.ServerOption, 0)
uchain := append([]grpc.UnaryServerInterceptor{}, o.UnaryInterceptors...)
schain := append([]grpc.StreamServerInterceptor{}, o.StreamInterceptors...)

if o.MaxConcurrentStreams > 0 {
opts = append(opts, grpc.MaxConcurrentStreams(o.MaxConcurrentStreams))
}

if o.MaxMsgSize > 0 {
opts = append(opts, grpc.MaxMsgSize(o.MaxMsgSize))
}

if !o.SkipCompression {
opts = append(opts, grpc.RPCDecompressor(grpc.NewGZIPDecompressor()))
}

if !o.SkipInstrumentation {
uchain = append(uchain, DefaultInstrumenter.UnaryServerInterceptor)
schain = append(schain, DefaultInstrumenter.StreamServerInterceptor)
}

if !o.SkipLoadReporting {
uchain = append(uchain, UnaryLoadReporter(lrm))
schain = append(schain, StreamLoadReporter(lrm))
}

if len(uchain) != 0 {
opts = append(opts, grpc.UnaryInterceptor(unaryServerInterceptorChain(uchain...)))
}

if len(schain) != 0 {
opts = append(opts, grpc.StreamInterceptor(streamServerInterceptorChain(schain...)))
}

return opts
}
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var _ = Describe("Server", func() {
var subject *Server

BeforeEach(func() {
subject = NewServer("test", "127.0.0.1:8080")
subject = NewServer("test", "127.0.0.1:8080", nil)
})

AfterEach(func() {
Expand Down

0 comments on commit 4150e05

Please sign in to comment.