From c4218675bfeea1135fd55a5012e4af77b1d025ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Thu, 23 Jan 2025 12:34:49 +0100 Subject: [PATCH 1/3] Support empty endpoints in redirects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit "An empty endpoint indicates that the server node has an unknown endpoint, and the client should send the next request to the same endpoint as the current request but with the provided port." https://valkey.io/topics/cluster-spec/ Signed-off-by: Björn Svensson --- src/cluster.c | 135 +++++++++++++++++---------- tests/ct_out_of_memory_handling.c | 4 +- tests/scripts/moved-redirect-test.sh | 31 +++++- 3 files changed, 115 insertions(+), 55 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 6f277420..cc4f6da1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1821,50 +1821,96 @@ static int valkeyClusterGetReplyFromNode(valkeyClusterContext *cc, } /* Parses a MOVED or ASK error reply and returns the destination node. The slot - * is returned by pointer, if provided. */ + * is returned by pointer, if provided. When the parsed endpoint/IP is an empty + * string the address from which the reply was sent from is used instead, as + * described in the Valkey Cluster Specification. This address is provided via + * the valkeyContext given in 'c'. */ static valkeyClusterNode *getNodeFromRedirectReply(valkeyClusterContext *cc, + valkeyContext *c, valkeyReply *reply, int *slotptr) { valkeyClusterNode *node = NULL; - sds *part = NULL; - int part_len = 0; - char *p; + sds key = NULL; + sds endpoint = NULL; + char *str = reply->str; /* Expecting ["ASK" | "MOVED", "", ":"] */ - part = sdssplitlen(reply->str, reply->len, " ", 1, &part_len); - if (part == NULL) { - goto oom; - } - if (part_len != 3) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "failed to parse redirect"); - goto done; + char *p, *slot = NULL, *addr = NULL; + int field = 0; + while (*str != '\0') { + // clang-format off + if ((p = strchr(str, ' ')) != NULL) + *p = '\0'; + switch (field++) { + // Skip field 0, i.e. ASK/MOVED + case 1: slot = str; break; + case 2: addr = str; break; + } + if (p == NULL) break; /* No more fields. */ + str = p + 1; /* Start of next field. */ + // clang-format on } - /* Parse slot if requested. */ - if (slotptr != NULL) { - *slotptr = vk_atoi(part[1], sdslen(part[1])); + /* Make sure all expected fields are found. */ + if (addr == NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Failed to parse redirect"); + return NULL; } - /* Find the last occurrence of the port separator since * IPv6 addresses can contain ':' */ - if ((p = strrchr(part[2], IP_PORT_SEPARATOR)) == NULL) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "port separator missing in redirect"); - goto done; + if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid address in redirect"); + return NULL; + } + /* Get the port (skip the found port separator). */ + int port = vk_atoi(p + 1, strlen(p + 1)); + if (port < 1 || port > UINT16_MAX) { + valkeyClusterSetError(cc, VALKEY_ERR_OTHER, "Invalid port in redirect"); + return NULL; } - // p includes separator - /* Empty endpoint not supported yet */ - if (p - part[2] == 0) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "endpoint missing in redirect"); - goto done; + /* Check that we received an ip/host address, i.e. the field + * does not start with the port separator. */ + if (p != addr) { + key = sdsnew(addr); + if (key == NULL) { + goto oom; + } + *p = '\0'; /* Cut port separator and port. */ + + endpoint = sdsnew(addr); + if (endpoint == NULL) { + goto oom; + } + } else { + /* We received an ip/host that is an empty string. According to the docs + * we can treat this as it means the same address we received the reply from. */ + endpoint = sdsnew(c->tcp.host); + if (endpoint == NULL) { + goto oom; + } + + key = sdsdup(endpoint); + if (key == NULL) { + goto oom; + } + key = sdscatfmt(key, ":%i", port); + if (key == NULL) { + goto oom; + } + } + + /* Parse slot if requested. */ + if (slotptr != NULL) { + *slotptr = vk_atoi(slot, strlen(slot)); } - dictEntry *de = dictFind(cc->nodes, part[2]); + /* Get the node if already known. */ + dictEntry *de = dictFind(cc->nodes, key); if (de != NULL) { - node = de->val; - goto done; + sdsfree(key); + sdsfree(endpoint); + return de->val; } /* Add this node since it was unknown */ @@ -1873,39 +1919,26 @@ static valkeyClusterNode *getNodeFromRedirectReply(valkeyClusterContext *cc, goto oom; } node->role = VALKEY_ROLE_PRIMARY; - node->addr = part[2]; - part[2] = NULL; /* Memory ownership moved */ - - node->host = sdsnewlen(node->addr, p - node->addr); - if (node->host == NULL) { - goto oom; - } - p++; // remove found separator character - node->port = vk_atoi(p, strlen(p)); - - sds key = sdsnewlen(node->addr, sdslen(node->addr)); - if (key == NULL) { + node->host = endpoint; + node->port = port; + node->addr = sdsdup(key); + if (node->addr == NULL) { goto oom; } if (dictAdd(cc->nodes, key, node) != DICT_OK) { - sdsfree(key); goto oom; } - -done: - sdsfreesplitres(part, part_len); return node; oom: valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory"); - sdsfreesplitres(part, part_len); + sdsfree(key); + sdsfree(endpoint); if (node != NULL) { sdsfree(node->addr); - sdsfree(node->host); vk_free(node); } - return NULL; } @@ -1991,7 +2024,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc, int slot = -1; switch (error_type) { case CLUSTER_ERR_MOVED: - node = getNodeFromRedirectReply(cc, reply, &slot); + node = getNodeFromRedirectReply(cc, c, reply, &slot); freeReplyObject(reply); reply = NULL; @@ -2031,7 +2064,7 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc, break; case CLUSTER_ERR_ASK: - node = getNodeFromRedirectReply(cc, reply, NULL); + node = getNodeFromRedirectReply(cc, c, reply, NULL); if (node == NULL) { goto error; } @@ -3079,7 +3112,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r, /* Initiate slot mapping update using the node that sent MOVED. */ throttledUpdateSlotMapAsync(acc, ac); - node = getNodeFromRedirectReply(cc, reply, &slot); + node = getNodeFromRedirectReply(cc, &ac->c, reply, &slot); if (node == NULL) { valkeyClusterAsyncSetError(acc, cc->err, cc->errstr); goto done; @@ -3092,7 +3125,7 @@ static void valkeyClusterAsyncCallback(valkeyAsyncContext *ac, void *r, break; case CLUSTER_ERR_ASK: - node = getNodeFromRedirectReply(cc, reply, NULL); + node = getNodeFromRedirectReply(cc, &ac->c, reply, NULL); if (node == NULL) { valkeyClusterAsyncSetError(acc, cc->err, cc->errstr); goto done; diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 3d7de08c..d3537e32 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -360,7 +360,7 @@ void test_alloc_failure_handling(void) { freeReplyObject(reply); /* Test ASK reply handling with OOM */ - for (int i = 0; i < 47; ++i) { + for (int i = 0; i < 30; ++i) { prepare_allocation_test(cc, i); reply = valkeyClusterCommand(cc, "GET foo"); assert(reply == NULL); @@ -368,7 +368,7 @@ void test_alloc_failure_handling(void) { } /* Test ASK reply handling without OOM */ - prepare_allocation_test(cc, 47); + prepare_allocation_test(cc, 30); reply = valkeyClusterCommand(cc, "GET foo"); CHECK_REPLY_STR(cc, reply, "one"); freeReplyObject(reply); diff --git a/tests/scripts/moved-redirect-test.sh b/tests/scripts/moved-redirect-test.sh index 3e8ee7ca..d05eaaa6 100755 --- a/tests/scripts/moved-redirect-test.sh +++ b/tests/scripts/moved-redirect-test.sh @@ -1,5 +1,12 @@ #!/bin/bash - +# +# Test of MOVED redirect handling. +# +# Test 1: Handle a common MOVED redirect. +# Test 2: Handle a MOVED redirect with an empty endpoint. +# "The next request should be sent to the same endpoint as the +# current request but with the provided port." +# # Usage: $0 /path/to/clusterclient-binary clientprog=${1:-./clusterclient} @@ -13,24 +20,40 @@ syncpid2=$!; # Start simulated valkey node #1 timeout 5s ./simulated-valkey.pl -p 7403 -d --sigcont $syncpid1 <<'EOF' & +# Setup initial slotmap EXPECT CONNECT EXPECT ["CLUSTER", "SLOTS"] SEND [[0, 16383, ["127.0.0.1", 7403, "nodeid7403"]]] EXPECT CLOSE + +# Test 1: Handle MOVED redirect. EXPECT CONNECT EXPECT ["GET", "foo"] SEND -MOVED 12182 127.0.0.1:7404 EXPECT ["CLUSTER", "SLOTS"] SEND [[0, 16383, ["127.0.0.1", 7404, "nodeid7404"]]] EXPECT CLOSE + +# Test 2: Handle empty endpoint. +EXPECT CONNECT +EXPECT ["GET", "foo"] +SEND "bar" +EXPECT CLOSE EOF server1=$! # Start simulated valkey node #2 timeout 5s ./simulated-valkey.pl -p 7404 -d --sigcont $syncpid2 <<'EOF' & +# Test 1: Handle MOVED redirect. EXPECT CONNECT EXPECT ["GET", "foo"] SEND "bar" + +# Test 2: Handle empty endpoint. +EXPECT ["GET", "foo"] +SEND -MOVED 9718 :7403 +EXPECT ["CLUSTER", "SLOTS"] +SEND [[0, 16383, ["127.0.0.1", 7403, "nodeid7403"]]] EXPECT CLOSE EOF server2=$! @@ -39,7 +62,10 @@ server2=$! wait $syncpid1 $syncpid2; # Run client -echo 'GET foo' | timeout 3s "$clientprog" --events 127.0.0.1:7403 > "$testname.out" +timeout 3s "$clientprog" --events 127.0.0.1:7403 > "$testname.out" <<'EOF' +GET foo +GET foo +EOF clientexit=$? # Wait for servers to exit @@ -65,6 +91,7 @@ expected="Event: slotmap-updated Event: ready Event: slotmap-updated bar +bar Event: free-context" echo "$expected" | diff -u - "$testname.out" || exit 99 From 39ddb0c6dcc65fa9c38c33bfde79b6f6095f2911 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Thu, 23 Jan 2025 22:04:48 +0100 Subject: [PATCH 2/3] fixup: test correction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- tests/scripts/moved-redirect-test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/scripts/moved-redirect-test.sh b/tests/scripts/moved-redirect-test.sh index d05eaaa6..9445e550 100755 --- a/tests/scripts/moved-redirect-test.sh +++ b/tests/scripts/moved-redirect-test.sh @@ -91,6 +91,7 @@ expected="Event: slotmap-updated Event: ready Event: slotmap-updated bar +Event: slotmap-updated bar Event: free-context" From d526d39309a7021663ba8517d72b7a393b6f4919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Thu, 23 Jan 2025 22:24:41 +0100 Subject: [PATCH 3/3] fixup: fix timing in test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- tests/scripts/moved-redirect-test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/scripts/moved-redirect-test.sh b/tests/scripts/moved-redirect-test.sh index 9445e550..27539e6d 100755 --- a/tests/scripts/moved-redirect-test.sh +++ b/tests/scripts/moved-redirect-test.sh @@ -64,6 +64,7 @@ wait $syncpid1 $syncpid2; # Run client timeout 3s "$clientprog" --events 127.0.0.1:7403 > "$testname.out" <<'EOF' GET foo +!sleep GET foo EOF clientexit=$?