Skip to content

Commit

Permalink
Add support for Consumer Create Action (#886)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Apr 9, 2024
1 parent 92c2bc2 commit bad1132
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public static class ApiConstants
public const string AckFloor = "ack_floor";
public const string AckPolicy = "ack_policy";
public const string AckWait = "ack_wait";
public const string Action = "action";
public const string Active = "active";
public const string AllowRollupHdrs = "allow_rollup_hdrs";
public const string AllowDirect = "allow_direct";
Expand Down
13 changes: 13 additions & 0 deletions src/NATS.Client/JetStream/ApiEnums.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public enum StorageType { File, Memory }

public enum CompressionOption { None, S2 }

public enum ConsumerCreateRequestAction { Create, Update, CreateOrUpdate }

public static class ApiEnums
{
public static string GetString(this AckPolicy ackPolicy)
Expand Down Expand Up @@ -105,6 +107,17 @@ public static string GetString(this CompressionOption compressionOption)
return null;
}

public static string GetString(this ConsumerCreateRequestAction action)
{
switch (action)
{
case ConsumerCreateRequestAction.Create: return "create";
case ConsumerCreateRequestAction.Update: return "update";
case ConsumerCreateRequestAction.CreateOrUpdate: return null;
}
return null;
}

public static AckPolicy? GetAckPolicy(string value)
{
if (value != null)
Expand Down
18 changes: 11 additions & 7 deletions src/NATS.Client/JetStream/ConsumerCreateRequest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 The NATS Authors
// Copyright 2021-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
Expand All @@ -11,28 +11,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{

public sealed class ConsumerCreateRequest : JsonSerializable
{
public string StreamName { get; }
public ConsumerConfiguration Config { get; }
public ConsumerCreateRequestAction Action { get; }

internal ConsumerCreateRequest(string streamName, ConsumerConfiguration config)
internal ConsumerCreateRequest(string streamName, ConsumerConfiguration config, ConsumerCreateRequestAction action)
{
StreamName = streamName;
Config = config;
Action = action;
}

public override JSONNode ToJsonNode()
{
return new JSONObject
{
[ApiConstants.StreamName] = StreamName,
[ApiConstants.Config] = Config.ToJsonNode()
};
JSONObject o = new JSONObject();
JsonUtils.AddField(o, ApiConstants.StreamName, StreamName);
JsonUtils.AddField(o, ApiConstants.Config, Config);
JsonUtils.AddField(o, ApiConstants.Action, Action.GetString());
return o;
}
}
}
16 changes: 16 additions & 0 deletions src/NATS.Client/JetStream/IJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ public interface IJetStreamManagement
/// <returns></returns>
ConsumerInfo AddOrUpdateConsumer(string streamName, ConsumerConfiguration config);

/// <summary>
/// Creates a consumer. Must not already exist.
/// </summary>
/// <param name="streamName">The name of the stream the consumer is attached to.</param>
/// <param name="config">The consumer configuration to use.</param>
/// <returns></returns>
ConsumerInfo CreateConsumer(string streamName, ConsumerConfiguration config);

/// <summary>
/// Updates an existing consumer. Must already exist.
/// </summary>
/// <param name="streamName">The name of the stream the consumer is attached to.</param>
/// <param name="config">The consumer configuration to use.</param>
/// <returns></returns>
ConsumerInfo UpdateConsumer(string streamName, ConsumerConfiguration config);

/// <summary>
/// Deletes a consumer.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ internal Subscription CreateSubscription(string userSubscribeSubject,
{
try
{
ConsumerInfo ci = CreateConsumerInternal(settledStream, settledCC);
ConsumerInfo ci = CreateConsumerInternal(settledStream, settledCC, ConsumerCreateRequestAction.CreateOrUpdate);
if (sub is JetStreamAbstractSyncSubscription syncSub)
{
syncSub.SetConsumerName(ci.Name);
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client/JetStream/JetStreamBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ internal ConsumerInfo GetConsumerInfoInternal(string streamName, string consumer
return new ConsumerInfo(m, true);
}

internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfiguration config)
internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfiguration config, ConsumerCreateRequestAction ccrAction)
{
// ConsumerConfiguration validates that name and durable are the same if both are supplied.
string consumerName = Validator.EmptyAsNull(config.Name);
Expand Down Expand Up @@ -145,7 +145,7 @@ internal ConsumerInfo CreateConsumerInternal(string streamName, ConsumerConfigur
subj = string.Format(JetStreamConstants.JsapiDurableCreate, streamName, durable);
}

var ccr = new ConsumerCreateRequest(streamName, config);
var ccr = new ConsumerCreateRequest(streamName, config, ccrAction);
var m = RequestResponseRequired(subj, ccr.Serialize(), Timeout);
return new ConsumerInfo(m, true);
}
Expand Down
16 changes: 15 additions & 1 deletion src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,21 @@ public ConsumerInfo AddOrUpdateConsumer(string streamName, ConsumerConfiguration
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(config, nameof(config));
return CreateConsumerInternal(streamName, config);
return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.CreateOrUpdate);
}

public ConsumerInfo CreateConsumer(string streamName, ConsumerConfiguration config)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(config, nameof(config));
return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.Create);
}

