Skip to content

Commit

Permalink
Initial support for Per Message TTL
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Jan 9, 2025
1 parent e4ccd32 commit 2ca079e
Show file tree
Hide file tree
Showing 17 changed files with 189 additions and 0 deletions.
2 changes: 2 additions & 0 deletions api/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
JSMetaCurrentServerLevel = "_nats.level"
JSMetaCurrentServerVersion = "_nats.ver"
JsMetaRequiredServerLevel = "_nats.req.level"
JSMessageTTL = "Nats-TTL"
JSMessageNoExpire = "Nats-No-Expire"
)

// Responses to requests sent to a server from a client.
Expand Down
5 changes: 5 additions & 0 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,11 @@ type StreamConfig struct {
FirstSeq uint64 `json:"first_seq,omitempty" yaml:"first_seq"`
// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata"`
// AllowMsgTTL allows header initiated per-message TTLs. If disabled,
// then the `NATS-TTL` header will be ignored.
AllowMsgTTL bool `json:"allow_msg_ttl,omitempty" yaml:"allow_msg_ttl"`
// LimitsTTL activates writing of messages when limits are applied with a specific TTL.
LimitsTTL time.Duration `json:"limits_ttl,omitempty" yaml:"limits_ttl"`
// The following defaults will apply to consumers when created against
// this stream, unless overridden manually. They also represent the maximum values that
// these properties may have
Expand Down
9 changes: 9 additions & 0 deletions schema_source/jetstream/api/v1/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,15 @@
"consumer_limits": {
"description": "Limits of certain values that consumers can set, defaults for those who don't set these settings",
"$ref": "#/definitions/stream_consumer_limits"
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$ref": "#/definitions/golang_duration_nanos"
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_list_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_restore_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_snapshot_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_template_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
},
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_update_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions schemas/jetstream/api/v1/stream_update_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@
"minimum": -9223372036854775807
}
}
},
"allow_msg_ttl": {
"description": "Enables per-message TTL using headers",
"type": "boolean",
"default": false
},
"limits_ttl": {
"description": "Writes messages into the stream when certain limits are applied, using this ttl",
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
"maximum": 9223372036854775807,
"minimum": -9223372036854775807
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,21 @@ func FirstSequence(seq uint64) StreamOption {
}
}

func AllowMsgTTL() StreamOption {
return func(o *api.StreamConfig) error {
o.AllowMsgTTL = true
return nil
}
}

func LimitsAppliedTTL(ttl time.Duration) StreamOption {
return func(o *api.StreamConfig) error {
o.AllowMsgTTL = true
o.LimitsTTL = ttl
return nil
}
}

func SubjectTransform(subjectTransform *api.SubjectTransformConfig) StreamOption {
return func(o *api.StreamConfig) error {
o.SubjectTransform = subjectTransform
Expand Down Expand Up @@ -1116,4 +1131,6 @@ func (s *Stream) IsRepublishing() bool { return s.Republish(
func (s *Stream) Metadata() map[string]string { return s.cfg.Metadata }
func (s *Stream) Compression() api.Compression { return s.cfg.Compression }
func (s *Stream) FirstSequence() uint64 { return s.cfg.FirstSeq }
func (s *Stream) AllowMsgTTL() bool { return s.cfg.AllowMsgTTL }
func (s *Stream) LimitsTTL() time.Duration { return s.cfg.LimitsTTL }
func (s *Stream) ConsumerLimits() api.StreamConsumerLimits { return s.cfg.ConsumerLimits }

0 comments on commit 2ca079e

Please sign in to comment.