diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 8ec422430..5039e5420 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -21,6 +21,7 @@ const ( frameTypeResponse int32 = 0 frameTypeError int32 = 1 frameTypeMessage int32 = 2 + frameTypeStats int32 = 3 ) var separatorBytes = []byte(" ") @@ -196,6 +197,8 @@ func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { return p.CLS(client, params) case bytes.Equal(params[0], []byte("AUTH")): return p.AUTH(client, params) + case bytes.Equal(params[0], []byte("STATS")): + return p.STATS(client, params) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) } @@ -701,6 +704,52 @@ func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { return nil, nil } +func (p *protocolV2) STATS(client *clientV2, params [][]byte) ([]byte, error) { + var ( + state = atomic.LoadInt32(&client.State) + channel = client.Channel + ) + if state != stateSubscribed && state != stateClosing { + return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot STATS in current state") + } + + channel.inFlightMutex.Lock() + inflight := len(channel.inFlightMessages) + channel.inFlightMutex.Unlock() + channel.deferredMutex.Lock() + deferred := len(channel.deferredMessages) + channel.deferredMutex.Unlock() + resp, err := json.Marshal(struct { + ChannelName string `json:"channel_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + InFlightCount int `json:"in_flight_count"` + DeferredCount int `json:"deferred_count"` + MessageCount uint64 `json:"message_count"` + RequeueCount uint64 `json:"requeue_count"` + TimeoutCount uint64 `json:"timeout_count"` + Paused bool `json:"paused"` + }{ + ChannelName: channel.name, + Depth: channel.Depth(), + BackendDepth: channel.backend.Depth(), + InFlightCount: inflight, + DeferredCount: deferred, + MessageCount: atomic.LoadUint64(&channel.messageCount), + RequeueCount: atomic.LoadUint64(&channel.requeueCount), + TimeoutCount: atomic.LoadUint64(&channel.timeoutCount), + Paused: channel.IsPaused(), + }) + if err != nil { + return nil, protocol.NewFatalClientErr(err, "E_STATS_ERROR", "STATS error "+err.Error()) + } + + if err = p.Send(client, frameTypeStats, resp); err != nil { + return nil, protocol.NewFatalClientErr(err, "E_STATS_FAILED", "STATS failed "+err.Error()) + } + return nil, nil +} + func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != stateSubscribed && state != stateClosing {