Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] js_PauseConsumer support (also changed jsConsumerConfig, jsCo… #726

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, the nats.c.deps contained the executable of the latest release and this is what we would be using. When in dev cycle, I would occasionally update the servers in nats.c.deps to a version that I needed for testing (say in this case here the server version that would contain the pause consumer feature).

Since it looks like we don't really need the nats.c.deps (at least for the NATS Server, we still need for the streaming server and the pbuf library), and since you have a matrix run for the latest server release already, maybe you could change the buildOnTravis.sh script to use main if NATS_TEST_SERVER_VERSION env variable is not set? So you would not have to add it here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking about doing that, and maybe testing with a few older server releases too. Separate PR? Here, I just wanted to make sure that my new tests are running with checks in CI.

- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_WITH_TLS=OFF -DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no"

Expand All @@ -89,6 +90,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand All @@ -102,6 +104,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- NATS_DEFAULT_TO_LIB_MSG_DELIVERY=yes BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand All @@ -115,6 +118,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- NATS_DEFAULT_LIB_WRITE_DEADLINE=2000 BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand Down Expand Up @@ -153,6 +157,7 @@ jobs:
packages:
- clang-8
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=clang-8"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand All @@ -165,6 +170,7 @@ jobs:
packages:
- clang-8
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=clang-8"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=thread" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ set(NATS_VERSION_MINOR 9)
set(NATS_VERSION_PATCH 0)
set(NATS_VERSION_SUFFIX "-beta")

set(NATS_VERSION_REQUIRED_NUMBER 0x030800) # Consider updating before next (v3.9.0) release!!!
set(NATS_VERSION_REQUIRED_NUMBER 0x030900)

if(NATS_UPDATE_VERSION OR NATS_UPDATE_DOC)
configure_file(
Expand Down
3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ extern const int64_t jsDefaultRequestWait;
// jsApiConsumerNamesT is the endpoint to get the list of consumer names for a stream.
#define jsApiConsumerNamesT "%.*s.CONSUMER.NAMES.%s"

// jsApiConsumerPauseT is the endpoint to pause a consumer.
#define jsApiConsumerPauseT "%.*s.CONSUMER.PAUSE.%s.%s"

// Creates a subject based on the option's prefix, the subject format and its values.
#define js_apiSubj(s, o, f, ...) (nats_asprintf((s), (f), (o)->Prefix, __VA_ARGS__) < 0 ? NATS_NO_MEMORY : NATS_OK)

Expand Down
160 changes: 155 additions & 5 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ typedef struct apiPaged
} apiPaged;

static natsStatus
_marshalTimeUTC(natsBuffer *buf, const char *fieldName, int64_t timeUTC)
_marshalTimeUTC(natsBuffer *buf, bool sep, const char *fieldName, int64_t timeUTC)
{
natsStatus s = NATS_OK;
char dbuf[36] = {'\0'};
Expand All @@ -42,7 +42,10 @@ _marshalTimeUTC(natsBuffer *buf, const char *fieldName, int64_t timeUTC)
if (s != NATS_OK)
return nats_setError(NATS_ERR, "unable to encode data for field '%s' value %" PRId64, fieldName, timeUTC);

s = natsBuf_Append(buf, ",\"", -1);
if (sep)
s = natsBuf_AppendByte(buf, ',');

IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, fieldName, -1));
IFOK(s, natsBuf_Append(buf, "\":\"", -1));
IFOK(s, natsBuf_Append(buf, dbuf, -1));
Expand Down Expand Up @@ -353,7 +356,7 @@ _marshalStreamSource(jsStreamSource *source, const char *fieldName, natsBuffer *
if ((s == NATS_OK) && (source->OptStartSeq > 0))
s = nats_marshalLong(buf, true, "opt_start_seq", source->OptStartSeq);
if ((s == NATS_OK) && (source->OptStartTime > 0))
IFOK(s, _marshalTimeUTC(buf, "opt_start_time", source->OptStartTime));
IFOK(s, _marshalTimeUTC(buf, true, "opt_start_time", source->OptStartTime));
if (source->FilterSubject != NULL)
{
IFOK(s, natsBuf_Append(buf, ",\"filter_subject\":\"", -1));
Expand Down Expand Up @@ -1439,7 +1442,6 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
&& (cfg->ConsumerLimits.MaxAckPending != (*new_si)->Config->ConsumerLimits.MaxAckPending)
)
{
// <>/<> wrong error
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamConfigRequired);
}

