diff --git a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/ClusterEntry.java b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/ClusterEntry.java index 9f69638920f..ac734fc5372 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/ClusterEntry.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/ClusterEntry.java @@ -64,7 +64,7 @@ public class ClusterEntry implements Consumer>, AsyncCloseable { // only cluster.getLbPolicy() == ROUND_ROBIN is supported for now endpointSelectionStrategy = EndpointSelectionStrategy.weightedRoundRobin(); if (cluster.hasLbSubsetConfig()) { - loadBalancer = new SubsetLoadBalancer(cluster.getLbSubsetConfig()); + loadBalancer = new SubsetLoadBalancer(clusterSnapshot, cluster.getLbSubsetConfig()); } else { loadBalancer = new ZoneAwareLoadBalancer(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/PrioritySet.java b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/PrioritySet.java index bb04d30a243..7b57f9ad681 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/PrioritySet.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/PrioritySet.java @@ -96,6 +96,7 @@ private static int panicThreshold(Cluster cluster) { HostSet getOrCreateHostSet(int priority, UpdateHostsParam params, Map localityWeightsMap, boolean weightedPriorityHealth, int overProvisioningFactor) { + priorities.add(priority); return hostSets.computeIfAbsent(priority, ignored -> new HostSet(priority, params, localityWeightsMap, weightedPriorityHealth, @@ -105,7 +106,6 @@ HostSet getOrCreateHostSet(int priority, UpdateHostsParam params, Map localityWeightsMap, boolean weightedPriorityHealth, int overProvisioningFactor) { - priorities.add(priority); getOrCreateHostSet(priority, params, localityWeightsMap, weightedPriorityHealth, overProvisioningFactor); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/SubsetLoadBalancer.java b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/SubsetLoadBalancer.java index 6791b0a3af5..99ba3b98a17 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/SubsetLoadBalancer.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/SubsetLoadBalancer.java @@ -16,7 +16,6 @@ package com.linecorp.armeria.xds.endpoint; -import java.io.Serial; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -221,14 +220,14 @@ Endpoint chooseHostForSelectorFallbackPolicy(SubsetSelector subsetSelector, Load LbSubsetEntry initSubsetAnyOnce() { - if (subsetAny != null) { + if (subsetAny == null) { subsetAny = new LbSubsetEntry(); } return subsetAny; } LbSubsetEntry initSubsetDefaultOnce() { - if (subsetDefault != null) { + if (subsetDefault == null) { subsetDefault = new LbSubsetEntry(); } return subsetDefault; @@ -457,7 +456,6 @@ public int compareTo(SubsetSelector o) { static class CollectionComparator implements Comparator>, Serializable { static final CollectionComparator INSTANCE = new CollectionComparator(); - @Serial private static final long serialVersionUID = 5835645231445633543L; @Override diff --git a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/XdsEndpointSelector.java b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/XdsEndpointSelector.java index b3162d1ddab..774f6527ceb 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/endpoint/XdsEndpointSelector.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/endpoint/XdsEndpointSelector.java @@ -33,6 +33,7 @@ final class XdsEndpointSelector extends AbstractEndpointSelector XdsEndpointSelector(ClusterManager clusterManager, EndpointGroup endpointGroup) { super(endpointGroup); this.clusterManager = clusterManager; + initialize(); } @Override diff --git a/xds/src/test/java/com/linecorp/armeria/xds/RouteMetadataSubsetTest.java b/xds/src/test/java/com/linecorp/armeria/xds/RouteMetadataSubsetTest.java index ac8548cfe27..9251fdf869f 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/RouteMetadataSubsetTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/RouteMetadataSubsetTest.java @@ -16,13 +16,14 @@ package com.linecorp.armeria.xds; -import static com.linecorp.armeria.xds.internal.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME; import static com.linecorp.armeria.xds.XdsConverterUtilTest.sampleClusterLoadAssignment; import static com.linecorp.armeria.xds.XdsTestResources.stringValue; +import static com.linecorp.armeria.xds.internal.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import java.net.URI; +import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -31,8 +32,11 @@ import com.google.protobuf.Any; import com.google.protobuf.Struct; +import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -133,8 +137,10 @@ void routeMetadataMatch() { bootstrapCluster); try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { final EndpointGroup xdsEndpointGroup = XdsEndpointGroup.of(xdsBootstrap.listenerRoot("listener")); - await().untilAsserted(() -> assertThat(xdsEndpointGroup.endpoints()) - .containsExactly(Endpoint.of("127.0.0.1", 8082))); + await().untilAsserted(() -> assertThat(collectEndpoints(xdsEndpointGroup)) + .containsExactly(Endpoint.of("127.0.0.1", 8082), + Endpoint.of("127.0.0.1", 8082), + Endpoint.of("127.0.0.1", 8082))); } // No metadata. Fallback to all endpoints. @@ -144,9 +150,9 @@ void routeMetadataMatch() { bootstrapCluster); try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { final EndpointGroup xdsEndpointGroup = XdsEndpointGroup.of(xdsBootstrap.listenerRoot("listener")); - await().untilAsserted(() -> assertThat(xdsEndpointGroup.endpoints()) - .containsExactlyInAnyOrder(Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081), - Endpoint.of("127.0.0.1", 8082))); + assertThat(collectEndpoints(xdsEndpointGroup)) + .containsExactly(Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081), + Endpoint.of("127.0.0.1", 8082)); } // No matched metadata. Fallback to all endpoints. @@ -158,12 +164,24 @@ void routeMetadataMatch() { bootstrapCluster); try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { final EndpointGroup xdsEndpointGroup = XdsEndpointGroup.of(xdsBootstrap.listenerRoot("listener")); - await().untilAsserted(() -> assertThat(xdsEndpointGroup.endpoints()) + await().untilAsserted(() -> assertThat(collectEndpoints(xdsEndpointGroup)) .containsExactlyInAnyOrder(Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081), Endpoint.of("127.0.0.1", 8082))); } } + List collectEndpoints(EndpointGroup endpointGroup) { + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + await().untilAsserted(() -> assertThat(endpointGroup.endpoints()).isNotEmpty()); + final ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0; i < 3; i++) { + final Endpoint endpoint = endpointGroup.selectNow(ctx); + assertThat(endpoint).isNotNull(); + builder.add(endpoint); + } + return builder.build(); + } + private static Listener listener(Metadata metadata) { final VirtualHost virtualHost = VirtualHost.newBuilder()