From 36752fb6e4e7c3aef11c1015440cfde4b27ccdc3 Mon Sep 17 00:00:00 2001
From: tonywang <tony.wanghongchen@gmail.com>
Date: Fri, 17 Apr 2020 02:57:24 +0200
Subject: [PATCH] use stats.handler instead of interceptor and add message size
 monitor

---
 client.go                    |  21 +++++
 client_metrics.go            |  85 ++++++++++++++++--
 client_reporter.go           |  38 ++++++++
 client_stats_handler.go      |  58 +++++++++++++
 client_stats_handler_test.go | 164 +++++++++++++++++++++++++++++++++++
 client_test.go               |  30 +++++++
 go.mod                       |   2 +
 go.sum                       |   1 +
 server.go                    |  21 +++++
 server_metrics.go            |  99 +++++++++++++++++++--
 server_reporter.go           |  34 ++++++++
 server_stats_handler.go      |  57 ++++++++++++
 server_stats_handler_test.go | 161 ++++++++++++++++++++++++++++++++++
 util.go                      |  34 ++++++++
 14 files changed, 792 insertions(+), 13 deletions(-)
 create mode 100644 client_stats_handler.go
 create mode 100644 client_stats_handler_test.go
 create mode 100644 server_stats_handler.go
 create mode 100644 server_stats_handler_test.go

diff --git a/client.go b/client.go
index 5b861b7..8df63d6 100644
--- a/client.go
+++ b/client.go
@@ -20,6 +20,9 @@ var (
 
 	// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
 	StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()
+
+	// ClientStatsHandler is a gRPC client-side stats.Handler that provides Prometheus monitoring for RPCs.
+	ClientStatsHandler = DefaultClientMetrics.NewClientStatsHandler()
 )
 
 func init() {
@@ -55,3 +58,21 @@ func EnableClientStreamSendTimeHistogram(opts ...HistogramOption) {
 	DefaultClientMetrics.EnableClientStreamSendTimeHistogram(opts...)
 	prom.Register(DefaultClientMetrics.clientStreamSendHistogram)
 }
+
+// EnableClientMsgSizeReceivedBytesHistogram turns on recording of
+// single message send time of streaming RPCs.
+// This function acts on the DefaultClientMetrics variable and the
+// default Prometheus metrics registry.
+func EnableClientMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
+	DefaultClientMetrics.EnableMsgSizeReceivedBytesHistogram(opts...)
+	prom.Register(DefaultClientMetrics.clientMsgSizeReceivedHistogram)
+}
+
+// EnableClientMsgSizeSentBytesHistogram turns on recording of
+// single message send time of streaming RPCs.
+// This function acts on the DefaultClientMetrics variable and the
+// default Prometheus metrics registry.
+func EnableClientMsgSizeSentBytesHistogram(opts ...HistogramOption) {
+	DefaultClientMetrics.EnableMsgSizeSentBytesHistogram(opts...)
+	prom.Register(DefaultClientMetrics.clientMsgSizeSentHistogram)
+}
diff --git a/client_metrics.go b/client_metrics.go
index a344084..75aa2c8 100644
--- a/client_metrics.go
+++ b/client_metrics.go
@@ -7,16 +7,18 @@ import (
 	prom "github.com/prometheus/client_golang/prometheus"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/stats"
 	"google.golang.org/grpc/status"
 )
 
 // ClientMetrics represents a collection of metrics to be registered on a
 // Prometheus metrics registry for a gRPC client.
 type ClientMetrics struct {
-	clientStartedCounter    *prom.CounterVec
-	clientHandledCounter    *prom.CounterVec
-	clientStreamMsgReceived *prom.CounterVec
-	clientStreamMsgSent     *prom.CounterVec
+	clientStartedCounter     *prom.CounterVec
+	clientStartedCounterOpts prom.CounterOpts
+	clientHandledCounter     *prom.CounterVec
+	clientStreamMsgReceived  *prom.CounterVec
+	clientStreamMsgSent      *prom.CounterVec
 
 	clientHandledHistogramEnabled bool
 	clientHandledHistogramOpts    prom.HistogramOpts
@@ -29,6 +31,14 @@ type ClientMetrics struct {
 	clientStreamSendHistogramEnabled bool
 	clientStreamSendHistogramOpts    prom.HistogramOpts
 	clientStreamSendHistogram        *prom.HistogramVec
+
+	clientMsgSizeReceivedHistogramEnabled bool
+	clientMsgSizeReceivedHistogramOpts    prom.HistogramOpts
+	clientMsgSizeReceivedHistogram        *prom.HistogramVec
+
+	clientMsgSizeSentHistogramEnabled bool
+	clientMsgSizeSentHistogramOpts    prom.HistogramOpts
+	clientMsgSizeSentHistogram        *prom.HistogramVec
 }
 
 // NewClientMetrics returns a ClientMetrics object. Use a new instance of
@@ -82,7 +92,21 @@ func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
 			Help:    "Histogram of response latency (seconds) of the gRPC single message send.",
 			Buckets: prom.DefBuckets,
 		},
