From 5bffb214984dc1c3b0ff35e949e0402c9eb78893 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 8 Apr 2024 19:38:46 -0400 Subject: [PATCH] Add support for Consumer Create Action --- src/NATS.Client/JetStream/ApiConstants.cs | 1 + src/NATS.Client/JetStream/ApiEnums.cs | 13 ++ .../JetStream/ConsumerCreateRequest.cs | 18 ++- .../JetStream/IJetStreamManagement.cs | 16 +++ src/NATS.Client/JetStream/JetStream.cs | 2 +- src/NATS.Client/JetStream/JetStreamBase.cs | 4 +- .../JetStream/JetStreamManagement.cs | 16 ++- .../JetStream/OrderedMessageManager.cs | 2 +- .../TestJetStreamManagement.cs | 127 ++++++++++++++++++ .../JetStream/TestConsumerConfiguration.cs | 2 +- 10 files changed, 188 insertions(+), 13 deletions(-) diff --git a/src/NATS.Client/JetStream/ApiConstants.cs b/src/NATS.Client/JetStream/ApiConstants.cs index 41eff8e7d..7a7a1758c 100644 --- a/src/NATS.Client/JetStream/ApiConstants.cs +++ b/src/NATS.Client/JetStream/ApiConstants.cs @@ -18,6 +18,7 @@ public static class ApiConstants public const string AckFloor = "ack_floor"; public const string AckPolicy = "ack_policy"; public const string AckWait = "ack_wait"; + public const string Action = "action"; public const string Active = "active"; public const string AllowRollupHdrs = "allow_rollup_hdrs"; public const string AllowDirect = "allow_direct"; diff --git a/src/NATS.Client/JetStream/ApiEnums.cs b/src/NATS.Client/JetStream/ApiEnums.cs index 343d831e0..c65d169f5 100644 --- a/src/NATS.Client/JetStream/ApiEnums.cs +++ b/src/NATS.Client/JetStream/ApiEnums.cs @@ -27,6 +27,8 @@ public enum StorageType { File, Memory } public enum CompressionOption { None, S2 } + public enum ConsumerCreateRequestAction { Create, Update, CreateOrUpdate } + public static class ApiEnums { public static string GetString(this AckPolicy ackPolicy) @@ -105,6 +107,17 @@ public static string GetString(this CompressionOption compressionOption) return null; } + public static string GetString(this ConsumerCreateRequestAction action) + { + switch (action) + { + case ConsumerCreateRequestAction.Create: return "create"; + case ConsumerCreateRequestAction.Update: return "update"; + case ConsumerCreateRequestAction.CreateOrUpdate: return null; + } + return null; + } + public static AckPolicy? GetAckPolicy(string value) { if (value != null) diff --git a/src/NATS.Client/JetStream/ConsumerCreateRequest.cs b/src/NATS.Client/JetStream/ConsumerCreateRequest.cs index 7565d3595..d0d716e44 100644 --- a/src/NATS.Client/JetStream/ConsumerCreateRequest.cs +++ b/src/NATS.Client/JetStream/ConsumerCreateRequest.cs @@ -1,4 +1,4 @@ -// Copyright 2021 The NATS Authors +// Copyright 2021-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at: @@ -11,28 +11,32 @@ // See the License for the specific language governing permissions and // limitations under the License. +using NATS.Client.Internals; using NATS.Client.Internals.SimpleJSON; namespace NATS.Client.JetStream { + public sealed class ConsumerCreateRequest : JsonSerializable { public string StreamName { get; } public ConsumerConfiguration Config { get; } + public ConsumerCreateRequestAction Action { get; } - internal ConsumerCreateRequest(string streamName, ConsumerConfiguration config) + internal ConsumerCreateRequest(string streamName, ConsumerConfiguration config, ConsumerCreateRequestAction action) { StreamName = streamName; Config = config; + Action = action; } public override JSONNode ToJsonNode() { - return new JSONObject - { - [ApiConstants.StreamName] = StreamName, - [ApiConstants.Config] = Config.ToJsonNode() - }; + JSONObject o = new JSONObject(); + JsonUtils.AddField(o, ApiConstants.StreamName, StreamName); + JsonUtils.AddField(o, ApiConstants.Config, Config); + JsonUtils.AddField(o, ApiConstants.Action, Action.GetString()); + return o; } } } diff --git a/src/NATS.Client/JetStream/IJetStreamManagement.cs b/src/NATS.Client/JetStream/IJetStreamManagement.cs index 754a61521..da0049904 100644 --- a/src/NATS.Client/JetStream/IJetStreamManagement.cs +++ b/src/NATS.Client/JetStream/IJetStreamManagement.cs @@ -90,6 +90,22 @@ public interface IJetStreamManagement /// ConsumerInfo AddOrUpdateConsumer(string streamName, ConsumerConfiguration config); + /// + /// Creates a consumer. Must not already exist. + /// + /// The name of the stream the consumer is attached to. + /// The consumer configuration to use. + /// + ConsumerInfo CreateConsumer(string streamName, ConsumerConfiguration config); + + /// + /// Updates an existing consumer. Must already exist. + /// + /// The name of the stream the consumer is attached to. + /// The consumer configuration to use. + /// + ConsumerInfo UpdateConsumer(string streamName, ConsumerConfiguration config); + /// /// Deletes a consumer. /// diff --git a/src/NATS.Client/JetStream/JetStream.cs b/src/NATS.Client/JetStream/JetStream.cs index 5c5c68272..9eacba51b 100644 --- a/src/NATS.Client/JetStream/JetStream.cs +++ b/src/NATS.Client/JetStream/JetStream.cs @@ -495,7 +495,7 @@ internal Subscription CreateSubscription(string userSubscribeSubject, { try { - ConsumerInfo ci = CreateConsumerInternal(settledStream, settledCC); + ConsumerInfo ci = CreateConsumerInternal(settledStream, settledCC, ConsumerCreateRequestAction.CreateOrUpdate); if (sub is JetStreamAbstractSyncSubscription syncSub) { syncSub.SetConsumerName(ci.Name); diff --git a/src/NATS.Client/JetStream/JetStreamBase.cs b/src/NATS.Client/JetStream/JetStreamBase.cs index 50a6d9f35..4e1dc5f29 100644 --- a/src/NATS.Client/JetStream/JetStreamBase.cs +++ b/src/NATS.Client/JetStream/JetStreamBase.cs @@ -100,7 +100,7 @@ internal ConsumerInfo GetConsumerInfoInternal(string streamName, string consumer return new ConsumerInfo(m, true); } - internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfiguration config) + internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfiguration config, ConsumerCreateRequestAction ccrAction) { // ConsumerConfiguration validates that name and durable are the same if both are supplied. string consumerName = Validator.EmptyAsNull(config.Name); @@ -145,7 +145,7 @@ internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfigur subj = string.Format(JetStreamConstants.JsapiDurableCreate, streamName, durable); } - var ccr = new ConsumerCreateRequest(streamName, config); + var ccr = new ConsumerCreateRequest(streamName, config, ccrAction); var m = RequestResponseRequired(subj, ccr.Serialize(), Timeout); return new ConsumerInfo(m, true); } diff --git a/src/NATS.Client/JetStream/JetStreamManagement.cs b/src/NATS.Client/JetStream/JetStreamManagement.cs index 8fe742061..f49ceba22 100644 --- a/src/NATS.Client/JetStream/JetStreamManagement.cs +++ b/src/NATS.Client/JetStream/JetStreamManagement.cs @@ -89,7 +89,21 @@ public ConsumerInfo AddOrUpdateConsumer(string streamName, ConsumerConfiguration { Validator.ValidateStreamName(streamName, true); Validator.ValidateNotNull(config, nameof(config)); - return CreateConsumerInternal(streamName, config); + return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.CreateOrUpdate); + } + + public ConsumerInfo CreateConsumer(string streamName, ConsumerConfiguration config) + { + Validator.ValidateStreamName(streamName, true); + Validator.ValidateNotNull(config, nameof(config)); + return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.Create); + } + + public ConsumerInfo UpdateConsumer(string streamName, ConsumerConfiguration config) + { + Validator.ValidateStreamName(streamName, true); + Validator.ValidateNotNull(config, nameof(config)); + return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.Update); } public bool DeleteConsumer(string streamName, string consumer) diff --git a/src/NATS.Client/JetStream/OrderedMessageManager.cs b/src/NATS.Client/JetStream/OrderedMessageManager.cs index 863aec735..20dcec481 100644 --- a/src/NATS.Client/JetStream/OrderedMessageManager.cs +++ b/src/NATS.Client/JetStream/OrderedMessageManager.cs @@ -92,7 +92,7 @@ private void HandleErrorCondition() // 3. make a new consumer using the same deliver subject but // with a new starting point ConsumerConfiguration userCc = Js.ConsumerConfigurationForOrdered(OriginalCc, LastStreamSeq, newDeliverSubject, actualConsumerName, null); - ConsumerInfo ci = Js.CreateConsumerInternal(Stream, userCc); + ConsumerInfo ci = Js.CreateConsumerInternal(Stream, userCc, ConsumerCreateRequestAction.Create); if (Sub is JetStreamAbstractSyncSubscription syncSub) { syncSub.SetConsumerName(ci.Name); diff --git a/src/Tests/IntegrationTests/TestJetStreamManagement.cs b/src/Tests/IntegrationTests/TestJetStreamManagement.cs index 4ce871112..9fe14de03 100644 --- a/src/Tests/IntegrationTests/TestJetStreamManagement.cs +++ b/src/Tests/IntegrationTests/TestJetStreamManagement.cs @@ -1195,5 +1195,132 @@ public void TestPauseConsumer() Assert.Throws(() => jsm.ResumeConsumer(stream, Name())); }); } + + [Fact] + public void TestCreateConsumerUpdateConsumer() + { + Context.RunInJsServer(AtLeast2_9_0, c => + { + string streamPrefix = Variant(); + IJetStreamManagement jsmNew = c.CreateJetStreamManagementContext(); + IJetStreamManagement jsmPre290 = c.CreateJetStreamManagementContext( + JetStreamOptions.Builder().WithOptOut290ConsumerCreate(true).Build()); + + // -------------------------------------------------------- + // New without filter + // -------------------------------------------------------- + string stream1 = streamPrefix + "-new"; + string name = Name(); + string subject = Name(); + CreateMemoryStream(jsmNew, stream1, subject + ".*"); + + ConsumerConfiguration cc11 = ConsumerConfiguration.Builder().WithName(name).Build(); + + // update no good when not exist + NATSJetStreamException e = Assert.Throws(() => jsmNew.UpdateConsumer(stream1, cc11)); + Assert.Equal(10149, e.ApiErrorCode); + + // initial create ok + ConsumerInfo ci = jsmNew.CreateConsumer(stream1, cc11); + Assert.Equal(name, ci.Name); + Assert.Null(ci.ConsumerConfiguration.FilterSubject); + + // any other create no good + e = Assert.Throws(() => jsmNew.CreateConsumer(stream1, cc11)); + Assert.Equal(10148, e.ApiErrorCode); + + // update ok when exists + ConsumerConfiguration cc12 = ConsumerConfiguration.Builder().WithName(name).WithDescription(Variant()).Build(); + ci = jsmNew.UpdateConsumer(stream1, cc12); + Assert.Equal(name, ci.Name); + Assert.Null(ci.ConsumerConfiguration.FilterSubject); + + // -------------------------------------------------------- + // New with filter subject + // -------------------------------------------------------- + String stream2 = streamPrefix + "-new-fs"; + name = Name(); + subject = Name(); + String fs1 = subject + ".A"; + String fs2 = subject + ".B"; + CreateMemoryStream(jsmNew, stream2, subject + ".*"); + + ConsumerConfiguration cc21 = ConsumerConfiguration.Builder().WithName(name).WithFilterSubject(fs1).Build(); + + // update no good when not exist + e = Assert.Throws(() => jsmNew.UpdateConsumer(stream2, cc21)); + Assert.Equal(10149, e.ApiErrorCode); + + // initial create ok + ci = jsmNew.CreateConsumer(stream2, cc21); + Assert.Equal(name, ci.Name); + Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject); + + // any other create no good + e = Assert.Throws(() => jsmNew.CreateConsumer(stream2, cc21)); + Assert.Equal(10148, e.ApiErrorCode); + + // update ok when exists + ConsumerConfiguration cc22 = ConsumerConfiguration.Builder().WithName(name).WithFilterSubjects(fs2).Build(); + ci = jsmNew.UpdateConsumer(stream2, cc22); + Assert.Equal(name, ci.Name); + Assert.Equal(fs2, ci.ConsumerConfiguration.FilterSubject); + + // -------------------------------------------------------- + // Pre 290 durable pathway + // -------------------------------------------------------- + String stream3 = streamPrefix + "-old-durable"; + name = Name(); + subject = Name(); + fs1 = subject + ".A"; + fs2 = subject + ".B"; + String fs3 = subject + ".C"; + CreateMemoryStream(jsmPre290, stream3, subject + ".*"); + + ConsumerConfiguration cc31 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubject(fs1).Build(); + + // update no good when not exist + e = Assert.Throws(() => jsmPre290.UpdateConsumer(stream3, cc31)); + Assert.Equal(10149, e.ApiErrorCode); + + // initial create ok + ci = jsmPre290.CreateConsumer(stream3, cc31); + Assert.Equal(name, ci.Name); + Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject); + + // opt out of 209, create on existing ok + // This is not exactly the same behavior as with the new consumer create api, but it's what the server does + jsmPre290.CreateConsumer(stream3, cc31); + + ConsumerConfiguration cc32 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubject(fs2).Build(); + e = Assert.Throws(() => jsmPre290.CreateConsumer(stream3, cc32)); + Assert.Equal(10148, e.ApiErrorCode); + + // update ok when exists + ConsumerConfiguration cc33 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubjects(fs3).Build(); + ci = jsmPre290.UpdateConsumer(stream3, cc33); + Assert.Equal(name, ci.Name); + Assert.Equal(fs3, ci.ConsumerConfiguration.FilterSubject); + + // -------------------------------------------------------- + // Pre 290 ephemeral pathway + // -------------------------------------------------------- + subject = Name(); + + String stream4 = streamPrefix + "-old-ephemeral"; + fs1 = subject + ".A"; + CreateMemoryStream(jsmPre290, stream4, subject + ".*"); + + ConsumerConfiguration cc4 = ConsumerConfiguration.Builder().WithFilterSubject(fs1).Build(); + + // update no good when not exist + e = Assert.Throws(() => jsmPre290.UpdateConsumer(stream4, cc4)); + Assert.Equal(10149, e.ApiErrorCode); + + // initial create ok + ci = jsmPre290.CreateConsumer(stream4, cc4); + Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject); + }); + } } } diff --git a/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs b/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs index 511ccd5a9..aa3d39823 100644 --- a/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs +++ b/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs @@ -60,7 +60,7 @@ public void BuilderWorks() AssertAsBuilt(c, dt); - ConsumerCreateRequest ccr = new ConsumerCreateRequest("stream", c); + ConsumerCreateRequest ccr = new ConsumerCreateRequest("stream", c, ConsumerCreateRequestAction.CreateOrUpdate); Assert.Equal("stream", ccr.StreamName); Assert.NotNull(ccr.Config);