Skip to content

Commit

Permalink
Merge pull request #623 from ripienaar/msg_ttl_refine
Browse files Browse the repository at this point in the history
Further MSG TTL refinements
  • Loading branch information
ripienaar authored Jan 31, 2025
2 parents 62a6bf0 + 2acea85 commit 4df792d
Show file tree
Hide file tree
Showing 23 changed files with 64 additions and 104 deletions.
11 changes: 6 additions & 5 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ type JSApiStreamListRequest struct {

// io.nats.jetstream.api.v1.stream_msg_delete_request
type JSApiMsgDeleteRequest struct {
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
NoMarker bool `json:"no_marker,omitempty"`
}

// io.nats.jetstream.api.v1.stream_msg_delete_response
Expand Down Expand Up @@ -171,6 +172,8 @@ type JSApiStreamPurgeRequest struct {
Subject string `json:"filter,omitempty"`
// Number of messages to keep.
Keep uint64 `json:"keep,omitempty"`
// Avoids purge markers
NoMarker bool `json:"no_marker,omitempty"`
}

// io.nats.jetstream.api.v1.stream_msg_get_response
Expand Down Expand Up @@ -583,9 +586,7 @@ type StreamConfig struct {
// AllowMsgTTL allows header initiated per-message TTLs. If disabled,
// then the `NATS-TTL` header will be ignored.
AllowMsgTTL bool `json:"allow_msg_ttl,omitempty" yaml:"allow_msg_ttl"`
// Enables placing markers in the stream for certain message delete operations
SubjectDeleteMarkers bool `json:"subject_delete_markers,omitempty" yaml:"subject_delete_markers"`
// When placing a marker, how long should it be valid, defaults to 15m
// SubjectDeleteMarkerTTL enables and sets a duration for adding server markers for delete, purge and max age limits
SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty" yaml:"subject_delete_marker_ttl"`
// The following defaults will apply to consumers when created against
// this stream, unless overridden manually. They also represent the maximum values that
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/jedib0t/go-pretty/v6 v6.6.5
github.com/klauspost/compress v1.17.11
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742
github.com/nats-io/nats.go v1.38.0
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nuid v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254 h1:3
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200 h1:JTZQwrehqUHpEO+DRjm0734B4a0porO1Cb5ACT0nSJY=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742 h1:UbqnZ2WOky1HufvUU+jRfrskdBkJlJrOsuyAzvrmMQA=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
Expand Down
7 changes: 1 addition & 6 deletions schema_source/jetstream/api/v1/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1230,13 +1230,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"$ref": "#/definitions/golang_duration_nanos",
"minimum": 0
}
Expand Down
4 changes: 4 additions & 0 deletions schema_source/jetstream/api/v1/stream_msg_delete_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"no_erase": {
"type": "boolean",
"description": "Default will securely remove a message and rewrite the data with random data, set this to true to only remove the message"
},
"no_marker": {
"type": "boolean",
"description": "Prevents placing a delete marker in streams with markers enabled"
}
}
}
4 changes: 4 additions & 0 deletions schema_source/jetstream/api/v1/stream_purge_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
"keep": {
"description": "Ensures this many messages are present after the purge. Can be combined with the subject filter but not the sequence",
"$ref": "definitions.json#/definitions/golang_uint64"
},
"no_marker": {
"type": "boolean",
"description": "Prevents placing a purge marker in streams with markers enabled"
}
}
}
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_create_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_list_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
4 changes: 4 additions & 0 deletions schemas/jetstream/api/v1/stream_msg_delete_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
"no_erase": {
"type": "boolean",
"description": "Default will securely remove a message and rewrite the data with random data, set this to true to only remove the message"
},
"no_marker": {
"type": "boolean",
"description": "Prevents placing a delete marker in streams with markers enabled"
}
}
}
4 changes: 4 additions & 0 deletions schemas/jetstream/api/v1/stream_purge_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
"type": "integer",
"minimum": 0,
"maximum": 18446744073709551615
},
"no_marker": {
"type": "boolean",
"description": "Prevents placing a purge marker in streams with markers enabled"
}
}
}
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_restore_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_snapshot_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_template_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_template_create_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_template_info_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_update_request.json
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
7 changes: 1 addition & 6 deletions schemas/jetstream/api/v1/stream_update_response.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,8 @@
"type": "boolean",
"default": false
},
"subject_delete_markers": {
"description": "Enables placing markers in the stream for certain message delete operations",
"type": "boolean",
"default": false
},
"subject_delete_marker_ttl": {
"description": "When placing a marker, how long should it be valid, defaults to 15m",
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
"minimum": 0,
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
"type": "integer",
Expand Down
14 changes: 0 additions & 14 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,21 +545,8 @@ func AllowMsgTTL() StreamOption {
}
}

func AllowSubjectDeleteMarkers() StreamOption {
return func(o *api.StreamConfig) error {
o.AllowMsgTTL = true
o.SubjectDeleteMarkers = true
return nil
}
}

func SubjectDeleteMarkerTTL(d time.Duration) StreamOption {
return func(o *api.StreamConfig) error {
err := AllowSubjectDeleteMarkers()(o)
if err != nil {
return err
}

o.SubjectDeleteMarkerTTL = d

return nil
Expand Down Expand Up @@ -1145,6 +1132,5 @@ func (s *Stream) Metadata() map[string]string { return s.cfg.Metada
func (s *Stream) Compression() api.Compression { return s.cfg.Compression }
func (s *Stream) FirstSequence() uint64 { return s.cfg.FirstSeq }
func (s *Stream) AllowMsgTTL() bool { return s.cfg.AllowMsgTTL }
func (s *Stream) SubjectDeleteMarkers() bool { return s.cfg.SubjectDeleteMarkers }
func (s *Stream) SubjectDeleteMarkerTTL() time.Duration { return s.cfg.SubjectDeleteMarkerTTL }
func (s *Stream) ConsumerLimits() api.StreamConsumerLimits { return s.cfg.ConsumerLimits }
25 changes: 25 additions & 0 deletions test/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,30 @@ func TestFirstSequence(t *testing.T) {
}
}

func TestStreamSubjectDeleteMarkerTTL(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
defer nc.Flush()

s, err := mgr.NewStream("m1", jsm.Subjects("test"))
checkErr(t, err, "create failed")

if s.SubjectDeleteMarkerTTL() != 0 {
t.Fatalf("Expected DeleteMarkerTTL to be 0 got %v", s.SubjectDeleteMarkerTTL())
}

err = s.Delete()
checkErr(t, err, "delete failed")

s, err = mgr.NewStream("m1", jsm.Subjects("test"), jsm.AllowMsgTTL(), jsm.SubjectDeleteMarkerTTL(time.Minute))
checkErr(t, err, "create failed")

if s.SubjectDeleteMarkerTTL() != time.Minute {
t.Fatalf("Expected DeleteMarkerTTL to be 1 minute got %v", s.SubjectDeleteMarkerTTL())
}

}

func TestStreamSealed(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
Expand Down Expand Up @@ -1118,6 +1142,7 @@ func TestStream_Compression(t *testing.T) {
t.Fatalf("s2 compression was not reported correctly")
}
}

func TestStream_DetectGaps(t *testing.T) {
srv, nc, mgr := startJSServer(t)
defer srv.Shutdown()
Expand Down

0 comments on commit 4df792d

Please sign in to comment.