-		clientStreamSendHistogram: nil,
+		clientStreamSendHistogram:             nil,
+		clientMsgSizeReceivedHistogramEnabled: false,
+		clientMsgSizeReceivedHistogramOpts: prom.HistogramOpts{
+			Name:    "grpc_client_msg_size_received_bytes",
+			Help:    "Histogram of message sizes received by the client.",
+			Buckets: defMsgBytesBuckets,
+		},
+		clientMsgSizeReceivedHistogram:    nil,
+		clientMsgSizeSentHistogramEnabled: false,
+		clientMsgSizeSentHistogramOpts: prom.HistogramOpts{
+			Name:    "grpc_client_msg_size_sent_bytes",
+			Help:    "Histogram of message sizes sent by the client.",
+			Buckets: defMsgBytesBuckets,
+		},
+		clientMsgSizeSentHistogram: nil,
 	}
 }
 
@@ -103,6 +127,12 @@ func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
 	if m.clientStreamSendHistogramEnabled {
 		m.clientStreamSendHistogram.Describe(ch)
 	}
+	if m.clientMsgSizeReceivedHistogramEnabled {
+		m.clientMsgSizeReceivedHistogram.Describe(ch)
+	}
+	if m.clientMsgSizeSentHistogramEnabled {
+		m.clientMsgSizeSentHistogram.Describe(ch)
+	}
 }
 
 // Collect is called by the Prometheus registry when collecting
@@ -122,6 +152,12 @@ func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
 	if m.clientStreamSendHistogramEnabled {
 		m.clientStreamSendHistogram.Collect(ch)
 	}
+	if m.clientMsgSizeReceivedHistogramEnabled {
+		m.clientMsgSizeReceivedHistogram.Collect(ch)
+	}
+	if m.clientMsgSizeSentHistogramEnabled {
+		m.clientMsgSizeSentHistogram.Collect(ch)
+	}
 }
 
 // EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
@@ -173,6 +209,38 @@ func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOpt
 	m.clientStreamSendHistogramEnabled = true
 }
 
+// EnableMsgSizeReceivedBytesHistogram turns on recording of received message size of RPCs.
+// Histogram metrics can be very expensive for Prometheus to retain and query. It takes
+// options to configure histogram options such as the defined buckets.
+func (m *ClientMetrics) EnableMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
+	for _, o := range opts {
+		o(&m.clientMsgSizeReceivedHistogramOpts)
+	}
+	if !m.clientMsgSizeReceivedHistogramEnabled {
+		m.clientMsgSizeReceivedHistogram = prom.NewHistogramVec(
+			m.clientMsgSizeReceivedHistogramOpts,
+			[]string{"grpc_service", "grpc_method", "grpc_stats"},
+		)
+	}
+	m.clientMsgSizeReceivedHistogramEnabled = true
+}
+
+// EnableMsgSizeSentBytesHistogram turns on recording of sent message size of RPCs.
+// Histogram metrics can be very expensive for Prometheus to retain and query. It
+// takes options to configure histogram options such as the defined buckets.
+func (m *ClientMetrics) EnableMsgSizeSentBytesHistogram(opts ...HistogramOption) {
+	for _, o := range opts {
+		o(&m.clientMsgSizeSentHistogramOpts)
+	}
+	if !m.clientMsgSizeSentHistogramEnabled {
+		m.clientMsgSizeSentHistogram = prom.NewHistogramVec(
+			m.clientMsgSizeSentHistogramOpts,
+			[]string{"grpc_service", "grpc_method", "grpc_stats"},
+		)
+	}
+	m.clientMsgSizeSentHistogramEnabled = true
+}
+
 // UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
 func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
 	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
@@ -202,6 +270,13 @@ func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc
 	}
 }
 
