diff --git a/src/NATS.Client/JetStream/ApiConstants.cs b/src/NATS.Client/JetStream/ApiConstants.cs
index 41eff8e7d..7a7a1758c 100644
--- a/src/NATS.Client/JetStream/ApiConstants.cs
+++ b/src/NATS.Client/JetStream/ApiConstants.cs
@@ -18,6 +18,7 @@ public static class ApiConstants
public const string AckFloor = "ack_floor";
public const string AckPolicy = "ack_policy";
public const string AckWait = "ack_wait";
+ public const string Action = "action";
public const string Active = "active";
public const string AllowRollupHdrs = "allow_rollup_hdrs";
public const string AllowDirect = "allow_direct";
diff --git a/src/NATS.Client/JetStream/ApiEnums.cs b/src/NATS.Client/JetStream/ApiEnums.cs
index 343d831e0..c65d169f5 100644
--- a/src/NATS.Client/JetStream/ApiEnums.cs
+++ b/src/NATS.Client/JetStream/ApiEnums.cs
@@ -27,6 +27,8 @@ public enum StorageType { File, Memory }
public enum CompressionOption { None, S2 }
+ public enum ConsumerCreateRequestAction { Create, Update, CreateOrUpdate }
+
public static class ApiEnums
{
public static string GetString(this AckPolicy ackPolicy)
@@ -105,6 +107,17 @@ public static string GetString(this CompressionOption compressionOption)
return null;
}
+ public static string GetString(this ConsumerCreateRequestAction action)
+ {
+ switch (action)
+ {
+ case ConsumerCreateRequestAction.Create: return "create";
+ case ConsumerCreateRequestAction.Update: return "update";
+ case ConsumerCreateRequestAction.CreateOrUpdate: return null;
+ }
+ return null;
+ }
+
public static AckPolicy? GetAckPolicy(string value)
{
if (value != null)
diff --git a/src/NATS.Client/JetStream/ConsumerCreateRequest.cs b/src/NATS.Client/JetStream/ConsumerCreateRequest.cs
index 7565d3595..d0d716e44 100644
--- a/src/NATS.Client/JetStream/ConsumerCreateRequest.cs
+++ b/src/NATS.Client/JetStream/ConsumerCreateRequest.cs
@@ -1,4 +1,4 @@
-// Copyright 2021 The NATS Authors
+// Copyright 2021-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
@@ -11,28 +11,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;
namespace NATS.Client.JetStream
{
+
public sealed class ConsumerCreateRequest : JsonSerializable
{
public string StreamName { get; }
public ConsumerConfiguration Config { get; }
+ public ConsumerCreateRequestAction Action { get; }
- internal ConsumerCreateRequest(string streamName, ConsumerConfiguration config)
+ internal ConsumerCreateRequest(string streamName, ConsumerConfiguration config, ConsumerCreateRequestAction action)
{
StreamName = streamName;
Config = config;
+ Action = action;
}
public override JSONNode ToJsonNode()
{
- return new JSONObject
- {
- [ApiConstants.StreamName] = StreamName,
- [ApiConstants.Config] = Config.ToJsonNode()
- };
+ JSONObject o = new JSONObject();
+ JsonUtils.AddField(o, ApiConstants.StreamName, StreamName);
+ JsonUtils.AddField(o, ApiConstants.Config, Config);
+ JsonUtils.AddField(o, ApiConstants.Action, Action.GetString());
+ return o;
}
}
}
diff --git a/src/NATS.Client/JetStream/IJetStreamManagement.cs b/src/NATS.Client/JetStream/IJetStreamManagement.cs
index 754a61521..da0049904 100644
--- a/src/NATS.Client/JetStream/IJetStreamManagement.cs
+++ b/src/NATS.Client/JetStream/IJetStreamManagement.cs
@@ -90,6 +90,22 @@ public interface IJetStreamManagement
///
ConsumerInfo AddOrUpdateConsumer(string streamName, ConsumerConfiguration config);
+ ///
+ /// Creates a consumer. Must not already exist.
+ ///
+ /// The name of the stream the consumer is attached to.
+ /// The consumer configuration to use.
+ ///
+ ConsumerInfo CreateConsumer(string streamName, ConsumerConfiguration config);
+
+ ///
+ /// Updates an existing consumer. Must already exist.
+ ///
+ /// The name of the stream the consumer is attached to.
+ /// The consumer configuration to use.
+ ///
+ ConsumerInfo UpdateConsumer(string streamName, ConsumerConfiguration config);
+
///
/// Deletes a consumer.
///
diff --git a/src/NATS.Client/JetStream/JetStream.cs b/src/NATS.Client/JetStream/JetStream.cs
index 5c5c68272..9eacba51b 100644
--- a/src/NATS.Client/JetStream/JetStream.cs
+++ b/src/NATS.Client/JetStream/JetStream.cs
@@ -495,7 +495,7 @@ internal Subscription CreateSubscription(string userSubscribeSubject,
{
try
{
- ConsumerInfo ci = CreateConsumerInternal(settledStream, settledCC);
+ ConsumerInfo ci = CreateConsumerInternal(settledStream, settledCC, ConsumerCreateRequestAction.CreateOrUpdate);
if (sub is JetStreamAbstractSyncSubscription syncSub)
{
syncSub.SetConsumerName(ci.Name);
diff --git a/src/NATS.Client/JetStream/JetStreamBase.cs b/src/NATS.Client/JetStream/JetStreamBase.cs
index 50a6d9f35..4e1dc5f29 100644
--- a/src/NATS.Client/JetStream/JetStreamBase.cs
+++ b/src/NATS.Client/JetStream/JetStreamBase.cs
@@ -100,7 +100,7 @@ internal ConsumerInfo GetConsumerInfoInternal(string streamName, string consumer
return new ConsumerInfo(m, true);
}
- internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfiguration config)
+ internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfiguration config, ConsumerCreateRequestAction ccrAction)
{
// ConsumerConfiguration validates that name and durable are the same if both are supplied.
string consumerName = Validator.EmptyAsNull(config.Name);
@@ -145,7 +145,7 @@ internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfigur
subj = string.Format(JetStreamConstants.JsapiDurableCreate, streamName, durable);
}
- var ccr = new ConsumerCreateRequest(streamName, config);
+ var ccr = new ConsumerCreateRequest(streamName, config, ccrAction);
var m = RequestResponseRequired(subj, ccr.Serialize(), Timeout);
return new ConsumerInfo(m, true);
}
diff --git a/src/NATS.Client/JetStream/JetStreamManagement.cs b/src/NATS.Client/JetStream/JetStreamManagement.cs
index 8fe742061..f49ceba22 100644
--- a/src/NATS.Client/JetStream/JetStreamManagement.cs
+++ b/src/NATS.Client/JetStream/JetStreamManagement.cs
@@ -89,7 +89,21 @@ public ConsumerInfo AddOrUpdateConsumer(string streamName, ConsumerConfiguration
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(config, nameof(config));
- return CreateConsumerInternal(streamName, config);
+ return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.CreateOrUpdate);
+ }
+
+ public ConsumerInfo CreateConsumer(string streamName, ConsumerConfiguration config)
+ {
+ Validator.ValidateStreamName(streamName, true);
+ Validator.ValidateNotNull(config, nameof(config));
+ return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.Create);
+ }
+
+ public ConsumerInfo UpdateConsumer(string streamName, ConsumerConfiguration config)
+ {
+ Validator.ValidateStreamName(streamName, true);
+ Validator.ValidateNotNull(config, nameof(config));
+ return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.Update);
}
public bool DeleteConsumer(string streamName, string consumer)
diff --git a/src/NATS.Client/JetStream/OrderedMessageManager.cs b/src/NATS.Client/JetStream/OrderedMessageManager.cs
index 863aec735..20dcec481 100644
--- a/src/NATS.Client/JetStream/OrderedMessageManager.cs
+++ b/src/NATS.Client/JetStream/OrderedMessageManager.cs
@@ -92,7 +92,7 @@ private void HandleErrorCondition()
// 3. make a new consumer using the same deliver subject but
// with a new starting point
ConsumerConfiguration userCc = Js.ConsumerConfigurationForOrdered(OriginalCc, LastStreamSeq, newDeliverSubject, actualConsumerName, null);
- ConsumerInfo ci = Js.CreateConsumerInternal(Stream, userCc);
+ ConsumerInfo ci = Js.CreateConsumerInternal(Stream, userCc, ConsumerCreateRequestAction.Create);
if (Sub is JetStreamAbstractSyncSubscription syncSub)
{
syncSub.SetConsumerName(ci.Name);
diff --git a/src/Tests/IntegrationTests/TestJetStreamManagement.cs b/src/Tests/IntegrationTests/TestJetStreamManagement.cs
index 4ce871112..9fe14de03 100644
--- a/src/Tests/IntegrationTests/TestJetStreamManagement.cs
+++ b/src/Tests/IntegrationTests/TestJetStreamManagement.cs
@@ -1195,5 +1195,132 @@ public void TestPauseConsumer()
Assert.Throws(() => jsm.ResumeConsumer(stream, Name()));
});
}
+
+ [Fact]
+ public void TestCreateConsumerUpdateConsumer()
+ {
+ Context.RunInJsServer(AtLeast2_9_0, c =>
+ {
+ string streamPrefix = Variant();
+ IJetStreamManagement jsmNew = c.CreateJetStreamManagementContext();
+ IJetStreamManagement jsmPre290 = c.CreateJetStreamManagementContext(
+ JetStreamOptions.Builder().WithOptOut290ConsumerCreate(true).Build());
+
+ // --------------------------------------------------------
+ // New without filter
+ // --------------------------------------------------------
+ string stream1 = streamPrefix + "-new";
+ string name = Name();
+ string subject = Name();
+ CreateMemoryStream(jsmNew, stream1, subject + ".*");
+
+ ConsumerConfiguration cc11 = ConsumerConfiguration.Builder().WithName(name).Build();
+
+ // update no good when not exist
+ NATSJetStreamException e = Assert.Throws(() => jsmNew.UpdateConsumer(stream1, cc11));
+ Assert.Equal(10149, e.ApiErrorCode);
+
+ // initial create ok
+ ConsumerInfo ci = jsmNew.CreateConsumer(stream1, cc11);
+ Assert.Equal(name, ci.Name);
+ Assert.Null(ci.ConsumerConfiguration.FilterSubject);
+
+ // any other create no good
+ e = Assert.Throws(() => jsmNew.CreateConsumer(stream1, cc11));
+ Assert.Equal(10148, e.ApiErrorCode);
+
+ // update ok when exists
+ ConsumerConfiguration cc12 = ConsumerConfiguration.Builder().WithName(name).WithDescription(Variant()).Build();
+ ci = jsmNew.UpdateConsumer(stream1, cc12);
+ Assert.Equal(name, ci.Name);
+ Assert.Null(ci.ConsumerConfiguration.FilterSubject);
+
+ // --------------------------------------------------------
+ // New with filter subject
+ // --------------------------------------------------------
+ String stream2 = streamPrefix + "-new-fs";
+ name = Name();
+ subject = Name();
+ String fs1 = subject + ".A";
+ String fs2 = subject + ".B";
+ CreateMemoryStream(jsmNew, stream2, subject + ".*");
+
+ ConsumerConfiguration cc21 = ConsumerConfiguration.Builder().WithName(name).WithFilterSubject(fs1).Build();
+
+ // update no good when not exist
+ e = Assert.Throws(() => jsmNew.UpdateConsumer(stream2, cc21));
+ Assert.Equal(10149, e.ApiErrorCode);
+
+ // initial create ok
+ ci = jsmNew.CreateConsumer(stream2, cc21);
+ Assert.Equal(name, ci.Name);
+ Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject);
+
+ // any other create no good
+ e = Assert.Throws(() => jsmNew.CreateConsumer(stream2, cc21));
+ Assert.Equal(10148, e.ApiErrorCode);
+
+ // update ok when exists
+ ConsumerConfiguration cc22 = ConsumerConfiguration.Builder().WithName(name).WithFilterSubjects(fs2).Build();
+ ci = jsmNew.UpdateConsumer(stream2, cc22);
+ Assert.Equal(name, ci.Name);
+ Assert.Equal(fs2, ci.ConsumerConfiguration.FilterSubject);
+
+ // --------------------------------------------------------
+ // Pre 290 durable pathway
+ // --------------------------------------------------------
+ String stream3 = streamPrefix + "-old-durable";
+ name = Name();
+ subject = Name();
+ fs1 = subject + ".A";
+ fs2 = subject + ".B";
+ String fs3 = subject + ".C";
+ CreateMemoryStream(jsmPre290, stream3, subject + ".*");
+
+ ConsumerConfiguration cc31 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubject(fs1).Build();
+
+ // update no good when not exist
+ e = Assert.Throws(() => jsmPre290.UpdateConsumer(stream3, cc31));
+ Assert.Equal(10149, e.ApiErrorCode);
+
+ // initial create ok
+ ci = jsmPre290.CreateConsumer(stream3, cc31);
+ Assert.Equal(name, ci.Name);
+ Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject);
+
+ // opt out of 209, create on existing ok
+ // This is not exactly the same behavior as with the new consumer create api, but it's what the server does
+ jsmPre290.CreateConsumer(stream3, cc31);
+
+ ConsumerConfiguration cc32 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubject(fs2).Build();
+ e = Assert.Throws(() => jsmPre290.CreateConsumer(stream3, cc32));
+ Assert.Equal(10148, e.ApiErrorCode);
+
+ // update ok when exists
+ ConsumerConfiguration cc33 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubjects(fs3).Build();
+ ci = jsmPre290.UpdateConsumer(stream3, cc33);
+ Assert.Equal(name, ci.Name);
+ Assert.Equal(fs3, ci.ConsumerConfiguration.FilterSubject);
+
+ // --------------------------------------------------------
+ // Pre 290 ephemeral pathway
+ // --------------------------------------------------------
+ subject = Name();
+
+ String stream4 = streamPrefix + "-old-ephemeral";
+ fs1 = subject + ".A";
+ CreateMemoryStream(jsmPre290, stream4, subject + ".*");
+
+ ConsumerConfiguration cc4 = ConsumerConfiguration.Builder().WithFilterSubject(fs1).Build();
+
+ // update no good when not exist
+ e = Assert.Throws(() => jsmPre290.UpdateConsumer(stream4, cc4));
+ Assert.Equal(10149, e.ApiErrorCode);
+
+ // initial create ok
+ ci = jsmPre290.CreateConsumer(stream4, cc4);
+ Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject);
+ });
+ }
}
}
diff --git a/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs b/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs
index 511ccd5a9..aa3d39823 100644
--- a/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs
+++ b/src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs
@@ -60,7 +60,7 @@ public void BuilderWorks()
AssertAsBuilt(c, dt);
- ConsumerCreateRequest ccr = new ConsumerCreateRequest("stream", c);
+ ConsumerCreateRequest ccr = new ConsumerCreateRequest("stream", c, ConsumerCreateRequestAction.CreateOrUpdate);
Assert.Equal("stream", ccr.StreamName);
Assert.NotNull(ccr.Config);