Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filtering to KV method returning all keys #797

Merged
merged 14 commits into from
Sep 11, 2024
26 changes: 23 additions & 3 deletions src/kv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1176,8 +1176,7 @@ kvStore_WatchAll(kvWatcher **new_watcher, kvStore *kv, kvWatchOptions *opts)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
kvStore_Keys(kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
static natsStatus _kvStore_Keys(kvKeysList *list, kvStore *kv, kvWatchOptions *opts, const char **filters, int numFilters)
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
{
natsStatus s;
kvWatchOptions o;
Expand All @@ -1200,7 +1199,11 @@ kvStore_Keys(kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
if (o.Timeout > 0)
timeout = o.Timeout;

s = kvStore_WatchAll(&w, kv, &o);
if(numFilters > 0) {
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
s = kvStore_WatchMulti(&w, kv, filters, numFilters, &o);
} else {
s = kvStore_WatchAll(&w, kv, &o);
}
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

Expand Down Expand Up @@ -1242,6 +1245,23 @@ kvStore_Keys(kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
kvStore_Keys(kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
{
natsStatus s = _kvStore_Keys(list, kv, opts, NULL, 0);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
kvStore_KeysWithFilters(kvKeysList *list, kvStore *kv, kvWatchOptions *opts, const char **filters, int numFilters)
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
{
if (filters == NULL || numFilters <= 0) {
return nats_setDefaultError(NATS_INVALID_ARG);
}
natsStatus s = _kvStore_Keys(list, kv, opts, filters, numFilters);
return NATS_UPDATE_ERR_STACK(s);
}

void
kvKeysList_Destroy(kvKeysList *list)
{
Expand Down
26 changes: 26 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -7172,6 +7172,32 @@ kvStore_WatchAll(kvWatcher **new_watcher, kvStore *kv, kvWatchOptions *opts);
NATS_EXTERN natsStatus
kvStore_Keys(kvKeysList *list, kvStore *kv, kvWatchOptions *opts);

/** \brief Returns all keys in the bucket which matches the list of subject like filters.
*
* Get a list of the keys in a bucket filtered by a
* subject-like string, for instance "key" or "key.foo.*" or "key.>"
* Any deleted or purged keys will not be returned.
*
* \note Use #kvWatchOptions.Timeout to specify how long to wait (in milliseconds)
* to gather all keys for this bucket. If the deadline is reached, this function
* will return #NATS_TIMEOUT and no keys.
*
* \warning The user should call #kvKeysList_Destroy to release memory allocated
* for the entries list.
*
* @see kvWatchOptions_Init
* @see kvKeysList_Destroy
* @see kvStore_WatchMulti
*
* @param list the pointer to a #kvKeysList that will be initialized and filled with resulting key strings.
* @param kv the pointer to the #kvStore object.
* @param opts the history options, possibly `NULL`.
* @param filters the list of subject filters. Cannot be `NULL`.
* @param numFilters number of filters. Cannot be 0.
*/
NATS_EXTERN natsStatus
kvStore_KeysWithFilters(kvKeysList *list, kvStore *kv, kvWatchOptions *opts, const char **filters, int numFilters);
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved

/** \brief Destroys this list of KeyValue store key strings.
*
* This function iterates through the list of all key strings and free them.
Expand Down
1 change: 1 addition & 0 deletions test/list_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ _test(KeyValueDeleteVsPurge)
_test(KeyValueDiscardOldToNew)
_test(KeyValueHistory)
_test(KeyValueKeys)
_test(KeyValueKeysWithFilters)
_test(KeyValueManager)
_test(KeyValueMirrorCrossDomains)
_test(KeyValueMirrorDirectGet)
Expand Down
138 changes: 138 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -31470,6 +31470,144 @@ void test_KeyValueKeys(void)
JS_TEARDOWN;
}

void test_KeyValueKeysWithFilters(void)
{
natsStatus s;
kvStore *kv = NULL;
kvKeysList l;
kvConfig kvc;
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
const char **defaultSubject = (const char *[]){">"};
l.Count = 0;
l.Keys = NULL;

JS_SETUP(2, 10, 14);

test("Create KV: ");
kvConfig_Init(&kvc);
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
kvc.Bucket = "KVSF";
kvc.History = 2;
s = js_CreateKeyValue(&kv, js, &kvc);
testCond(s == NATS_OK);

test("Populate: ");
s = kvStore_PutString(NULL, kv, "a.b", "a.b");
IFOK(s, kvStore_PutString(NULL, kv, "a.d", "a.d"));
IFOK(s, kvStore_PutString(NULL, kv, "c.d", "c.d"));
IFOK(s, kvStore_PutString(NULL, kv, "e.f", "e.f"));
IFOK(s, kvStore_PutString(NULL, kv, "e.a.f", "e.a.f"));
testCond(s == NATS_OK);

test("Get keys with filters (bad args): List is NULL");
s = kvStore_KeysWithFilters(NULL, kv, NULL, defaultSubject, 1);
testCond(s == NATS_INVALID_ARG);
nats_clearLastError();

saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
test("Get keys with filters (bad args): filters is NULL");
s = kvStore_KeysWithFilters(&l, kv, NULL, NULL, 0);
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
testCond((s == NATS_INVALID_ARG) && (l.Keys == NULL) && (l.Count == 0));
nats_clearLastError();

test("Get keys with filters (bad args): numFilters is 0");
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
s = kvStore_KeysWithFilters(&l, kv, NULL, defaultSubject, 0);
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
testCond((s == NATS_INVALID_ARG) && (l.Keys == NULL) && (l.Count == 0));
nats_clearLastError();

test("Get keys with filters (bad args): numFilters is <0");
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
s = kvStore_KeysWithFilters(&l, kv, NULL, defaultSubject, -10);
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
testCond((s == NATS_INVALID_ARG) && (l.Keys == NULL) && (l.Count == 0));
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
nats_clearLastError();

test("Get keys with filters (bad args): empty string");
const char **filter0 = (const char *[]){"a.*", "", "b.*"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter0, 3);
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
testCond((s == NATS_INVALID_ARG) && (l.Keys == NULL) && (l.Count == 0));
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved
nats_clearLastError();

test("Get keys with filters (bad args): kv is NULL");
s = kvStore_KeysWithFilters(&l, NULL, NULL, defaultSubject, 1);
testCond((s == NATS_INVALID_ARG) && (l.Keys == NULL) && (l.Count == 0));
nats_clearLastError();
kvKeysList_Destroy(&l);

test("filter: a.*");
const char **filter1 = (const char *[]){"a.*"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter1, 1);
testCond((s == NATS_OK) && (l.Keys != NULL) && (l.Count == 2));
nats_clearLastError();
kvKeysList_Destroy(&l);

test("filter: *.a.*");
const char **filter2 = (const char *[]){"*.a.*"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter2, 1);
testCond((s == NATS_OK) && (l.Keys != NULL) && (l.Count == 1));
nats_clearLastError();
kvKeysList_Destroy(&l);

test("filter: *.a");
const char **filter3 = (const char *[]){"*.a"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter3, 1);
testCond((s == NATS_OK) && (l.Keys == NULL) && (l.Count == 0));
nats_clearLastError();
kvKeysList_Destroy(&l);

test("filter: e.a.f");
const char **filter4 = (const char *[]){"e.a.f"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter4, 1);
testCond((s == NATS_OK) && (l.Keys != NULL) && (l.Count == 1));
nats_clearLastError();
kvKeysList_Destroy(&l);

test("filter: >");
const char **filter5 = (const char *[]){">"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter5, 1);
testCond((s == NATS_OK) && (l.Keys != NULL) && (l.Count == 5));
nats_clearLastError();
kvKeysList_Destroy(&l);

test("filter: multiple overlapping filters");
const char **filter6 = (const char *[]){"*.a","a.*","*.a.*"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter6, 3);
// consumer subject filters cannot overlap
testCond((s == NATS_ERR) && (l.Keys == NULL) && (l.Count == 0));
nats_clearLastError();
kvKeysList_Destroy(&l);

test("filter: multiple non overlapping filters");
const char **filter7 = (const char *[]){"a.*","e.*"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter7, 2);
testCond((s == NATS_OK) && (l.Keys != NULL) && (l.Count == 3));
nats_clearLastError();
kvKeysList_Destroy(&l);

// Delete the key and check if returned after filtering
test("Delete a.b: ");
s = kvStore_Delete(kv, "a.b");
testCond(s == NATS_OK);
saurabhojha marked this conversation as resolved.
Show resolved Hide resolved

test("a.b should not be returned post deletion")
const char **filter8 = (const char *[]){"a.b"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter8, 1);
testCond((s == NATS_OK) && (l.Keys == NULL) && (l.Count == 0));
nats_clearLastError();
kvKeysList_Destroy(&l);

// Purge the key and check if returned after filtering
test("Purge a.b:");
s = kvStore_Purge(kv, "a.d", NULL);
testCond(s == NATS_OK);

test("a.d should not be returned post purge")
const char **filter9 = (const char *[]){"a.d"};
s = kvStore_KeysWithFilters(&l, kv, NULL, filter9, 1);
testCond((s == NATS_OK) && (l.Keys == NULL) && (l.Count == 0));
nats_clearLastError();
kvKeysList_Destroy(&l);

kvStore_Destroy(kv);

JS_TEARDOWN;
}

void test_KeyValueDeleteVsPurge(void)
{
natsStatus s;
Expand Down