Expand Down Expand Up @@ -2918,7 +2920,7 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
if ((s == NATS_OK) && (cfg->OptStartSeq > 0))
s = nats_marshalLong(buf, true, "opt_start_seq", cfg->OptStartSeq);
if ((s == NATS_OK) && (cfg->OptStartTime > 0))
s = _marshalTimeUTC(buf, "opt_start_time", cfg->OptStartTime);
s = _marshalTimeUTC(buf, true, "opt_start_time", cfg->OptStartTime);
IFOK(s, _marshalAckPolicy(buf, cfg->AckPolicy));
if ((s == NATS_OK) && (cfg->AckWait > 0))
s = nats_marshalLong(buf, true, "ack_wait", cfg->AckWait);
Expand Down Expand Up @@ -2947,6 +2949,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
IFOK(s, natsBuf_AppendByte(buf, ']'));
}
IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
if ((s == NATS_OK) && (cfg->PauseUntil > 0))
s = _marshalTimeUTC(buf, true, "pause_until", cfg->PauseUntil);
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
Expand Down Expand Up @@ -3193,6 +3197,8 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci)
IFOK(s, nats_JSONGetULong(json, "num_pending", &(ci->NumPending)));
IFOK(s, _unmarshalClusterInfo(json, "cluster", &(ci->Cluster)));
IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound)));
IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining)));

