Skip to content

Commit

Permalink
working version
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhee17 committed Mar 4, 2024
1 parent d6785f7 commit 62afe03
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class ClusterEntry implements Consumer<List<Endpoint>>, AsyncCloseable {
// only cluster.getLbPolicy() == ROUND_ROBIN is supported for now
endpointSelectionStrategy = EndpointSelectionStrategy.weightedRoundRobin();
if (cluster.hasLbSubsetConfig()) {
loadBalancer = new SubsetLoadBalancer(cluster.getLbSubsetConfig());
loadBalancer = new SubsetLoadBalancer(clusterSnapshot, cluster.getLbSubsetConfig());
} else {
loadBalancer = new ZoneAwareLoadBalancer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private static int panicThreshold(Cluster cluster) {

HostSet getOrCreateHostSet(int priority, UpdateHostsParam params, Map<Locality, Integer> localityWeightsMap,
boolean weightedPriorityHealth, int overProvisioningFactor) {
priorities.add(priority);
return hostSets.computeIfAbsent(priority,
ignored -> new HostSet(priority, params,
localityWeightsMap, weightedPriorityHealth,
Expand All @@ -105,7 +106,6 @@ HostSet getOrCreateHostSet(int priority, UpdateHostsParam params, Map<Locality,
public void updateHosts(int priority, UpdateHostsParam params,
Map<Locality, Integer> localityWeightsMap,
boolean weightedPriorityHealth, int overProvisioningFactor) {
priorities.add(priority);
getOrCreateHostSet(priority, params, localityWeightsMap, weightedPriorityHealth,
overProvisioningFactor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.linecorp.armeria.xds.endpoint;

import java.io.Serial;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -221,14 +220,14 @@ Endpoint chooseHostForSelectorFallbackPolicy(SubsetSelector subsetSelector, Load


LbSubsetEntry initSubsetAnyOnce() {
if (subsetAny != null) {
if (subsetAny == null) {
subsetAny = new LbSubsetEntry();
}
return subsetAny;
}

LbSubsetEntry initSubsetDefaultOnce() {
if (subsetDefault != null) {
if (subsetDefault == null) {
subsetDefault = new LbSubsetEntry();
}
return subsetDefault;
Expand Down Expand Up @@ -457,7 +456,6 @@ public int compareTo(SubsetSelector o) {
static class CollectionComparator implements Comparator<Collection<String>>, Serializable {

static final CollectionComparator INSTANCE = new CollectionComparator();
@Serial
private static final long serialVersionUID = 5835645231445633543L;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ final class XdsEndpointSelector extends AbstractEndpointSelector
XdsEndpointSelector(ClusterManager clusterManager, EndpointGroup endpointGroup) {
super(endpointGroup);
this.clusterManager = clusterManager;
initialize();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package com.linecorp.armeria.xds;

import static com.linecorp.armeria.xds.internal.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME;
import static com.linecorp.armeria.xds.XdsConverterUtilTest.sampleClusterLoadAssignment;
import static com.linecorp.armeria.xds.XdsTestResources.stringValue;
import static com.linecorp.armeria.xds.internal.XdsConstants.SUBSET_LOAD_BALANCING_FILTER_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.net.URI;
import java.util.List;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -31,8 +32,11 @@
import com.google.protobuf.Any;
import com.google.protobuf.Struct;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
Expand Down Expand Up @@ -133,8 +137,10 @@ void routeMetadataMatch() {
bootstrapCluster);
try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) {
final EndpointGroup xdsEndpointGroup = XdsEndpointGroup.of(xdsBootstrap.listenerRoot("listener"));
await().untilAsserted(() -> assertThat(xdsEndpointGroup.endpoints())
.containsExactly(Endpoint.of("127.0.0.1", 8082)));
await().untilAsserted(() -> assertThat(collectEndpoints(xdsEndpointGroup))
.containsExactly(Endpoint.of("127.0.0.1", 8082),
Endpoint.of("127.0.0.1", 8082),
Endpoint.of("127.0.0.1", 8082)));
}

// No metadata. Fallback to all endpoints.
Expand All @@ -144,9 +150,9 @@ void routeMetadataMatch() {
bootstrapCluster);
try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) {
final EndpointGroup xdsEndpointGroup = XdsEndpointGroup.of(xdsBootstrap.listenerRoot("listener"));
await().untilAsserted(() -> assertThat(xdsEndpointGroup.endpoints())
.containsExactlyInAnyOrder(Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081),
Endpoint.of("127.0.0.1", 8082)));
assertThat(collectEndpoints(xdsEndpointGroup))
.containsExactly(Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081),
Endpoint.of("127.0.0.1", 8082));
}

// No matched metadata. Fallback to all endpoints.
Expand All @@ -158,12 +164,24 @@ void routeMetadataMatch() {
bootstrapCluster);
try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) {
final EndpointGroup xdsEndpointGroup = XdsEndpointGroup.of(xdsBootstrap.listenerRoot("listener"));
await().untilAsserted(() -> assertThat(xdsEndpointGroup.endpoints())
await().untilAsserted(() -> assertThat(collectEndpoints(xdsEndpointGroup))
.containsExactlyInAnyOrder(Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081),
Endpoint.of("127.0.0.1", 8082)));
}
}

List<Endpoint> collectEndpoints(EndpointGroup endpointGroup) {
final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
await().untilAsserted(() -> assertThat(endpointGroup.endpoints()).isNotEmpty());
final ImmutableList.Builder<Endpoint> builder = ImmutableList.builder();
for (int i = 0; i < 3; i++) {
final Endpoint endpoint = endpointGroup.selectNow(ctx);
assertThat(endpoint).isNotNull();
builder.add(endpoint);
}
return builder.build();
}

private static Listener listener(Metadata metadata) {
final VirtualHost virtualHost =
VirtualHost.newBuilder()
Expand Down

0 comments on commit 62afe03

Please sign in to comment.