+// NewClientStatsHandler is a gRPC client-side stats.Handler that providers Prometheus monitoring for RPCs.
+func (m *ClientMetrics) NewClientStatsHandler() stats.Handler {
+	return &clientStatsHandler{
+		clientMetrics: m,
+	}
+}
+
 func clientStreamType(desc *grpc.StreamDesc) grpcType {
 	if desc.ClientStreams && !desc.ServerStreams {
 		return ClientStream
diff --git a/client_reporter.go b/client_reporter.go
index 286d657..3cb20fd 100644
--- a/client_reporter.go
+++ b/client_reporter.go
@@ -4,6 +4,7 @@
 package grpc_prometheus
 
 import (
+	"fmt"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
@@ -31,6 +32,16 @@ func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *c
 	return r
 }
 
+func newClientReporterForStatsHanlder(startTime time.Time, m *ClientMetrics, fullMethod string) *clientReporter {
+	r := &clientReporter{
+		metrics:   m,
+		rpcType:   Unary,
+		startTime: startTime,
+	}
+	r.serviceName, r.methodName = splitMethodName(fullMethod)
+	return r
+}
+
 // timer is a helper interface to time functions.
 type timer interface {
 	ObserveDuration() time.Duration
@@ -54,10 +65,25 @@ func (r *clientReporter) ReceiveMessageTimer() timer {
 	return emptyTimer
 }
 
+func (r *clientReporter) StartedConn() {
+	r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
+}
+
 func (r *clientReporter) ReceivedMessage() {
 	r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
 }
 
+// ReceivedMessageSize counts the size of received messages on client-side
+func (r *clientReporter) ReceivedMessageSize(rpcStats grpcStats, size float64) {
+	if rpcStats == Payload {
+		r.ReceivedMessage()
+	}
+
+	if r.metrics.clientMsgSizeReceivedHistogramEnabled {
+		r.metrics.clientMsgSizeReceivedHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
+	}
+}
+
 func (r *clientReporter) SendMessageTimer() timer {
 	if r.metrics.clientStreamSendHistogramEnabled {
 		hist := r.metrics.clientStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
@@ -71,9 +97,21 @@ func (r *clientReporter) SentMessage() {
 	r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
 }
 
+// SentMessageSize counts the size of sent messages on client-side
+func (r *clientReporter) SentMessageSize(rpcStats grpcStats, size float64) {
+	if rpcStats == Payload {
+		r.SentMessage()
+	}
+
+	if r.metrics.clientMsgSizeSentHistogramEnabled {
+		r.metrics.clientMsgSizeSentHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
+	}
+}
+
 func (r *clientReporter) Handled(code codes.Code) {
 	r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
 	if r.metrics.clientHandledHistogramEnabled {
+		fmt.Printf("client handled count + 1: %v,%f\n", code, time.Since(r.startTime).Seconds())
 		r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
 	}
 }
diff --git a/client_stats_handler.go b/client_stats_handler.go
new file mode 100644
index 0000000..dbb9db2
--- /dev/null
+++ b/client_stats_handler.go
@@ -0,0 +1,58 @@
+package grpc_prometheus
+
+import (
+	"context"
+
+	"google.golang.org/grpc/stats"
+	"google.golang.org/grpc/status"
+)
+
+type clientStatsHandler struct {
+	clientMetrics *ClientMetrics
+}
+
+// TagRPC implements the stats.Hanlder interface.
+func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
+	rpcInfo := newRPCInfo(info.FullMethodName)
+	return context.WithValue(ctx, &rpcInfoKey, rpcInfo)
+}
+
+// HandleRPC implements the stats.Hanlder interface.
+func (h *clientStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
+	v, ok := ctx.Value(&rpcInfoKey).(*rpcInfo)
+	if !ok {
+		return
+	}
+	monitor := newClientReporterForStatsHanlder(v.startTime, h.clientMetrics, v.fullMethodName)
+	switch s := s.(type) {
+	case *stats.Begin:
+		v.startTime = s.BeginTime
+		monitor.StartedConn()
+	case *stats.End:
+		monitor.Handled(status.Code(s.Error))
+	case *stats.InHeader:
+		monitor.ReceivedMessageSize(Header, float64(s.WireLength))
+	case *stats.InPayload:
+		// TODO: remove the +5 offset on wire length here, which is a temporary stand-in for the missing grpc framing offset
+		//  See: https://github.com/grpc/grpc-go/issues/1647
+		monitor.ReceivedMessageSize(Payload, float64(s.WireLength+5))
+	case *stats.InTrailer:
+		monitor.ReceivedMessageSize(Tailer, float64(s.WireLength))
+	case *stats.OutHeader:
+		// TODO: Add the sent header message size stats, if the wire length of the send header is provided
+	case *stats.OutPayload:
+		// TODO(tonywang): response latency (seconds) of the gRPC single message send
+		monitor.SentMessageSize(Payload, float64(s.WireLength))
+	case *stats.OutTrailer:
+		monitor.SentMessageSize(Tailer, float64(s.WireLength))
+	}
+}
+
+// TagConn implements the stats.Hanlder interface.
+func (h *clientStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
+	return ctx
+}
+
+// HandleConn implements the stats.Hanlder interface.
+func (h *clientStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
+}
diff --git a/client_stats_handler_test.go b/client_stats_handler_test.go
new file mode 100644
index 0000000..a7ccc1d
--- /dev/null
+++ b/client_stats_handler_test.go
@@ -0,0 +1,164 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_prometheus
+
+import (
+	"context"
+	"io"
+	"net"
+	"testing"
+	"time"
+
+	pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/stretchr/testify/require"
+	"github.com/stretchr/testify/suite"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+var (
+	// client metrics must satisfy the Collector interface
+	_ prometheus.Collector = NewClientMetrics()
+)
+
+func TestClientStatsHandlerSuite(t *testing.T) {
+	suite.Run(t, &ClientStatsHandlerTestSuite{})
+}
+
+type ClientStatsHandlerTestSuite struct {
+	suite.Suite
+
+	serverListener net.Listener
+	server         *grpc.Server
+	clientConn     *grpc.ClientConn
+	testClient     pb_testproto.TestServiceClient
+	ctx            context.Context
+	cancel         context.CancelFunc
+}
+
+func (s *ClientStatsHandlerTestSuite) SetupSuite() {
+	var err error
+
+	EnableClientHandlingTimeHistogram()
+
+	EnableClientMsgSizeSentBytesHistogram()
+
+	EnableClientMsgSizeReceivedBytesHistogram()
+
+	s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
+	require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
+
+	// This is the point where we hook up the interceptor
+	s.server = grpc.NewServer()
+	pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()})
+
+	go func() {
+		s.server.Serve(s.serverListener)
+	}()
+
+	s.clientConn, err = grpc.Dial(
+		s.serverListener.Addr().String(),
+		grpc.WithInsecure(),
+		grpc.WithBlock(),
+		grpc.WithStatsHandler(ClientStatsHandler),
+		grpc.WithTimeout(2*time.Second))
+	require.NoError(s.T(), err, "must not error on client Dial")
+	s.testClient = pb_testproto.NewTestServiceClient(s.clientConn)
+}
+
+func (s *ClientStatsHandlerTestSuite) SetupTest() {
+	// Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests.
+	s.ctx, s.cancel = context.WithTimeout(context.TODO(), 2*time.Second)
+
+	// Make sure every test starts with same fresh, intialized metric state.
+	DefaultClientMetrics.clientStartedCounter.Reset()
+	DefaultClientMetrics.clientHandledCounter.Reset()
+	DefaultClientMetrics.clientHandledHistogram.Reset()
+	DefaultClientMetrics.clientStreamMsgReceived.Reset()
+	DefaultClientMetrics.clientStreamMsgSent.Reset()
+	DefaultClientMetrics.clientMsgSizeReceivedHistogram.Reset()
+	DefaultClientMetrics.clientMsgSizeSentHistogram.Reset()
+
+}
+
+func (s *ClientStatsHandlerTestSuite) TearDownSuite() {
+	if s.serverListener != nil {
+		s.server.Stop()
+		s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String())
+		s.serverListener.Close()
+
+	}
+	if s.clientConn != nil {
+		s.clientConn.Close()
+	}
+}
+
+func (s *ClientStatsHandlerTestSuite) TearDownTest() {
+	s.cancel()
+}
+
+func (s *ClientStatsHandlerTestSuite) TestUnaryIncrementsMetrics() {
+	_, err := s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK
+	require.NoError(s.T(), err)
+	requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty"))
+	requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty", "OK"))
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty"))
+
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String()))
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Header.String()))
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String()))
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Tailer.String()))
+
+	_, err = s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
+	require.Error(s.T(), err)
+	requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError"))
+	requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError", "FailedPrecondition"))
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError"))
+
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingError", Payload.String()))
+	requireValueHistCount(s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingError", Tailer.String()))
+}
+
+func (s *ClientStatsHandlerTestSuite) TestStreamingIncrementsMetrics() {
+	ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK
+	// Do a read, just for kicks.
+	count := 0
+	for {
+		_, err := ss.Recv()
+		if err == io.EOF {
+			break
+		}
+		require.NoError(s.T(), err, "reading pingList shouldn't fail")
+		count++
+	}
+	require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match")
+
+	requireValue(s.T(), 1, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "OK"))
+	requireValue(s.T(), countListResponses, DefaultClientMetrics.clientStreamMsgReceived.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValue(s.T(), 1, DefaultClientMetrics.clientStreamMsgSent.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+
+	requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String()))
+	requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Header.String()))
+	requireValueWithRetryHistCount(s.ctx, s.T(), countListResponses, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String()))
+	requireValueWithRetryHistCount(s.ctx, s.T(), 1, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String()))
+
+	ss, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
+	require.NoError(s.T(), err, "PingList must not fail immediately")
+
+	// Do a read, just to progate errors.
+	_, err = ss.Recv()
+	st, _ := status.FromError(err)
+	require.Equal(s.T(), codes.FailedPrecondition, st.Code(), "Recv must return FailedPrecondition, otherwise the test is wrong")
+
+	requireValue(s.T(), 2, DefaultClientMetrics.clientStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "FailedPrecondition"))
+	requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+
+	requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultClientMetrics.clientMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String()))
+	requireValueWithRetryHistCount(s.ctx, s.T(), 2, DefaultClientMetrics.clientMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String()))
+}
diff --git a/client_test.go b/client_test.go
index dc6e0b2..ab222e3 100644
--- a/client_test.go
+++ b/client_test.go
@@ -150,3 +150,33 @@ func (s *ClientInterceptorTestSuite) TestStreamingIncrementsMetrics() {
 	requireValue(s.T(), 1, DefaultClientMetrics.clientHandledCounter.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList", "FailedPrecondition"))
 	requireValueHistCount(s.T(), 2, DefaultClientMetrics.clientHandledHistogram.WithLabelValues("server_stream", "mwitkow.testproto.TestService", "PingList"))
 }
+
+func (s *ClientInterceptorTestSuite) SetupSuiteWithStatsHanlder() {
+	var err error
+
+	EnableClientHandlingTimeHistogram()
+
+	EnableClientMsgSizeSentBytesHistogram()
+
+	EnableClientMsgSizeReceivedBytesHistogram()
+
+	s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
+	require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
+
+	// This is the point where we hook up the interceptor
+	s.server = grpc.NewServer()
+	pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()})
+
+	go func() {
+		s.server.Serve(s.serverListener)
+	}()
+
+	s.clientConn, err = grpc.Dial(
+		s.serverListener.Addr().String(),
+		grpc.WithInsecure(),
+		grpc.WithBlock(),
+		grpc.WithStatsHandler(ClientStatsHandler),
+		grpc.WithTimeout(2*time.Second))
+	require.NoError(s.T(), err, "must not error on client Dial")
+	s.testClient = pb_testproto.NewTestServiceClient(s.clientConn)
+}
diff --git a/go.mod b/go.mod
index c49d110..b0540df 100644
--- a/go.mod
+++ b/go.mod
@@ -8,3 +8,5 @@ require (
 	golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
 	google.golang.org/grpc v1.18.0
 )
