Skip to content

Commit

Permalink
Merge pull request #522 from ripienaar/consumer_pause
Browse files Browse the repository at this point in the history
Adds the ability to pause and resume consumers
  • Loading branch information
ripienaar authored Feb 19, 2024
2 parents 8a115db + 44bca0a commit c1d05e6
Show file tree
Hide file tree
Showing 22 changed files with 498 additions and 22 deletions.
19 changes: 19 additions & 0 deletions api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
JSApiRequestNext = "$JS.API.CONSUMER.MSG.NEXT.*.*"
JSApiConsumerLeaderStepDownT = "$JS.API.CONSUMER.LEADER.STEPDOWN.%s.%s"
JSApiconsumerPauseT = "$JS.API.CONSUMER.PAUSE.%s.%s"
JSMetricConsumerAckPre = JSMetricPrefix + ".CONSUMER.ACK"
JSAdvisoryConsumerMaxDeliveryExceedPre = JSAdvisoryPrefix + ".CONSUMER.MAX_DELIVERIES"
)
Expand Down Expand Up @@ -150,6 +151,19 @@ type JSApiConsumerLeaderStepDownResponse struct {
Success bool `json:"success,omitempty"`
}

// io.nats.jetstream.api.v1.consumer_pause_request
type JSApiConsumerPauseRequest struct {
PauseUntil time.Time `json:"pause_until,omitempty"`
}

// io.nats.jetstream.api.v1.consumer_pause_response
type JSApiConsumerPauseResponse struct {
JSApiResponse
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
}

type AckPolicy int

const (
Expand Down Expand Up @@ -352,6 +366,9 @@ type ConsumerConfig struct {
// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil time.Time `json:"pause_until,omitempty"`

// Don't add to general clients.
Direct bool `json:"direct,omitempty"`
}
Expand All @@ -377,6 +394,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
Paused bool `json:"paused,omitempty"`
PauseRemaining time.Duration `json:"pause_remaining,omitempty"`
TimeStamp time.Time `json:"ts"`
}

