diff --git a/api/streams.go b/api/streams.go index 798086f..e34cfff 100644 --- a/api/streams.go +++ b/api/streams.go @@ -117,8 +117,9 @@ type JSApiStreamListRequest struct { // io.nats.jetstream.api.v1.stream_msg_delete_request type JSApiMsgDeleteRequest struct { - Seq uint64 `json:"seq"` - NoErase bool `json:"no_erase,omitempty"` + Seq uint64 `json:"seq"` + NoErase bool `json:"no_erase,omitempty"` + NoMarker bool `json:"no_marker,omitempty"` } // io.nats.jetstream.api.v1.stream_msg_delete_response @@ -171,6 +172,8 @@ type JSApiStreamPurgeRequest struct { Subject string `json:"filter,omitempty"` // Number of messages to keep. Keep uint64 `json:"keep,omitempty"` + // Avoids purge markers + NoMarker bool `json:"no_marker,omitempty"` } // io.nats.jetstream.api.v1.stream_msg_get_response @@ -583,9 +586,7 @@ type StreamConfig struct { // AllowMsgTTL allows header initiated per-message TTLs. If disabled, // then the `NATS-TTL` header will be ignored. AllowMsgTTL bool `json:"allow_msg_ttl,omitempty" yaml:"allow_msg_ttl"` - // Enables placing markers in the stream for certain message delete operations - SubjectDeleteMarkers bool `json:"subject_delete_markers,omitempty" yaml:"subject_delete_markers"` - // When placing a marker, how long should it be valid, defaults to 15m + // SubjectDeleteMarkerTTL enables and sets a duration for adding server markers for delete, purge and max age limits SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty" yaml:"subject_delete_marker_ttl"` // The following defaults will apply to consumers when created against // this stream, unless overridden manually. They also represent the maximum values that diff --git a/go.mod b/go.mod index aa99d8e..a2e30a5 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/jedib0t/go-pretty/v6 v6.6.5 github.com/klauspost/compress v1.17.11 github.com/nats-io/jwt/v2 v2.7.3 - github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200 + github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742 github.com/nats-io/nats.go v1.38.0 github.com/nats-io/nkeys v0.4.9 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index 510e8b6..d78803f 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254 h1:3 github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250122133427-2352ad9fd254/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg= github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200 h1:JTZQwrehqUHpEO+DRjm0734B4a0porO1Cb5ACT0nSJY= github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250126014539-f2eb5650d200/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742 h1:UbqnZ2WOky1HufvUU+jRfrskdBkJlJrOsuyAzvrmMQA= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250130210036-406084551742/go.mod h1:NLseHFkD5ZPPkHVYn4JEG8LguxveaOXJPiIfswZugHg= github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= diff --git a/schema_source/jetstream/api/v1/definitions.json b/schema_source/jetstream/api/v1/definitions.json index 58c85fd..004bd49 100644 --- a/schema_source/jetstream/api/v1/definitions.json +++ b/schema_source/jetstream/api/v1/definitions.json @@ -1230,13 +1230,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "$ref": "#/definitions/golang_duration_nanos", "minimum": 0 } diff --git a/schema_source/jetstream/api/v1/stream_msg_delete_request.json b/schema_source/jetstream/api/v1/stream_msg_delete_request.json index 703018c..0cfc53c 100644 --- a/schema_source/jetstream/api/v1/stream_msg_delete_request.json +++ b/schema_source/jetstream/api/v1/stream_msg_delete_request.json @@ -13,6 +13,10 @@ "no_erase": { "type": "boolean", "description": "Default will securely remove a message and rewrite the data with random data, set this to true to only remove the message" + }, + "no_marker": { + "type": "boolean", + "description": "Prevents placing a delete marker in streams with markers enabled" } } } diff --git a/schema_source/jetstream/api/v1/stream_purge_request.json b/schema_source/jetstream/api/v1/stream_purge_request.json index 79d2f08..e6159e2 100644 --- a/schema_source/jetstream/api/v1/stream_purge_request.json +++ b/schema_source/jetstream/api/v1/stream_purge_request.json @@ -16,6 +16,10 @@ "keep": { "description": "Ensures this many messages are present after the purge. Can be combined with the subject filter but not the sequence", "$ref": "definitions.json#/definitions/golang_uint64" + }, + "no_marker": { + "type": "boolean", + "description": "Prevents placing a purge marker in streams with markers enabled" } } } diff --git a/schemas/jetstream/api/v1/stream_configuration.json b/schemas/jetstream/api/v1/stream_configuration.json index b135358..9a63629 100644 --- a/schemas/jetstream/api/v1/stream_configuration.json +++ b/schemas/jetstream/api/v1/stream_configuration.json @@ -434,13 +434,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_create_request.json b/schemas/jetstream/api/v1/stream_create_request.json index 6506273..3b76874 100644 --- a/schemas/jetstream/api/v1/stream_create_request.json +++ b/schemas/jetstream/api/v1/stream_create_request.json @@ -437,13 +437,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_create_response.json b/schemas/jetstream/api/v1/stream_create_response.json index 0b6e63d..4fd6e11 100644 --- a/schemas/jetstream/api/v1/stream_create_response.json +++ b/schemas/jetstream/api/v1/stream_create_response.json @@ -452,13 +452,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_info_response.json b/schemas/jetstream/api/v1/stream_info_response.json index 52d11af..2f1dadd 100644 --- a/schemas/jetstream/api/v1/stream_info_response.json +++ b/schemas/jetstream/api/v1/stream_info_response.json @@ -452,13 +452,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_list_response.json b/schemas/jetstream/api/v1/stream_list_response.json index 3af2af9..f40c29a 100644 --- a/schemas/jetstream/api/v1/stream_list_response.json +++ b/schemas/jetstream/api/v1/stream_list_response.json @@ -517,13 +517,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_msg_delete_request.json b/schemas/jetstream/api/v1/stream_msg_delete_request.json index 0f7f505..ee43097 100644 --- a/schemas/jetstream/api/v1/stream_msg_delete_request.json +++ b/schemas/jetstream/api/v1/stream_msg_delete_request.json @@ -18,6 +18,10 @@ "no_erase": { "type": "boolean", "description": "Default will securely remove a message and rewrite the data with random data, set this to true to only remove the message" + }, + "no_marker": { + "type": "boolean", + "description": "Prevents placing a delete marker in streams with markers enabled" } } } diff --git a/schemas/jetstream/api/v1/stream_purge_request.json b/schemas/jetstream/api/v1/stream_purge_request.json index 67339d7..af01989 100644 --- a/schemas/jetstream/api/v1/stream_purge_request.json +++ b/schemas/jetstream/api/v1/stream_purge_request.json @@ -22,6 +22,10 @@ "type": "integer", "minimum": 0, "maximum": 18446744073709551615 + }, + "no_marker": { + "type": "boolean", + "description": "Prevents placing a purge marker in streams with markers enabled" } } } diff --git a/schemas/jetstream/api/v1/stream_restore_request.json b/schemas/jetstream/api/v1/stream_restore_request.json index 327f0a8..b208f33 100644 --- a/schemas/jetstream/api/v1/stream_restore_request.json +++ b/schemas/jetstream/api/v1/stream_restore_request.json @@ -441,13 +441,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_snapshot_response.json b/schemas/jetstream/api/v1/stream_snapshot_response.json index 8e08c5d..4fbc65a 100644 --- a/schemas/jetstream/api/v1/stream_snapshot_response.json +++ b/schemas/jetstream/api/v1/stream_snapshot_response.json @@ -476,13 +476,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_template_configuration.json b/schemas/jetstream/api/v1/stream_template_configuration.json index a51835c..23d28e5 100644 --- a/schemas/jetstream/api/v1/stream_template_configuration.json +++ b/schemas/jetstream/api/v1/stream_template_configuration.json @@ -457,13 +457,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_template_create_request.json b/schemas/jetstream/api/v1/stream_template_create_request.json index d1d04e0..2d8147a 100644 --- a/schemas/jetstream/api/v1/stream_template_create_request.json +++ b/schemas/jetstream/api/v1/stream_template_create_request.json @@ -451,13 +451,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_template_create_response.json b/schemas/jetstream/api/v1/stream_template_create_response.json index 7117300..a59972b 100644 --- a/schemas/jetstream/api/v1/stream_template_create_response.json +++ b/schemas/jetstream/api/v1/stream_template_create_response.json @@ -461,13 +461,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_template_info_response.json b/schemas/jetstream/api/v1/stream_template_info_response.json index d40348d..8ef4976 100644 --- a/schemas/jetstream/api/v1/stream_template_info_response.json +++ b/schemas/jetstream/api/v1/stream_template_info_response.json @@ -461,13 +461,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_update_request.json b/schemas/jetstream/api/v1/stream_update_request.json index b906c01..c7de45f 100644 --- a/schemas/jetstream/api/v1/stream_update_request.json +++ b/schemas/jetstream/api/v1/stream_update_request.json @@ -437,13 +437,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/schemas/jetstream/api/v1/stream_update_response.json b/schemas/jetstream/api/v1/stream_update_response.json index 9954455..b51bce3 100644 --- a/schemas/jetstream/api/v1/stream_update_response.json +++ b/schemas/jetstream/api/v1/stream_update_response.json @@ -452,13 +452,8 @@ "type": "boolean", "default": false }, - "subject_delete_markers": { - "description": "Enables placing markers in the stream for certain message delete operations", - "type": "boolean", - "default": false - }, "subject_delete_marker_ttl": { - "description": "When placing a marker, how long should it be valid, defaults to 15m", + "description": "Enables and sets a duration for adding server markers for delete, purge and max age limits", "minimum": 0, "$comment": "nanoseconds depicting a duration in time, signed 64 bit integer", "type": "integer", diff --git a/streams.go b/streams.go index c5a646e..f6872a3 100644 --- a/streams.go +++ b/streams.go @@ -545,21 +545,8 @@ func AllowMsgTTL() StreamOption { } } -func AllowSubjectDeleteMarkers() StreamOption { - return func(o *api.StreamConfig) error { - o.AllowMsgTTL = true - o.SubjectDeleteMarkers = true - return nil - } -} - func SubjectDeleteMarkerTTL(d time.Duration) StreamOption { return func(o *api.StreamConfig) error { - err := AllowSubjectDeleteMarkers()(o) - if err != nil { - return err - } - o.SubjectDeleteMarkerTTL = d return nil @@ -1145,6 +1132,5 @@ func (s *Stream) Metadata() map[string]string { return s.cfg.Metada func (s *Stream) Compression() api.Compression { return s.cfg.Compression } func (s *Stream) FirstSequence() uint64 { return s.cfg.FirstSeq } func (s *Stream) AllowMsgTTL() bool { return s.cfg.AllowMsgTTL } -func (s *Stream) SubjectDeleteMarkers() bool { return s.cfg.SubjectDeleteMarkers } func (s *Stream) SubjectDeleteMarkerTTL() time.Duration { return s.cfg.SubjectDeleteMarkerTTL } func (s *Stream) ConsumerLimits() api.StreamConsumerLimits { return s.cfg.ConsumerLimits } diff --git a/test/streams_test.go b/test/streams_test.go index 046c971..86ec11d 100644 --- a/test/streams_test.go +++ b/test/streams_test.go @@ -1011,6 +1011,30 @@ func TestFirstSequence(t *testing.T) { } } +func TestStreamSubjectDeleteMarkerTTL(t *testing.T) { + srv, nc, mgr := startJSServer(t) + defer srv.Shutdown() + defer nc.Flush() + + s, err := mgr.NewStream("m1", jsm.Subjects("test")) + checkErr(t, err, "create failed") + + if s.SubjectDeleteMarkerTTL() != 0 { + t.Fatalf("Expected DeleteMarkerTTL to be 0 got %v", s.SubjectDeleteMarkerTTL()) + } + + err = s.Delete() + checkErr(t, err, "delete failed") + + s, err = mgr.NewStream("m1", jsm.Subjects("test"), jsm.AllowMsgTTL(), jsm.SubjectDeleteMarkerTTL(time.Minute)) + checkErr(t, err, "create failed") + + if s.SubjectDeleteMarkerTTL() != time.Minute { + t.Fatalf("Expected DeleteMarkerTTL to be 1 minute got %v", s.SubjectDeleteMarkerTTL()) + } + +} + func TestStreamSealed(t *testing.T) { srv, nc, mgr := startJSServer(t) defer srv.Shutdown() @@ -1118,6 +1142,7 @@ func TestStream_Compression(t *testing.T) { t.Fatalf("s2 compression was not reported correctly") } } + func TestStream_DetectGaps(t *testing.T) { srv, nc, mgr := startJSServer(t) defer srv.Shutdown()