if (s == NATS_OK)
*new_ci = ci;
Expand Down Expand Up @@ -3446,6 +3452,150 @@ js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_unmarshalConsumerPauseResp(nats_JSON *json, jsConsumerPauseResponse **new_cpr)
{
natsStatus s = NATS_OK;
jsConsumerPauseResponse *cpr = NULL;

cpr = (jsConsumerPauseResponse *)NATS_CALLOC(1, sizeof(jsConsumerPauseResponse));
if (cpr == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetBool(json, "paused", &(cpr->Paused));
IFOK(s, nats_JSONGetTime(json, "pause_until", &(cpr->PauseUntil)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(cpr->PauseRemaining)));

if (s == NATS_OK)
*new_cpr = cpr;
else
jsConsumerPauseResponse_Destroy(cpr);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalConsumerPauseResp(jsConsumerPauseResponse **new_cpr, natsMsg *resp, jsErrCode *errCode)
{
nats_JSON *json = NULL;
jsApiResponse ar;
natsStatus s;

s = js_unmarshalResponse(&ar, &json, resp);
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

if (js_apiResponseIsErr(&ar))
{
if (errCode != NULL)
*errCode = (int)ar.Error.ErrCode;

if (ar.Error.ErrCode == JSConsumerNotFoundErr)
s = NATS_NOT_FOUND;
else
s = nats_setError(NATS_ERR, "%s", ar.Error.Description);
}
else if (new_cpr != NULL)
{
// At this point we need to unmarshal the consumer info itself.
s = js_unmarshalConsumerPauseResp(json, new_cpr);
}

js_freeApiRespContent(&ar);
nats_JSONDestroy(json);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalConsumerPauseReq(natsBuffer **new_buf, uint64_t pauseUntil)
{
natsStatus s = NATS_OK;
natsBuffer *buf = NULL;

s = natsBuf_Create(&buf, 256);
IFOK(s, natsBuf_AppendByte(buf, '{'));
if ((s == NATS_OK) && (pauseUntil > 0)) {
s = _marshalTimeUTC(buf, false, "pause_until", pauseUntil);
}
IFOK(s, natsBuf_AppendByte(buf, '}'));

if (s == NATS_OK)
*new_buf = buf;
else
natsBuf_Destroy(buf);

return NATS_UPDATE_ERR_STACK(s);
}

void
jsConsumerPauseResponse_Destroy(jsConsumerPauseResponse *cpr)
{
if (cpr == NULL)
return;

NATS_FREE(cpr);
}


natsStatus
js_PauseConsumer(jsConsumerPauseResponse **new_cpr, jsCtx *js,
levb marked this conversation as resolved.
Show resolved Hide resolved
const char *stream, const char *consumer,
uint64_t pauseUntil, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
char *subj = NULL;
bool freePfx = false;
natsConnection *nc = NULL;
natsBuffer *buf = NULL;
natsMsg *resp = NULL;
jsOptions o;

if (errCode != NULL)
*errCode = 0;

if ((js == NULL) || (new_cpr == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

s = _checkStreamName(stream);
IFOK(s, js_checkConsName(consumer, false))
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

s = js_setOpts(&nc, &freePfx, js, opts, &o);
if (s == NATS_OK)
{
if (nats_asprintf(&subj, jsApiConsumerPauseT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, consumer) < 0)
{
s = nats_setDefaultError(NATS_NO_MEMORY);
}
if (freePfx)
NATS_FREE((char *)o.Prefix);
}

IFOK(s, _marshalConsumerPauseReq(&buf, pauseUntil));

// Send the request
IFOK_JSR(s, natsConnection_Request(&resp, nc, subj, natsBuf_Data(buf), natsBuf_Len(buf), o.Wait));

// If we got a response, check for error or return the consumer info result.
IFOK(s, _unmarshalConsumerPauseResp(new_cpr, resp, errCode));

NATS_FREE(subj);
levb marked this conversation as resolved.
Show resolved Hide resolved
natsMsg_Destroy(resp);
natsBuf_Destroy(buf);

if (s == NATS_NOT_FOUND)
{
nats_clearLastError();
return s;
}

return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
jsConsumerConfig_Init(jsConsumerConfig *cc)
{
Expand Down
54 changes: 48 additions & 6 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -852,12 +852,13 @@ typedef struct jsConsumerConfig

// Configuration options introduced in 2.10

// Multiple filter subjects
const char **FilterSubjects;
int FilterSubjectsLen;
const char **FilterSubjects; ///< Multiple filter subjects
int FilterSubjectsLen;
natsMetadata Metadata; ///< User-provided metadata for the consumer, encoded as an array of {"key", "value",...}

// User-provided metadata for the consumer, encoded as an array of {"key", "value",...}
natsMetadata Metadata;
// Configuration options introduced in 2.11

int64_t PauseUntil; ///< Suspends the consumer until this deadline, represented as number of nanoseconds since epoch.
} jsConsumerConfig;

/**
Expand Down Expand Up @@ -1003,7 +1004,8 @@ typedef struct jsConsumerInfo
uint64_t NumPending;
jsClusterInfo *Cluster;
bool PushBound;

bool Paused;
int64_t PauseRemaining; ///< Remaining time in nanoseconds.
} jsConsumerInfo;

/**
Expand Down Expand Up @@ -1034,6 +1036,18 @@ typedef struct jsConsumerNamesList

} jsConsumerNamesList;

/**
* Request to pause the consumer, used to call js_PauseConsumer.
*
* @see js_PauseConsumer
*/
typedef struct jsConsumerPauseResponse
{
bool Paused;
int64_t PauseUntil; ///< UTC time expressed as number of nanoseconds since epoch.
int64_t PauseRemaining; ///< Remaining time in nanoseconds.
} jsConsumerPauseResponse;

/**
* Reports on API calls to JetStream for this account.
*/
Expand Down Expand Up @@ -5945,6 +5959,34 @@ NATS_EXTERN natsStatus
js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer,
jsOptions *opts, jsErrCode *errCode);

/** \brief Pauses a consumer.
*
* Pauses the consumer named <c>consumer</c> on stream named <c>stream</c>.
*
* @param new_cpr if not NULL, will receive the response of the operation.
* @param js the pointer to the #jsCtx context.
* @param stream the name of the stream.
* @param consumer the name of the consumer.
* @param pauseUntil the time in nanoseconds since the Unix epoch to pause the consumer until.
* @param opts the pointer to the #jsOptions object, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, or `NULL`
* if not needed.
*/

NATS_EXTERN natsStatus
js_PauseConsumer(jsConsumerPauseResponse **new_cpr, jsCtx *js,
const char *stream, const char *consumer,
uint64_t pauseUntil, jsOptions *opts, jsErrCode *errCode);

/** \brief Destroys the PauseConsumer response object.
*
* Releases memory allocated for this object.
*
* @param cpr the pointer to the #jsConsumerPauseResponse object.
*/
NATS_EXTERN void
jsConsumerPauseResponse_Destroy(jsConsumerPauseResponse *cpr);

/** \brief Destroys the consumer information object.
*
* Releases memory allocated for this consumer information object.
Expand Down
2 changes: 1 addition & 1 deletion src/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extern "C" {
(NATS_VERSION_MINOR << 8) | \
NATS_VERSION_PATCH)

#define NATS_VERSION_REQUIRED_NUMBER 0x030800
#define NATS_VERSION_REQUIRED_NUMBER 0x030900

#ifdef __cplusplus
}
Expand Down
Loading