diff --git a/api/jetstream.go b/api/jetstream.go index eed215d..3da4617 100644 --- a/api/jetstream.go +++ b/api/jetstream.go @@ -32,6 +32,8 @@ const ( JSMetaCurrentServerLevel = "_nats.level" JSMetaCurrentServerVersion = "_nats.ver" JsMetaRequiredServerLevel = "_nats.req.level" + JSMessageTTL = "Nats-TTL" + JSMessageNoExpire = "Nats-No-Expire" ) // Responses to requests sent to a server from a client. diff --git a/api/streams.go b/api/streams.go index 824ee9c..cf8d5ab 100644 --- a/api/streams.go +++ b/api/streams.go @@ -580,6 +580,11 @@ type StreamConfig struct { FirstSeq uint64 `json:"first_seq,omitempty" yaml:"first_seq"` // Metadata is additional metadata for the Consumer. Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata"` + // AllowMsgTTL allows header initiated per-message TTLs. If disabled, + // then the `NATS-TTL` header will be ignored. + AllowMsgTTL bool `json:"allow_msg_ttl,omitempty" yaml:"allow_msg_ttl"` + // LimitsTTL activates writing of messages when limits are applied with a specific TTL. + LimitsTTL time.Duration `json:"limits_ttl,omitempty" yaml:"limits_ttl"` // The following defaults will apply to consumers when created against // this stream, unless overridden manually. They also represent the maximum values that // these properties may have diff --git a/schema_source/jetstream/api/v1/definitions.json b/schema_source/jetstream/api/v1/definitions.json index 2c2c004..5396cf1 100644 --- a/schema_source/jetstream/api/v1/definitions.json +++ b/schema_source/jetstream/api/v1/definitions.json @@ -1204,6 +1204,15 @@ "consumer_limits": { "description": "Limits of certain values that consumers can set, defaults for those who don't set these settings", "$ref": "#/definitions/stream_consumer_limits" + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$ref": "#/definitions/golang_duration_nanos" } } }, diff --git a/schemas/jetstream/api/v1/stream_configuration.json b/schemas/jetstream/api/v1/stream_configuration.json index 19b6c02..fd9381d 100644 --- a/schemas/jetstream/api/v1/stream_configuration.json +++ b/schemas/jetstream/api/v1/stream_configuration.json @@ -428,6 +428,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } } diff --git a/schemas/jetstream/api/v1/stream_create_request.json b/schemas/jetstream/api/v1/stream_create_request.json index 9237b0e..f32f72f 100644 --- a/schemas/jetstream/api/v1/stream_create_request.json +++ b/schemas/jetstream/api/v1/stream_create_request.json @@ -431,6 +431,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } }, diff --git a/schemas/jetstream/api/v1/stream_create_response.json b/schemas/jetstream/api/v1/stream_create_response.json index b5c74bb..e023be1 100644 --- a/schemas/jetstream/api/v1/stream_create_response.json +++ b/schemas/jetstream/api/v1/stream_create_response.json @@ -446,6 +446,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } } diff --git a/schemas/jetstream/api/v1/stream_info_response.json b/schemas/jetstream/api/v1/stream_info_response.json index 66f8ef5..4f161a8 100644 --- a/schemas/jetstream/api/v1/stream_info_response.json +++ b/schemas/jetstream/api/v1/stream_info_response.json @@ -446,6 +446,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } } diff --git a/schemas/jetstream/api/v1/stream_list_response.json b/schemas/jetstream/api/v1/stream_list_response.json index cda413d..3904252 100644 --- a/schemas/jetstream/api/v1/stream_list_response.json +++ b/schemas/jetstream/api/v1/stream_list_response.json @@ -511,6 +511,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } } diff --git a/schemas/jetstream/api/v1/stream_restore_request.json b/schemas/jetstream/api/v1/stream_restore_request.json index c9afebb..7d0344d 100644 --- a/schemas/jetstream/api/v1/stream_restore_request.json +++ b/schemas/jetstream/api/v1/stream_restore_request.json @@ -435,6 +435,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } }, diff --git a/schemas/jetstream/api/v1/stream_snapshot_response.json b/schemas/jetstream/api/v1/stream_snapshot_response.json index 7123634..b7ace0a 100644 --- a/schemas/jetstream/api/v1/stream_snapshot_response.json +++ b/schemas/jetstream/api/v1/stream_snapshot_response.json @@ -470,6 +470,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } }, diff --git a/schemas/jetstream/api/v1/stream_template_configuration.json b/schemas/jetstream/api/v1/stream_template_configuration.json index d1a1488..1e06992 100644 --- a/schemas/jetstream/api/v1/stream_template_configuration.json +++ b/schemas/jetstream/api/v1/stream_template_configuration.json @@ -451,6 +451,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } } diff --git a/schemas/jetstream/api/v1/stream_template_create_request.json b/schemas/jetstream/api/v1/stream_template_create_request.json index cadc40b..695d43a 100644 --- a/schemas/jetstream/api/v1/stream_template_create_request.json +++ b/schemas/jetstream/api/v1/stream_template_create_request.json @@ -445,6 +445,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } }, diff --git a/schemas/jetstream/api/v1/stream_template_create_response.json b/schemas/jetstream/api/v1/stream_template_create_response.json index 85a5bce..97489c7 100644 --- a/schemas/jetstream/api/v1/stream_template_create_response.json +++ b/schemas/jetstream/api/v1/stream_template_create_response.json @@ -455,6 +455,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } }, diff --git a/schemas/jetstream/api/v1/stream_template_info_response.json b/schemas/jetstream/api/v1/stream_template_info_response.json index f670b82..9df1e93 100644 --- a/schemas/jetstream/api/v1/stream_template_info_response.json +++ b/schemas/jetstream/api/v1/stream_template_info_response.json @@ -455,6 +455,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } }, diff --git a/schemas/jetstream/api/v1/stream_update_request.json b/schemas/jetstream/api/v1/stream_update_request.json index c3d3af2..29fef6f 100644 --- a/schemas/jetstream/api/v1/stream_update_request.json +++ b/schemas/jetstream/api/v1/stream_update_request.json @@ -431,6 +431,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } } diff --git a/schemas/jetstream/api/v1/stream_update_response.json b/schemas/jetstream/api/v1/stream_update_response.json index b68e0b9..e6aaa14 100644 --- a/schemas/jetstream/api/v1/stream_update_response.json +++ b/schemas/jetstream/api/v1/stream_update_response.json @@ -446,6 +446,18 @@ "minimum": -9223372036854775807 } } + }, + "allow_msg_ttl": { + "description": "Enables per-message TTL using headers", + "type": "boolean", + "default": false + }, + "limits_ttl": { + "description": "Writes messages into the stream when certain limits are applied, using this ttl", + "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", + "type": "integer", + "maximum": 9223372036854775807, + "minimum": -9223372036854775807 } } } diff --git a/streams.go b/streams.go index 2313584..9928ec6 100644 --- a/streams.go +++ b/streams.go @@ -538,6 +538,21 @@ func FirstSequence(seq uint64) StreamOption { } } +func AllowMsgTTL() StreamOption { + return func(o *api.StreamConfig) error { + o.AllowMsgTTL = true + return nil + } +} + +func LimitsAppliedTTL(ttl time.Duration) StreamOption { + return func(o *api.StreamConfig) error { + o.AllowMsgTTL = true + o.LimitsTTL = ttl + return nil + } +} + func SubjectTransform(subjectTransform *api.SubjectTransformConfig) StreamOption { return func(o *api.StreamConfig) error { o.SubjectTransform = subjectTransform @@ -1116,4 +1131,6 @@ func (s *Stream) IsRepublishing() bool { return s.Republish( func (s *Stream) Metadata() map[string]string { return s.cfg.Metadata } func (s *Stream) Compression() api.Compression { return s.cfg.Compression } func (s *Stream) FirstSequence() uint64 { return s.cfg.FirstSeq } +func (s *Stream) AllowMsgTTL() bool { return s.cfg.AllowMsgTTL } +func (s *Stream) LimitsTTL() time.Duration { return s.cfg.LimitsTTL } func (s *Stream) ConsumerLimits() api.StreamConsumerLimits { return s.cfg.ConsumerLimits }