Skip to content

Commit

Permalink
Add LoadBalancer for generalizing EndpointSelector (#5779)
Browse files Browse the repository at this point in the history
Motivation:

A load-balancing strategy such as round robin can be used in `EndpointSelector` and elsewhere. For example, in the event loop scheduler, requests can be distributed using round robin to determine which event loop to use.

This PR is preliminary work to resolve #5289 and #5537

Modifications:

- `LoadBalancer<T, C>` is the root interface all load balancers should implement.
  - `T` is the type of a candidate selected by strategies.
  - `C` is the type of context that is used when selecting a candidate.
- `UpdatableLoadBalancer<T, C>` is a stateful load balancer to which new endpoints are updated. `RampingUpLoadBalancer` is the only implementation for `UpdatableLoadBalancer`. Other load balances will be re-created when new endpoints are added because they can always be reconstructed for the same results.
- `Weighted` is a new API that represents the weight of an object.
  - If an object is `Weighted`, a weight function is not necessary when creating weighted-based load balancers.
  - `Endpoint` now implements `Weighted`.
- `EndpointSelectionStategy` uses `DefaultEndpointSelector` to create a `LoadBalancer<Endpoint, ClientRequestContext>` internally and delegates the selection logic to it.
  - Each `EndpointSelectionStategy` implements `LoadBalancerFactory` to update the existing `LoadBalancer` or create a new `LoadBalancer` when endpoints are updated.
- The following implementations are migrated from `**Strategy`. Except for `RampingUpLoadBalancer` which has some minor changes, most of the logic was ported as is.
  - `RampingUpLoadBalancer`
    - `Weight` prefix is dropped for simplicity. There may be no problem conveying the behavior.
    - Refactored to use a lock to guarantee thread-safety and sequential access.
      - A `RampingUpLoadBalancer` is now created from a list of candidates. If an executor is used to build the initial state, null is returned right after it is created. 
- `AbstractRampingUpLoadBalancerBuilder` is added to share common code for `RampingUpLoadBalancerBuilder` and `WeightRampingUpStrategyBuilder`
- Fixed xDS implementations to use the new API when implementing load balancing strategies.
- Deprecation) `EndpointWeightTransition` in favor of `WeightTransition`

Result:

- You can now create `LoadBalancer` using various load balancing strategies to select an element from a list of candidates.
```java
List<Endpoint> candidates = ...;
LoadBalancer.ofRoundRobin(candidates);
LoadBalancer.ofWeightedRoundRobin(candidates);
LoadBalancer.ofSticky(candidates, contextHasher);
LoadBalancer.ofWeightedRandom(candidates);
LoadBalancer.ofRampingUp(candidates);
```
  • Loading branch information
