diff --git a/.travis.yml b/.travis.yml
index d30a5cef1..6ab9ae1ae 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -63,6 +63,7 @@ jobs:
packages:
- g++-9
env:
+ - NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_WITH_TLS=OFF -DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no"
@@ -89,6 +90,7 @@ jobs:
packages:
- g++-9
env:
+ - NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"
@@ -102,6 +104,7 @@ jobs:
packages:
- g++-9
env:
+ - NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- NATS_DEFAULT_TO_LIB_MSG_DELIVERY=yes BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"
@@ -115,6 +118,7 @@ jobs:
packages:
- g++-9
env:
+ - NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- NATS_DEFAULT_LIB_WRITE_DEADLINE=2000 BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"
@@ -153,6 +157,7 @@ jobs:
packages:
- clang-8
env:
+ - NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=clang-8"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"
@@ -165,6 +170,7 @@ jobs:
packages:
- clang-8
env:
+ - NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=clang-8"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=thread" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"
diff --git a/CMakeLists.txt b/CMakeLists.txt
index eb54d4d86..9678f6803 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -240,7 +240,7 @@ set(NATS_VERSION_MINOR 9)
set(NATS_VERSION_PATCH 0)
set(NATS_VERSION_SUFFIX "-beta")
-set(NATS_VERSION_REQUIRED_NUMBER 0x030800) # Consider updating before next (v3.9.0) release!!!
+set(NATS_VERSION_REQUIRED_NUMBER 0x030900)
if(NATS_UPDATE_VERSION OR NATS_UPDATE_DOC)
configure_file(
diff --git a/src/js.h b/src/js.h
index d98fb68f4..bc5ec2abe 100644
--- a/src/js.h
+++ b/src/js.h
@@ -175,6 +175,9 @@ extern const int64_t jsDefaultRequestWait;
// jsApiConsumerNamesT is the endpoint to get the list of consumer names for a stream.
#define jsApiConsumerNamesT "%.*s.CONSUMER.NAMES.%s"
+// jsApiConsumerPauseT is the endpoint to pause a consumer.
+#define jsApiConsumerPauseT "%.*s.CONSUMER.PAUSE.%s.%s"
+
// Creates a subject based on the option's prefix, the subject format and its values.
#define js_apiSubj(s, o, f, ...) (nats_asprintf((s), (f), (o)->Prefix, __VA_ARGS__) < 0 ? NATS_NO_MEMORY : NATS_OK)
diff --git a/src/jsm.c b/src/jsm.c
index fed19665e..7d8587ce0 100644
--- a/src/jsm.c
+++ b/src/jsm.c
@@ -33,7 +33,7 @@ typedef struct apiPaged
} apiPaged;
static natsStatus
-_marshalTimeUTC(natsBuffer *buf, const char *fieldName, int64_t timeUTC)
+_marshalTimeUTC(natsBuffer *buf, bool sep, const char *fieldName, int64_t timeUTC)
{
natsStatus s = NATS_OK;
char dbuf[36] = {'\0'};
@@ -42,7 +42,10 @@ _marshalTimeUTC(natsBuffer *buf, const char *fieldName, int64_t timeUTC)
if (s != NATS_OK)
return nats_setError(NATS_ERR, "unable to encode data for field '%s' value %" PRId64, fieldName, timeUTC);
- s = natsBuf_Append(buf, ",\"", -1);
+ if (sep)
+ s = natsBuf_AppendByte(buf, ',');
+
+ IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, fieldName, -1));
IFOK(s, natsBuf_Append(buf, "\":\"", -1));
IFOK(s, natsBuf_Append(buf, dbuf, -1));
@@ -353,7 +356,7 @@ _marshalStreamSource(jsStreamSource *source, const char *fieldName, natsBuffer *
if ((s == NATS_OK) && (source->OptStartSeq > 0))
s = nats_marshalLong(buf, true, "opt_start_seq", source->OptStartSeq);
if ((s == NATS_OK) && (source->OptStartTime > 0))
- IFOK(s, _marshalTimeUTC(buf, "opt_start_time", source->OptStartTime));
+ IFOK(s, _marshalTimeUTC(buf, true, "opt_start_time", source->OptStartTime));
if (source->FilterSubject != NULL)
{
IFOK(s, natsBuf_Append(buf, ",\"filter_subject\":\"", -1));
@@ -1439,7 +1442,6 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
&& (cfg->ConsumerLimits.MaxAckPending != (*new_si)->Config->ConsumerLimits.MaxAckPending)
)
{
- // <>/<> wrong error
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamConfigRequired);
}
@@ -2918,7 +2920,7 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
if ((s == NATS_OK) && (cfg->OptStartSeq > 0))
s = nats_marshalLong(buf, true, "opt_start_seq", cfg->OptStartSeq);
if ((s == NATS_OK) && (cfg->OptStartTime > 0))
- s = _marshalTimeUTC(buf, "opt_start_time", cfg->OptStartTime);
+ s = _marshalTimeUTC(buf, true, "opt_start_time", cfg->OptStartTime);
IFOK(s, _marshalAckPolicy(buf, cfg->AckPolicy));
if ((s == NATS_OK) && (cfg->AckWait > 0))
s = nats_marshalLong(buf, true, "ack_wait", cfg->AckWait);
@@ -2947,6 +2949,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
IFOK(s, natsBuf_AppendByte(buf, ']'));
}
IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
+ if ((s == NATS_OK) && (cfg->PauseUntil > 0))
+ s = _marshalTimeUTC(buf, true, "pause_until", cfg->PauseUntil);
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
@@ -3193,6 +3197,8 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci)
IFOK(s, nats_JSONGetULong(json, "num_pending", &(ci->NumPending)));
IFOK(s, _unmarshalClusterInfo(json, "cluster", &(ci->Cluster)));
IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound)));
+ IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused)));
+ IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining)));
if (s == NATS_OK)
*new_ci = ci;
@@ -3446,6 +3452,150 @@ js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer,
return NATS_UPDATE_ERR_STACK(s);
}
+natsStatus
+js_unmarshalConsumerPauseResp(nats_JSON *json, jsConsumerPauseResponse **new_cpr)
+{
+ natsStatus s = NATS_OK;
+ jsConsumerPauseResponse *cpr = NULL;
+
+ cpr = (jsConsumerPauseResponse *)NATS_CALLOC(1, sizeof(jsConsumerPauseResponse));
+ if (cpr == NULL)
+ return nats_setDefaultError(NATS_NO_MEMORY);
+
+ s = nats_JSONGetBool(json, "paused", &(cpr->Paused));
+ IFOK(s, nats_JSONGetTime(json, "pause_until", &(cpr->PauseUntil)));
+ IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(cpr->PauseRemaining)));
+
+ if (s == NATS_OK)
+ *new_cpr = cpr;
+ else
+ jsConsumerPauseResponse_Destroy(cpr);
+
+ return NATS_UPDATE_ERR_STACK(s);
+}
+
+static natsStatus
+_unmarshalConsumerPauseResp(jsConsumerPauseResponse **new_cpr, natsMsg *resp, jsErrCode *errCode)
+{
+ nats_JSON *json = NULL;
+ jsApiResponse ar;
+ natsStatus s;
+
+ s = js_unmarshalResponse(&ar, &json, resp);
+ if (s != NATS_OK)
+ return NATS_UPDATE_ERR_STACK(s);
+
+ if (js_apiResponseIsErr(&ar))
+ {
+ if (errCode != NULL)
+ *errCode = (int)ar.Error.ErrCode;
+
+ if (ar.Error.ErrCode == JSConsumerNotFoundErr)
+ s = NATS_NOT_FOUND;
+ else
+ s = nats_setError(NATS_ERR, "%s", ar.Error.Description);
+ }
+ else if (new_cpr != NULL)
+ {
+ // At this point we need to unmarshal the consumer info itself.
+ s = js_unmarshalConsumerPauseResp(json, new_cpr);
+ }
+
+ js_freeApiRespContent(&ar);
+ nats_JSONDestroy(json);
+
+ return NATS_UPDATE_ERR_STACK(s);
+}
+
+static natsStatus
+_marshalConsumerPauseReq(natsBuffer **new_buf, uint64_t pauseUntil)
+{
+ natsStatus s = NATS_OK;
+ natsBuffer *buf = NULL;
+
+ s = natsBuf_Create(&buf, 256);
+ IFOK(s, natsBuf_AppendByte(buf, '{'));
+ if ((s == NATS_OK) && (pauseUntil > 0)) {
+ s = _marshalTimeUTC(buf, false, "pause_until", pauseUntil);
+ }
+ IFOK(s, natsBuf_AppendByte(buf, '}'));
+
+ if (s == NATS_OK)
+ *new_buf = buf;
+ else
+ natsBuf_Destroy(buf);
+
+ return NATS_UPDATE_ERR_STACK(s);
+}
+
+void
+jsConsumerPauseResponse_Destroy(jsConsumerPauseResponse *cpr)
+{
+ if (cpr == NULL)
+ return;
+
+ NATS_FREE(cpr);
+}
+
+
+natsStatus
+js_PauseConsumer(jsConsumerPauseResponse **new_cpr, jsCtx *js,
+ const char *stream, const char *consumer,
+ uint64_t pauseUntil, jsOptions *opts, jsErrCode *errCode)
+{
+ natsStatus s = NATS_OK;
+ char *subj = NULL;
+ bool freePfx = false;
+ natsConnection *nc = NULL;
+ natsBuffer *buf = NULL;
+ natsMsg *resp = NULL;
+ jsOptions o;
+
+ if (errCode != NULL)
+ *errCode = 0;
+
+ if ((js == NULL) || (new_cpr == NULL))
+ return nats_setDefaultError(NATS_INVALID_ARG);
+
+ s = _checkStreamName(stream);
+ IFOK(s, js_checkConsName(consumer, false))
+ if (s != NATS_OK)
+ return NATS_UPDATE_ERR_STACK(s);
+
+ s = js_setOpts(&nc, &freePfx, js, opts, &o);
+ if (s == NATS_OK)
+ {
+ if (nats_asprintf(&subj, jsApiConsumerPauseT,
+ js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
+ stream, consumer) < 0)
+ {
+ s = nats_setDefaultError(NATS_NO_MEMORY);
+ }
+ if (freePfx)
+ NATS_FREE((char *)o.Prefix);
+ }
+
+ IFOK(s, _marshalConsumerPauseReq(&buf, pauseUntil));
+
+ // Send the request
+ IFOK_JSR(s, natsConnection_Request(&resp, nc, subj, natsBuf_Data(buf), natsBuf_Len(buf), o.Wait));
+
+ // If we got a response, check for error or return the consumer info result.
+ IFOK(s, _unmarshalConsumerPauseResp(new_cpr, resp, errCode));
+
+ NATS_FREE(subj);
+ natsMsg_Destroy(resp);
+ natsBuf_Destroy(buf);
+
+ if (s == NATS_NOT_FOUND)
+ {
+ nats_clearLastError();
+ return s;
+ }
+
+ return NATS_UPDATE_ERR_STACK(s);
+}
+
natsStatus
jsConsumerConfig_Init(jsConsumerConfig *cc)
{
diff --git a/src/nats.h b/src/nats.h
index 9e0e3e791..12f90946b 100644
--- a/src/nats.h
+++ b/src/nats.h
@@ -852,12 +852,13 @@ typedef struct jsConsumerConfig
// Configuration options introduced in 2.10
- // Multiple filter subjects
- const char **FilterSubjects;
- int FilterSubjectsLen;
+ const char **FilterSubjects; ///< Multiple filter subjects
+ int FilterSubjectsLen;
+ natsMetadata Metadata; ///< User-provided metadata for the consumer, encoded as an array of {"key", "value",...}
- // User-provided metadata for the consumer, encoded as an array of {"key", "value",...}
- natsMetadata Metadata;
+ // Configuration options introduced in 2.11
+
+ int64_t PauseUntil; ///< Suspends the consumer until this deadline, represented as number of nanoseconds since epoch.
} jsConsumerConfig;
/**
@@ -1003,7 +1004,8 @@ typedef struct jsConsumerInfo
uint64_t NumPending;
jsClusterInfo *Cluster;
bool PushBound;
-
+ bool Paused;
+ int64_t PauseRemaining; ///< Remaining time in nanoseconds.
} jsConsumerInfo;
/**
@@ -1034,6 +1036,18 @@ typedef struct jsConsumerNamesList
} jsConsumerNamesList;
+/**
+ * Request to pause the consumer, used to call js_PauseConsumer.
+ *
+ * @see js_PauseConsumer
+ */
+typedef struct jsConsumerPauseResponse
+{
+ bool Paused;
+ int64_t PauseUntil; ///< UTC time expressed as number of nanoseconds since epoch.
+ int64_t PauseRemaining; ///< Remaining time in nanoseconds.
+} jsConsumerPauseResponse;
+
/**
* Reports on API calls to JetStream for this account.
*/
@@ -5945,6 +5959,34 @@ NATS_EXTERN natsStatus
js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer,
jsOptions *opts, jsErrCode *errCode);
+/** \brief Pauses a consumer.
+ *
+ * Pauses the consumer named consumer on stream named stream.
+ *
+ * @param new_cpr if not NULL, will receive the response of the operation.
+ * @param js the pointer to the #jsCtx context.
+ * @param stream the name of the stream.
+ * @param consumer the name of the consumer.
+ * @param pauseUntil the time in nanoseconds since the Unix epoch to pause the consumer until.
+ * @param opts the pointer to the #jsOptions object, possibly `NULL`.
+ * @param errCode the location where to store the JetStream specific error code, or `NULL`
+ * if not needed.
+ */
+
+NATS_EXTERN natsStatus
+js_PauseConsumer(jsConsumerPauseResponse **new_cpr, jsCtx *js,
+ const char *stream, const char *consumer,
+ uint64_t pauseUntil, jsOptions *opts, jsErrCode *errCode);
+
+/** \brief Destroys the PauseConsumer response object.
+ *
+ * Releases memory allocated for this object.
+ *
+ * @param cpr the pointer to the #jsConsumerPauseResponse object.
+ */
+NATS_EXTERN void
+jsConsumerPauseResponse_Destroy(jsConsumerPauseResponse *cpr);
+
/** \brief Destroys the consumer information object.
*
* Releases memory allocated for this consumer information object.
diff --git a/src/version.h b/src/version.h
index 1819ebe0e..e06ea3530 100644
--- a/src/version.h
+++ b/src/version.h
@@ -31,7 +31,7 @@ extern "C" {
(NATS_VERSION_MINOR << 8) | \
NATS_VERSION_PATCH)
-#define NATS_VERSION_REQUIRED_NUMBER 0x030800
+#define NATS_VERSION_REQUIRED_NUMBER 0x030900
#ifdef __cplusplus
}
diff --git a/test/test.c b/test/test.c
index 321127b82..2bad703d2 100644
--- a/test/test.c
+++ b/test/test.c
@@ -24094,7 +24094,7 @@ test_JetStreamMgtConsumers(void)
jsConsumerConfig_Init(&cfg);
cfg.DeliverPolicy = dlvPolicies[i];
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
+ testCond((s == NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();
test("Verify config: ");
@@ -24109,7 +24109,7 @@ test_JetStreamMgtConsumers(void)
jsConsumerConfig_Init(&cfg);
cfg.DeliverPolicy = (jsDeliverPolicy) 100;
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_INVALID_ARG) && (jerr == 0) && (ci == NULL));
+ testCond((s == NATS_INVALID_ARG) && (jerr == 0) && (ci == NULL));
nats_clearLastError();
for (i=0; i<3; i++)
@@ -24118,7 +24118,7 @@ test_JetStreamMgtConsumers(void)
jsConsumerConfig_Init(&cfg);
cfg.AckPolicy = ackPolicies[i];
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
+ testCond((s == NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();
test("Verify config: ");
@@ -24133,7 +24133,7 @@ test_JetStreamMgtConsumers(void)
jsConsumerConfig_Init(&cfg);
cfg.AckPolicy = (jsAckPolicy) 100;
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_INVALID_ARG) && (jerr == 0) && (ci == NULL));
+ testCond((s == NATS_INVALID_ARG) && (jerr == 0) && (ci == NULL));
nats_clearLastError();
for (i=0; i<2; i++)
@@ -24142,7 +24142,7 @@ test_JetStreamMgtConsumers(void)
jsConsumerConfig_Init(&cfg);
cfg.ReplayPolicy = replayPolicies[i];
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
+ testCond((s == NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();
test("Verify config: ");
@@ -24157,7 +24157,7 @@ test_JetStreamMgtConsumers(void)
jsConsumerConfig_Init(&cfg);
cfg.ReplayPolicy = (jsReplayPolicy) 100;
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_INVALID_ARG) && (jerr == 0) && (ci == NULL));
+ testCond((s == NATS_INVALID_ARG) && (jerr == 0) && (ci == NULL));
nats_clearLastError();
test("Add consumer (non durable): ");
@@ -24187,7 +24187,7 @@ test_JetStreamMgtConsumers(void)
// expect this to fail. We are just checking that the config
// is properly serialized.
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
+ testCond((s == NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();
test("Verify config: ");
@@ -24217,7 +24217,7 @@ test_JetStreamMgtConsumers(void)
cfg.FilterSubjects = (const char *[]){"bar1", "bar2"};
cfg.FilterSubjectsLen = 2;
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
+ testCond((s == NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();
test("Verify config: ");
@@ -24229,7 +24229,7 @@ test_JetStreamMgtConsumers(void)
"\"opt_start_seq\":100,"
"\"opt_start_time\":\"2021-06-23T18:22:00.12345Z\",\"ack_policy\":\"explicit\","
"\"ack_wait\":200,\"max_deliver\":300,\"filter_subjects\":[\"bar1\",\"bar2\"],"
- "\"metadata\":{\"key1\":\"val1\",\"key2\":\"val2\"},"\
+ "\"metadata\":{\"key1\":\"val1\",\"key2\":\"val2\"},"
"\"replay_policy\":\"instant\",\"rate_limit_bps\":400,"
"\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600,"
"\"flow_control\":true,\"idle_heartbeat\":700,"
@@ -24251,7 +24251,7 @@ test_JetStreamMgtConsumers(void)
test("Add consumer (durable): ");
cfg.Durable = "dur";
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
+ testCond((s == NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();
test("Verify config: ");
@@ -24286,7 +24286,7 @@ test_JetStreamMgtConsumers(void)
cfg.Durable = NULL;
cfg.Name = "my_name";
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
- testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
+ testCond((s == NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();
test("Verify config: ");
@@ -24325,14 +24325,50 @@ test_JetStreamMgtConsumers(void)
cfg.Name = "my_name";
cfg.DeliverSubject = "mn.foo";
cfg.FilterSubject = "bar.>";
+#define TIME_20350101 (2051251200L * 1000000000L)
+ if (serverVersionAtLeast(2, 11, 0))
+ {
+ cfg.PauseUntil = TIME_20350101;
+ }
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (ci != NULL)
&& (strcmp(ci->Stream, "MY_STREAM") == 0)
&& (strcmp(ci->Name, "my_name") == 0)
- && (strcmp(ci->Config->Name, "my_name") == 0));
+ && (strcmp(ci->Config->Name, "my_name") == 0)
+ && ((cfg.PauseUntil == 0) || (ci->Paused && ci->PauseRemaining > 0)));
jsConsumerInfo_Destroy(ci);
ci = NULL;
+ if (serverVersionAtLeast(2, 11, 0))
+ {
+ test("Pause consumer: ");
+ jsConsumerPauseResponse *cpr = NULL;
+ s = js_PauseConsumer(&cpr, js, "MY_STREAM", "my_name", TIME_20350101, NULL, &jerr);
+ testCond((s == NATS_OK) && (jerr == 0) && (cpr != NULL)
+ && cpr->Paused
+ && (cpr->PauseUntil == TIME_20350101)
+ && (cpr->PauseRemaining > 0));
+ jsConsumerPauseResponse_Destroy(cpr);
+ cpr = NULL;
+
+ test("Verify consumer paused with GetInfo: ");
+ s = js_GetConsumerInfo(&ci, js, "MY_STREAM", "my_name", NULL, &jerr);
+ testCond((s == NATS_OK) && (jerr == 0) && (ci != NULL)
+ && ci->Paused
+ && (ci->PauseRemaining > 0));
+ jsConsumerInfo_Destroy(ci);
+ ci = NULL;
+
+ test("Unpause consumer: ");
+ s = js_PauseConsumer(&cpr, js, "MY_STREAM", "my_name", 0, NULL, &jerr);
+ testCond((s == NATS_OK) && (jerr == 0) && (cpr != NULL)
+ && !cpr->Paused
+ && (cpr->PauseUntil == 0)
+ && (cpr->PauseRemaining == 0));
+ jsConsumerPauseResponse_Destroy(cpr);
+ cpr = NULL;
+ }
+
test("Add consumer (durable): ");
jsConsumerConfig_Init(&cfg);
cfg.Durable = "dur";