diff --git a/src/cluster.c b/src/cluster.c index 3a3f223..3c77a72 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1806,50 +1806,96 @@ static int valkeyClusterGetReplyFromNode(valkeyClusterContext *cc, } /* Parses a MOVED or ASK error reply and returns the destination node. The slot - * is returned by pointer, if provided. */ + * is returned by pointer, if provided. When the parsed endpoint/IP is an empty + * string the address from which the reply was sent from is used instead, as + * described in the Valkey Cluster Specification. This address is provided via + * the valkeyContext given in 'c'. */ static valkeyClusterNode *getNodeFromRedirectReply(valkeyClusterContext *cc, + valkeyContext *c, valkeyReply *reply, int *slotptr) { valkeyClusterNode *node = NULL; - sds *part = NULL; - int part_len = 0; - char *p; + sds key = NULL; + sds endpoint = NULL; + char *str = reply->str; /* Expecting ["ASK" | "MOVED", "", ":"] */ - part = sdssplitlen(reply->str, reply->len, " ", 1, &part_len); - if (part == NULL) { - goto oom; - } - if (part_len != 3) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "failed to parse redirect"); - goto done; + char *p, *slot = NULL, *addr = NULL; + int field = 0; + while (*str != '\0') { + // clang-format off + if ((p = strchr(str, ' ')) != NULL) + *p = '\0'; + switch (field++) { + // Skip field 0, i.e. ASK/MOVED + case 1: slot = str; break; + case 2: addr = str; break; + } + if (p == NULL) break; /* No more fields. */ + str = p + 1; /* Start of next field. */ + // clang-format on } - /* Parse slot if requested. */ - if (slotptr != NULL) { - *slotptr = vk_atoi(part[1], sdslen(part[1])); + /* Make sure all expected fields are found. */ + if (addr == NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Failed to parse redirect"); + return NULL; } - /* Find the last occurrence of the port separator since * IPv6 addresses can contain ':' */ - if ((p = strrchr(part[2], IP_PORT_SEPARATOR)) == NULL) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "port separator missing in redirect"); - goto done; + if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid address in redirect"); + return NULL; + } + /* Get the port (skip the found port separator). */ + int port = vk_atoi(p + 1, strlen(p + 1)); + if (port < 1 || port > UINT16_MAX) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port in redirect"); + return NULL; } - // p includes separator - /* Empty endpoint not supported yet */ - if (p - part[2] == 0) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "endpoint missing in redirect"); - goto done; + /* Check that we received an ip/host address, i.e. the field + * does not start with the port separator. */ + if (p != addr) { + key = sdsnew(addr); + if (key == NULL) { + goto oom; + } + *p = '\0'; /* Cut port separator and port. */ + + endpoint = sdsnew(addr); + if (endpoint == NULL) { + goto oom; + } + } else { + /* We received an ip/host that is an empty string. According to the docs + * we can treat this as it means the same address we received the reply from. */ + endpoint = sdsnew(c->tcp.host); + if (endpoint == NULL) { + goto oom; + } + + key = sdsdup(endpoint); + if (key == NULL) { + goto oom; + } + key = sdscatfmt(key, ":%i", port); + if (key == NULL) { + goto oom; + } + } + + /* Parse slot if requested. */ + if (slotptr != NULL) { + *slotptr = vk_atoi(slot, strlen(slot)); } - dictEntry *de = dictFind(cc->nodes, part[2]); + /* Get the node if already known. */ + dictEntry *de = dictFind(cc->nodes, key); if (de != NULL) { - node = de->val; - goto done; + sdsfree(key); + sdsfree(endpoint); + return de->val; } /* Add this node since it was unknown */ @@ -1858,39 +1904,26 @@ static valkeyClusterNode *getNodeFromRedirectReply(valkeyClusterContext *cc, goto oom; } node->role = VALKEY_ROLE_PRIMARY; - node->addr = part[2]; - part[2] = NULL; /* Memory ownership moved */ - - node->host = sdsnewlen(node->addr, p - node->addr); - if (node->host == NULL) { - goto oom; - } - p++; // remove found separator character - node->port = vk_atoi(p, strlen(p)); - - sds key = sdsnewlen(node->addr, sdslen(node->addr)); - if (key == NULL) { + node->host = endpoint; + node->port = port; + node->addr = sdsdup(key); + if (node->addr == NULL) { goto oom; } if (dictAdd(cc->nodes, key, node) != DICT_OK) { - sdsfree(key); goto oom; } - -done: - sdsfreesplitres(part, part_len); return node; oom: valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - sdsfreesplitres(part, part_len); + sdsfree(key); + sdsfree(endpoint); if (node != NULL) { sdsfree(node->addr); - sdsfree(node->host); vk_free(node); } - return NULL; } @@ -1976,7 +2009,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc, int slot = -1; switch (error_type) { case CLUSTER_ERR_MOVED: - node = getNodeFromRedirectReply(cc, reply, &slot); + node = getNodeFromRedirectReply(cc, c, reply, &slot); freeReplyObject(reply); reply = NULL; @@ -2016,7 +2049,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc, break; case CLUSTER_ERR_ASK: - node = getNodeFromRedirectReply(cc, reply, NULL); + node = getNodeFromRedirectReply(cc, c, reply, NULL); if (node == NULL) { goto error; } @@ -3006,7 +3039,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r, /* Initiate slot mapping update using the node that sent MOVED. */ throttledUpdateSlotMapAsync(acc, ac); - node = getNodeFromRedirectReply(cc, reply, &slot); + node = getNodeFromRedirectReply(cc, &ac->c, reply, &slot); if (node == NULL) { valkeyClusterAsyncSetError(acc, cc->err, cc->errstr); goto done; @@ -3019,7 +3052,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r, break; case CLUSTER_ERR_ASK: - node = getNodeFromRedirectReply(cc, reply, NULL); + node = getNodeFromRedirectReply(cc, &ac->c, reply, NULL); if (node == NULL) { valkeyClusterAsyncSetError(acc, cc->err, cc->errstr); goto done; diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 561697e..9c00a9b 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -318,7 +318,7 @@ void test_alloc_failure_handling(void) { freeReplyObject(reply); /* Test ASK reply handling with OOM */ - for (int i = 0; i < 47; ++i) { + for (int i = 0; i < 30; ++i) { prepare_allocation_test(cc, i); reply = valkeyClusterCommand(cc, "GET foo"); assert(reply == NULL); @@ -326,7 +326,7 @@ void test_alloc_failure_handling(void) { } /* Test ASK reply handling without OOM */ - prepare_allocation_test(cc, 47); + prepare_allocation_test(cc, 30); reply = valkeyClusterCommand(cc, "GET foo"); CHECK_REPLY_STR(cc, reply, "one"); freeReplyObject(reply); diff --git a/tests/scripts/moved-redirect-test.sh b/tests/scripts/moved-redirect-test.sh index 3e8ee7c..27539e6 100755 --- a/tests/scripts/moved-redirect-test.sh +++ b/tests/scripts/moved-redirect-test.sh @@ -1,5 +1,12 @@ #!/bin/bash - +# +# Test of MOVED redirect handling. +# +# Test 1: Handle a common MOVED redirect. +# Test 2: Handle a MOVED redirect with an empty endpoint. +# "The next request should be sent to the same endpoint as the +# current request but with the provided port." +# # Usage: $0 /path/to/clusterclient-binary clientprog=${1:-./clusterclient} @@ -13,24 +20,40 @@ syncpid2=$!; # Start simulated valkey node #1 timeout 5s ./simulated-valkey.pl -p 7403 -d --sigcont $syncpid1 <<'EOF' & +# Setup initial slotmap EXPECT CONNECT EXPECT ["CLUSTER", "SLOTS"] SEND [[0, 16383, ["127.0.0.1", 7403, "nodeid7403"]]] EXPECT CLOSE + +# Test 1: Handle MOVED redirect. EXPECT CONNECT EXPECT ["GET", "foo"] SEND -MOVED 12182 127.0.0.1:7404 EXPECT ["CLUSTER", "SLOTS"] SEND [[0, 16383, ["127.0.0.1", 7404, "nodeid7404"]]] EXPECT CLOSE + +# Test 2: Handle empty endpoint. +EXPECT CONNECT +EXPECT ["GET", "foo"] +SEND "bar" +EXPECT CLOSE EOF server1=$! # Start simulated valkey node #2 timeout 5s ./simulated-valkey.pl -p 7404 -d --sigcont $syncpid2 <<'EOF' & +# Test 1: Handle MOVED redirect. EXPECT CONNECT EXPECT ["GET", "foo"] SEND "bar" + +# Test 2: Handle empty endpoint. +EXPECT ["GET", "foo"] +SEND -MOVED 9718 :7403 +EXPECT ["CLUSTER", "SLOTS"] +SEND [[0, 16383, ["127.0.0.1", 7403, "nodeid7403"]]] EXPECT CLOSE EOF server2=$! @@ -39,7 +62,11 @@ server2=$! wait $syncpid1 $syncpid2; # Run client -echo 'GET foo' | timeout 3s "$clientprog" --events 127.0.0.1:7403 > "$testname.out" +timeout 3s "$clientprog" --events 127.0.0.1:7403 > "$testname.out" <<'EOF' +GET foo +!sleep +GET foo +EOF clientexit=$? # Wait for servers to exit @@ -65,6 +92,8 @@ expected="Event: slotmap-updated Event: ready Event: slotmap-updated bar +Event: slotmap-updated +bar Event: free-context" echo "$expected" | diff -u - "$testname.out" || exit 99