Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support empty endpoints in redirects #160

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 84 additions & 51 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1806,50 +1806,96 @@ static int valkeyClusterGetReplyFromNode(valkeyClusterContext *cc,
}

/* Parses a MOVED or ASK error reply and returns the destination node. The slot
* is returned by pointer, if provided. */
* is returned by pointer, if provided. When the parsed endpoint/IP is an empty
* string the address from which the reply was sent from is used instead, as
* described in the Valkey Cluster Specification. This address is provided via
* the valkeyContext given in 'c'. */
static valkeyClusterNode *getNodeFromRedirectReply(valkeyClusterContext *cc,
valkeyContext *c,
valkeyReply *reply,
int *slotptr) {
valkeyClusterNode *node = NULL;
sds *part = NULL;
int part_len = 0;
char *p;
sds key = NULL;
sds endpoint = NULL;
char *str = reply->str;

/* Expecting ["ASK" | "MOVED", "<slot>", "<endpoint>:<port>"] */
part = sdssplitlen(reply->str, reply->len, " ", 1, &part_len);
if (part == NULL) {
goto oom;
}
if (part_len != 3) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "failed to parse redirect");
goto done;
char *p, *slot = NULL, *addr = NULL;
int field = 0;
while (*str != '\0') {
// clang-format off
if ((p = strchr(str, ' ')) != NULL)
*p = '\0';
switch (field++) {
// Skip field 0, i.e. ASK/MOVED
case 1: slot = str; break;
case 2: addr = str; break;
}
if (p == NULL) break; /* No more fields. */
str = p + 1; /* Start of next field. */
// clang-format on
}

/* Parse slot if requested. */
if (slotptr != NULL) {
*slotptr = vk_atoi(part[1], sdslen(part[1]));
/* Make sure all expected fields are found. */
if (addr == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Failed to parse redirect");
return NULL;
}

/* Find the last occurrence of the port separator since
* IPv6 addresses can contain ':' */
if ((p = strrchr(part[2], IP_PORT_SEPARATOR)) == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"port separator missing in redirect");
goto done;
if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid address in redirect");
return NULL;
}
/* Get the port (skip the found port separator). */
int port = vk_atoi(p + 1, strlen(p + 1));
if (port < 1 || port > UINT16_MAX) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port in redirect");
return NULL;
}
// p includes separator

/* Empty endpoint not supported yet */
if (p - part[2] == 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"endpoint missing in redirect");
goto done;
/* Check that we received an ip/host address, i.e. the field
* does not start with the port separator. */
if (p != addr) {
key = sdsnew(addr);
if (key == NULL) {
goto oom;
}
*p = '\0'; /* Cut port separator and port. */

endpoint = sdsnew(addr);
if (endpoint == NULL) {
goto oom;
}
} else {
/* We received an ip/host that is an empty string. According to the docs
* we can treat this as it means the same address we received the reply from. */
endpoint = sdsnew(c->tcp.host);
if (endpoint == NULL) {
goto oom;
}

key = sdsdup(endpoint);
if (key == NULL) {
goto oom;
}
key = sdscatfmt(key, ":%i", port);
if (key == NULL) {
goto oom;
}
}

/* Parse slot if requested. */
if (slotptr != NULL) {
*slotptr = vk_atoi(slot, strlen(slot));
}

dictEntry *de = dictFind(cc->nodes, part[2]);
/* Get the node if already known. */
dictEntry *de = dictFind(cc->nodes, key);
if (de != NULL) {
node = de->val;
goto done;
sdsfree(key);
sdsfree(endpoint);
return de->val;
}

/* Add this node since it was unknown */
Expand All @@ -1858,39 +1904,26 @@ static valkeyClusterNode *getNodeFromRedirectReply(valkeyClusterContext *cc,
goto oom;
}
node->role = VALKEY_ROLE_PRIMARY;
node->addr = part[2];
part[2] = NULL; /* Memory ownership moved */

node->host = sdsnewlen(node->addr, p - node->addr);
if (node->host == NULL) {
goto oom;
}
p++; // remove found separator character
node->port = vk_atoi(p, strlen(p));

sds key = sdsnewlen(node->addr, sdslen(node->addr));
if (key == NULL) {
node->host = endpoint;
node->port = port;
node->addr = sdsdup(key);
if (node->addr == NULL) {
goto oom;
}

if (dictAdd(cc->nodes, key, node) != DICT_OK) {
sdsfree(key);
goto oom;
}

done:
sdsfreesplitres(part, part_len);
return node;

oom:
valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory");
sdsfreesplitres(part, part_len);
sdsfree(key);
sdsfree(endpoint);
if (node != NULL) {
sdsfree(node->addr);
sdsfree(node->host);
vk_free(node);
}

return NULL;
}