public ConsumerInfo UpdateConsumer(string streamName, ConsumerConfiguration config)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(config, nameof(config));
return CreateConsumerInternal(streamName, config, ConsumerCreateRequestAction.Update);
}

public bool DeleteConsumer(string streamName, string consumer)
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/OrderedMessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void HandleErrorCondition()
// 3. make a new consumer using the same deliver subject but
// with a new starting point
ConsumerConfiguration userCc = Js.ConsumerConfigurationForOrdered(OriginalCc, LastStreamSeq, newDeliverSubject, actualConsumerName, null);
ConsumerInfo ci = Js.CreateConsumerInternal(Stream, userCc);
ConsumerInfo ci = Js.CreateConsumerInternal(Stream, userCc, ConsumerCreateRequestAction.Create);
if (Sub is JetStreamAbstractSyncSubscription syncSub)
{
syncSub.SetConsumerName(ci.Name);
Expand Down
127 changes: 127 additions & 0 deletions src/Tests/IntegrationTests/TestJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1195,5 +1195,132 @@ public void TestPauseConsumer()
Assert.Throws<NATSJetStreamException>(() => jsm.ResumeConsumer(stream, Name()));
});
}

[Fact]
public void TestCreateConsumerUpdateConsumer()
{
Context.RunInJsServer(AtLeast2_9_0, c =>
{
string streamPrefix = Variant();
IJetStreamManagement jsmNew = c.CreateJetStreamManagementContext();
IJetStreamManagement jsmPre290 = c.CreateJetStreamManagementContext(
JetStreamOptions.Builder().WithOptOut290ConsumerCreate(true).Build());

// --------------------------------------------------------
// New without filter
// --------------------------------------------------------
string stream1 = streamPrefix + "-new";
string name = Name();
string subject = Name();
CreateMemoryStream(jsmNew, stream1, subject + ".*");

ConsumerConfiguration cc11 = ConsumerConfiguration.Builder().WithName(name).Build();

// update no good when not exist
NATSJetStreamException e = Assert.Throws<NATSJetStreamException>(() => jsmNew.UpdateConsumer(stream1, cc11));
Assert.Equal(10149, e.ApiErrorCode);

// initial create ok
ConsumerInfo ci = jsmNew.CreateConsumer(stream1, cc11);
Assert.Equal(name, ci.Name);
Assert.Null(ci.ConsumerConfiguration.FilterSubject);

// any other create no good
e = Assert.Throws<NATSJetStreamException>(() => jsmNew.CreateConsumer(stream1, cc11));
Assert.Equal(10148, e.ApiErrorCode);

// update ok when exists
ConsumerConfiguration cc12 = ConsumerConfiguration.Builder().WithName(name).WithDescription(Variant()).Build();
ci = jsmNew.UpdateConsumer(stream1, cc12);
Assert.Equal(name, ci.Name);
Assert.Null(ci.ConsumerConfiguration.FilterSubject);

// --------------------------------------------------------
// New with filter subject
// --------------------------------------------------------
String stream2 = streamPrefix + "-new-fs";
name = Name();
subject = Name();
String fs1 = subject + ".A";
String fs2 = subject + ".B";
CreateMemoryStream(jsmNew, stream2, subject + ".*");

ConsumerConfiguration cc21 = ConsumerConfiguration.Builder().WithName(name).WithFilterSubject(fs1).Build();

// update no good when not exist
e = Assert.Throws<NATSJetStreamException>(() => jsmNew.UpdateConsumer(stream2, cc21));
Assert.Equal(10149, e.ApiErrorCode);

// initial create ok
ci = jsmNew.CreateConsumer(stream2, cc21);
Assert.Equal(name, ci.Name);
Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject);

// any other create no good
e = Assert.Throws<NATSJetStreamException>(() => jsmNew.CreateConsumer(stream2, cc21));
Assert.Equal(10148, e.ApiErrorCode);

// update ok when exists
ConsumerConfiguration cc22 = ConsumerConfiguration.Builder().WithName(name).WithFilterSubjects(fs2).Build();
ci = jsmNew.UpdateConsumer(stream2, cc22);
Assert.Equal(name, ci.Name);
Assert.Equal(fs2, ci.ConsumerConfiguration.FilterSubject);

// --------------------------------------------------------
// Pre 290 durable pathway
// --------------------------------------------------------
String stream3 = streamPrefix + "-old-durable";
name = Name();
subject = Name();
fs1 = subject + ".A";
fs2 = subject + ".B";
String fs3 = subject + ".C";
CreateMemoryStream(jsmPre290, stream3, subject + ".*");

ConsumerConfiguration cc31 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubject(fs1).Build();

// update no good when not exist
e = Assert.Throws<NATSJetStreamException>(() => jsmPre290.UpdateConsumer(stream3, cc31));
Assert.Equal(10149, e.ApiErrorCode);

// initial create ok
ci = jsmPre290.CreateConsumer(stream3, cc31);
Assert.Equal(name, ci.Name);
Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject);

// opt out of 209, create on existing ok
// This is not exactly the same behavior as with the new consumer create api, but it's what the server does
jsmPre290.CreateConsumer(stream3, cc31);

ConsumerConfiguration cc32 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubject(fs2).Build();
e = Assert.Throws<NATSJetStreamException>(() => jsmPre290.CreateConsumer(stream3, cc32));
Assert.Equal(10148, e.ApiErrorCode);

// update ok when exists
ConsumerConfiguration cc33 = ConsumerConfiguration.Builder().WithDurable(name).WithFilterSubjects(fs3).Build();
ci = jsmPre290.UpdateConsumer(stream3, cc33);
Assert.Equal(name, ci.Name);
Assert.Equal(fs3, ci.ConsumerConfiguration.FilterSubject);

// --------------------------------------------------------
// Pre 290 ephemeral pathway
// --------------------------------------------------------
subject = Name();

String stream4 = streamPrefix + "-old-ephemeral";
fs1 = subject + ".A";
CreateMemoryStream(jsmPre290, stream4, subject + ".*");

ConsumerConfiguration cc4 = ConsumerConfiguration.Builder().WithFilterSubject(fs1).Build();

// update no good when not exist
e = Assert.Throws<NATSJetStreamException>(() => jsmPre290.UpdateConsumer(stream4, cc4));
Assert.Equal(10149, e.ApiErrorCode);

// initial create ok
ci = jsmPre290.CreateConsumer(stream4, cc4);
Assert.Equal(fs1, ci.ConsumerConfiguration.FilterSubject);
});
}
}
}
2 changes: 1 addition & 1 deletion src/Tests/UnitTests/JetStream/TestConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void BuilderWorks()

AssertAsBuilt(c, dt);

ConsumerCreateRequest ccr = new ConsumerCreateRequest("stream", c);
ConsumerCreateRequest ccr = new ConsumerCreateRequest("stream", c, ConsumerCreateRequestAction.CreateOrUpdate);
Assert.Equal("stream", ccr.StreamName);
Assert.NotNull(ccr.Config);

Expand Down

0 comments on commit bad1132

Please sign in to comment.