+
+go 1.13
diff --git a/go.sum b/go.sum
index 485e90a..11d9de6 100644
--- a/go.sum
+++ b/go.sum
@@ -16,6 +16,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
 github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
+github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
diff --git a/server.go b/server.go
index 322f990..4c7ef9f 100644
--- a/server.go
+++ b/server.go
@@ -21,6 +21,9 @@ var (
 
 	// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
 	StreamServerInterceptor = DefaultServerMetrics.StreamServerInterceptor()
+
+	// ServerStatsHandler is a gRPC server-side stats.Handler that provides Prometheus monitoring
+	ServerStatsHandler = DefaultServerMetrics.NewServerStatsHandler()
 )
 
 func init() {
@@ -46,3 +49,21 @@ func EnableHandlingTimeHistogram(opts ...HistogramOption) {
 	DefaultServerMetrics.EnableHandlingTimeHistogram(opts...)
 	prom.Register(DefaultServerMetrics.serverHandledHistogram)
 }
+
+// EnableServerMsgSizeReceivedBytesHistogram turns on recording of handling time
+// of RPCs. Histogram metrics can be very expensive for Prometheus
+// to retain and query. This function acts on the DefaultServerMetrics
+// variable and the default Prometheus metrics registry.
+func EnableServerMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
+	DefaultServerMetrics.EnableMsgSizeReceivedBytesHistogram(opts...)
+	prom.Register(DefaultServerMetrics.serverMsgSizeReceivedHistogram)
+}
+
+// EnableServerMsgSizeSentBytesHistogram turns on recording of handling time
+// of RPCs. Histogram metrics can be very expensive for Prometheus
+// to retain and query. This function acts on the DefaultServerMetrics
+// variable and the default Prometheus metrics registry.
+func EnableServerMsgSizeSentBytesHistogram(opts ...HistogramOption) {
+	DefaultServerMetrics.EnableMsgSizeSentBytesHistogram(opts...)
+	prom.Register(DefaultServerMetrics.serverMsgSizeSentHistogram)
+}
diff --git a/server_metrics.go b/server_metrics.go
index d28a46e..cdbdeb1 100644
--- a/server_metrics.go
+++ b/server_metrics.go
@@ -2,22 +2,30 @@ package grpc_prometheus
 
 import (
 	"context"
+
 	"github.com/grpc-ecosystem/go-grpc-prometheus/packages/grpcstatus"
 	prom "github.com/prometheus/client_golang/prometheus"
 
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/stats"
 )
 
 // ServerMetrics represents a collection of metrics to be registered on a
 // Prometheus metrics registry for a gRPC server.
 type ServerMetrics struct {
-	serverStartedCounter          *prom.CounterVec
-	serverHandledCounter          *prom.CounterVec
-	serverStreamMsgReceived       *prom.CounterVec
-	serverStreamMsgSent           *prom.CounterVec
-	serverHandledHistogramEnabled bool
-	serverHandledHistogramOpts    prom.HistogramOpts
-	serverHandledHistogram        *prom.HistogramVec
+	serverStartedCounter                  *prom.CounterVec
+	serverHandledCounter                  *prom.CounterVec
+	serverStreamMsgReceived               *prom.CounterVec
+	serverStreamMsgSent                   *prom.CounterVec
+	serverHandledHistogramEnabled         bool
+	serverHandledHistogramOpts            prom.HistogramOpts
+	serverHandledHistogram                *prom.HistogramVec
+	serverMsgSizeReceivedHistogramEnabled bool
+	serverMsgSizeReceivedHistogramOpts    prom.HistogramOpts
+	serverMsgSizeReceivedHistogram        *prom.HistogramVec
+	serverMsgSizeSentHistogramEnabled     bool
+	serverMsgSizeSentHistogramOpts        prom.HistogramOpts
+	serverMsgSizeSentHistogram            *prom.HistogramVec
 }
 
 // NewServerMetrics returns a ServerMetrics object. Use a new instance of