ikhoon authored Feb 24, 2025
1 parent 98e3054 commit 629f3b0
Show file tree
Hide file tree
Showing 40 changed files with 2,705 additions and 1,100 deletions.
4 changes: 3 additions & 1 deletion core/src/main/java/com/linecorp/armeria/client/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.loadbalancer.Weighted;
import com.linecorp.armeria.common.util.DomainSocketAddress;
import com.linecorp.armeria.common.util.UnmodifiableFuture;
import com.linecorp.armeria.internal.common.ArmeriaHttpUtil;
Expand All @@ -72,7 +73,7 @@
* represented as {@code "<host>"} or {@code "<host>:<port>"} in the authority part of a URI. It can have
* an IP address if the host name has been resolved and thus there's no need to query a DNS server.</p>
*/
public final class Endpoint implements Comparable<Endpoint>, EndpointGroup {
public final class Endpoint implements Comparable<Endpoint>, EndpointGroup, Weighted {

private static final Comparator<Endpoint> COMPARATOR =
Comparator.comparing(Endpoint::host)
Expand Down Expand Up @@ -652,6 +653,7 @@ public Endpoint withWeight(int weight) {
/**
* Returns the weight of this endpoint.
*/
@Override
public int weight() {
return weight;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client.endpoint;

import java.util.List;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.loadbalancer.LoadBalancer;
import com.linecorp.armeria.common.util.ListenableAsyncCloseable;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;

final class DefaultEndpointSelector<T extends LoadBalancer<Endpoint, ClientRequestContext>>
extends AbstractEndpointSelector {

private final LoadBalancerFactory<T> loadBalancerFactory;
@Nullable
private volatile T loadBalancer;
private boolean closed;
private final ReentrantShortLock lock = new ReentrantShortLock();

DefaultEndpointSelector(EndpointGroup endpointGroup,
LoadBalancerFactory<T> loadBalancerFactory) {
super(endpointGroup);
this.loadBalancerFactory = loadBalancerFactory;
if (endpointGroup instanceof ListenableAsyncCloseable) {
((ListenableAsyncCloseable) endpointGroup).whenClosed().thenAccept(unused -> {
lock.lock();
try {
closed = true;
final T loadBalancer = this.loadBalancer;
if (loadBalancer != null) {
loadBalancer.close();
}
} finally {
lock.unlock();
}
});
}
initialize();
}

@Override
protected void updateNewEndpoints(List<Endpoint> endpoints) {
lock.lock();
try {
if (closed) {
return;
}
loadBalancer = loadBalancerFactory.newLoadBalancer(loadBalancer, endpoints);
} finally {
lock.unlock();
}
}

@Nullable
@Override
public Endpoint selectNow(ClientRequestContext ctx) {
final T loadBalancer = this.loadBalancer;
if (loadBalancer == null) {
return null;
}
return loadBalancer.pick(ctx);
}

@FunctionalInterface
interface LoadBalancerFactory<T> {
T newLoadBalancer(@Nullable T oldLoadBalancer, List<Endpoint> candidates);

@SuppressWarnings("unchecked")
default T unsafeCast(LoadBalancer<Endpoint, ?> loadBalancer) {
return (T) loadBalancer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.loadbalancer.WeightTransition;

/**
* {@link Endpoint} selection strategy that creates a {@link EndpointSelector}.
Expand Down Expand Up @@ -53,7 +54,7 @@ static EndpointSelectionStrategy roundRobin() {

/**
* Returns a weight ramping up {@link EndpointSelectionStrategy} which ramps the weight of newly added
* {@link Endpoint}s using {@link EndpointWeightTransition#linear()}. The {@link Endpoint} is selected
* {@link Endpoint}s using {@link WeightTransition#linear()}. The {@link Endpoint} is selected
* using weighted random distribution.
* The weights of {@link Endpoint}s are ramped up by 10 percent every 2 seconds up to 100 percent
* by default. If you want to customize the parameters, use {@link #builderForRampingUp()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
package com.linecorp.armeria.client.endpoint;

import static com.google.common.base.Preconditions.checkArgument;
import static com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder.DEFAULT_LINEAR_TRANSITION;

import com.google.common.primitives.Ints;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.loadbalancer.WeightTransition;

/**
* Computes the weight of the given {@link Endpoint} using the given {@code currentStep}
* and {@code totalSteps}.
*
* @deprecated Use {@link WeightTransition} instead.
*/
@Deprecated
@FunctionalInterface
public interface EndpointWeightTransition {

/**
* Returns the {@link EndpointWeightTransition} which returns the gradually increased weight as the current
* step increases.
*
* @deprecated Use {@link WeightTransition#linear()} instead.
*/
@Deprecated
static EndpointWeightTransition linear() {
return DEFAULT_LINEAR_TRANSITION;
return (endpoint, currentStep, totalSteps) -> {
return WeightTransition.linear().compute(endpoint, endpoint.weight(), currentStep, totalSteps);
};
}

/**
Expand All @@ -44,24 +50,18 @@ static EndpointWeightTransition linear() {
* Refer to the following
* <a href="https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/slow_start">link</a>
* for more information.
*
* @deprecated Use {@link WeightTransition#aggression(double, double)} instead.
*/
@Deprecated
static EndpointWeightTransition aggression(double aggression, double minWeightPercent) {
checkArgument(aggression > 0,
"aggression: %s (expected: > 0.0)", aggression);
checkArgument(minWeightPercent >= 0 && minWeightPercent <= 1.0,
"minWeightPercent: %s (expected: >= 0.0, <= 1.0)", minWeightPercent);
final int aggressionPercentage = Ints.saturatedCast(Math.round(aggression * 100));
final double invertedAggression = 100.0 / aggressionPercentage;
return (endpoint, currentStep, totalSteps) -> {
final int weight = endpoint.weight();
final int minWeight = Ints.saturatedCast(Math.round(weight * minWeightPercent));
final int computedWeight;
if (aggressionPercentage == 100) {
computedWeight = linear().compute(endpoint, currentStep, totalSteps);
} else {
computedWeight = (int) (weight * Math.pow(1.0 * currentStep / totalSteps, invertedAggression));
}
return Math.max(computedWeight, minWeight);
return WeightTransition.aggression(aggression, minWeightPercent)
.compute(endpoint, endpoint.weight(), currentStep, totalSteps);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,27 @@
package com.linecorp.armeria.client.endpoint;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.DefaultEndpointSelector.LoadBalancerFactory;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.loadbalancer.LoadBalancer;

final class RoundRobinStrategy implements EndpointSelectionStrategy {

static final RoundRobinStrategy INSTANCE = new RoundRobinStrategy();
enum RoundRobinStrategy
implements EndpointSelectionStrategy,
LoadBalancerFactory<LoadBalancer<Endpoint, ClientRequestContext>> {

private RoundRobinStrategy() {}
INSTANCE;

@Override
public EndpointSelector newSelector(EndpointGroup endpointGroup) {
return new RoundRobinSelector(endpointGroup);
return new DefaultEndpointSelector<>(endpointGroup, this);
}

/**
* A round robin select strategy.
*
* <p>For example, with node a, b and c, then select result is abc abc ...
*/
static class RoundRobinSelector extends AbstractEndpointSelector {
private final AtomicInteger sequence = new AtomicInteger();

RoundRobinSelector(EndpointGroup endpointGroup) {
super(endpointGroup);
initialize();
}

@Nullable
@Override
public Endpoint selectNow(ClientRequestContext ctx) {
final List<Endpoint> endpoints = group().endpoints();
if (endpoints.isEmpty()) {
return null;
}
final int currentSequence = sequence.getAndIncrement();
return endpoints.get(Math.abs(currentSequence % endpoints.size()));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpoints", group().endpoints())
.toString();
}
@Override
public LoadBalancer<Endpoint, ClientRequestContext> newLoadBalancer(
@Nullable LoadBalancer<Endpoint, ClientRequestContext> oldLoadBalancer, List<Endpoint> candidates) {
return unsafeCast(LoadBalancer.ofRoundRobin(candidates));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import java.util.List;
import java.util.function.ToLongFunction;

import com.google.common.base.MoreObjects;
import com.google.common.hash.Hashing;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.DefaultEndpointSelector.LoadBalancerFactory;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.loadbalancer.LoadBalancer;

/**
* An {@link EndpointSelector} strategy which implements sticky load-balancing using
Expand All @@ -46,7 +45,9 @@
* final StickyEndpointSelectionStrategy strategy = new StickyEndpointSelectionStrategy(hasher);
* }</pre>
*/
final class StickyEndpointSelectionStrategy implements EndpointSelectionStrategy {
final class StickyEndpointSelectionStrategy
implements EndpointSelectionStrategy,
LoadBalancerFactory<LoadBalancer<Endpoint, ClientRequestContext>> {

private final ToLongFunction<? super ClientRequestContext> requestContextHasher;

Expand All @@ -61,45 +62,16 @@ final class StickyEndpointSelectionStrategy implements EndpointSelectionStrategy
}

/**
* Creates a new {@link StickyEndpointSelector}.
*
* @param endpointGroup an {@link EndpointGroup}
* @return a new {@link StickyEndpointSelector}
* Creates a new sticky {@link EndpointSelector}.
*/
@Override
public EndpointSelector newSelector(EndpointGroup endpointGroup) {
return new StickyEndpointSelector(endpointGroup, requestContextHasher);
return new DefaultEndpointSelector<>(endpointGroup, this);
}

private static final class StickyEndpointSelector extends AbstractEndpointSelector {

private final ToLongFunction<? super ClientRequestContext> requestContextHasher;

StickyEndpointSelector(EndpointGroup endpointGroup,
ToLongFunction<? super ClientRequestContext> requestContextHasher) {
super(endpointGroup);
this.requestContextHasher = requireNonNull(requestContextHasher, "requestContextHasher");
initialize();
}

@Nullable
@Override
public Endpoint selectNow(ClientRequestContext ctx) {
final List<Endpoint> endpoints = group().endpoints();
if (endpoints.isEmpty()) {
return null;
}

final long key = requestContextHasher.applyAsLong(ctx);
final int nearest = Hashing.consistentHash(key, endpoints.size());
return endpoints.get(nearest);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpoints", group().endpoints())
.toString();
}
@Override
public LoadBalancer<Endpoint, ClientRequestContext> newLoadBalancer(
@Nullable LoadBalancer<Endpoint, ClientRequestContext> oldLoadBalancer, List<Endpoint> candidates) {
return LoadBalancer.ofSticky(candidates, requestContextHasher);
}
}
Loading

0 comments on commit 629f3b0

Please sign in to comment.