diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 2a209140c..c24564d73 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -788,11 +788,6 @@ static void qdr_delivery_anycast_reforward_CT(qdr_core_t *core, qdr_delivery_t * } } - // - // Reset the payload references so the delivery content can be re-transmitted. - // - // TODO - maybe nothing needed here becaue there will be a new outbound delivery - // // Re-route the inbound delivery through the core link_deliver process // diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index e6310ec53..df033e270 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -130,6 +130,11 @@ static int qdr_forward_message_null_CT(qdr_core_t *core, static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t *addr) { + // + // Note that this algorithm assumes that the core's router list is sorted by cost, least to most. + // Since the sorting work is done at router insert/delete, this algorithm is more efficient at forwarding + // time. + // qdr_node_t *rnode = DEQ_HEAD(core->routers); int lowest_cost = 0; @@ -731,25 +736,36 @@ int qdr_forward_closest_CT(qdr_core_t *core, } } + if (!!in_delivery) { + in_delivery->chosen_link = 0; + in_delivery->chosen_neighbor = -1; + } + // // Forward to a locally attached subscriber. // qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); // - // If this link results in edge-echo, skip to the next link in the list. + // If this link results in edge-echo or is invalidated, skip to the next link in the list. // - while (link_ref && qdr_forward_edge_echo_CT(in_delivery, link_ref->link)) + while (link_ref && (qdr_forward_edge_echo_CT(in_delivery, link_ref->link) || qdr_invalidated_link_CT(in_delivery, link_ref->link))) { link_ref = DEQ_NEXT(link_ref); + } if (link_ref) { qdr_link_t *out_link = link_ref->link; + qdr_link_t *original_link = out_link; if (!receive_complete && out_link->conn->connection_info->streaming_links) { out_link = get_outgoing_streaming_link(core, out_link->conn); } if (out_link) { + if (!!in_delivery) { + in_delivery->chosen_link = original_link; + } + qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); qdr_forward_deliver_CT(core, out_link, out_delivery); @@ -783,37 +799,91 @@ int qdr_forward_closest_CT(qdr_core_t *core, qdr_forward_find_closest_remotes_CT(core, addr); // - // Forward to remote routers with subscribers using the appropriate - // link for the traffic class: control or data + // Get the mask bit associated with the ingress router for the message. + // This will be compared against the "valid_origin" masks for each + // candidate destination router. + // + int origin = 0; // default to this router + qd_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0; + + if (ingress_iter) { + qd_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH); + qdr_address_t *origin_addr; + qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr); + if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1) + qd_bitmask_first_set(origin_addr->rnodes, &origin); + } + + // + // Find a non-invalidated neighbor to send this delivery to. + // + int start_bit = addr->next_remote; + int chosen_conn_bit = -1; + + // + // Start by trying all of the least-cost routers // if (addr->next_remote >= 0) { - qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote]; - if (rnode) { + do { + qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote]; + if (!!rnode) { + const int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit; + if ((!in_delivery || !in_delivery->invalidated_neighbors || qd_bitmask_value(in_delivery->invalidated_neighbors, conn_bit) == 0) + && qd_bitmask_value(rnode->valid_origins, origin)) { + chosen_conn_bit = conn_bit; + } + } + + // + // Advance to the next router in the closest_remotes set + // _qdbm_next(addr->closest_remotes, &addr->next_remote); - if (addr->next_remote == -1) + if (addr->next_remote == -1) { qd_bitmask_first_set(addr->closest_remotes, &addr->next_remote); + } + } while (chosen_conn_bit < 0 && addr->next_remote != start_bit); + } - // get the inter-router connection associated with path to rnode: + // + // If all of the least-cost routers are invalidated, try the ordered list of routers that match this address + // + if (chosen_conn_bit < 0) { + qdr_node_t *rnode = DEQ_HEAD(core->routers); + while (!!rnode) { const int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit; - if (conn_bit >= 0) { - qdr_link_t *out_link; - if (control) { - out_link = peer_router_control_link(core, conn_bit); - } else if (!receive_complete) { - out_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[conn_bit]); - } else { - out_link = peer_router_data_link(core, conn_bit, qdr_forward_effective_priority(msg, addr)); - } + if (qd_bitmask_value(addr->rnodes, rnode->mask_bit) && (!in_delivery || !in_delivery->invalidated_neighbors || qd_bitmask_value(in_delivery->invalidated_neighbors, conn_bit) == 0) + && qd_bitmask_value(rnode->valid_origins, origin)) { + chosen_conn_bit = conn_bit; + break; + } + rnode = DEQ_NEXT(rnode); + } + } - if (out_link) { - qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); - qdr_forward_deliver_CT(core, out_link, out_delivery); - addr->deliveries_transit++; - if (out_link->link_type == QD_LINK_ROUTER) - core->deliveries_transit++; - return 1; - } + // + // Forward to remote routers with subscribers using the appropriate + // link for the traffic class: control or data + // + if (chosen_conn_bit >= 0) { + qdr_link_t *out_link; + if (control) { + out_link = peer_router_control_link(core, chosen_conn_bit); + } else if (!receive_complete) { + out_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[chosen_conn_bit]); + } else { + out_link = peer_router_data_link(core, chosen_conn_bit, qdr_forward_effective_priority(msg, addr)); + } + + if (out_link) { + if (!!in_delivery) { + in_delivery->chosen_neighbor = chosen_conn_bit; } + qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); + qdr_forward_deliver_CT(core, out_link, out_delivery); + addr->deliveries_transit++; + if (out_link->link_type == QD_LINK_ROUTER) + core->deliveries_transit++; + return 1; } } diff --git a/tests/system_tests_resend_released.py b/tests/system_tests_resend_released.py index 5e2010187..bf9ac31ec 100644 --- a/tests/system_tests_resend_released.py +++ b/tests/system_tests_resend_released.py @@ -47,6 +47,7 @@ def router(name, mode, connection1, connection2=None, extra=None, args=None): config = [ ('router', {'mode': mode, 'id': name}), ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), + ('address', {'prefix': 'cl', 'distribution': 'closest'}), connection1 ] @@ -100,86 +101,166 @@ def router(name, mode, connection1, connection2=None, extra=None, args=None): cls.ec1 = cls.routers[7].addresses[0] cls.ec2 = cls.routers[8].addresses[0] - def test_01_baseline_released(self): + def test_01_baseline_released_balanced(self): test = ResendReleasedTest(self.inta, [self.inta], [], 'resrel.01', 1) test.run() self.assertIsNone(test.error) - def test_02_baseline_accepted(self): + def test_02_baseline_accepted_balanced(self): test = ResendReleasedTest(self.inta, [], [self.inta], 'resrel.02', 0) test.run() self.assertIsNone(test.error) - def test_03_all_released_same_router(self): + def test_03_all_released_same_router_balanced(self): test = ResendReleasedTest(self.inta, [self.inta, self.inta, self.inta], [], 'resrel.03', 3) test.run() self.assertIsNone(test.error) - def test_04_all_released_remote_edge(self): + def test_04_all_released_remote_edge_balanced(self): test = ResendReleasedTest(self.inta, [self.ea1, self.ea1, self.ea1], [], 'resrel.04', 3) test.run() self.assertIsNone(test.error) - def test_05_all_released_remote_interior(self): + def test_05_all_released_remote_interior_balanced(self): test = ResendReleasedTest(self.inta, [self.intb, self.intb, self.intb], [], 'resrel.05', 3) test.run() self.assertIsNone(test.error) - def test_06_all_released_remote_interiors(self): + def test_06_all_released_remote_interiors_balanced(self): test = ResendReleasedTest(self.inta, [self.intb, self.intb, self.intb, self.intc, self.intc], [], 'resrel.06', 5, 0, 2) test.run() self.assertIsNone(test.error) - def test_07_all_released_interior_to_local_edges(self): + def test_07_all_released_interior_to_local_edges_balanced(self): test = ResendReleasedTest(self.inta, [self.ea1, self.ea1, self.ea2, self.ea2, self.ea2], [], 'resrel.07', 5, 2, 0) test.run() self.assertIsNone(test.error) - def test_08_all_released_interior_to_remote_edges(self): + def test_08_all_released_interior_to_remote_edges_balanced(self): test = ResendReleasedTest(self.inta, [self.ea1, self.ea1, self.eb2, self.ec1, self.ec2], [], 'resrel.08', 5, 1, 2) test.run() self.assertIsNone(test.error) - def test_09_all_released_edge_to_local_interior(self): + def test_09_all_released_edge_to_local_interior_balanced(self): test = ResendReleasedTest(self.ea1, [self.inta, self.inta, self.inta], [], 'resrel.09', 3) test.run() self.assertIsNone(test.error) - def test_10_all_released_edge_to_remote_interior(self): + def test_10_all_released_edge_to_remote_interior_balanced(self): test = ResendReleasedTest(self.ec1, [self.inta, self.inta, self.inta], [], 'resrel.10', 3) test.run() self.assertIsNone(test.error) - def test_11_all_released_edge_to_remote_edge(self): + def test_11_all_released_edge_to_remote_edge_balanced(self): test = ResendReleasedTest(self.ec1, [self.ea1, self.ea1, self.ea1], [], 'resrel.11', 3) test.run() self.assertIsNone(test.error) - def test_12_all_released_many(self): + def test_12_all_released_many_balanced(self): test = ResendReleasedTest(self.inta, [self.inta, self.inta, self.intb, self.intb, self.intc, self.intc, self.ea1, self.ea2], [], 'resrel.12', 8, 4, 2) test.run() self.assertIsNone(test.error) - def test_13_accept_same_interior(self): + def test_13_accept_same_interior_balanced(self): test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.inta], 'resrel.13') test.run() self.assertIsNone(test.error) - def test_14_accept_remote_interior(self): + def test_14_accept_remote_interior_balanced(self): test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.intb], 'resrel.14', 2, 2, 1) test.run() self.assertIsNone(test.error) - def test_15_accept_local_edge(self): + def test_15_accept_local_edge_balanced(self): test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.ea1], 'resrel.15', None, 3, 0) test.run() self.assertIsNone(test.error) - def test_16_accept_remote_edge(self): + def test_16_accept_remote_edge_balanced(self): test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.ec1], 'resrel.16', 2, 2, 1) test.run() self.assertIsNone(test.error) + def test_17_baseline_released_closest(self): + test = ResendReleasedTest(self.inta, [self.inta], [], 'cl.resrel.17', 1) + test.run() + self.assertIsNone(test.error) + + def test_18_baseline_accepted_closest(self): + test = ResendReleasedTest(self.inta, [], [self.inta], 'cl.resrel.18', 0) + test.run() + self.assertIsNone(test.error) + + def test_19_all_released_same_router_closest(self): + test = ResendReleasedTest(self.inta, [self.inta, self.inta, self.inta], [], 'cl.resrel.19', 3) + test.run() + self.assertIsNone(test.error) + + def test_20_all_released_remote_edge_closest(self): + test = ResendReleasedTest(self.inta, [self.ea1, self.ea1, self.ea1], [], 'cl.resrel.20', 3) + test.run() + self.assertIsNone(test.error) + + def test_21_all_released_remote_interior_closest(self): + test = ResendReleasedTest(self.inta, [self.intb, self.intb, self.intb], [], 'cl.resrel.21', 3) + test.run() + self.assertIsNone(test.error) + + def test_22_all_released_remote_interiors_closest(self): + test = ResendReleasedTest(self.inta, [self.intb, self.intb, self.intb, self.intc, self.intc], [], 'cl.resrel.22', 5, 0, 2) + test.run() + self.assertIsNone(test.error) + + def test_23_all_released_interior_to_local_edges_closest(self): + test = ResendReleasedTest(self.inta, [self.ea1, self.ea1, self.ea2, self.ea2, self.ea2], [], 'cl.resrel.23', 5, 2, 0) + test.run() + self.assertIsNone(test.error) + + def test_24_all_released_interior_to_remote_edges_closest(self): + test = ResendReleasedTest(self.inta, [self.ea1, self.ea1, self.eb2, self.ec1, self.ec2], [], 'cl.resrel.24', 5, 1, 2) + test.run() + self.assertIsNone(test.error) + + def test_25_all_released_edge_to_local_interior_closest(self): + test = ResendReleasedTest(self.ea1, [self.inta, self.inta, self.inta], [], 'cl.resrel.25', 3) + test.run() + self.assertIsNone(test.error) + + def test_26_all_released_edge_to_remote_interior_closest(self): + test = ResendReleasedTest(self.ec1, [self.inta, self.inta, self.inta], [], 'cl.resrel.26', 3) + test.run() + self.assertIsNone(test.error) + + def test_27_all_released_edge_to_remote_edge_closest(self): + test = ResendReleasedTest(self.ec1, [self.ea1, self.ea1, self.ea1], [], 'cl.resrel.27', 3) + test.run() + self.assertIsNone(test.error) + + def test_28_all_released_many_closest(self): + test = ResendReleasedTest(self.inta, [self.inta, self.inta, self.intb, self.intb, self.intc, self.intc, self.ea1, self.ea2], [], 'cl.resrel.28', 8, 4, 2) + test.run() + self.assertIsNone(test.error) + + def test_29_accept_same_interior_closest(self): + test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.inta], 'cl.resrel.29') + test.run() + self.assertIsNone(test.error) + + def test_30_accept_remote_interior_closest(self): + test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.intb], 'cl.resrel.30', 2, 2, 1) + test.run() + self.assertIsNone(test.error) + + def test_31_accept_local_edge_closest(self): + test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.ea1], 'cl.resrel.31', None, 3, 0) + test.run() + self.assertIsNone(test.error) + + def test_32_accept_remote_edge_closest(self): + test = ResendReleasedTest(self.inta, [self.inta, self.inta], [self.ec1], 'cl.resrel.32', 2, 2, 1) + test.run() + self.assertIsNone(test.error) + class ResendReleasedTest(MessagingHandler): def __init__(self, sender_host, release_hosts, accept_hosts, addr, expected_releases=None, wait_local=0, wait_remote=0):