diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java index 1f59616ccfa..f303ceb2eba 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java @@ -30,16 +30,27 @@ import com.linecorp.armeria.xds.ClusterSnapshot; import com.linecorp.armeria.xds.EndpointSnapshot; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; + final class ClusterEntry implements Consumer>, AsyncCloseable { private final EndpointGroup endpointGroup; + private final Cluster cluster; + private final ClusterLoadAssignment clusterLoadAssignment; private final LoadBalancer loadBalancer; private List endpoints = ImmutableList.of(); ClusterEntry(ClusterSnapshot clusterSnapshot, ClusterManager clusterManager) { final EndpointSnapshot endpointSnapshot = clusterSnapshot.endpointSnapshot(); assert endpointSnapshot != null; - loadBalancer = new SubsetLoadBalancer(clusterSnapshot); + cluster = clusterSnapshot.xdsResource().resource(); + clusterLoadAssignment = endpointSnapshot.xdsResource().resource(); + if (cluster.hasLbSubsetConfig()) { + loadBalancer = new SubsetLoadBalancer(clusterSnapshot); + } else { + loadBalancer = new ZoneAwareLoadBalancer(); + } // The order of adding listeners is important endpointGroup = XdsEndpointUtil.convertEndpointGroup(clusterSnapshot); @@ -55,7 +66,12 @@ Endpoint selectNow(ClientRequestContext ctx) { @Override public void accept(List endpoints) { this.endpoints = ImmutableList.copyOf(endpoints); - final PrioritySet prioritySet = new PrioritySet(endpoints); + final PriorityStateManager priorityStateManager = + new PriorityStateManager(cluster, clusterLoadAssignment); + for (Endpoint endpoint: endpoints) { + priorityStateManager.registerEndpoint(endpoint); + } + final PrioritySet prioritySet = priorityStateManager.build(); loadBalancer.prioritySetUpdated(prioritySet); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointGroupUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointGroupUtil.java new file mode 100644 index 00000000000..0d1aed14cdf --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointGroupUtil.java @@ -0,0 +1,87 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.locality; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +final class EndpointGroupUtil { + + static Map> endpointsByLocality(List endpoints) { + final Map> endpointsPerLocality = new HashMap<>(); + for (Endpoint endpoint : endpoints) { + endpointsPerLocality.computeIfAbsent(locality(endpoint), ignored -> new ArrayList<>()) + .add(endpoint); + } + return ImmutableMap.copyOf(endpointsPerLocality); + } + + static EndpointGroup filter(List endpoints, EndpointSelectionStrategy strategy, + Predicate predicate) { + final List filteredEndpoints = + endpoints.stream().filter(predicate).collect(Collectors.toList()); + return EndpointGroup.of(strategy, filteredEndpoints); + } + + static EndpointGroup filter(EndpointGroup origEndpointGroup, Predicate predicate) { + return filter(origEndpointGroup.endpoints(), origEndpointGroup.selectionStrategy(), predicate); + } + + static Map filterByLocality(Map> endpointsMap, + EndpointSelectionStrategy strategy, + Predicate predicate) { + final ImmutableMap.Builder filteredLocality = ImmutableMap.builder(); + for (Entry> entry: endpointsMap.entrySet()) { + final EndpointGroup endpointGroup = filter(entry.getValue(), strategy, predicate); + if (endpointGroup.endpoints().isEmpty()) { + continue; + } + filteredLocality.put(entry.getKey(), endpointGroup); + } + return filteredLocality.build(); + } + + static Map filterByLocality(Map origLocality, + Predicate predicate) { + final ImmutableMap.Builder filteredLocality = ImmutableMap.builder(); + for (Entry entry: origLocality.entrySet()) { + final EndpointGroup endpointGroup = filter(entry.getValue(), predicate); + if (endpointGroup.endpoints().isEmpty()) { + continue; + } + filteredLocality.put(entry.getKey(), endpointGroup); + } + return filteredLocality.build(); + } + + private EndpointGroupUtil() {} +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointUtil.java new file mode 100644 index 00000000000..23c787a04c3 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointUtil.java @@ -0,0 +1,137 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.concurrent.ThreadLocalRandom; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.Metadata; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; + +final class EndpointUtil { + + static Metadata metadata(Endpoint endpoint) { + return lbEndpoint(endpoint).getMetadata(); + } + + static Locality locality(Endpoint endpoint) { + final LocalityLbEndpoints localityLbEndpoints = localityLbEndpoints(endpoint); + return localityLbEndpoints.hasLocality() ? localityLbEndpoints.getLocality() + : Locality.getDefaultInstance(); + } + + static CoarseHealth coarseHealth(Endpoint endpoint) { + final LbEndpoint lbEndpoint = lbEndpoint(endpoint); + switch (lbEndpoint.getHealthStatus()) { + // Assume UNKNOWN means health check wasn't performed + case UNKNOWN: + case HEALTHY: + return CoarseHealth.HEALTHY; + case DEGRADED: + return CoarseHealth.DEGRADED; + default: + return CoarseHealth.UNHEALTHY; + } + } + + static int hash(ClientRequestContext ctx) { + if (ctx.hasAttr(XdsAttributesKeys.SELECTION_HASH)) { + return ctx.attr(XdsAttributesKeys.SELECTION_HASH); + } + return ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE); + } + + static int priority(Endpoint endpoint) { + return localityLbEndpoints(endpoint).getPriority(); + } + + static boolean hasLoadBalancingWeight(Endpoint endpoint) { + return lbEndpoint(endpoint).hasLoadBalancingWeight(); + } + + private static LbEndpoint lbEndpoint(Endpoint endpoint) { + final LbEndpoint lbEndpoint = endpoint.attr(XdsAttributesKeys.LB_ENDPOINT_KEY); + assert lbEndpoint != null; + return lbEndpoint; + } + + private static LocalityLbEndpoints localityLbEndpoints(Endpoint endpoint) { + final LocalityLbEndpoints localityLbEndpoints = endpoint.attr( + XdsAttributesKeys.LOCALITY_LB_ENDPOINTS_KEY); + assert localityLbEndpoints != null; + return localityLbEndpoints; + } + + static EndpointSelectionStrategy selectionStrategy(Cluster cluster) { + switch (cluster.getLbPolicy()) { + case ROUND_ROBIN: + return EndpointSelectionStrategy.weightedRoundRobin(); + case RANDOM: + return EndpointSelectionStrategy.roundRobin(); + case RING_HASH: + // implementing this is trivial so it will be done separately + default: + return EndpointSelectionStrategy.weightedRoundRobin(); + } + } + + static int overProvisionFactor(ClusterLoadAssignment clusterLoadAssignment) { + if (!clusterLoadAssignment.hasPolicy()) { + return 140; + } + final Policy policy = clusterLoadAssignment.getPolicy(); + final int overProvisionFactor; + overProvisionFactor = + policy.hasOverprovisioningFactor() ? policy.getOverprovisioningFactor().getValue() : 140; + return overProvisionFactor; + } + + static boolean weightedPriorityHealth(ClusterLoadAssignment clusterLoadAssignment) { + final boolean weightedPriorityHealth; + final Policy policy = clusterLoadAssignment.getPolicy(); + weightedPriorityHealth = policy.getWeightedPriorityHealth(); + return weightedPriorityHealth; + } + + static int panicThreshold(Cluster cluster) { + if (!cluster.hasCommonLbConfig()) { + return 50; + } + final CommonLbConfig commonLbConfig = cluster.getCommonLbConfig(); + if (!commonLbConfig.hasHealthyPanicThreshold()) { + return 50; + } + return Math.min((int) Math.round(commonLbConfig.getHealthyPanicThreshold().getValue()), 100); + } + + enum CoarseHealth { + HEALTHY, + DEGRADED, + UNHEALTHY, + } + + private EndpointUtil() {} +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/HostSet.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/HostSet.java new file mode 100644 index 00000000000..5bf751da49c --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/HostSet.java @@ -0,0 +1,184 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.client.endpoint.WeightedRandomDistributionSelector; + +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; + +final class HostSet { + private final Map localityWeightsMap; + private final boolean weightedPriorityHealth; + private final int overProvisioningFactor; + + private final WeightedRandomDistributionSelector healthyLocalitySelector; + private final WeightedRandomDistributionSelector degradedLocalitySelector; + + private final EndpointGroup hostsEndpointGroup; + private final Map endpointGroupPerLocality; + private final EndpointGroup healthyHostsEndpointGroup; + private final Map healthyEndpointGroupPerLocality; + private final EndpointGroup degradedHostsEndpointGroup; + private final Map degradedEndpointGroupPerLocality; + + HostSet(UpdateHostsParam params, ClusterLoadAssignment clusterLoadAssignment) { + localityWeightsMap = params.localityWeightsMap(); + weightedPriorityHealth = EndpointUtil.weightedPriorityHealth(clusterLoadAssignment); + overProvisioningFactor = EndpointUtil.overProvisionFactor(clusterLoadAssignment); + + healthyLocalitySelector = rebuildLocalityScheduler( + params.healthyHostsPerLocality(), params.hostsPerLocality(), + localityWeightsMap, overProvisioningFactor); + degradedLocalitySelector = rebuildLocalityScheduler( + params.degradedHostsPerLocality(), params.hostsPerLocality(), + localityWeightsMap, overProvisioningFactor); + + hostsEndpointGroup = params.hosts(); + healthyHostsEndpointGroup = params.healthyHosts(); + degradedHostsEndpointGroup = params.degradedHosts(); + endpointGroupPerLocality = params.hostsPerLocality(); + healthyEndpointGroupPerLocality = params.healthyHostsPerLocality(); + degradedEndpointGroupPerLocality = params.degradedHostsPerLocality(); + } + + Map localityWeightsMap() { + return localityWeightsMap; + } + + List hosts() { + return hostsEndpointGroup.endpoints(); + } + + EndpointGroup hostsEndpointGroup() { + return hostsEndpointGroup; + } + + Map endpointGroupPerLocality() { + return endpointGroupPerLocality; + } + + List healthyHosts() { + return healthyHostsEndpointGroup.endpoints(); + } + + EndpointGroup healthyHostsEndpointGroup() { + return healthyHostsEndpointGroup; + } + + Map healthyEndpointGroupPerLocality() { + return healthyEndpointGroupPerLocality; + } + + List degradedHosts() { + return degradedHostsEndpointGroup.endpoints(); + } + + EndpointGroup degradedHostsEndpointGroup() { + return degradedHostsEndpointGroup; + } + + Map degradedEndpointGroupPerLocality() { + return degradedEndpointGroupPerLocality; + } + + boolean weightedPriorityHealth() { + return weightedPriorityHealth; + } + + int overProvisioningFactor() { + return overProvisioningFactor; + } + + private static WeightedRandomDistributionSelector rebuildLocalityScheduler( + Map eligibleHostsPerLocality, + Map allHostsPerLocality, + Map localityWeightsMap, + int overProvisioningFactor) { + final ImmutableList.Builder localityWeightsBuilder = ImmutableList.builder(); + for (Locality locality : allHostsPerLocality.keySet()) { + final double effectiveWeight = + effectiveLocalityWeight(locality, eligibleHostsPerLocality, allHostsPerLocality, + localityWeightsMap, overProvisioningFactor); + if (effectiveWeight > 0) { + localityWeightsBuilder.add(new LocalityEntry(locality, effectiveWeight)); + } + } + return new WeightedRandomDistributionSelector<>(localityWeightsBuilder.build()); + } + + static double effectiveLocalityWeight(Locality locality, + Map eligibleHostsPerLocality, + Map allHostsPerLocality, + Map localityWeightsMap, + int overProvisioningFactor) { + final EndpointGroup localityEligibleHosts = + eligibleHostsPerLocality.getOrDefault(locality, EndpointGroup.of()); + final int hostCount = allHostsPerLocality.getOrDefault(locality, EndpointGroup.of()).endpoints().size(); + if (hostCount <= 0) { + return 0; + } + final double localityAvailabilityRatio = 1.0 * localityEligibleHosts.endpoints().size() / hostCount; + final int weight = localityWeightsMap.getOrDefault(locality, 0); + final double effectiveLocalityAvailabilityRatio = + Math.min(1.0, (overProvisioningFactor / 100.0) * localityAvailabilityRatio); + return weight * effectiveLocalityAvailabilityRatio; + } + + @Nullable + Locality chooseDegradedLocality() { + final LocalityEntry localityEntry = degradedLocalitySelector.select(); + if (localityEntry == null) { + return null; + } + return localityEntry.locality; + } + + @Nullable + Locality chooseHealthyLocality() { + final LocalityEntry localityEntry = healthyLocalitySelector.select(); + if (localityEntry == null) { + return null; + } + return localityEntry.locality; + } + + static class LocalityEntry extends WeightedRandomDistributionSelector.AbstractEntry { + + private final Locality locality; + private final int weight; + + LocalityEntry(Locality locality, double weight) { + this.locality = locality; + this.weight = Ints.saturatedCast(Math.round(weight)); + } + + @Override + public int weight() { + return weight; + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/MetadataUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/MetadataUtil.java index 3cbe7ea0260..6cafdebb4ff 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/MetadataUtil.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/MetadataUtil.java @@ -18,50 +18,116 @@ import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME; -import java.util.Map; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; -import com.google.protobuf.ProtocolStringList; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ListValue; import com.google.protobuf.Struct; import com.google.protobuf.Value; +import com.google.protobuf.Value.KindCase; import com.linecorp.armeria.xds.ClusterSnapshot; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetSelector; +import io.envoyproxy.envoy.config.core.v3.Metadata; import io.envoyproxy.envoy.config.route.v3.Route; import io.envoyproxy.envoy.config.route.v3.RouteAction; final class MetadataUtil { + static boolean metadataLabelMatch(Struct labelSet, Metadata hostMetadata, + String filterKey, boolean listAsAny) { + if (hostMetadata == Metadata.getDefaultInstance()) { + return labelSet.getFieldsMap().isEmpty(); + } + if (!hostMetadata.containsFilterMetadata(filterKey)) { + return labelSet.getFieldsMap().isEmpty(); + } + final Struct dataStruct = hostMetadata.getFilterMetadataOrThrow(filterKey); + for (Entry kv: labelSet.getFieldsMap().entrySet()) { + if (!dataStruct.getFieldsMap().containsKey(kv.getKey())) { + return false; + } + final Value value = dataStruct.getFieldsOrThrow(kv.getKey()); + if (listAsAny && value.getKindCase() == KindCase.LIST_VALUE) { + boolean anyMatch = false; + for (Value innerValue: value.getListValue().getValuesList()) { + if (Objects.equals(kv.getValue(), innerValue)) { + anyMatch = true; + break; + } + } + if (!anyMatch) { + return false; + } + } else if (!Objects.equals(kv.getValue(), value)) { + return false; + } + } + return true; + } + static Struct filterMetadata(ClusterSnapshot clusterSnapshot) { final Route route = clusterSnapshot.route(); if (route == null) { return Struct.getDefaultInstance(); } final RouteAction action = route.getRoute(); - return action.getMetadataMatch().getFilterMetadataOrDefault(SUBSET_LOAD_BALANCING_FILTER_NAME, - Struct.getDefaultInstance()); - } - - static boolean findMatchedSubsetSelector(LbSubsetConfig lbSubsetConfig, Struct filterMetadata) { - for (LbSubsetSelector subsetSelector : lbSubsetConfig.getSubsetSelectorsList()) { - final ProtocolStringList keysList = subsetSelector.getKeysList(); - if (filterMetadata.getFieldsCount() != keysList.size()) { + final Struct metadata = action.getMetadataMatch().getFilterMetadataOrDefault( + SUBSET_LOAD_BALANCING_FILTER_NAME, + Struct.getDefaultInstance()); + if (!metadata.containsFields(XdsConstants.ENVOY_LB_FALLBACK_LIST)) { + return metadata; + } + final Struct.Builder builder = Struct.newBuilder(); + for (Entry entry: metadata.getFieldsMap().entrySet()) { + if (XdsConstants.ENVOY_LB_FALLBACK_LIST.equals(entry.getKey())) { continue; } - boolean found = true; - final Map filterMetadataMap = filterMetadata.getFieldsMap(); - for (String key : filterMetadataMap.keySet()) { - if (!keysList.contains(key)) { - found = false; - break; - } + builder.putFields(entry.getKey(), entry.getValue()); + } + return builder.build(); + } + + static List fallbackMetadataList(ClusterSnapshot clusterSnapshot) { + final Route route = clusterSnapshot.route(); + if (route == null) { + return Collections.emptyList(); + } + final RouteAction action = route.getRoute(); + final Struct metadata = action.getMetadataMatch().getFilterMetadataOrDefault( + SUBSET_LOAD_BALANCING_FILTER_NAME, + Struct.getDefaultInstance()); + if (!metadata.containsFields(XdsConstants.ENVOY_LB_FALLBACK_LIST)) { + return Collections.emptyList(); + } + final Value fallbackValue = + metadata.getFieldsOrDefault(XdsConstants.ENVOY_LB_FALLBACK_LIST, + Value.getDefaultInstance()); + if (!fallbackValue.hasListValue()) { + return Collections.emptyList(); + } + final ListValue fallbackListValue = fallbackValue.getListValue(); + final ImmutableList.Builder fallbackMetadataList = ImmutableList.builder(); + for (Value value: fallbackListValue.getValuesList()) { + if (value.hasStructValue()) { + fallbackMetadataList.add(value.getStructValue()); } - if (found) { - return true; + } + return fallbackMetadataList.build(); + } + + static Struct withFilterKeys(Struct filterMetadata, Set subsetKeys) { + final Struct.Builder structBuilder = Struct.newBuilder(); + for (Entry entry: filterMetadata.getFieldsMap().entrySet()) { + if (subsetKeys.contains(entry.getKey())) { + structBuilder.putFields(entry.getKey(), entry.getValue()); } } - return false; + return structBuilder.build(); } private MetadataUtil() {} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java index a73f5775b8f..6ec6bf3837c 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java @@ -16,20 +16,96 @@ package com.linecorp.armeria.xds.client.endpoint; -import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; -import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.common.annotation.Nullable; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; final class PrioritySet { - private final List endpoints; + private final Map hostSets; + private final SortedSet priorities; + private final Cluster cluster; + private final ClusterLoadAssignment clusterLoadAssignment; + private final int panicThreshold; + + PrioritySet(Cluster cluster, ClusterLoadAssignment clusterLoadAssignment, + Map hostSets) { + this.cluster = cluster; + this.clusterLoadAssignment = clusterLoadAssignment; + panicThreshold = EndpointUtil.panicThreshold(cluster); + this.hostSets = hostSets; + priorities = new TreeSet<>(hostSets.keySet()); + } + + boolean failTrafficOnPanic() { + final CommonLbConfig commonLbConfig = commonLbConfig(); + if (commonLbConfig == null) { + return false; + } + if (!commonLbConfig.hasZoneAwareLbConfig()) { + return false; + } + return commonLbConfig.getZoneAwareLbConfig().getFailTrafficOnPanic(); + } + + @Nullable + private CommonLbConfig commonLbConfig() { + if (!cluster.hasCommonLbConfig()) { + return null; + } + return cluster.getCommonLbConfig(); + } + + boolean localityWeightedBalancing() { + final CommonLbConfig commonLbConfig = commonLbConfig(); + if (commonLbConfig == null) { + return false; + } + return commonLbConfig.hasLocalityWeightedLbConfig(); + } - PrioritySet(List endpoints) { - this.endpoints = ImmutableList.copyOf(endpoints); + int panicThreshold() { + return panicThreshold; } - List endpoints() { - return endpoints; + SortedSet priorities() { + return priorities; + } + + Map hostSets() { + return hostSets; + } + + static final class PrioritySetBuilder { + + private final ImmutableMap.Builder hostSetsBuilder = ImmutableMap.builder(); + private final Cluster cluster; + private final ClusterLoadAssignment clusterLoadAssignment; + + PrioritySetBuilder(PrioritySet prioritySet) { + cluster = prioritySet.cluster; + clusterLoadAssignment = prioritySet.clusterLoadAssignment; + } + + PrioritySetBuilder(Cluster cluster, ClusterLoadAssignment clusterLoadAssignment) { + this.cluster = cluster; + this.clusterLoadAssignment = clusterLoadAssignment; + } + + void createHostSet(int priority, UpdateHostsParam params) { + final HostSet hostSet = new HostSet(params, clusterLoadAssignment); + hostSetsBuilder.put(priority, hostSet); + } + + PrioritySet build() { + return new PrioritySet(cluster, clusterLoadAssignment, hostSetsBuilder.build()); + } } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityState.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityState.java new file mode 100644 index 00000000000..bd3336d5a52 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityState.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointGroupUtil.endpointsByLocality; +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.locality; +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.selectionStrategy; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.Endpoint; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.core.v3.Locality; + +final class PriorityState { + private final UpdateHostsParam param; + + PriorityState(List hosts, Map localityWeightsMap, + Cluster cluster) { + final Map> endpointsPerLocality = endpointsByLocality(hosts); + param = new UpdateHostsParam(hosts, endpointsPerLocality, localityWeightsMap, + selectionStrategy(cluster)); + } + + UpdateHostsParam param() { + return param; + } + + static final class PriorityStateBuilder { + + private final ImmutableList.Builder hostsBuilder = ImmutableList.builder(); + private final ImmutableMap.Builder localityWeightsBuilder = + ImmutableMap.builder(); + private final Cluster cluster; + + PriorityStateBuilder(Cluster cluster) { + this.cluster = cluster; + } + + void addEndpoint(Endpoint endpoint) { + hostsBuilder.add(endpoint); + if (locality(endpoint) != Locality.getDefaultInstance() && + EndpointUtil.hasLoadBalancingWeight(endpoint)) { + localityWeightsBuilder.put(locality(endpoint), endpoint.weight()); + } + } + + PriorityState build() { + return new PriorityState(hostsBuilder.build(), localityWeightsBuilder.buildKeepingLast(), + cluster); + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityStateManager.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityStateManager.java new file mode 100644 index 00000000000..8411a55cbe1 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityStateManager.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.priority; + +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; + +import com.linecorp.armeria.client.Endpoint; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; + +final class PriorityStateManager { + + private final SortedMap priorityStateMap = new TreeMap<>(); + private final Cluster cluster; + private final ClusterLoadAssignment clusterLoadAssignment; + + PriorityStateManager(Cluster cluster, ClusterLoadAssignment clusterLoadAssignment) { + this.cluster = cluster; + this.clusterLoadAssignment = clusterLoadAssignment; + } + + void registerEndpoint(Endpoint endpoint) { + final PriorityState.PriorityStateBuilder priorityStateBuilder = + priorityStateMap.computeIfAbsent(priority(endpoint), + ignored -> new PriorityState.PriorityStateBuilder(cluster)); + priorityStateBuilder.addEndpoint(endpoint); + } + + PrioritySet build() { + final PrioritySet.PrioritySetBuilder prioritySetBuilder = + new PrioritySet.PrioritySetBuilder(cluster, clusterLoadAssignment); + for (Entry entry: priorityStateMap.entrySet()) { + final Integer priority = entry.getKey(); + final PriorityState priorityState = entry.getValue().build(); + prioritySetBuilder.createHostSet(priority, priorityState.param()); + } + return prioritySetBuilder.build(); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetInfo.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetInfo.java new file mode 100644 index 00000000000..efa15f1f573 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetInfo.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import com.google.common.collect.ImmutableSet; +import com.google.protobuf.ProtocolStringList; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetSelector; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetSelector.LbSubsetSelectorFallbackPolicy; + +class SubsetInfo { + + private final Set subsetSelectors; + + SubsetInfo(LbSubsetConfig config) { + final Set subsetSelectors = new HashSet<>(); + for (LbSubsetSelector selector : config.getSubsetSelectorsList()) { + final ProtocolStringList keys = selector.getKeysList(); + if (keys.isEmpty()) { + continue; + } + subsetSelectors.add(new SubsetSelector(selector.getKeysList(), selector.getFallbackPolicy(), + selector.getFallbackKeysSubsetList())); + } + this.subsetSelectors = ImmutableSet.copyOf(subsetSelectors); + } + + Set subsetSelectors() { + return subsetSelectors; + } + + static class SubsetSelector { + + private final SortedSet keys; + private final LbSubsetSelectorFallbackPolicy fallbackPolicy; + private final Set fallbackKeysSubset; + + SubsetSelector(List keys, LbSubsetSelectorFallbackPolicy fallbackPolicy, + List fallbackKeysSubsetList) { + this.keys = new TreeSet<>(keys); + this.fallbackPolicy = fallbackPolicy; + fallbackKeysSubset = ImmutableSet.copyOf(fallbackKeysSubsetList); + } + + Set fallbackKeysSubset() { + return fallbackKeysSubset; + } + + LbSubsetSelectorFallbackPolicy fallbackPolicy() { + return fallbackPolicy; + } + + SortedSet keys() { + return keys; + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java index 6cb0ddb2061..3f28bb4f621 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java @@ -16,86 +16,304 @@ package com.linecorp.armeria.xds.client.endpoint; -import static com.linecorp.armeria.xds.client.endpoint.MetadataUtil.filterMetadata; -import static com.linecorp.armeria.xds.client.endpoint.MetadataUtil.findMatchedSubsetSelector; -import static com.linecorp.armeria.xds.client.endpoint.XdsEndpointUtil.convertEndpoints; +import static com.linecorp.armeria.xds.client.endpoint.MetadataUtil.withFilterKeys; +import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.function.Predicate; +import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import com.google.protobuf.Value.KindCase; import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.Endpoint; -import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.xds.ClusterSnapshot; +import com.linecorp.armeria.xds.client.endpoint.PrioritySet.PrioritySetBuilder; +import com.linecorp.armeria.xds.client.endpoint.SubsetInfo.SubsetSelector; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetFallbackPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetMetadataFallbackPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetSelector.LbSubsetSelectorFallbackPolicy; +import io.envoyproxy.envoy.config.core.v3.Metadata; final class SubsetLoadBalancer implements LoadBalancer { - private static final Logger logger = LoggerFactory.getLogger(SubsetLoadBalancer.class); - private final ClusterSnapshot clusterSnapshot; + private final LbSubsetConfig lbSubsetConfig; + private final SubsetInfo subsetInfo; @Nullable - private volatile EndpointGroup endpointGroup; + private LbState lbState; + private final Struct filterMetadata; SubsetLoadBalancer(ClusterSnapshot clusterSnapshot) { this.clusterSnapshot = clusterSnapshot; + lbSubsetConfig = clusterSnapshot.xdsResource().resource().getLbSubsetConfig(); + subsetInfo = new SubsetInfo(lbSubsetConfig); + filterMetadata = MetadataUtil.filterMetadata(clusterSnapshot); } @Override @Nullable public Endpoint selectNow(ClientRequestContext ctx) { - final EndpointGroup endpointGroup = this.endpointGroup; - if (endpointGroup == null) { + if (lbState == null) { return null; } - return endpointGroup.selectNow(ctx); + return lbState.chooseHost(ctx, filterMetadata); } @Override public void prioritySetUpdated(PrioritySet prioritySet) { - endpointGroup = createEndpointGroup(prioritySet); + lbState = new LbState(prioritySet, subsetInfo, lbSubsetConfig, clusterSnapshot); } - private EndpointGroup createEndpointGroup(PrioritySet prioritySet) { - final Struct filterMetadata = filterMetadata(clusterSnapshot); - if (filterMetadata.getFieldsCount() == 0) { - // No metadata. Use the whole endpoints. - return createEndpointGroup(prioritySet.endpoints()); + static class LbState { + + private final PrioritySet origPrioritySet; + private final SubsetInfo subsetInfo; + private final LbSubsetMetadataFallbackPolicy metadataFallbackPolicy; + @Nullable + private ZoneAwareLoadBalancer subsetAny; + @Nullable + private ZoneAwareLoadBalancer subsetDefault; + @Nullable + private ZoneAwareLoadBalancer fallbackSubset; + @Nullable + private ZoneAwareLoadBalancer panicModeSubset; + private final Map, SubsetSelector> selectorMap; + private final Map subsets; + private final boolean listAsAny; + private final boolean scaleLocalityWeight; + + private final List fallbackMetadataList; + + LbState(PrioritySet origPrioritySet, SubsetInfo subsetInfo, LbSubsetConfig lbSubsetConfig, + ClusterSnapshot clusterSnapshot) { + this.origPrioritySet = origPrioritySet; + this.subsetInfo = subsetInfo; + listAsAny = lbSubsetConfig.getListAsAny(); + scaleLocalityWeight = lbSubsetConfig.getScaleLocalityWeight(); + metadataFallbackPolicy = lbSubsetConfig.getMetadataFallbackPolicy(); + fallbackMetadataList = MetadataUtil.fallbackMetadataList(clusterSnapshot); + + final Struct defaultSubsetMetadata = lbSubsetConfig.getDefaultSubset(); + if (lbSubsetConfig.getFallbackPolicy() != LbSubsetFallbackPolicy.NO_FALLBACK) { + if (lbSubsetConfig.getFallbackPolicy() == LbSubsetFallbackPolicy.ANY_ENDPOINT) { + fallbackSubset = initSubsetAnyOnce(); + } else { + fallbackSubset = initSubsetDefaultOnce(defaultSubsetMetadata); + } + } + if (lbSubsetConfig.getPanicModeAny()) { + panicModeSubset = initSubsetAnyOnce(); + } + selectorMap = initSubsetSelectorMap(subsetInfo, defaultSubsetMetadata); + subsets = refreshSubsets(); } - final Cluster cluster = clusterSnapshot.xdsResource().resource(); - final LbSubsetConfig lbSubsetConfig = cluster.getLbSubsetConfig(); - if (lbSubsetConfig == LbSubsetConfig.getDefaultInstance()) { - // No lbSubsetConfig. Use the whole endpoints. - return createEndpointGroup(prioritySet.endpoints()); + @Nullable + Endpoint chooseHost(ClientRequestContext ctx, Struct filterMetadata) { + if (metadataFallbackPolicy != LbSubsetMetadataFallbackPolicy.FALLBACK_LIST) { + return chooseHostIteration(ctx, filterMetadata); + } + if (fallbackMetadataList.isEmpty()) { + return chooseHostIteration(ctx, filterMetadata); + } + for (Struct struct: fallbackMetadataList) { + final Endpoint endpoint = chooseHostIteration(ctx, struct); + if (endpoint != null) { + return endpoint; + } + } + return null; } - final LbSubsetFallbackPolicy fallbackPolicy = lbSubsetConfig.getFallbackPolicy(); - if (fallbackPolicy != LbSubsetFallbackPolicy.ANY_ENDPOINT) { - logger.warn("Currently, only {} is supported.", LbSubsetFallbackPolicy.ANY_ENDPOINT); + + @Nullable + Endpoint chooseHostIteration(ClientRequestContext ctx, Struct filterMetadata) { + if (subsets.containsKey(filterMetadata)) { + return subsets.get(filterMetadata).selectNow(ctx); + } + final Set keys = filterMetadata.getFieldsMap().keySet(); + if (selectorMap.containsKey(keys)) { + final SubsetSelector subsetSelector = selectorMap.get(keys); + if (subsetSelector.fallbackPolicy() != LbSubsetSelectorFallbackPolicy.NOT_DEFINED) { + return chooseHostForSelectorFallbackPolicy(subsetSelector, ctx, filterMetadata); + } + } + + if (fallbackSubset != null) { + return fallbackSubset.selectNow(ctx); + } + + if (panicModeSubset != null) { + return panicModeSubset.selectNow(ctx); + } + return null; } - if (!findMatchedSubsetSelector(lbSubsetConfig, filterMetadata)) { - // No matched subset selector. Use the whole endpoints. - return createEndpointGroup(prioritySet.endpoints()); + @Nullable + Endpoint chooseHostForSelectorFallbackPolicy(SubsetSelector subsetSelector, + ClientRequestContext ctx, Struct filterMetadata) { + if (subsetSelector.fallbackPolicy() == LbSubsetSelectorFallbackPolicy.ANY_ENDPOINT && + subsetAny != null) { + return subsetAny.selectNow(ctx); + } + if (subsetSelector.fallbackPolicy() == LbSubsetSelectorFallbackPolicy.DEFAULT_SUBSET && + subsetDefault != null) { + return subsetDefault.selectNow(ctx); + } + if (subsetSelector.fallbackPolicy() == LbSubsetSelectorFallbackPolicy.KEYS_SUBSET) { + final Set fallbackKeysSubset = subsetSelector.fallbackKeysSubset(); + final Struct newFilterMetadata = withFilterKeys(filterMetadata, fallbackKeysSubset); + return chooseHostIteration(ctx, newFilterMetadata); + } + return null; } - final List endpoints = convertEndpoints(prioritySet.endpoints(), - filterMetadata); - if (endpoints.isEmpty()) { - // No matched metadata. Use the whole endpoints. - return createEndpointGroup(prioritySet.endpoints()); + + ZoneAwareLoadBalancer initSubsetAnyOnce() { + if (subsetAny == null) { + subsetAny = createSubsetEntry(ignored -> true); + } + return subsetAny; + } + + ZoneAwareLoadBalancer initSubsetDefaultOnce(Struct subsetMetadata) { + if (subsetDefault == null) { + subsetDefault = createSubsetEntry(host -> hostMatches(subsetMetadata, host)); + } + return subsetDefault; + } + + Map, SubsetSelector> initSubsetSelectorMap(SubsetInfo subsetInfo, + Struct defaultSubsetMetadata) { + final ImmutableMap.Builder, SubsetSelector> selectorMap = ImmutableMap.builder(); + for (SubsetSelector subsetSelector: subsetInfo.subsetSelectors()) { + selectorMap.put(subsetSelector.keys(), subsetSelector); + if (subsetSelector.fallbackPolicy() == LbSubsetSelectorFallbackPolicy.ANY_ENDPOINT) { + initSubsetAnyOnce(); + } else if (subsetSelector.fallbackPolicy() == LbSubsetSelectorFallbackPolicy.DEFAULT_SUBSET) { + initSubsetDefaultOnce(defaultSubsetMetadata); + } + } + return selectorMap.buildKeepingLast(); + } + + Map refreshSubsets() { + final Map prioritySets = new HashMap<>(); + for (Entry entry: origPrioritySet.hostSets().entrySet()) { + processSubsets(entry.getKey(), entry.getValue(), prioritySets); + } + + final ImmutableMap.Builder subsets = ImmutableMap.builder(); + for (Entry entry: prioritySets.entrySet()) { + final PrioritySet.PrioritySetBuilder + prioritySetBuilder = new PrioritySet.PrioritySetBuilder(origPrioritySet); + for (Integer priority: origPrioritySet.priorities()) { + final UpdateHostsParam param = entry.getValue().finalize(priority); + prioritySetBuilder.createHostSet(priority, param); + } + subsets.put(entry.getKey(), new ZoneAwareLoadBalancer(prioritySetBuilder.build())); + } + return subsets.build(); + } + + void processSubsets(int priority, HostSet hostSet, Map prioritySets) { + for (Endpoint endpoint: hostSet.hosts()) { + for (SubsetSelector selector: subsetInfo.subsetSelectors()) { + final List allKvs = + extractSubsetMetadata(selector.keys(), endpoint); + for (Struct kvs: allKvs) { + prioritySets.computeIfAbsent(kvs, ignored -> new SubsetPrioritySetBuilder( + origPrioritySet, scaleLocalityWeight)) + .pushHost(priority, endpoint); + } + } + } } - return createEndpointGroup(endpoints); - } - private static EndpointGroup createEndpointGroup(List endpoints) { - return EndpointGroup.of(endpoints); + ZoneAwareLoadBalancer createSubsetEntry(Predicate hostPredicate) { + final PrioritySetBuilder prioritySetBuilder = new PrioritySetBuilder(origPrioritySet); + final SubsetPrioritySetBuilder subsetPrioritySetBuilder = + new SubsetPrioritySetBuilder(origPrioritySet, scaleLocalityWeight); + + for (Entry entry: origPrioritySet.hostSets().entrySet()) { + for (Endpoint endpoint: entry.getValue().hosts()) { + if (!hostPredicate.test(endpoint)) { + continue; + } + subsetPrioritySetBuilder.pushHost(entry.getKey(), endpoint); + } + final UpdateHostsParam param = subsetPrioritySetBuilder.finalize(entry.getKey()); + prioritySetBuilder.createHostSet(entry.getKey(), param); + } + + return new ZoneAwareLoadBalancer(prioritySetBuilder.build()); + } + + boolean hostMatches(Struct metadata, Endpoint endpoint) { + return MetadataUtil.metadataLabelMatch( + metadata, EndpointUtil.metadata(endpoint), SUBSET_LOAD_BALANCING_FILTER_NAME, listAsAny); + } + + List extractSubsetMetadata(Set subsetKeys, Endpoint endpoint) { + final Metadata metadata = EndpointUtil.metadata(endpoint); + if (metadata == Metadata.getDefaultInstance()) { + return Collections.emptyList(); + } + if (!metadata.containsFilterMetadata(SUBSET_LOAD_BALANCING_FILTER_NAME)) { + return Collections.emptyList(); + } + final Struct filter = metadata.getFilterMetadataOrThrow(SUBSET_LOAD_BALANCING_FILTER_NAME); + final Map fields = filter.getFieldsMap(); + List> allKvs = new ArrayList<>(); + for (String subsetKey: subsetKeys) { + if (!fields.containsKey(subsetKey)) { + return Collections.emptyList(); + } + final Value value = fields.get(subsetKey); + if (listAsAny && value.getKindCase() == KindCase.LIST_VALUE) { + if (allKvs.isEmpty()) { + for (Value innerValue: value.getListValue().getValuesList()) { + final HashMap map = new HashMap<>(); + map.put(subsetKey, innerValue); + allKvs.add(map); + } + } else { + final List> newKvs = new ArrayList<>(); + for (Map kvMap: allKvs) { + for (Value innerValue: value.getListValue().getValuesList()) { + final Map newKv = new HashMap<>(kvMap); + newKv.put(subsetKey, innerValue); + newKvs.add(newKv); + } + } + allKvs = newKvs; + } + } else { + if (allKvs.isEmpty()) { + final HashMap map = new HashMap<>(); + map.put(subsetKey, value); + allKvs.add(map); + } else { + for (Map valueMap: allKvs) { + valueMap.put(subsetKey, value); + } + } + } + } + return allKvs.stream().map(m -> Struct.newBuilder().putAllFields(m).build()) + .collect(Collectors.toList()); + } } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetPrioritySetBuilder.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetPrioritySetBuilder.java new file mode 100644 index 00000000000..041585566a1 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetPrioritySetBuilder.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointGroupUtil.filterByLocality; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +class SubsetPrioritySetBuilder { + + private final Map> rawHostsSet = new HashMap<>(); + private final PrioritySet origPrioritySet; + private final boolean scaleLocalityWeight; + + SubsetPrioritySetBuilder(PrioritySet origPrioritySet, boolean scaleLocalityWeight) { + this.origPrioritySet = origPrioritySet; + this.scaleLocalityWeight = scaleLocalityWeight; + } + + void pushHost(int priority, Endpoint host) { + rawHostsSet.computeIfAbsent(priority, ignored -> new HashSet<>()) + .add(host); + } + + UpdateHostsParam finalize(int priority) { + final HostSet origHostSet = origPrioritySet.hostSets().get(priority); + final Set newHostSet = rawHostsSet.getOrDefault(priority, Collections.emptySet()); + assert origHostSet != null; + final EndpointGroup hosts = + EndpointGroupUtil.filter(origHostSet.hostsEndpointGroup(), newHostSet::contains); + final EndpointGroup healthyHosts = + EndpointGroupUtil.filter(origHostSet.healthyHostsEndpointGroup(), newHostSet::contains); + final EndpointGroup degradedHosts = + EndpointGroupUtil.filter(origHostSet.degradedHostsEndpointGroup(), newHostSet::contains); + final Map hostsPerLocality = + filterByLocality(origHostSet.endpointGroupPerLocality(), newHostSet::contains); + final Map healthyHostsPerLocality = + filterByLocality(origHostSet.healthyEndpointGroupPerLocality(), newHostSet::contains); + final Map degradedHostsPerLocality = + filterByLocality(origHostSet.degradedEndpointGroupPerLocality(), newHostSet::contains); + + final Map localityWeightsMap = + determineLocalityWeights(hostsPerLocality, origHostSet); + return new UpdateHostsParam(hosts, healthyHosts, degradedHosts, + hostsPerLocality, healthyHostsPerLocality, + degradedHostsPerLocality, localityWeightsMap); + } + + Map determineLocalityWeights(Map hostsPerLocality, + HostSet origHostSet) { + final Map localityWeightsMap = origHostSet.localityWeightsMap(); + if (!scaleLocalityWeight) { + return localityWeightsMap; + } + final Map origHostsPerLocality = origHostSet.endpointGroupPerLocality(); + final ImmutableMap.Builder scaledLocalityWeightsMap = ImmutableMap.builder(); + for (Entry entry : localityWeightsMap.entrySet()) { + final float scale = 1.0f * hostsPerLocality.get(entry.getKey()).endpoints().size() / + origHostsPerLocality.get(entry.getKey()).endpoints().size(); + scaledLocalityWeightsMap.put(entry.getKey(), Math.round(scale * entry.getValue())); + } + return scaledLocalityWeightsMap.build(); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/UpdateHostsParam.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/UpdateHostsParam.java new file mode 100644 index 00000000000..8c6f16fb6fd --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/UpdateHostsParam.java @@ -0,0 +1,104 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointGroupUtil.filterByLocality; +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.coarseHealth; + +import java.util.List; +import java.util.Map; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; +import com.linecorp.armeria.xds.client.endpoint.EndpointUtil.CoarseHealth; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +/** + * Hosts per partition. + */ +class UpdateHostsParam { + private final EndpointGroup hosts; + private final EndpointGroup healthyHosts; + private final EndpointGroup degradedHosts; + private final Map hostsPerLocality; + private final Map healthyHostsPerLocality; + private final Map degradedHostsPerLocality; + private final Map localityWeightsMap; + + UpdateHostsParam(List endpoints, + Map> endpointsPerLocality, + Map localityWeightsMap, + EndpointSelectionStrategy strategy) { + hosts = EndpointGroupUtil.filter(endpoints, strategy, ignored -> true); + hostsPerLocality = filterByLocality(endpointsPerLocality, strategy, ignored -> true); + healthyHosts = EndpointGroupUtil.filter(endpoints, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.HEALTHY); + healthyHostsPerLocality = filterByLocality(endpointsPerLocality, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.HEALTHY); + degradedHosts = EndpointGroupUtil.filter(endpoints, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.DEGRADED); + degradedHostsPerLocality = filterByLocality( + endpointsPerLocality, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.DEGRADED); + this.localityWeightsMap = localityWeightsMap; + } + + UpdateHostsParam(EndpointGroup hosts, EndpointGroup healthyHosts, + EndpointGroup degradedHosts, + Map hostsPerLocality, + Map healthyHostsPerLocality, + Map degradedHostsPerLocality, + Map localityWeightsMap) { + this.hosts = hosts; + this.healthyHosts = healthyHosts; + this.degradedHosts = degradedHosts; + this.hostsPerLocality = hostsPerLocality; + this.healthyHostsPerLocality = healthyHostsPerLocality; + this.degradedHostsPerLocality = degradedHostsPerLocality; + this.localityWeightsMap = localityWeightsMap; + } + + EndpointGroup hosts() { + return hosts; + } + + Map hostsPerLocality() { + return hostsPerLocality; + } + + EndpointGroup healthyHosts() { + return healthyHosts; + } + + Map healthyHostsPerLocality() { + return healthyHostsPerLocality; + } + + EndpointGroup degradedHosts() { + return degradedHosts; + } + + Map degradedHostsPerLocality() { + return degradedHostsPerLocality; + } + + Map localityWeightsMap() { + return localityWeightsMap; + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributesKeys.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributesKeys.java index 9b2a531c861..66c97bf9161 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributesKeys.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributesKeys.java @@ -22,10 +22,13 @@ final class XdsAttributesKeys { - static final AttributeKey LB_ENDPOINT_KEY = + public static final AttributeKey LB_ENDPOINT_KEY = AttributeKey.valueOf(XdsAttributesKeys.class, "LB_ENDPOINT_KEY"); - static final AttributeKey LOCALITY_LB_ENDPOINTS_KEY = + public static final AttributeKey LOCALITY_LB_ENDPOINTS_KEY = AttributeKey.valueOf(XdsAttributesKeys.class, "LOCALITY_LB_ENDPOINTS_KEY"); + public static final AttributeKey SELECTION_HASH = + AttributeKey.valueOf(XdsAttributesKeys.class, "SELECTION_HASH"); + private XdsAttributesKeys() {} } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsConstants.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsConstants.java index c79ffa4b5d5..d220eb6b5df 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsConstants.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsConstants.java @@ -19,7 +19,8 @@ final class XdsConstants { // https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/subsets - static final String SUBSET_LOAD_BALANCING_FILTER_NAME = "envoy.lb"; + public static final String SUBSET_LOAD_BALANCING_FILTER_NAME = "envoy.lb"; + public static final String ENVOY_LB_FALLBACK_LIST = "fallback_list"; private XdsConstants() {} } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java index 3b908bb1b42..e9f15673de7 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java @@ -16,20 +16,13 @@ package com.linecorp.armeria.xds.client.endpoint; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.function.Predicate; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; -import com.google.protobuf.Struct; -import com.google.protobuf.Value; import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.endpoint.EndpointGroup; @@ -53,33 +46,6 @@ final class XdsEndpointUtil { - static List convertEndpoints(List endpoints, Struct filterMetadata) { - checkArgument(filterMetadata.getFieldsCount() > 0, - "filterMetadata.getFieldsCount(): %s (expected: > 0)", filterMetadata.getFieldsCount()); - final Predicate lbEndpointPredicate = endpoint -> { - final LbEndpoint lbEndpoint = endpoint.attr(XdsAttributesKeys.LB_ENDPOINT_KEY); - assert lbEndpoint != null; - final Struct endpointMetadata = lbEndpoint.getMetadata().getFilterMetadataOrDefault( - SUBSET_LOAD_BALANCING_FILTER_NAME, Struct.getDefaultInstance()); - if (endpointMetadata.getFieldsCount() == 0) { - return false; - } - return containsFilterMetadata(filterMetadata, endpointMetadata); - }; - return endpoints.stream().filter(lbEndpointPredicate).collect(toImmutableList()); - } - - private static boolean containsFilterMetadata(Struct filterMetadata, Struct endpointMetadata) { - final Map endpointMetadataMap = endpointMetadata.getFieldsMap(); - for (Entry entry : filterMetadata.getFieldsMap().entrySet()) { - final Value value = endpointMetadataMap.get(entry.getKey()); - if (value == null || !value.equals(entry.getValue())) { - return false; - } - } - return true; - } - static EndpointGroup convertEndpointGroup(ClusterSnapshot clusterSnapshot) { final EndpointSnapshot endpointSnapshot = clusterSnapshot.endpointSnapshot(); if (endpointSnapshot == null) { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ZoneAwareLbStateFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ZoneAwareLbStateFactory.java new file mode 100644 index 00000000000..34e295174cd --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ZoneAwareLbStateFactory.java @@ -0,0 +1,351 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Streams; +import com.google.common.math.IntMath; +import com.google.common.math.LongMath; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.client.endpoint.ZoneAwareLoadBalancer.DistributeLoadState; +import com.linecorp.armeria.xds.client.endpoint.ZoneAwareLoadBalancer.HostAvailability; +import com.linecorp.armeria.xds.client.endpoint.ZoneAwareLoadBalancer.PriorityAndAvailability; + +final class ZoneAwareLbStateFactory { + + static ZoneAwareLbState newInstance(PrioritySet prioritySet) { + final ImmutableMap.Builder perPriorityHealthBuilder = + ImmutableMap.builder(); + final ImmutableMap.Builder perPriorityDegradedBuilder = + ImmutableMap.builder(); + for (Integer priority: prioritySet.priorities()) { + final HealthAndDegraded healthAndDegraded = + recalculatePerPriorityState(priority, prioritySet); + perPriorityHealthBuilder.put(priority, healthAndDegraded.healthWeight); + perPriorityDegradedBuilder.put(priority, healthAndDegraded.degradedWeight); + } + PerPriorityLoad perPriorityLoad = buildLoads(prioritySet, + perPriorityHealthBuilder.build(), + perPriorityDegradedBuilder.build()); + final PerPriorityPanic perPriorityPanic = + recalculatePerPriorityPanic(prioritySet, + perPriorityLoad.normalizedTotalAvailability()); + if (perPriorityPanic.totalPanic()) { + perPriorityLoad = recalculateLoadInTotalPanic(prioritySet); + } + return new ZoneAwareLbState(prioritySet, perPriorityLoad, perPriorityPanic); + } + + private static HealthAndDegraded recalculatePerPriorityState( + int priority, PrioritySet prioritySet) { + final HostSet hostSet = prioritySet.hostSets().get(priority); + final int hostCount = hostSet.hosts().size(); + + if (hostCount > 0) { + long healthyWeight = 0; + long degradedWeight = 0; + long totalWeight = 0; + if (hostSet.weightedPriorityHealth()) { + for (Endpoint host : hostSet.healthyHosts()) { + healthyWeight += host.weight(); + } + for (Endpoint host : hostSet.degradedHosts()) { + degradedWeight += host.weight(); + } + for (Endpoint host : hostSet.hosts()) { + totalWeight += host.weight(); + } + } else { + healthyWeight = hostSet.healthyHosts().size(); + degradedWeight = hostSet.degradedHosts().size(); + totalWeight = hostCount; + } + final int health = (int) Math.min(100L, LongMath.saturatedMultiply( + hostSet.overProvisioningFactor(), healthyWeight) / totalWeight); + final int degraded = (int) Math.min(100L, LongMath.saturatedMultiply( + hostSet.overProvisioningFactor(), degradedWeight) / totalWeight); + return new HealthAndDegraded(health, degraded); + } + return new HealthAndDegraded(0, 0); + } + + private static PerPriorityLoad buildLoads(PrioritySet prioritySet, + Map perPriorityHealth, + Map perPriorityDegraded) { + final int normalizedTotalAvailability = + normalizedTotalAvailability(perPriorityHealth, perPriorityDegraded); + if (normalizedTotalAvailability == 0) { + return PerPriorityLoad.INVALID; + } + + final Map healthyPriorityLoad = new HashMap<>(); + final Map degradedPriorityLoad = new HashMap<>(); + final DistributeLoadState firstHealthyAndRemaining = + distributeLoad(prioritySet.priorities(), healthyPriorityLoad, perPriorityHealth, + 100, normalizedTotalAvailability); + final DistributeLoadState firstDegradedAndRemaining = + distributeLoad(prioritySet.priorities(), degradedPriorityLoad, perPriorityDegraded, + firstHealthyAndRemaining.totalLoad, normalizedTotalAvailability); + final int remainingLoad = firstDegradedAndRemaining.totalLoad; + if (remainingLoad > 0) { + final int firstHealthy = firstHealthyAndRemaining.firstAvailablePriority; + final int firstDegraded = firstDegradedAndRemaining.firstAvailablePriority; + if (firstHealthy != -1) { + healthyPriorityLoad.computeIfPresent(firstHealthy, (k, v) -> v + remainingLoad); + } else { + assert firstDegraded != -1; + degradedPriorityLoad.computeIfPresent(firstDegraded, (k, v) -> v + remainingLoad); + } + } + + assert priorityLoadSum(healthyPriorityLoad, degradedPriorityLoad) == 100; + return new PerPriorityLoad(healthyPriorityLoad, degradedPriorityLoad, + normalizedTotalAvailability); + } + + private static int normalizedTotalAvailability(Map perPriorityHealth, + Map perPriorityDegraded) { + final int totalAvailability = Streams.concat(perPriorityHealth.values().stream(), + perPriorityDegraded.values().stream()) + .reduce(0, IntMath::saturatedAdd).intValue(); + return Math.min(totalAvailability, 100); + } + + private static int priorityLoadSum(Map healthyPriorityLoad, + Map degradedPriorityLoad) { + return Streams.concat(healthyPriorityLoad.values().stream(), + degradedPriorityLoad.values().stream()) + .reduce(0, IntMath::saturatedAdd).intValue(); + } + + private static DistributeLoadState distributeLoad(SortedSet priorities, + Map perPriorityLoad, + Map perPriorityAvailability, + int totalLoad, int normalizedTotalAvailability) { + int firstAvailablePriority = -1; + for (Integer priority: priorities) { + final long availability = perPriorityAvailability.getOrDefault(priority, 0); + if (firstAvailablePriority < 0 && availability > 0) { + firstAvailablePriority = priority; + } + final int load = (int) Math.min(totalLoad, availability * 100 / normalizedTotalAvailability); + perPriorityLoad.put(priority, load); + totalLoad -= load; + } + return new DistributeLoadState(totalLoad, firstAvailablePriority); + } + + private static PerPriorityPanic recalculatePerPriorityPanic(PrioritySet prioritySet, + int normalizedTotalAvailability) { + final int panicThreshold = prioritySet.panicThreshold(); + if (normalizedTotalAvailability == 0 && panicThreshold == 0) { + // there are no hosts available and panic mode is disabled. + // we should always return a null Endpoint for this case. + return PerPriorityPanic.INVALID; + } + boolean totalPanic = true; + final ImmutableMap.Builder perPriorityPanicBuilder = ImmutableMap.builder(); + for (Integer priority : prioritySet.priorities()) { + final HostSet hostSet = prioritySet.hostSets().get(priority); + final boolean isPanic = + normalizedTotalAvailability == 100 ? false : isHostSetInPanic(hostSet, panicThreshold); + perPriorityPanicBuilder.put(priority, isPanic); + totalPanic &= isPanic; + } + return new PerPriorityPanic(perPriorityPanicBuilder.build(), totalPanic); + } + + private static PerPriorityLoad recalculateLoadInTotalPanic(PrioritySet prioritySet) { + final int totalHostsCount = prioritySet.hostSets().values().stream() + .map(hostSet -> hostSet.hosts().size()) + .reduce(0, IntMath::saturatedAdd) + .intValue(); + if (totalHostsCount == 0) { + return PerPriorityLoad.INVALID; + } + int totalLoad = 100; + int firstNoEmpty = -1; + final Map healthyPriorityLoad = new HashMap<>(); + final Map degradedPriorityLoad = new HashMap<>(); + for (Integer priority: prioritySet.priorities()) { + final HostSet hostSet = prioritySet.hostSets().get(priority); + final int hostsSize = hostSet.hosts().size(); + if (firstNoEmpty == -1 && hostsSize > 0) { + firstNoEmpty = priority; + } + final int load = 100 * hostsSize / totalHostsCount; + healthyPriorityLoad.put(priority, load); + degradedPriorityLoad.put(priority, 0); + totalLoad -= load; + } + final int remainingLoad = totalLoad; + healthyPriorityLoad.computeIfPresent(firstNoEmpty, (k, v) -> v + remainingLoad); + final int priorityLoadSum = priorityLoadSum(healthyPriorityLoad, degradedPriorityLoad); + assert priorityLoadSum == 100 + : "The priority loads not summing up to 100 (" + priorityLoadSum + ')'; + return new PerPriorityLoad(healthyPriorityLoad, degradedPriorityLoad, 100); + } + + private static boolean isHostSetInPanic(HostSet hostSet, int panicThreshold) { + final int hostCount = hostSet.hosts().size(); + final double healthyPercent = + hostCount == 0 ? 0 : 100.0 * hostSet.healthyHosts().size() / hostCount; + final double degradedPercent = + hostCount == 0 ? 0 : 100.0 * hostSet.degradedHosts().size() / hostCount; + return healthyPercent + degradedPercent < panicThreshold; + } + + static class PerPriorityLoad { + final Map healthyPriorityLoad; + final Map degradedPriorityLoad; + private final int normalizedTotalAvailability; + private final boolean forceEmptyEndpoint; + + private static final PerPriorityLoad INVALID = new PerPriorityLoad(); + + private PerPriorityLoad() { + healthyPriorityLoad = Collections.emptyMap(); + degradedPriorityLoad = Collections.emptyMap(); + normalizedTotalAvailability = 0; + forceEmptyEndpoint = true; + } + + PerPriorityLoad(Map healthyPriorityLoad, + Map degradedPriorityLoad, + int normalizedTotalAvailability) { + this.healthyPriorityLoad = ImmutableMap.copyOf(healthyPriorityLoad); + this.degradedPriorityLoad = ImmutableMap.copyOf(degradedPriorityLoad); + this.normalizedTotalAvailability = normalizedTotalAvailability; + forceEmptyEndpoint = false; + } + + int normalizedTotalAvailability() { + return normalizedTotalAvailability; + } + + int getHealthy(int priority) { + return healthyPriorityLoad.getOrDefault(priority, 0); + } + + int getDegraded(int priority) { + return degradedPriorityLoad.getOrDefault(priority, 0); + } + + boolean forceEmptyEndpoint() { + return forceEmptyEndpoint; + } + } + + static class PerPriorityPanic { + final Map perPriorityPanic; + private final boolean totalPanic; + private final boolean forceEmptyEndpoint; + + static final PerPriorityPanic INVALID = new PerPriorityPanic(); + + private PerPriorityPanic() { + perPriorityPanic = Collections.emptyMap(); + forceEmptyEndpoint = true; + totalPanic = false; + } + + PerPriorityPanic(Map perPriorityPanic, boolean totalPanic) { + this.perPriorityPanic = ImmutableMap.copyOf(perPriorityPanic); + this.totalPanic = totalPanic; + forceEmptyEndpoint = false; + } + + boolean get(int priority) { + return perPriorityPanic.getOrDefault(priority, true); + } + + boolean totalPanic() { + return totalPanic; + } + + boolean forceEmptyEndpoint() { + return forceEmptyEndpoint; + } + } + + static class ZoneAwareLbState { + private final PrioritySet prioritySet; + private final PerPriorityLoad perPriorityLoad; + private final PerPriorityPanic perPriorityPanic; + + ZoneAwareLbState(PrioritySet prioritySet, + PerPriorityLoad perPriorityLoad, PerPriorityPanic perPriorityPanic) { + this.prioritySet = prioritySet; + this.perPriorityLoad = perPriorityLoad; + this.perPriorityPanic = perPriorityPanic; + } + + PerPriorityPanic perPriorityPanic() { + return perPriorityPanic; + } + + PrioritySet prioritySet() { + return prioritySet; + } + + PerPriorityLoad perPriorityLoad() { + return perPriorityLoad; + } + + @Nullable + PriorityAndAvailability choosePriority(int hash) { + if (perPriorityLoad.forceEmptyEndpoint() || perPriorityPanic.forceEmptyEndpoint()) { + return null; + } + hash = hash % 100 + 1; + int aggregatePercentageLoad = 0; + final PerPriorityLoad perPriorityLoad = perPriorityLoad(); + for (Integer priority: prioritySet.priorities()) { + aggregatePercentageLoad += perPriorityLoad.getHealthy(priority); + if (hash <= aggregatePercentageLoad) { + return new PriorityAndAvailability(priority, HostAvailability.HEALTHY); + } + } + for (Integer priority: prioritySet.priorities()) { + aggregatePercentageLoad += perPriorityLoad.getDegraded(priority); + if (hash <= aggregatePercentageLoad) { + return new PriorityAndAvailability(priority, HostAvailability.DEGRADED); + } + } + throw new Error("shouldn't reach here"); + } + } + + private static class HealthAndDegraded { + private final int healthWeight; + private final int degradedWeight; + + HealthAndDegraded(int healthWeight, int degradedWeight) { + this.healthWeight = healthWeight; + this.degradedWeight = degradedWeight; + } + } + + private ZoneAwareLbStateFactory() {} +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ZoneAwareLoadBalancer.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ZoneAwareLoadBalancer.java new file mode 100644 index 00000000000..34645f31318 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ZoneAwareLoadBalancer.java @@ -0,0 +1,212 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Map; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.client.endpoint.ZoneAwareLbStateFactory.ZoneAwareLbState; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +final class ZoneAwareLoadBalancer implements LoadBalancer { + + ZoneAwareLoadBalancer() { + } + + ZoneAwareLoadBalancer(PrioritySet prioritySet) { + prioritySetUpdated(prioritySet); + } + + @Nullable + private ZoneAwareLbState lbState; + + @Override + @Nullable + public Endpoint selectNow(ClientRequestContext ctx) { + final ZoneAwareLbState lbState = this.lbState; + if (lbState == null) { + return null; + } + final PrioritySet prioritySet = lbState.prioritySet(); + if (prioritySet.priorities().isEmpty()) { + return null; + } + final int hash = EndpointUtil.hash(ctx); + final HostsSource hostsSource = hostSourceToUse(lbState, hash); + if (hostsSource == null) { + return null; + } + if (!prioritySet.hostSets().containsKey(hostsSource.priority)) { + return null; + } + final HostSet hostSet = prioritySet.hostSets().get(hostsSource.priority); + switch (hostsSource.sourceType) { + case ALL_HOSTS: + return hostSet.hostsEndpointGroup().selectNow(ctx); + case HEALTHY_HOSTS: + return hostSet.healthyHostsEndpointGroup().selectNow(ctx); + case DEGRADED_HOSTS: + return hostSet.degradedHostsEndpointGroup().selectNow(ctx); + case LOCALITY_HEALTHY_HOSTS: + final Map healthyLocalities = + hostSet.healthyEndpointGroupPerLocality(); + if (healthyLocalities.containsKey(hostsSource.locality)) { + return healthyLocalities.get(hostsSource.locality).selectNow(ctx); + } + break; + case LOCALITY_DEGRADED_HOSTS: + final Map degradedLocalities = + hostSet.degradedEndpointGroupPerLocality(); + if (degradedLocalities.containsKey(hostsSource.locality)) { + return degradedLocalities.get(hostsSource.locality).selectNow(ctx); + } + break; + default: + throw new Error(); + } + return null; + } + + @Nullable + HostsSource hostSourceToUse(ZoneAwareLbState lbState, int hash) { + final PriorityAndAvailability priorityAndAvailability = lbState.choosePriority(hash); + if (priorityAndAvailability == null) { + return null; + } + final PrioritySet prioritySet = lbState.prioritySet(); + final int priority = priorityAndAvailability.priority; + final HostSet hostSet = prioritySet.hostSets().get(priority); + final HostAvailability hostAvailability = priorityAndAvailability.hostAvailability; + if (lbState.perPriorityPanic().get(priority)) { + if (prioritySet.failTrafficOnPanic()) { + return null; + } else { + return new HostsSource(priority, SourceType.ALL_HOSTS); + } + } + + if (prioritySet.localityWeightedBalancing()) { + final Locality locality; + if (hostAvailability == HostAvailability.DEGRADED) { + locality = hostSet.chooseDegradedLocality(); + } else { + locality = hostSet.chooseHealthyLocality(); + } + if (locality != null) { + return new HostsSource(priority, localitySourceType(hostAvailability), locality); + } + } + + // don't do zone aware routing for now + return new HostsSource(priority, sourceType(hostAvailability), null); + } + + private static SourceType localitySourceType(HostAvailability hostAvailability) { + final SourceType sourceType; + switch (hostAvailability) { + case HEALTHY: + sourceType = SourceType.LOCALITY_HEALTHY_HOSTS; + break; + case DEGRADED: + sourceType = SourceType.LOCALITY_DEGRADED_HOSTS; + break; + default: + throw new Error(); + } + return sourceType; + } + + private static SourceType sourceType(HostAvailability hostAvailability) { + final SourceType sourceType; + switch (hostAvailability) { + case HEALTHY: + sourceType = SourceType.HEALTHY_HOSTS; + break; + case DEGRADED: + sourceType = SourceType.DEGRADED_HOSTS; + break; + default: + throw new Error(); + } + return sourceType; + } + + @Override + public void prioritySetUpdated(PrioritySet prioritySet) { + lbState = ZoneAwareLbStateFactory.newInstance(prioritySet); + } + + static class PriorityAndAvailability { + final int priority; + final HostAvailability hostAvailability; + + PriorityAndAvailability(int priority, HostAvailability hostAvailability) { + this.priority = priority; + this.hostAvailability = hostAvailability; + } + } + + static class HostsSource { + final int priority; + final SourceType sourceType; + @Nullable + final Locality locality; + + HostsSource(int priority, SourceType sourceType) { + this(priority, sourceType, null); + } + + HostsSource(int priority, SourceType sourceType, @Nullable Locality locality) { + if (sourceType == SourceType.LOCALITY_HEALTHY_HOSTS || + sourceType == SourceType.LOCALITY_DEGRADED_HOSTS) { + checkArgument(locality != null, "Locality must be non-null for %s", sourceType); + } + this.priority = priority; + this.sourceType = sourceType; + this.locality = locality; + } + } + + enum SourceType { + ALL_HOSTS, + HEALTHY_HOSTS, + DEGRADED_HOSTS, + LOCALITY_HEALTHY_HOSTS, + LOCALITY_DEGRADED_HOSTS, + } + + enum HostAvailability { + HEALTHY, + DEGRADED, + } + + static class DistributeLoadState { + final int totalLoad; + final int firstAvailablePriority; + + DistributeLoadState(int totalLoad, int firstAvailablePriority) { + this.totalLoad = totalLoad; + this.firstAvailablePriority = firstAvailablePriority; + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/package-info.java b/xds/src/main/java/com/linecorp/armeria/xds/package-info.java index 2e2e8690423..5eb0af83f45 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/package-info.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/package-info.java @@ -19,6 +19,8 @@ * v3 gRPC-xDS protocol. */ @NonNullByDefault +@UnstableApi package com.linecorp.armeria.xds; import com.linecorp.armeria.common.annotation.NonNullByDefault; +import com.linecorp.armeria.common.annotation.UnstableApi; diff --git a/xds/src/test/java/com/linecorp/armeria/xds/TestResourceWatcher.java b/xds/src/test/java/com/linecorp/armeria/xds/TestResourceWatcher.java index 3b2f5f4c123..4a7f19bf938 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/TestResourceWatcher.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/TestResourceWatcher.java @@ -28,7 +28,7 @@ import io.grpc.Status; -class TestResourceWatcher implements SnapshotWatcher> { +public final class TestResourceWatcher implements SnapshotWatcher> { private static final Logger logger = LoggerFactory.getLogger(TestResourceWatcher.class); private final LinkedBlockingDeque> events = new LinkedBlockingDeque<>(); @@ -51,12 +51,12 @@ public void snapshotUpdated(Snapshot newSnapshot) { events.add(ImmutableList.of("snapshotUpdated", newSnapshot)); } - List blockingMissing() { + public List blockingMissing() { //noinspection unchecked return blockingFirst("onMissing", List.class); } - T blockingChanged(Class clazz) { + public T blockingChanged(Class clazz) { return blockingFirst("snapshotUpdated", clazz); } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java b/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java index 9c439b79363..f455857fa3f 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java @@ -66,6 +66,7 @@ import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; +import io.envoyproxy.envoy.type.v3.Percent; public final class XdsTestResources { @@ -74,7 +75,8 @@ public final class XdsTestResources { private XdsTestResources() {} public static LbEndpoint endpoint(String address, int port) { - return endpoint(address, port, Metadata.getDefaultInstance()); + return endpoint(address, port, Metadata.getDefaultInstance(), 1, + HealthStatus.HEALTHY); } public static LbEndpoint endpoint(String address, int port, int weight) { @@ -82,6 +84,15 @@ public static LbEndpoint endpoint(String address, int port, int weight) { HealthStatus.HEALTHY); } + public static LbEndpoint endpoint(String address, int port, HealthStatus healthStatus) { + return endpoint(address, port, Metadata.getDefaultInstance(), 1, healthStatus); + } + + public static LbEndpoint endpoint(String address, int port, HealthStatus healthStatus, + int weight) { + return endpoint(address, port, Metadata.getDefaultInstance(), weight, healthStatus); + } + public static LbEndpoint endpoint(String address, int port, Metadata metadata) { return endpoint(address, port, metadata, 1, HealthStatus.HEALTHY); } @@ -104,6 +115,38 @@ public static LbEndpoint endpoint(String address, int port, Metadata metadata, i .build()).build(); } + public static Locality locality(String region) { + return Locality.newBuilder() + .setRegion(region) + .build(); + } + + public static Percent percent(int percent) { + return Percent.newBuilder().setValue(percent).build(); + } + + public static LocalityLbEndpoints localityLbEndpoints(Locality locality, + Collection endpoints, + Integer priority) { + return LocalityLbEndpoints.newBuilder() + .addAllLbEndpoints(endpoints) + .setLocality(locality) + .setPriority(priority) + .build(); + } + + public static LocalityLbEndpoints localityLbEndpoints(Locality locality, + Collection endpoints) { + return LocalityLbEndpoints.newBuilder() + .addAllLbEndpoints(endpoints) + .setLocality(locality) + .build(); + } + + public static LocalityLbEndpoints localityLbEndpoints(Locality locality, LbEndpoint... endpoints) { + return localityLbEndpoints(locality, Arrays.asList(endpoints)); + } + public static ClusterLoadAssignment loadAssignment(String clusterName, URI uri) { return loadAssignment(clusterName, uri.getHost(), uri.getPort()); } @@ -373,16 +416,4 @@ public static Bootstrap staticBootstrap(Listener listener, Cluster cluster) { .addClusters(cluster) .build()).build(); } - - public static LocalityLbEndpoints localityLbEndpoints(Locality locality, - Collection endpoints) { - return LocalityLbEndpoints.newBuilder() - .addAllLbEndpoints(endpoints) - .setLocality(locality) - .build(); - } - - public static LocalityLbEndpoints localityLbEndpoints(Locality locality, LbEndpoint... endpoints) { - return localityLbEndpoints(locality, Arrays.asList(endpoints)); - } } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/XdsConverterUtilTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/EndpointTestUtil.java similarity index 50% rename from xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/XdsConverterUtilTest.java rename to xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/EndpointTestUtil.java index 4de80d0ee9f..5aa91dcf677 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/XdsConverterUtilTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/EndpointTestUtil.java @@ -19,50 +19,72 @@ import static com.linecorp.armeria.xds.XdsTestResources.endpoint; import static com.linecorp.armeria.xds.XdsTestResources.stringValue; import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME; -import static com.linecorp.armeria.xds.client.endpoint.XdsEndpointUtil.convertEndpoints; -import static org.assertj.core.api.Assertions.assertThat; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.stream.Collectors; -import org.junit.jupiter.api.Test; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.ListValue; import com.google.protobuf.Struct; import com.google.protobuf.Value; -import com.linecorp.armeria.client.Endpoint; - +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetSelector; import io.envoyproxy.envoy.config.core.v3.Metadata; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.config.listener.v3.ApiListener; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.Route; +import io.envoyproxy.envoy.config.route.v3.RouteAction; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.RouteMatch; +import io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.CodecType; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; -class XdsConverterUtilTest { - - @Test - void convertEndpointsWithFilterMetadata() { - final Metadata metadata1 = metadata(ImmutableMap.of("foo", "foo1")); - final LbEndpoint lbEndpoint1 = endpoint("127.0.0.1", 8080, metadata1); - final Endpoint endpoint1 = Endpoint.of("127.0.0.1", 8080) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint1); - final Metadata metadata2 = metadata(ImmutableMap.of("foo", "foo1", "bar", "bar2")); - final LbEndpoint lbEndpoint2 = endpoint("127.0.0.1", 8081, metadata2); - final Endpoint endpoint2 = Endpoint.of("127.0.0.1", 8081) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint2); - final Metadata metadata3 = metadata(ImmutableMap.of("foo", "foo1", "bar", "bar1", "baz", "baz1")); - final LbEndpoint lbEndpoint3 = endpoint("127.0.0.1", 8082, metadata3); - final Endpoint endpoint3 = Endpoint.of("127.0.0.1", 8082) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint3); - final List endpoints = - convertEndpoints(ImmutableList.of(endpoint1, endpoint2, endpoint3), Struct.newBuilder() - .putFields("foo", stringValue("foo1")) - .putFields("bar", stringValue("bar1")) - .build()); - assertThat(endpoints).containsExactly(Endpoint.of("127.0.0.1", 8082)); +final class EndpointTestUtil { + + static Listener staticResourceListener() { + return staticResourceListener(Metadata.getDefaultInstance()); + } + + static Listener staticResourceListener(Metadata metadata) { + final RouteAction.Builder routeActionBuilder = RouteAction.newBuilder().setCluster("cluster"); + if (metadata != Metadata.getDefaultInstance()) { + routeActionBuilder.setMetadataMatch(metadata); + } + final VirtualHost virtualHost = + VirtualHost.newBuilder() + .setName("route") + .addDomains("*") + .addRoutes(Route.newBuilder() + .setMatch(RouteMatch.newBuilder().setPrefix("/")) + .setRoute(routeActionBuilder)) + .build(); + final HttpConnectionManager manager = + HttpConnectionManager + .newBuilder() + .setCodecType(CodecType.AUTO) + .setStatPrefix("ingress_http") + .setRouteConfig(RouteConfiguration.newBuilder() + .setName("route") + .addVirtualHosts(virtualHost) + .build()) + .addHttpFilters(HttpFilter.newBuilder() + .setName("envoy.filters.http.router") + .setTypedConfig(Any.pack(Router.getDefaultInstance()))) + .build(); + return Listener.newBuilder() + .setName("listener") + .setApiListener(ApiListener.newBuilder().setApiListener(Any.pack(manager))) + .build(); } static Metadata metadata(Struct struct) { @@ -74,6 +96,18 @@ static Metadata metadata(Map map) { return metadata(struct(map)); } + static Value fallbackListValue(Map... maps) { + final List values = + Arrays.stream(maps).map(map -> Value.newBuilder() + .setStructValue(struct(map)) + .build()).collect(Collectors.toList()); + return Value.newBuilder() + .setListValue(ListValue.newBuilder() + .addAllValues(values) + .build()) + .build(); + } + static Struct struct(Map map) { final Map structMap = map.entrySet().stream() @@ -83,6 +117,18 @@ static Struct struct(Map map) { return Struct.newBuilder().putAllFields(structMap).build(); } + static LbSubsetSelector lbSubsetSelector(Iterable keys) { + return LbSubsetSelector.newBuilder() + .addAllKeys(keys) + .build(); + } + + static LbSubsetConfig lbSubsetConfig(LbSubsetSelector... lbSubsetSelectors) { + return LbSubsetConfig.newBuilder() + .addAllSubsetSelectors(Arrays.asList(lbSubsetSelectors)) + .build(); + } + static ClusterLoadAssignment sampleClusterLoadAssignment() { final Metadata metadata1 = Metadata.newBuilder() @@ -118,8 +164,10 @@ static ClusterLoadAssignment sampleClusterLoadAssignment() { .addLbEndpoints(endpoint3) .build(); return ClusterLoadAssignment.newBuilder() - .setClusterName("cluster") - .addEndpoints(lbEndpoints) - .build(); + .setClusterName("cluster") + .addEndpoints(lbEndpoints) + .build(); } + + private EndpointTestUtil() {} } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java index dbb272cefff..3aa1f49f50a 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java @@ -20,7 +20,7 @@ import static com.linecorp.armeria.xds.XdsTestResources.endpoint; import static com.linecorp.armeria.xds.XdsTestResources.localityLbEndpoints; import static com.linecorp.armeria.xds.XdsTestResources.staticBootstrap; -import static com.linecorp.armeria.xds.XdsTestResources.staticResourceListener; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.staticResourceListener; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/LocalityTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/LocalityTest.java new file mode 100644 index 00000000000..fef1c1d1c50 --- /dev/null +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/LocalityTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.XdsTestResources.createStaticCluster; +import static com.linecorp.armeria.xds.XdsTestResources.endpoint; +import static com.linecorp.armeria.xds.XdsTestResources.locality; +import static com.linecorp.armeria.xds.XdsTestResources.localityLbEndpoints; +import static com.linecorp.armeria.xds.XdsTestResources.staticBootstrap; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.staticResourceListener; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.xds.ListenerRoot; +import com.linecorp.armeria.xds.XdsBootstrap; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig.Builder; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig.LocalityWeightedLbConfig; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.listener.v3.Listener; + +class LocalityTest { + + private static final Builder LOCALITY_LB_CONFIG = + CommonLbConfig.newBuilder() + .setLocalityWeightedLbConfig(LocalityWeightedLbConfig.getDefaultInstance()); + + @Test + void basicCase() { + final Listener listener = staticResourceListener(); + + final List lbEndpointsA = + ImmutableList.of(endpoint("127.0.0.1", 8080, 1)); + final List lbEndpointsB = + ImmutableList.of(endpoint("127.0.0.1", 8081, 1)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints(locality("regionA"), lbEndpointsA)) + .addEndpoints(localityLbEndpoints(locality("regionB"), lbEndpointsB)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(LOCALITY_LB_CONFIG).build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + // one will be chosen from regionA, one from regionB + // locality is chosen randomly, so the order is random + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final Endpoint endpoint1 = endpointGroup.selectNow(ctx); + final Endpoint endpoint2 = endpointGroup.selectNow(ctx); + assertThat(ImmutableList.of(endpoint1, endpoint2)) + .containsExactlyInAnyOrder(Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8081)); + } + } + + @Test + void emptyLocality() { + final Listener listener = staticResourceListener(); + + final List lbEndpointsA = ImmutableList.of(); + final List lbEndpointsB = + ImmutableList.of(endpoint("127.0.0.1", 8081), + endpoint("127.0.0.1", 8081)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints(locality("regionA"), lbEndpointsA)) + .addEndpoints(localityLbEndpoints(locality("regionB"), lbEndpointsB)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(LOCALITY_LB_CONFIG).build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + // regionA won't be selected at all since it is empty + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + } + } + + @Test + void multiPriorityAndLocality() { + final Listener listener = staticResourceListener(); + + final List lbEndpointsA = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY)); + final List lbEndpointsB = + ImmutableList.of(endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY)); + final List lbEndpointsC = + ImmutableList.of(endpoint("127.0.0.1", 8082, HealthStatus.HEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints(locality("regionA"), lbEndpointsA, 0)) + .addEndpoints(localityLbEndpoints(locality("regionB"), lbEndpointsB, 0)) + .addEndpoints(localityLbEndpoints(locality("regionC"), lbEndpointsC, 1)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(LOCALITY_LB_CONFIG).build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 0); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 99); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } +} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/MostlyStaticWithDynamicEdsTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/MostlyStaticWithDynamicEdsTest.java similarity index 91% rename from xds/src/test/java/com/linecorp/armeria/xds/MostlyStaticWithDynamicEdsTest.java rename to xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/MostlyStaticWithDynamicEdsTest.java index 58adfa9ea49..503b78fe4f2 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/MostlyStaticWithDynamicEdsTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/MostlyStaticWithDynamicEdsTest.java @@ -14,7 +14,7 @@ * under the License. */ -package com.linecorp.armeria.xds; +package com.linecorp.armeria.xds.client.endpoint; import static com.linecorp.armeria.xds.XdsTestResources.BOOTSTRAP_CLUSTER_NAME; import static com.linecorp.armeria.xds.XdsTestResources.bootstrapCluster; @@ -31,6 +31,14 @@ import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.armeria.xds.ClusterSnapshot; +import com.linecorp.armeria.xds.EndpointSnapshot; +import com.linecorp.armeria.xds.ListenerRoot; +import com.linecorp.armeria.xds.ListenerSnapshot; +import com.linecorp.armeria.xds.RouteSnapshot; +import com.linecorp.armeria.xds.TestResourceWatcher; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.XdsTestResources; import io.envoyproxy.controlplane.cache.v3.SimpleCache; import io.envoyproxy.controlplane.cache.v3.Snapshot; @@ -83,7 +91,7 @@ void basicCase() throws Exception { final Listener listener = staticResourceListener(); final Bootstrap bootstrap = XdsTestResources.bootstrap(configSource, listener, bootstrapCluster, staticCluster); - try (XdsBootstrapImpl xdsBootstrap = new XdsBootstrapImpl(bootstrap)) { + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); final TestResourceWatcher watcher = new TestResourceWatcher(); listenerRoot.addSnapshotWatcher(watcher); diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/PriorityTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/PriorityTest.java new file mode 100644 index 00000000000..01c6880e2cc --- /dev/null +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/PriorityTest.java @@ -0,0 +1,384 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.XdsTestResources.createStaticCluster; +import static com.linecorp.armeria.xds.XdsTestResources.endpoint; +import static com.linecorp.armeria.xds.XdsTestResources.localityLbEndpoints; +import static com.linecorp.armeria.xds.XdsTestResources.percent; +import static com.linecorp.armeria.xds.XdsTestResources.staticBootstrap; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.staticResourceListener; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.UInt32Value; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.ListenerRoot; +import com.linecorp.armeria.xds.XdsBootstrap; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.type.v3.Percent; + +class PriorityTest { + + @Test + void basicCase() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080), + endpoint("127.0.0.1", 8081), + endpoint("127.0.0.1", 8082)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void differentWeights() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, 1), + endpoint("127.0.0.1", 8081, 1), + endpoint("127.0.0.1", 8082, 2)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void differentPriorities() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.DEGRADED)); + final List lbEndpoints1 = + ImmutableList.of(endpoint("127.0.0.1", 8082, HealthStatus.HEALTHY), + endpoint("127.0.0.1", 8083, HealthStatus.DEGRADED)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints1, 1)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + + // default overprovisioning factor (140) * 0.5 = 70 will be routed + // to healthy endpoints for priority 0 + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 68); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + + // 100 - 70 (priority 0) = 30 will be routed to healthy endpoints for priority 1 + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 70); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void degradedEndpoints() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY, 1), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY, 9)); + final List lbEndpoints1 = + ImmutableList.of(endpoint("127.0.0.1", 8082, HealthStatus.HEALTHY, 1), + endpoint("127.0.0.1", 8083, HealthStatus.DEGRADED, 9)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints1, 1)) + // set overprovisioning factor to 100 for simpler calculation + .setPolicy(Policy.newBuilder() + .setOverprovisioningFactor(UInt32Value.of(100)) + .setWeightedPriorityHealth(true)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(0))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + + // 0 ~ 9 for priority 0 HEALTHY + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 0); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + + // 10 ~ 19 for priority 1 HEALTHY + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 10); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + + // 20 ~ 99 for priority 1 DEGRADED + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 20); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8083)); + } + } + + @Test + void noHosts() { + final Listener listener = staticResourceListener(); + final List lbEndpoints0 = ImmutableList.of(); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(50))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot, true); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + await().pollDelay(3, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(endpointGroup.selectNow(ctx)).isNull()); + } + } + + @Test + void partialPanic() { + final Listener listener = staticResourceListener(); + + // there are no healthy endpoints in priority0 + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8082, HealthStatus.UNHEALTHY)); + final List lbEndpoints1 = + ImmutableList.of(endpoint("127.0.0.1", 8083, HealthStatus.HEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints1, 1)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(50))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 0); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8083)); + ctx.setAttr(XdsAttributesKeys.SELECTION_HASH, 99); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8083)); + } + } + + @Test + void totalPanic() { + final Listener listener = staticResourceListener(); + + // 0.33 (healthy) * 140 (overprovisioning factor) < 50 (healthyPanicThreshold) + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8082, HealthStatus.UNHEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(50))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + // When in panic mode, all endpoints are selected regardless of health status + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void onlyUnhealthyPanicDisabled() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8082, HealthStatus.UNHEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder().setHealthyPanicThreshold(percent(0))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + // When in panic mode, all endpoints are selected regardless of health status + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isNull(); + assertThat(endpointGroup.selectNow(ctx)).isNull(); + assertThat(endpointGroup.selectNow(ctx)).isNull(); + } + } + + private static Stream healthyLoadZeroArgs() { + return Stream.of( + // panic mode routes traffic to all endpoints + Arguments.of(51, Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081)), + // non-panic mode doesn't route traffic + Arguments.of(49, null, null) + ); + } + + @ParameterizedTest + @MethodSource("healthyLoadZeroArgs") + void healthyLoadZero(int healthyPanicThreshold, @Nullable Endpoint endpoint1, + @Nullable Endpoint endpoint2) { + final Listener listener = staticResourceListener(); + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY, 1), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY, 10000)); + + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0)) + .setPolicy(Policy.newBuilder() + .setWeightedPriorityHealth(true)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(percent(healthyPanicThreshold))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + // When in panic mode, all endpoints are selected regardless of health status + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint1); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint2); + } + } +} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/RouteMetadataSubsetTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/RouteMetadataSubsetTest.java index a7c4192497e..6adc8494dab 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/RouteMetadataSubsetTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/RouteMetadataSubsetTest.java @@ -20,8 +20,8 @@ import static com.linecorp.armeria.xds.XdsTestResources.bootstrapCluster; import static com.linecorp.armeria.xds.XdsTestResources.staticResourceListener; import static com.linecorp.armeria.xds.XdsTestResources.stringValue; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.sampleClusterLoadAssignment; import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME; -import static com.linecorp.armeria.xds.client.endpoint.XdsConverterUtilTest.sampleClusterLoadAssignment; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/SubsetTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/SubsetTest.java new file mode 100644 index 00000000000..4b15b36f4b1 --- /dev/null +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/SubsetTest.java @@ -0,0 +1,518 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.XdsTestResources.createStaticCluster; +import static com.linecorp.armeria.xds.XdsTestResources.endpoint; +import static com.linecorp.armeria.xds.XdsTestResources.localityLbEndpoints; +import static com.linecorp.armeria.xds.XdsTestResources.stringValue; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.fallbackListValue; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.lbSubsetConfig; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.lbSubsetSelector; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.metadata; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.staticResourceListener; +import static com.linecorp.armeria.xds.client.endpoint.EndpointTestUtil.struct; +import static com.linecorp.armeria.xds.client.endpoint.XdsConstants.ENVOY_LB_FALLBACK_LIST; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ListValue; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.ListenerRoot; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.XdsTestResources; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetFallbackPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetMetadataFallbackPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbSubsetConfig.LbSubsetSelector.LbSubsetSelectorFallbackPolicy; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.listener.v3.Listener; + +class SubsetTest { + + private static final Value LIST_VALUE = + Value.newBuilder() + .setListValue(ListValue.newBuilder() + .addValues(stringValue("val1")) + .addValues(stringValue("val2")) + .addValues(stringValue("val3")) + ) + .build(); + + @Test + void basicCase() { + final Listener listener = + staticResourceListener(metadata(ImmutableMap.of("key1", "val1", + "key2", "val2"))); + + // struct with different orders still pass the equality test + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, + metadata(ImmutableMap.of("key2", "val2", + "key1", "val1"))), + endpoint("127.0.0.1", 8081)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key2", "key1"))); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + } + } + + @Test + void fallbackKeysSubset() { + final Listener listener = + staticResourceListener(metadata(ImmutableMap.of("key1", "val1", + "key2", "val2"))); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, metadata(ImmutableMap.of("key1", "val1"))), + endpoint("127.0.0.1", 8081, metadata(ImmutableMap.of("key2", "val2")))); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key1", "key2")) + .toBuilder() + .addFallbackKeysSubset("key2") + .setFallbackPolicy(LbSubsetSelectorFallbackPolicy.KEYS_SUBSET) + .build(), + lbSubsetSelector(ImmutableList.of("key2")) + .toBuilder() + .setFallbackPolicy(LbSubsetSelectorFallbackPolicy.NO_FALLBACK) + .build()); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + } + } + + private static Stream fallbackKeysDefaultArgs() { + return Stream.of(Arguments.of(null, + Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8081)), + Arguments.of(Struct.getDefaultInstance(), + Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8081)), + Arguments.of(struct(ImmutableMap.of("key0", "val0")), + Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8081)), + // the default subset matches the first endpoint only + Arguments.of(struct(ImmutableMap.of("key1", "val1")), + Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8080)), + // the default subset doesn't match any endpoints + Arguments.of(struct(ImmutableMap.of("key3", "val3")), null, null)); + } + + @ParameterizedTest + @MethodSource("fallbackKeysDefaultArgs") + void fallbackDefault(@Nullable Struct defaultSubset, @Nullable Endpoint endpoint1, + @Nullable Endpoint endpoint2) { + final Listener listener = + staticResourceListener(metadata(ImmutableMap.of("key0", "val0", + "key1", "val1", "key2", "val2"))); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, metadata(ImmutableMap.of("key0", "val0", + "key1", "val1"))), + endpoint("127.0.0.1", 8081, metadata(ImmutableMap.of("key0", "val0", + "key2", "val2")))); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key0", "key1", "key2")) + .toBuilder() + .setFallbackPolicy(LbSubsetSelectorFallbackPolicy.DEFAULT_SUBSET) + .build()); + if (defaultSubset != null) { + lbSubsetConfig = lbSubsetConfig.toBuilder().setDefaultSubset(defaultSubset).build(); + } + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint1); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint2); + } + } + + @Test + void fallbackAny() { + final Listener listener = + staticResourceListener(metadata(ImmutableMap.of("key1", "val1"))); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, metadata(ImmutableMap.of("key2", "val2"))), + endpoint("127.0.0.1", 8081, metadata(ImmutableMap.of("key3", "val3")))); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key1")) + .toBuilder() + .setFallbackPolicy(LbSubsetSelectorFallbackPolicy.ANY_ENDPOINT) + .build()); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + } + } + + private static Stream metadataFallbackParams() { + return Stream.of( + Arguments.of(fallbackListValue(ImmutableMap.of("key2", "val2")), + Endpoint.of("127.0.0.1", 8081), + Endpoint.of("127.0.0.1", 8081)), + Arguments.of(fallbackListValue(ImmutableMap.of("key3", "val3"), + ImmutableMap.of("key2", "val2")), + Endpoint.of("127.0.0.1", 8081), + Endpoint.of("127.0.0.1", 8081)), + Arguments.of(Value.getDefaultInstance(), + Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8080)), + Arguments.of(fallbackListValue(ImmutableMap.of("key4", "val4")), null, null) + ); + } + + @ParameterizedTest + @MethodSource("metadataFallbackParams") + void metadataFallback(Value metadataFallback, Endpoint endpoint1, Endpoint endpoint2) { + final Listener listener = + staticResourceListener(metadata(Struct.newBuilder() + .putFields("key1", stringValue("val1")) + .putFields(ENVOY_LB_FALLBACK_LIST, metadataFallback) + .build())); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, metadata(ImmutableMap.of("key1", "val1"))), + endpoint("127.0.0.1", 8081, metadata(ImmutableMap.of("key2", "val2")))); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key1")), + lbSubsetSelector(ImmutableList.of("key2"))) + .toBuilder() + .setMetadataFallbackPolicy(LbSubsetMetadataFallbackPolicy.FALLBACK_LIST) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint1); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint2); + } + } + + @Test + void subsetAny() { + final Listener listener = + staticResourceListener(metadata(ImmutableMap.of("key1", "val1"))); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080), + endpoint("127.0.0.1", 8081)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key1"))) + .toBuilder() + .setFallbackPolicy(LbSubsetFallbackPolicy.ANY_ENDPOINT) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + } + } + + @ParameterizedTest + @MethodSource("fallbackKeysDefaultArgs") + void fallbackSubsetTest(@Nullable Struct defaultSubset, @Nullable Endpoint endpoint1, + @Nullable Endpoint endpoint2) { + final Listener listener = + staticResourceListener(metadata(ImmutableMap.of("key0", "val0", + "key1", "val1", "key2", "val2"))); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, metadata(ImmutableMap.of("key0", "val0", + "key1", "val1"))), + endpoint("127.0.0.1", 8081, metadata(ImmutableMap.of("key0", "val0", + "key2", "val2")))); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key0", "key1", "key2"))) + .toBuilder() + .setFallbackPolicy(LbSubsetFallbackPolicy.DEFAULT_SUBSET) + .build(); + if (defaultSubset != null) { + lbSubsetConfig = lbSubsetConfig.toBuilder().setDefaultSubset(defaultSubset).build(); + } + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint1); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint2); + } + } + + @Test + void panic() { + final Listener listener = + staticResourceListener(metadata(ImmutableMap.of("key1", "val1"))); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080), + endpoint("127.0.0.1", 8081)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key1"))) + .toBuilder() + .setPanicModeAny(true) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + } + } + + private static Stream listAsAnySubsetParams() { + return Stream.of( + Arguments.of(true, struct(ImmutableMap.of("key1", "val1", + "key4", "val4")), + Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8080)), + Arguments.of(false, struct(ImmutableMap.of("key1", "val1", + "key4", "val4")), + null, null), + Arguments.of(true, Struct.newBuilder() + .putFields("key1", LIST_VALUE) + .putFields("key4", stringValue("val4")) + .build(), + null, null), + Arguments.of(false, Struct.newBuilder() + .putFields("key1", LIST_VALUE) + .putFields("key4", stringValue("val4")) + .build(), + Endpoint.of("127.0.0.1", 8080), + Endpoint.of("127.0.0.1", 8080)) + ); + } + + @ParameterizedTest + @MethodSource("listAsAnySubsetParams") + void listAsAnySubset(boolean listAsAny, Struct struct, @Nullable Endpoint endpoint1, + @Nullable Endpoint endpoint2) { + + final Listener listener = staticResourceListener(metadata(struct)); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, + metadata(Struct.newBuilder() + .putFields("key1", LIST_VALUE) + .putFields("key4", stringValue("val4")) + .build())), + endpoint("127.0.0.1", 8081, + metadata(Struct.newBuilder() + .putFields("key1", LIST_VALUE) + .putFields("key4", stringValue("val3")) + .build()))); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key1", "key4"))) + .toBuilder() + .setListAsAny(listAsAny) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint1); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint2); + } + } + + @ParameterizedTest + @MethodSource("listAsAnySubsetParams") + void defaultSubsetListAsAny(boolean listAsAny, Struct struct, @Nullable Endpoint endpoint1, + @Nullable Endpoint endpoint2) { + + final Listener listener = + staticResourceListener(metadata(struct(ImmutableMap.of("key1", "val1", + "key4", "val4")))); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, + metadata(Struct.newBuilder() + .putFields("key1", LIST_VALUE) + .putFields("key4", stringValue("val4")) + .build())), + endpoint("127.0.0.1", 8081, + metadata(Struct.newBuilder() + .putFields("key1", LIST_VALUE) + .putFields("key4", stringValue("val3")) + .build()))); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final LbSubsetConfig lbSubsetConfig = + lbSubsetConfig(lbSubsetSelector(ImmutableList.of("key1", "key5"))) + .toBuilder() + .setListAsAny(listAsAny) + .setDefaultSubset(struct) + .setFallbackPolicy(LbSubsetFallbackPolicy.DEFAULT_SUBSET) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setLbSubsetConfig(lbSubsetConfig).build(); + + final Bootstrap bootstrap = XdsTestResources.staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint1); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint2); + } + } +}