Skip to content

Commit

Permalink
[ADDED] js_PauseConsumer support (also changed jsConsumerConfig, jsCo… (
Browse files Browse the repository at this point in the history
#726)

* [ADDED] js_PauseConsumer support (also changed jsConsumerConfig, jsConsumerInfo)

* Updated most travis jobs to test against main

* PR feedback: nit extra line

* PR feedback: added NATS_EXTERN

* PR feedback: added jsConsumerPauseResponse_Destroy

* PR feedback: create a paused consumer for real, FREE cleanup

* PR feedback: destroy buffer
  • Loading branch information
levb authored Mar 13, 2024
1 parent ec2fbfc commit 4b9f0ad
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 25 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_WITH_TLS=OFF -DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no"

Expand All @@ -89,6 +90,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand All @@ -102,6 +104,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- NATS_DEFAULT_TO_LIB_MSG_DELIVERY=yes BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand All @@ -115,6 +118,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- NATS_DEFAULT_LIB_WRITE_DEADLINE=2000 BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand Down Expand Up @@ -153,6 +157,7 @@ jobs:
packages:
- clang-8
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=clang-8"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand All @@ -165,6 +170,7 @@ jobs:
packages:
- clang-8
env:
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=clang-8"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=thread" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ set(NATS_VERSION_MINOR 9)
set(NATS_VERSION_PATCH 0)
set(NATS_VERSION_SUFFIX "-beta")

set(NATS_VERSION_REQUIRED_NUMBER 0x030800) # Consider updating before next (v3.9.0) release!!!
set(NATS_VERSION_REQUIRED_NUMBER 0x030900)

if(NATS_UPDATE_VERSION OR NATS_UPDATE_DOC)
configure_file(
Expand Down
3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ extern const int64_t jsDefaultRequestWait;
// jsApiConsumerNamesT is the endpoint to get the list of consumer names for a stream.
#define jsApiConsumerNamesT "%.*s.CONSUMER.NAMES.%s"

// jsApiConsumerPauseT is the endpoint to pause a consumer.
#define jsApiConsumerPauseT "%.*s.CONSUMER.PAUSE.%s.%s"

// Creates a subject based on the option's prefix, the subject format and its values.
#define js_apiSubj(s, o, f, ...) (nats_asprintf((s), (f), (o)->Prefix, __VA_ARGS__) < 0 ? NATS_NO_MEMORY : NATS_OK)

Expand Down
160 changes: 155 additions & 5 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ typedef struct apiPaged
} apiPaged;

static natsStatus
_marshalTimeUTC(natsBuffer *buf, const char *fieldName, int64_t timeUTC)
_marshalTimeUTC(natsBuffer *buf, bool sep, const char *fieldName, int64_t timeUTC)
{
natsStatus s = NATS_OK;
char dbuf[36] = {'\0'};
Expand All @@ -42,7 +42,10 @@ _marshalTimeUTC(natsBuffer *buf, const char *fieldName, int64_t timeUTC)
if (s != NATS_OK)
return nats_setError(NATS_ERR, "unable to encode data for field '%s' value %" PRId64, fieldName, timeUTC);

s = natsBuf_Append(buf, ",\"", -1);
if (sep)
s = natsBuf_AppendByte(buf, ',');

IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, fieldName, -1));
IFOK(s, natsBuf_Append(buf, "\":\"", -1));
IFOK(s, natsBuf_Append(buf, dbuf, -1));
Expand Down Expand Up @@ -353,7 +356,7 @@ _marshalStreamSource(jsStreamSource *source, const char *fieldName, natsBuffer *
if ((s == NATS_OK) && (source->OptStartSeq > 0))
s = nats_marshalLong(buf, true, "opt_start_seq", source->OptStartSeq);
if ((s == NATS_OK) && (source->OptStartTime > 0))
IFOK(s, _marshalTimeUTC(buf, "opt_start_time", source->OptStartTime));
IFOK(s, _marshalTimeUTC(buf, true, "opt_start_time", source->OptStartTime));
if (source->FilterSubject != NULL)
{
IFOK(s, natsBuf_Append(buf, ",\"filter_subject\":\"", -1));
Expand Down Expand Up @@ -1439,7 +1442,6 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
&& (cfg->ConsumerLimits.MaxAckPending != (*new_si)->Config->ConsumerLimits.MaxAckPending)
)
{
// <>/<> wrong error
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamConfigRequired);
}

Expand Down Expand Up @@ -2918,7 +2920,7 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
if ((s == NATS_OK) && (cfg->OptStartSeq > 0))
s = nats_marshalLong(buf, true, "opt_start_seq", cfg->OptStartSeq);
if ((s == NATS_OK) && (cfg->OptStartTime > 0))
s = _marshalTimeUTC(buf, "opt_start_time", cfg->OptStartTime);
s = _marshalTimeUTC(buf, true, "opt_start_time", cfg->OptStartTime);
IFOK(s, _marshalAckPolicy(buf, cfg->AckPolicy));
if ((s == NATS_OK) && (cfg->AckWait > 0))
s = nats_marshalLong(buf, true, "ack_wait", cfg->AckWait);
Expand Down Expand Up @@ -2947,6 +2949,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
IFOK(s, natsBuf_AppendByte(buf, ']'));
}
IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
if ((s == NATS_OK) && (cfg->PauseUntil > 0))
s = _marshalTimeUTC(buf, true, "pause_until", cfg->PauseUntil);
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
Expand Down Expand Up @@ -3193,6 +3197,8 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci)
IFOK(s, nats_JSONGetULong(json, "num_pending", &(ci->NumPending)));
IFOK(s, _unmarshalClusterInfo(json, "cluster", &(ci->Cluster)));
IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound)));
IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining)));