@@ -53,8 +61,54 @@ func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
 			Help:    "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
 			Buckets: prom.DefBuckets,
 		},
-		serverHandledHistogram: nil,
+		serverHandledHistogram:                nil,
+		serverMsgSizeReceivedHistogramEnabled: false,
+		serverMsgSizeReceivedHistogramOpts: prom.HistogramOpts{
+			Name:    "grpc_server_msg_size_received_bytes",
+			Help:    "Histogram of message sizes received by the server.",
+			Buckets: defMsgBytesBuckets,
+		},
+		serverMsgSizeReceivedHistogram:    nil,
+		serverMsgSizeSentHistogramEnabled: false,
+		serverMsgSizeSentHistogramOpts: prom.HistogramOpts{
+			Name:    "grpc_server_msg_size_sent_bytes",
+			Help:    "Histogram of message sizes sent by the server.",
+			Buckets: defMsgBytesBuckets,
+		},
+		serverMsgSizeSentHistogram: nil,
+	}
+}
+
+// EnableMsgSizeReceivedBytesHistogram turns on recording of received message size of RPCs.
+// Histogram metrics can be very expensive for Prometheus to retain and query. It takes
+// options to configure histogram options such as the defined buckets.
+func (m *ServerMetrics) EnableMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
+	for _, o := range opts {
+		o(&m.serverMsgSizeReceivedHistogramOpts)
+	}
+	if !m.serverMsgSizeReceivedHistogramEnabled {
+		m.serverMsgSizeReceivedHistogram = prom.NewHistogramVec(
+			m.serverMsgSizeReceivedHistogramOpts,
+			[]string{"grpc_service", "grpc_method", "grpc_stats"},
+		)
+	}
+	m.serverMsgSizeReceivedHistogramEnabled = true
+}
+
+// EnableMsgSizeSentBytesHistogram turns on recording of sent message size of RPCs.
+// Histogram metrics can be very expensive for Prometheus to retain and query. It takes
+// options to configure histogram options such as the defined buckets.
+func (m *ServerMetrics) EnableMsgSizeSentBytesHistogram(opts ...HistogramOption) {
+	for _, o := range opts {
+		o(&m.serverMsgSizeSentHistogramOpts)
 	}
+	if !m.serverMsgSizeSentHistogramEnabled {
+		m.serverMsgSizeSentHistogram = prom.NewHistogramVec(
+			m.serverMsgSizeSentHistogramOpts,
+			[]string{"grpc_service", "grpc_method", "grpc_stats"},
+		)
+	}
+	m.serverMsgSizeSentHistogramEnabled = true
 }
 
 // EnableHandlingTimeHistogram enables histograms being registered when