Expand Down Expand Up @@ -1976,7 +2009,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
int slot = -1;
switch (error_type) {
case CLUSTER_ERR_MOVED:
node = getNodeFromRedirectReply(cc, reply, &slot);
node = getNodeFromRedirectReply(cc, c, reply, &slot);
freeReplyObject(reply);
reply = NULL;

Expand Down Expand Up @@ -2016,7 +2049,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,

break;
case CLUSTER_ERR_ASK:
node = getNodeFromRedirectReply(cc, reply, NULL);
node = getNodeFromRedirectReply(cc, c, reply, NULL);
if (node == NULL) {
goto error;
}
Expand Down Expand Up @@ -3006,7 +3039,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r,
/* Initiate slot mapping update using the node that sent MOVED. */
throttledUpdateSlotMapAsync(acc, ac);

node = getNodeFromRedirectReply(cc, reply, &slot);
node = getNodeFromRedirectReply(cc, &ac->c, reply, &slot);
if (node == NULL) {
valkeyClusterAsyncSetError(acc, cc->err, cc->errstr);
goto done;
Expand All @@ -3019,7 +3052,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r,

break;
case CLUSTER_ERR_ASK:
node = getNodeFromRedirectReply(cc, reply, NULL);
node = getNodeFromRedirectReply(cc, &ac->c, reply, NULL);
if (node == NULL) {
valkeyClusterAsyncSetError(acc, cc->err, cc->errstr);
goto done;
Expand Down
4 changes: 2 additions & 2 deletions tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,15 @@ void test_alloc_failure_handling(void) {
freeReplyObject(reply);

/* Test ASK reply handling with OOM */
for (int i = 0; i < 47; ++i) {
for (int i = 0; i < 30; ++i) {
prepare_allocation_test(cc, i);
reply = valkeyClusterCommand(cc, "GET foo");
assert(reply == NULL);
ASSERT_STR_EQ(cc->errstr, "Out of memory");
}

/* Test ASK reply handling without OOM */
prepare_allocation_test(cc, 47);
prepare_allocation_test(cc, 30);
reply = valkeyClusterCommand(cc, "GET foo");
CHECK_REPLY_STR(cc, reply, "one");
freeReplyObject(reply);
Expand Down
33 changes: 31 additions & 2 deletions tests/scripts/moved-redirect-test.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#!/bin/bash

#
# Test of MOVED redirect handling.
#
# Test 1: Handle a common MOVED redirect.
# Test 2: Handle a MOVED redirect with an empty endpoint.
# "The next request should be sent to the same endpoint as the
# current request but with the provided port."
#
# Usage: $0 /path/to/clusterclient-binary

clientprog=${1:-./clusterclient}
Expand All @@ -13,24 +20,40 @@ syncpid2=$!;

# Start simulated valkey node #1
timeout 5s ./simulated-valkey.pl -p 7403 -d --sigcont $syncpid1 <<'EOF' &
# Setup initial slotmap
EXPECT CONNECT
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 16383, ["127.0.0.1", 7403, "nodeid7403"]]]
EXPECT CLOSE

# Test 1: Handle MOVED redirect.
EXPECT CONNECT
EXPECT ["GET", "foo"]
SEND -MOVED 12182 127.0.0.1:7404
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 16383, ["127.0.0.1", 7404, "nodeid7404"]]]
EXPECT CLOSE

# Test 2: Handle empty endpoint.
EXPECT CONNECT
EXPECT ["GET", "foo"]
SEND "bar"
EXPECT CLOSE
EOF
server1=$!

# Start simulated valkey node #2
timeout 5s ./simulated-valkey.pl -p 7404 -d --sigcont $syncpid2 <<'EOF' &
# Test 1: Handle MOVED redirect.
EXPECT CONNECT
EXPECT ["GET", "foo"]
SEND "bar"

# Test 2: Handle empty endpoint.
EXPECT ["GET", "foo"]
SEND -MOVED 9718 :7403
EXPECT ["CLUSTER", "SLOTS"]
SEND [[0, 16383, ["127.0.0.1", 7403, "nodeid7403"]]]
EXPECT CLOSE
EOF
server2=$!
Expand All @@ -39,7 +62,11 @@ server2=$!
wait $syncpid1 $syncpid2;

# Run client
echo 'GET foo' | timeout 3s "$clientprog" --events 127.0.0.1:7403 > "$testname.out"
timeout 3s "$clientprog" --events 127.0.0.1:7403 > "$testname.out" <<'EOF'
GET foo
!sleep
GET foo
EOF
clientexit=$?

# Wait for servers to exit
Expand All @@ -65,6 +92,8 @@ expected="Event: slotmap-updated
Event: ready
Event: slotmap-updated
bar
Event: slotmap-updated
bar
Event: free-context"

echo "$expected" | diff -u - "$testname.out" || exit 99
Expand Down
Loading