if (s == NATS_OK)
*new_ci = ci;
Expand Down Expand Up @@ -3446,6 +3452,150 @@ js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_unmarshalConsumerPauseResp(nats_JSON *json, jsConsumerPauseResponse **new_cpr)
{
natsStatus s = NATS_OK;
jsConsumerPauseResponse *cpr = NULL;

cpr = (jsConsumerPauseResponse *)NATS_CALLOC(1, sizeof(jsConsumerPauseResponse));
if (cpr == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

s = nats_JSONGetBool(json, "paused", &(cpr->Paused));
IFOK(s, nats_JSONGetTime(json, "pause_until", &(cpr->PauseUntil)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(cpr->PauseRemaining)));

if (s == NATS_OK)
*new_cpr = cpr;
else
jsConsumerPauseResponse_Destroy(cpr);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalConsumerPauseResp(jsConsumerPauseResponse **new_cpr, natsMsg *resp, jsErrCode *errCode)
{
nats_JSON *json = NULL;
jsApiResponse ar;
natsStatus s;

s = js_unmarshalResponse(&ar, &json, resp);
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

if (js_apiResponseIsErr(&ar))
{
if (errCode != NULL)
*errCode = (int)ar.Error.ErrCode;

if (ar.Error.ErrCode == JSConsumerNotFoundErr)
s = NATS_NOT_FOUND;
else
s = nats_setError(NATS_ERR, "%s", ar.Error.Description);
}
else if (new_cpr != NULL)
{
// At this point we need to unmarshal the consumer info itself.
s = js_unmarshalConsumerPauseResp(json, new_cpr);
}

js_freeApiRespContent(&ar);
nats_JSONDestroy(json);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalConsumerPauseReq(natsBuffer **new_buf, uint64_t pauseUntil)
{
natsStatus s = NATS_OK;
natsBuffer *buf = NULL;

s = natsBuf_Create(&buf, 256);
IFOK(s, natsBuf_AppendByte(buf, '{'));
if ((s == NATS_OK) && (pauseUntil > 0)) {
s = _marshalTimeUTC(buf, false, "pause_until", pauseUntil);
}
IFOK(s, natsBuf_AppendByte(buf, '}'));

if (s == NATS_OK)
*new_buf = buf;
else
natsBuf_Destroy(buf);

return NATS_UPDATE_ERR_STACK(s);
}

void
jsConsumerPauseResponse_Destroy(jsConsumerPauseResponse *cpr)
{
if (cpr == NULL)
return;

NATS_FREE(cpr);
}


natsStatus
js_PauseConsumer(jsConsumerPauseResponse **new_cpr, jsCtx *js,
const char *stream, const char *consumer,
uint64_t pauseUntil, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
char *subj = NULL;
bool freePfx = false;
natsConnection *nc = NULL;
natsBuffer *buf = NULL;
natsMsg *resp = NULL;
jsOptions o;

if (errCode != NULL)
*errCode = 0;

if ((js == NULL) || (new_cpr == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

s = _checkStreamName(stream);
IFOK(s, js_checkConsName(consumer, false))
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

s = js_setOpts(&nc, &freePfx, js, opts, &o);
if (s == NATS_OK)
{
if (nats_asprintf(&subj, jsApiConsumerPauseT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, consumer) < 0)
{
s = nats_setDefaultError(NATS_NO_MEMORY);
}
if (freePfx)
NATS_FREE((char *)o.Prefix);
}

IFOK(s, _marshalConsumerPauseReq(&buf, pauseUntil));

// Send the request
IFOK_JSR(s, natsConnection_Request(&resp, nc, subj, natsBuf_Data(buf), natsBuf_Len(buf), o.Wait));

// If we got a response, check for error or return the consumer info result.
IFOK(s, _unmarshalConsumerPauseResp(new_cpr, resp, errCode));

NATS_FREE(subj);
natsMsg_Destroy(resp);
natsBuf_Destroy(buf);

if (s == NATS_NOT_FOUND)
{
nats_clearLastError();
return s;
}

return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
jsConsumerConfig_Init(jsConsumerConfig *cc)
{
Expand Down
54 changes: 48 additions & 6 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -852,12 +852,13 @@ typedef struct jsConsumerConfig

// Configuration options introduced in 2.10

// Multiple filter subjects
const char **FilterSubjects;
int FilterSubjectsLen;
const char **FilterSubjects; ///< Multiple filter subjects
int FilterSubjectsLen;
natsMetadata Metadata; ///< User-provided metadata for the consumer, encoded as an array of {"key", "value",...}

// User-provided metadata for the consumer, encoded as an array of {"key", "value",...}
natsMetadata Metadata;
// Configuration options introduced in 2.11

int64_t PauseUntil; ///< Suspends the consumer until this deadline, represented as number of nanoseconds since epoch.
} jsConsumerConfig;

/**
Expand Down Expand Up @@ -1003,7 +1004,8 @@ typedef struct jsConsumerInfo
uint64_t NumPending;
jsClusterInfo *Cluster;
bool PushBound;

bool Paused;
int64_t PauseRemaining; ///< Remaining time in nanoseconds.
} jsConsumerInfo;

/**
Expand Down Expand Up @@ -1034,6 +1036,18 @@ typedef struct jsConsumerNamesList

} jsConsumerNamesList;

/**
* Request to pause the consumer, used to call js_PauseConsumer.
*
* @see js_PauseConsumer
*/
typedef struct jsConsumerPauseResponse
{
bool Paused;
int64_t PauseUntil; ///< UTC time expressed as number of nanoseconds since epoch.
int64_t PauseRemaining; ///< Remaining time in nanoseconds.
} jsConsumerPauseResponse;

/**
* Reports on API calls to JetStream for this account.
*/
Expand Down Expand Up @@ -5945,6 +5959,34 @@ NATS_EXTERN natsStatus
js_DeleteConsumer(jsCtx *js, const char *stream, const char *consumer,
jsOptions *opts, jsErrCode *errCode);

/** \brief Pauses a consumer.
*
* Pauses the consumer named <c>consumer</c> on stream named <c>stream</c>.
*
* @param new_cpr if not NULL, will receive the response of the operation.
* @param js the pointer to the #jsCtx context.
* @param stream the name of the stream.
* @param consumer the name of the consumer.
* @param pauseUntil the time in nanoseconds since the Unix epoch to pause the consumer until.
* @param opts the pointer to the #jsOptions object, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, or `NULL`
* if not needed.
*/

NATS_EXTERN natsStatus
js_PauseConsumer(jsConsumerPauseResponse **new_cpr, jsCtx *js,
const char *stream, const char *consumer,
uint64_t pauseUntil, jsOptions *opts, jsErrCode *errCode);

/** \brief Destroys the PauseConsumer response object.
*
* Releases memory allocated for this object.
*
* @param cpr the pointer to the #jsConsumerPauseResponse object.
*/
NATS_EXTERN void
jsConsumerPauseResponse_Destroy(jsConsumerPauseResponse *cpr);

/** \brief Destroys the consumer information object.
*
* Releases memory allocated for this consumer information object.
Expand Down
2 changes: 1 addition & 1 deletion src/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extern "C" {
(NATS_VERSION_MINOR << 8) | \
NATS_VERSION_PATCH)

#define NATS_VERSION_REQUIRED_NUMBER 0x030800
#define NATS_VERSION_REQUIRED_NUMBER 0x030900

#ifdef __cplusplus
}
Expand Down
Loading

0 comments on commit 4b9f0ad

Please sign in to comment.