@@ -85,6 +139,12 @@ func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
 	if m.serverHandledHistogramEnabled {
 		m.serverHandledHistogram.Describe(ch)
 	}
+	if m.serverMsgSizeReceivedHistogramEnabled {
+		m.serverMsgSizeReceivedHistogram.Describe(ch)
+	}
+	if m.serverMsgSizeSentHistogramEnabled {
+		m.serverMsgSizeSentHistogram.Describe(ch)
+	}
 }
 
 // Collect is called by the Prometheus registry when collecting
@@ -98,6 +158,12 @@ func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
 	if m.serverHandledHistogramEnabled {
 		m.serverHandledHistogram.Collect(ch)
 	}
+	if m.serverMsgSizeReceivedHistogramEnabled {
+		m.serverMsgSizeReceivedHistogram.Collect(ch)
+	}
+	if m.serverMsgSizeSentHistogramEnabled {
+		m.serverMsgSizeSentHistogram.Collect(ch)
+	}
 }
 
 // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
@@ -126,6 +192,13 @@ func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.
 	}
 }
 
+// NewServerStatsHandler is a gRPC server-side stats.Handler that providers Prometheus monitoring for RPCs.
+func (m *ServerMetrics) NewServerStatsHandler() stats.Handler {
+	return &serverStatsHandler{
+		serverMetrics: m,
+	}
+}
+
 // InitializeMetrics initializes all metrics, with their appropriate null
 // value, for all gRPC methods registered on a gRPC server. This is useful, to
 // ensure that all metrics exist when collecting and querying.
@@ -180,6 +253,16 @@ func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.M
 	if metrics.serverHandledHistogramEnabled {
 		metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
 	}
+	if metrics.serverMsgSizeReceivedHistogramEnabled {
+		for _, stats := range allStatss {
+			metrics.serverMsgSizeReceivedHistogram.GetMetricWithLabelValues(serviceName, methodName, stats.String())
+		}
+	}
+	if metrics.serverMsgSizeSentHistogramEnabled {
+		for _, stats := range allStatss {
+			metrics.serverMsgSizeSentHistogram.GetMetricWithLabelValues(serviceName, methodName, stats.String())
+		}
+	}
 	for _, code := range allCodes {
 		metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
 	}
diff --git a/server_reporter.go b/server_reporter.go
index aa9db54..c5a9318 100644
--- a/server_reporter.go
+++ b/server_reporter.go
@@ -30,14 +30,48 @@ func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *s
 	return r
 }
 
+func newServerReporterForStatsHanlder(startTime time.Time, m *ServerMetrics, fullMethod string) *serverReporter {
+	r := &serverReporter{
+		metrics:   m,
+		rpcType:   Unary,
+		startTime: startTime,
+	}
+	r.serviceName, r.methodName = splitMethodName(fullMethod)
+	return r
+}
+
+func (r *serverReporter) StartedConn() {
+	r.metrics.serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
+}
+
 func (r *serverReporter) ReceivedMessage() {
 	r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
 }
 
+// ReceivedMessageSize counts the size of received messages on server-side
+func (r *serverReporter) ReceivedMessageSize(rpcStats grpcStats, size float64) {
+	if rpcStats == Payload {
+		r.ReceivedMessage()
+	}
+	if r.metrics.serverMsgSizeReceivedHistogramEnabled {
+		r.metrics.serverMsgSizeReceivedHistogram.WithLabelValues(r.serviceName, r.methodName, rpcStats.String()).Observe(size)
+	}
+}
+
 func (r *serverReporter) SentMessage() {
 	r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
 }
 