Expand Down
3 changes: 3 additions & 0 deletions api/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func main() {
&schema{P: "jetstream/advisory/v1/terminated.json", St: "jsadvisory.JSConsumerDeliveryTerminatedAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/stream_action.json", St: "jsadvisory.JSStreamActionAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/consumer_action.json", St: "jsadvisory.JSConsumerActionAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/consumer_pause.json", St: "jsadvisory.JSConsumerPauseAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/snapshot_create.json", St: "jsadvisory.JSSnapshotCreateAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/snapshot_complete.json", St: "jsadvisory.JSSnapshotCompleteAdvisoryV1"},
&schema{P: "jetstream/advisory/v1/restore_create.json", St: "jsadvisory.JSRestoreCreateAdvisoryV1"},
Expand All @@ -223,6 +224,8 @@ func main() {
&schema{P: "jetstream/api/v1/consumer_names_response.json", St: "JSApiConsumerNamesResponse"},
&schema{P: "jetstream/api/v1/consumer_getnext_request.json", St: "JSApiConsumerGetNextRequest"},
&schema{P: "jetstream/api/v1/consumer_leader_stepdown_response.json", St: "JSApiConsumerLeaderStepDownResponse"},
&schema{P: "jetstream/api/v1/consumer_pause_request.json", St: "JSApiConsumerPauseRequest"},
&schema{P: "jetstream/api/v1/consumer_pause_response.json", St: "JSApiConsumerPauseResponse"},
&schema{P: "jetstream/api/v1/stream_create_request.json", St: "JSApiStreamCreateRequest"},
&schema{P: "jetstream/api/v1/stream_create_response.json", St: "JSApiStreamCreateResponse"},
&schema{P: "jetstream/api/v1/stream_delete_response.json", St: "JSApiStreamDeleteResponse"},
Expand Down
42 changes: 42 additions & 0 deletions api/jetstream/advisory/consumer_pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package advisory

import (
"time"

"github.com/nats-io/jsm.go/api/event"
)

// JSConsumerPauseAdvisoryV1 indicates that a consumer was paused or unpaused
type JSConsumerPauseAdvisoryV1 struct {
event.NATSEvent

Stream string `json:"stream"`
Consumer string `json:"consumer"`
Paused bool `json:"paused"`
PauseUntil time.Time `json:"pause_until,omitempty"`
Domain string `json:"domain,omitempty"`
}

func init() {
err := event.RegisterTextCompactTemplate("io.nats.jetstream.advisory.v1.consumer_pause", `{{ .Time | ShortTime }} [Consumer Pause] Consumer: {{ .Stream }} > {{ .Consumer }} Paused: {{ .Paused }}{{ if .Paused }} until {{ .PauseUntil }}{{ end }}`)
if err != nil {
panic(err)
}

err = event.RegisterTextExtendedTemplate("io.nats.jetstream.advisory.v1.consumer_pause", `
[{{ .Time | ShortTime }}] [{{ .ID }}] Consumer Pause
Stream: {{ .Stream }}
Consumer: {{ .Consumer }}
Paused: {{ .Paused }}
{{- if .Paused }}
Until: {{ .PauseUntil }}
{{- end }}
{{- if .Domain }}
Domain: {{ .Domain }}
{{- end }}
`)
if err != nil {
panic(err)
}
}
61 changes: 60 additions & 1 deletion api/schemas_generated.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// auto generated 2024-02-08 10:37:12.960275 +0100 CET m=+0.008558457
// auto generated 2024-02-13 15:58:31.227879 +0100 CET m=+0.013734209

package api

Expand All @@ -22,6 +22,7 @@ var schemaTypes = map[string]func() any{
"io.nats.jetstream.advisory.v1.terminated": func() any { return &jsadvisory.JSConsumerDeliveryTerminatedAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.stream_action": func() any { return &jsadvisory.JSStreamActionAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.consumer_action": func() any { return &jsadvisory.JSConsumerActionAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.consumer_pause": func() any { return &jsadvisory.JSConsumerPauseAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.snapshot_create": func() any { return &jsadvisory.JSSnapshotCreateAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.snapshot_complete": func() any { return &jsadvisory.JSSnapshotCompleteAdvisoryV1{} },
"io.nats.jetstream.advisory.v1.restore_create": func() any { return &jsadvisory.JSRestoreCreateAdvisoryV1{} },
Expand All @@ -48,6 +49,8 @@ var schemaTypes = map[string]func() any{
"io.nats.jetstream.api.v1.consumer_names_response": func() any { return &JSApiConsumerNamesResponse{} },
"io.nats.jetstream.api.v1.consumer_getnext_request": func() any { return &JSApiConsumerGetNextRequest{} },
"io.nats.jetstream.api.v1.consumer_leader_stepdown_response": func() any { return &JSApiConsumerLeaderStepDownResponse{} },
"io.nats.jetstream.api.v1.consumer_pause_request": func() any { return &JSApiConsumerPauseRequest{} },
"io.nats.jetstream.api.v1.consumer_pause_response": func() any { return &JSApiConsumerPauseResponse{} },
"io.nats.jetstream.api.v1.stream_create_request": func() any { return &JSApiStreamCreateRequest{} },
"io.nats.jetstream.api.v1.stream_create_response": func() any { return &JSApiStreamCreateResponse{} },
"io.nats.jetstream.api.v1.stream_delete_response": func() any { return &JSApiStreamDeleteResponse{} },
Expand Down Expand Up @@ -480,6 +483,62 @@ func (t JSApiConsumerLeaderStepDownResponse) Schema() ([]byte, error) {
return scfs.Load(f)
}

// Validate performs a JSON Schema validation of the configuration
func (t JSApiConsumerPauseRequest) Validate(v ...StructValidator) (valid bool, errors []string) {
if len(v) == 0 || v[0] == nil {
return true, nil
}

return v[0].ValidateStruct(t, t.SchemaType())
}

// SchemaType is the NATS schema type io.nats.jetstream.api.v1.consumer_pause_request
func (t JSApiConsumerPauseRequest) SchemaType() string {
return "io.nats.jetstream.api.v1.consumer_pause_request"
}

// SchemaID is the url to the JSON Schema for JetStream Consumer Configuration
func (t JSApiConsumerPauseRequest) SchemaID() string {
return "https://raw.githubusercontent.com/nats-io/jsm.go/master/schemas/jetstream/api/v1/consumer_pause_request.json"
}

// Schema is a JSON Schema document for the JetStream Consumer Configuration
func (t JSApiConsumerPauseRequest) Schema() ([]byte, error) {
f, err := SchemaFileForType(t.SchemaType())
if err != nil {
return nil, err
}
return scfs.Load(f)
}

// Validate performs a JSON Schema validation of the configuration
func (t JSApiConsumerPauseResponse) Validate(v ...StructValidator) (valid bool, errors []string) {
if len(v) == 0 || v[0] == nil {
return true, nil
}

return v[0].ValidateStruct(t, t.SchemaType())
}

// SchemaType is the NATS schema type io.nats.jetstream.api.v1.consumer_pause_response
func (t JSApiConsumerPauseResponse) SchemaType() string {
return "io.nats.jetstream.api.v1.consumer_pause_response"
}

// SchemaID is the url to the JSON Schema for JetStream Consumer Configuration
func (t JSApiConsumerPauseResponse) SchemaID() string {
return "https://raw.githubusercontent.com/nats-io/jsm.go/master/schemas/jetstream/api/v1/consumer_pause_response.json"
}

// Schema is a JSON Schema document for the JetStream Consumer Configuration
func (t JSApiConsumerPauseResponse) Schema() ([]byte, error) {
f, err := SchemaFileForType(t.SchemaType())
if err != nil {
return nil, err
}
return scfs.Load(f)
}

// Validate performs a JSON Schema validation of the configuration
func (t JSApiStreamCreateRequest) Validate(v ...StructValidator) (valid bool, errors []string) {
if len(v) == 0 || v[0] == nil {
Expand Down
45 changes: 45 additions & 0 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,13 @@ func ConsumerMetadata(meta map[string]string) ConsumerOption {
}
}

func PauseUntil(deadline time.Time) ConsumerOption {
return func(o *api.ConsumerConfig) error {
o.PauseUntil = deadline
return nil
}
}

// UpdateConfiguration updates the consumer configuration
// At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed
func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error {
Expand Down Expand Up @@ -888,6 +895,43 @@ func (c *Consumer) LeaderStepDown() error {
return nil
}

// Pause requests a consumer be paused until the deadline, if it fails to pause an error is returned.
//
// A common reason for failures is when a time is supplied that is in the past from the perspective of the server
func (c *Consumer) Pause(deadline time.Time) (*api.JSApiConsumerPauseResponse, error) {
var resp *api.JSApiConsumerPauseResponse
req := api.JSApiConsumerPauseRequest{
PauseUntil: deadline,
}

err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), &req, &resp)
if err != nil {
return nil, err
}

if !resp.Paused {
return nil, fmt.Errorf("pause request failed, perhaps due to a time in the past")
}

return resp, nil
}

// Resume requests the server resumes a paused consumer
func (c *Consumer) Resume() error {
var resp *api.JSApiConsumerPauseResponse

err := c.mgr.jsonRequest(fmt.Sprintf(api.JSApiconsumerPauseT, c.StreamName(), c.Name()), nil, &resp)
if err != nil {
return err
}

if resp.Paused {
return fmt.Errorf("pause request failed for an unknown reason")
}

return nil
}

func (c *Consumer) Name() string { return c.name }
func (c *Consumer) IsSampled() bool { return c.SampleFrequency() != "" }
func (c *Consumer) IsPullMode() bool { return c.cfg.DeliverSubject == "" }
Expand Down Expand Up @@ -922,6 +966,7 @@ func (c *Consumer) InactiveThreshold() time.Duration { return c.cfg.InactiveThre
func (c *Consumer) Replicas() int { return c.cfg.Replicas }
func (c *Consumer) Metadata() map[string]string { return c.cfg.Metadata }
func (c *Consumer) MemoryStorage() bool { return c.cfg.MemoryStorage }
func (c *Consumer) PauseUntil() time.Time { return c.cfg.PauseUntil }
func (c *Consumer) StartTime() time.Time {
if c.cfg.OptStartTime == nil {
return time.Time{}
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/expr-lang/expr v1.15.8
github.com/google/go-cmp v0.6.0
github.com/klauspost/compress v1.17.5
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88
github.com/nats-io/nats.go v1.32.0
github.com/klauspost/compress v1.17.6
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba
github.com/nats-io/nats.go v1.33.1
github.com/nats-io/nuid v1.0.1
golang.org/x/net v0.20.0
golang.org/x/text v0.14.0
Expand All @@ -18,10 +18,10 @@ require (
require (
github.com/kr/pretty v0.1.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/jwt/v2 v2.5.4 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,34 @@ github.com/expr-lang/expr v1.15.8 h1:FL8+d3rSSP4tmK9o+vKfSMqqpGL8n15pEPiHcnBpxoI
github.com/expr-lang/expr v1.15.8/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E=
github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88 h1:mQUXBh1zwlTogpLmb3F8wJC/OrJlgQ2j76LD1BVHp64=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240207201315-f703123c4b88/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI=
github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba h1:idIfFiRzXv2wHFpxHH4nSoPtg7UVr4vQPB1NraU0D4c=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240217230419-4b3317b980ba/go.mod h1:Co2t9J1pk4WXyMZiNFkLjFiD7hKE/jjsXtDWCyLfcgw=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
Expand Down
6 changes: 5 additions & 1 deletion natscontext/testdata/nats/context/user_pass_token_creds.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@
"jetstream_event_prefix": "",
"inbox_prefix": "",
"user_jwt": "",
"color_scheme": ""
"color_scheme": "",
"tls_first": false,
"windows_cert_store": "",
"windows_cert_match_by": "",
"windows_cert_match": ""
}
50 changes: 50 additions & 0 deletions schema_source/jetstream/advisory/v1/consumer_pause.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://nats.io/schemas/jetstream/advisory/v1/consumer_pause.json",
"description": "An Advisory sent when consumer is paused or resumed",
"title": "io.nats.jetstream.advisory.v1.consumer_pause",
"type": "object",
"required": [
"type",
"id",
"timestamp",
"stream",
"consumer",
"paused"
],
"additionalProperties": false,
"properties": {
"type": {
"type": "string",
"const": "io.nats.jetstream.advisory.v1.consumer_pause"
},
"id": {
"type": "string",
"description": "Unique correlation ID for this event"
},
"timestamp": {
"type": "string",
"description": "The time this event was created in RFC3339 format"
},
"stream": {
"type": "string",
"description": "The name of the Stream the Consumer belongs to"
},
"consumer": {
"type": "string",
"description": "The name of the Consumer that elected a new leader"
},
"paused": {
"type": "boolean",
"description": "Indicates the consumer is paused"
},
"pause_until": {
"description": "When paused the time the consumer will be unpaused, RFC3339 format",
"type": "string"
},
"domain": {
"description": "The domain hosting the Stream and Consumer if configured",
"type": "string"
}
}
}
Loading

0 comments on commit c1d05e6

Please sign in to comment.