+// SentMessageSize counts the size of sent messages on server-side
+func (r *serverReporter) SentMessageSize(rpcStats grpcStats, size float64) {
+	if rpcStats == Payload {
+		r.SentMessage()
+	}
+	if r.metrics.serverMsgSizeSentHistogramEnabled {
+		r.metrics.serverMsgSizeSentHistogram.WithLabelValues(r.serviceName, r.methodName, rpcStats.String()).Observe(size)
+	}
+}
+
 func (r *serverReporter) Handled(code codes.Code) {
 	r.metrics.serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
 	if r.metrics.serverHandledHistogramEnabled {
diff --git a/server_stats_handler.go b/server_stats_handler.go
new file mode 100644
index 0000000..30bef7e
--- /dev/null
+++ b/server_stats_handler.go
@@ -0,0 +1,57 @@
+package grpc_prometheus
+
+import (
+	"context"
+
+	"google.golang.org/grpc/stats"
+	"google.golang.org/grpc/status"
+)
+
+type serverStatsHandler struct {
+	serverMetrics *ServerMetrics
+}
+
+// TagRPC implements the stats.Hanlder interface.
+func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
+	rpcInfo := newRPCInfo(info.FullMethodName)
+	return context.WithValue(ctx, &rpcInfoKey, rpcInfo)
+}
+
+// HandleRPC implements the stats.Hanlder interface.
+func (h *serverStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
+	v, ok := ctx.Value(&rpcInfoKey).(*rpcInfo)
+	if !ok {
+		return
+	}
+	monitor := newServerReporterForStatsHanlder(v.startTime, h.serverMetrics, v.fullMethodName)
+	switch s := s.(type) {
+	case *stats.Begin:
+		v.startTime = s.BeginTime
+		monitor.StartedConn()
+	case *stats.End:
+		monitor.Handled(status.Code(s.Error))
+	case *stats.InHeader:
+		monitor.ReceivedMessageSize(Header, float64(s.WireLength))
+	case *stats.InPayload:
+		// TODO: remove the +5 offset on wire length here, which is a temporary stand-in for the missing grpc framing offset
+		//  See: https://github.com/grpc/grpc-go/issues/1647
+		monitor.ReceivedMessageSize(Payload, float64(s.WireLength+5))
+	case *stats.InTrailer:
+		monitor.ReceivedMessageSize(Tailer, float64(s.WireLength))
+	case *stats.OutHeader:
+		// TODO: Add the sent header message size stats, if the wire length of the send header is provided
+	case *stats.OutPayload:
+		monitor.SentMessageSize(Payload, float64(s.WireLength))
+	case *stats.OutTrailer:
+		monitor.SentMessageSize(Tailer, float64(s.WireLength))
+	}
+}
+
+// TagConn implements the stats.Hanlder interface.
+func (h *serverStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
+	return ctx
+}
+
+// HandleConn implements the stats.Hanlder interface.
+func (h *serverStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
+}
diff --git a/server_stats_handler_test.go b/server_stats_handler_test.go
new file mode 100644
index 0000000..37035db
--- /dev/null
+++ b/server_stats_handler_test.go
@@ -0,0 +1,161 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_prometheus
+
+import (
+	"context"
+	"io"
+	"net"
+	"testing"
+	"time"
+
+	pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto"
+	"github.com/stretchr/testify/require"
+	"github.com/stretchr/testify/suite"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+)
+
+func TestServerStatsHandlerSuite(t *testing.T) {
+	suite.Run(t, &ServerStatsHandlerTestSuite{})
+}
+
+type ServerStatsHandlerTestSuite struct {
+	suite.Suite
+
+	serverListener net.Listener
+	server         *grpc.Server
+	clientConn     *grpc.ClientConn
+	testClient     pb_testproto.TestServiceClient
+	ctx            context.Context
+	cancel         context.CancelFunc
+}
+
+func (s *ServerStatsHandlerTestSuite) SetupSuite() {
+	var err error
+
+	EnableHandlingTimeHistogram()
+
+	EnableServerMsgSizeReceivedBytesHistogram()
+
+	EnableServerMsgSizeSentBytesHistogram()
+
+	s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
+	require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
+
+	// This is the point where we hook up the interceptor
+	s.server = grpc.NewServer(
+		grpc.StatsHandler(ServerStatsHandler),
+	)
+	pb_testproto.RegisterTestServiceServer(s.server, &testService{t: s.T()})
+
+	go func() {
+		s.server.Serve(s.serverListener)
+	}()
+
+	s.clientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second))
+	require.NoError(s.T(), err, "must not error on client Dial")
+	s.testClient = pb_testproto.NewTestServiceClient(s.clientConn)
+}
+
+func (s *ServerStatsHandlerTestSuite) SetupTest() {
+	// Make all RPC calls last at most 2 sec, meaning all async issues or deadlock will not kill tests.
+	s.ctx, s.cancel = context.WithTimeout(context.TODO(), 2*time.Second)
+
+	// Make sure every test starts with same fresh, intialized metric state.
+	DefaultServerMetrics.serverStartedCounter.Reset()
+	DefaultServerMetrics.serverHandledCounter.Reset()
+	DefaultServerMetrics.serverHandledHistogram.Reset()
+	DefaultServerMetrics.serverStreamMsgReceived.Reset()
+	DefaultServerMetrics.serverStreamMsgSent.Reset()
+	DefaultServerMetrics.serverMsgSizeReceivedHistogram.Reset()
+	DefaultServerMetrics.serverMsgSizeSentHistogram.Reset()
+	Register(s.server)
+}
+
+func (s *ServerStatsHandlerTestSuite) TearDownSuite() {
+	if s.serverListener != nil {
+		s.server.Stop()
+		s.T().Logf("stopped grpc.Server at: %v", s.serverListener.Addr().String())
+		s.serverListener.Close()
+
+	}
+	if s.clientConn != nil {
+		s.clientConn.Close()
+	}
+}
+
+func (s *ServerStatsHandlerTestSuite) TearDownTest() {
+	s.cancel()
+}
+
+func (s *ServerStatsHandlerTestSuite) TestUnaryIncrementsMetrics() {
+	_, err := s.testClient.PingEmpty(s.ctx, &pb_testproto.Empty{}) // should return with code=OK
+	require.NoError(s.T(), err)
+	requireValue(s.T(), 1, DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty"))
+	requireValue(s.T(), 1, DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty", "OK"))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingEmpty"))
+
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Header.String()))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String()))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String()))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Tailer.String()))
+
+	_, err = s.testClient.PingError(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
+	require.Error(s.T(), err)
+	requireValue(s.T(), 1, DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError"))
+	requireValue(s.T(), 1, DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError", "FailedPrecondition"))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingError"))
+
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Header.String()))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Payload.String()))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingEmpty", Tailer.String()))
+
+}
+
+func (s *ServerStatsHandlerTestSuite) TestStreamingIncrementsMetrics() {
+	ss, _ := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{}) // should return with code=OK
+	// Do a read, just for kicks.
+	count := 0
+	for {
+		_, err := ss.Recv()
+		if err == io.EOF {
+			break
+		}
+		require.NoError(s.T(), err, "reading pingList shouldn't fail")
+		count++
+	}
+	require.EqualValues(s.T(), countListResponses, count, "Number of received msg on the wire must match")
+
+	requireValueWithRetry(s.ctx, s.T(), 1,
+		DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValueWithRetry(s.ctx, s.T(), 1,
+		DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "OK"))
+	requireValueWithRetry(s.ctx, s.T(), countListResponses,
+		DefaultServerMetrics.serverStreamMsgSent.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValueWithRetry(s.ctx, s.T(), 1,
+		DefaultServerMetrics.serverStreamMsgReceived.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValueHistCount(s.T(), 1,
+		DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Header.String()))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String()))
+	requireValueHistCount(s.T(), countListResponses, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String()))
+	requireValueHistCount(s.T(), 1, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String()))
+
+	_, err := s.testClient.PingList(s.ctx, &pb_testproto.PingRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
+	require.NoError(s.T(), err, "PingList must not fail immediately")
+
+	requireValueWithRetry(s.ctx, s.T(), 2,
+		DefaultServerMetrics.serverStartedCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+	requireValueWithRetry(s.ctx, s.T(), 1,
+		DefaultServerMetrics.serverHandledCounter.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList", "FailedPrecondition"))
+	requireValueHistCount(s.T(), 2,
+		DefaultServerMetrics.serverHandledHistogram.WithLabelValues("unary", "mwitkow.testproto.TestService", "PingList"))
+
+	requireValueHistCount(s.T(), 2, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Header.String()))
+	requireValueHistCount(s.T(), 2, DefaultServerMetrics.serverMsgSizeReceivedHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Payload.String()))
+	requireValueHistCount(s.T(), 2, DefaultServerMetrics.serverMsgSizeSentHistogram.WithLabelValues("mwitkow.testproto.TestService", "PingList", Tailer.String()))
+
+}
diff --git a/util.go b/util.go
index 7987de3..41c86cf 100644
--- a/util.go
+++ b/util.go
@@ -5,6 +5,7 @@ package grpc_prometheus
 
 import (
 	"strings"
+	"time"
 
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
@@ -26,6 +27,12 @@ var (
 		codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal,
 		codes.Unavailable, codes.DataLoss,
 	}
+
+	allStatss = []grpcStats{Header, Payload, Tailer}
+
+	rpcInfoKey = "rpc-info"
+
+	defMsgBytesBuckets = []float64{0, 32, 64, 128, 256, 512, 1024, 2048, 8192, 32768, 131072, 524288}
 )
 
 func splitMethodName(fullMethodName string) (string, string) {
@@ -48,3 +55,30 @@ func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType {
 	}
 	return BidiStream
 }
+
+type grpcStats string
+
+const (
+	// Header indicates that the stats is the header
+	Header grpcStats = "header"
+
+	// Payload indicates that the stats is the Payload
+	Payload grpcStats = "payload"
+
+	// Tailer indicates that the stats is the Payload
+	Tailer grpcStats = "tailer"
+)
+
+// String function returns the grpcStats with string format.
+func (s grpcStats) String() string {
+	return string(s)
+}
+
+type rpcInfo struct {
+	fullMethodName string
+	startTime      time.Time
+}
+
+func newRPCInfo(fullMethodName string) *rpcInfo {
+	return &rpcInfo{fullMethodName: fullMethodName}
+}