Skip to content

Commit

Permalink
Merge branch 'main' into feature/set-eureka-instanceinfo-to-endpoint-…
Browse files Browse the repository at this point in the history
…attr
  • Loading branch information
Ivan-Montes authored Feb 22, 2025
2 parents 9a88c9d + 98e3054 commit 1575885
Show file tree
Hide file tree
Showing 42 changed files with 1,451 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import java.util.List;
import java.util.function.BiConsumer;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
Expand All @@ -43,16 +41,6 @@

abstract class BraveClientIntegrationTest extends ITHttpAsyncClient<WebClient> {

/**
* OkHttp's MockWebServer does not support H2C with HTTP/1 upgrade request.
*/
private static final ClientFactory clientFactoryWithoutUpgradeRequest =
ClientFactory.builder().useHttp2Preface(true).build();

@AfterAll static void closeClientFactory() {
clientFactoryWithoutUpgradeRequest.closeAsync();
}

private final List<Protocol> protocols;
private final SessionProtocol sessionProtocol;

Expand Down Expand Up @@ -81,7 +69,6 @@ protected CurrentTraceContext.Builder currentTraceContextBuilder() {
@Override
protected WebClient newClient(int port) {
return WebClient.builder(sessionProtocol.uriText() + "://127.0.0.1:" + port)
.factory(clientFactoryWithoutUpgradeRequest)
.decorator(BraveClient.newDecorator(httpTracing))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,53 +75,78 @@ ObjectMapper getObjectMapper() {
}

/**
* Registers a service to Consul Agent with service ID.
* Registers a service to Consul Agent.
*
* @param serviceId a service ID that identifying a service
* @param serviceName a service name to register
* @param endpoint an endpoint of service to register
* @param check a check for the service
* @param tags tags for the service
* @return a {@link CompletableFuture} that will be completed with the registered service ID
*
* @return an HttpResponse representing the HTTP response from Consul
*/
public HttpResponse register(String serviceId, String serviceName, Endpoint endpoint,
@Nullable Check check, List<String> tags) {
return agentClient.register(serviceId, serviceName, endpoint.host(), endpoint.port(), check, tags);
}

/**
* De-registers a service to Consul Agent.
* De-registers a service from Consul Agent.
*
* @param serviceId a service ID that identifying a service
*
* @return an HttpResponse representing the HTTP response from Consul
*/
public HttpResponse deregister(String serviceId) {
return agentClient.deregister(serviceId);
}

/**
* Get registered endpoints with service name from Consul agent.
* Retrieves the list of registered endpoints for the specified service name from the Consul agent.
*
* @param serviceName the name of the service whose endpoints are to be retrieved
*
* @return a {@link CompletableFuture} which provides a list of {@link Endpoint}s
*/
public CompletableFuture<List<Endpoint>> endpoints(String serviceName) {
return endpoints(serviceName, null, null);
}

/**
* Get registered endpoints with service name in datacenter from Consul agent.
* Retrieves the list of registered endpoints for the specified service name and datacenter
* from the Consul agent, optionally applying a filter.
*
* @param serviceName the name of the service whose endpoints are to be retrieved
* @param datacenter the datacenter to query; if {@code null}, the default datacenter is used
* @param filter a filter expression to apply; if {@code null}, no filtering is performed
*
* @return a {@link CompletableFuture} which provides a list of {@link Endpoint}s
*/
public CompletableFuture<List<Endpoint>> endpoints(String serviceName, @Nullable String datacenter,
@Nullable String filter) {
return catalogClient.endpoints(serviceName, datacenter, filter);
}

/**
* Returns the registered endpoints with the specified service name from Consul agent.
* Retrieves the list of healthy endpoints for the specified service name from the Consul agent.
*
* @param serviceName the name of the service whose healthy endpoints are to be retrieved
*
* @return a {@link CompletableFuture} which provides a list of healthy {@link Endpoint}s
*/
public CompletableFuture<List<Endpoint>> healthyEndpoints(String serviceName) {
return healthyEndpoints(serviceName, null, null);
}

/**
* Returns the registered endpoints with the specified service name in datacenter from Consul agent.
* Retrieves the list of healthy endpoints for the specified service name and datacenter
* from the Consul agent, optionally applying a filter.
*
* @param serviceName the name of the service whose healthy endpoints are to be retrieved
* @param datacenter the datacenter to query; if {@code null}, the default datacenter is used
* @param filter a filter expression to apply; if {@code null}, no filtering is performed
*
* @return a {@link CompletableFuture} which provides a list of healthy {@link Endpoint}s
*/
public CompletableFuture<List<Endpoint>> healthyEndpoints(String serviceName, @Nullable String datacenter,
@Nullable String filter) {
Expand Down
100 changes: 76 additions & 24 deletions core/src/main/java/com/linecorp/armeria/client/HttpClientDelegate.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.linecorp.armeria.client;

import static com.linecorp.armeria.internal.common.util.IpAddrUtil.isCreatedWithIpAddressOnly;
import static java.util.Objects.requireNonNull;

import java.net.InetAddress;
Expand Down Expand Up @@ -88,24 +89,36 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
}

final SessionProtocol protocol = ctx.sessionProtocol();
final ProxyConfig proxyConfig;

final Endpoint endpointWithPort = endpoint.withDefaultPort(ctx.sessionProtocol());
final EventLoop eventLoop = ctx.eventLoop().withoutContext();
// TODO(ikhoon) Use ctx.exchangeType() to create an optimized HttpResponse for non-streaming response.
final DecodedHttpResponse res = new DecodedHttpResponse(eventLoop);
updateCancellationTask(ctx, req, res);

try {
proxyConfig = getProxyConfig(protocol, endpoint);
resolveProxyConfig(protocol, endpoint, ctx, (proxyConfig, thrown) -> {
if (thrown != null) {
earlyFailedResponse(thrown, ctx, res);
} else {
assert proxyConfig != null;
execute0(ctx, endpointWithPort, req, res, proxyConfig);
}
});
} catch (Throwable t) {
return earlyFailedResponse(t, ctx);
}
return res;
}

private void execute0(ClientRequestContext ctx, Endpoint endpointWithPort, HttpRequest req,
DecodedHttpResponse res, ProxyConfig proxyConfig) {
final Throwable cancellationCause = ctx.cancellationCause();
if (cancellationCause != null) {
return earlyFailedResponse(cancellationCause, ctx);
earlyFailedResponse(cancellationCause, ctx, res);
return;
}

final Endpoint endpointWithPort = endpoint.withDefaultPort(ctx.sessionProtocol());
final EventLoop eventLoop = ctx.eventLoop().withoutContext();
// TODO(ikhoon) Use ctx.exchangeType() to create an optimized HttpResponse for non-streaming response.
final DecodedHttpResponse res = new DecodedHttpResponse(eventLoop);
updateCancellationTask(ctx, req, res);

final ClientConnectionTimingsBuilder timingsBuilder = ClientConnectionTimings.builder();

if (endpointWithPort.hasIpAddr() ||
Expand All @@ -125,8 +138,6 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
}
});
}

return res;
}

private static void updateCancellationTask(ClientRequestContext ctx, HttpRequest req,
Expand Down Expand Up @@ -215,24 +226,52 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
}
}

private ProxyConfig getProxyConfig(SessionProtocol protocol, Endpoint endpoint) {
final ProxyConfig proxyConfig = factory.proxyConfigSelector().select(protocol, endpoint);
requireNonNull(proxyConfig, "proxyConfig");
private void resolveProxyConfig(SessionProtocol protocol, Endpoint endpoint, ClientRequestContext ctx,
BiConsumer<@Nullable ProxyConfig, @Nullable Throwable> onComplete) {
final ProxyConfig unresolvedProxyConfig = factory.proxyConfigSelector().select(protocol, endpoint);
requireNonNull(unresolvedProxyConfig, "unresolvedProxyConfig");
final ProxyConfig proxyConfig = maybeSetHAProxySourceAddress(unresolvedProxyConfig);

// special behavior for haproxy when sourceAddress is null
if (proxyConfig.proxyType() == ProxyType.HAPROXY &&
((HAProxyConfig) proxyConfig).sourceAddress() == null) {
final InetSocketAddress proxyAddress = proxyConfig.proxyAddress();
final InetSocketAddress proxyAddress = proxyConfig.proxyAddress();
final boolean needsDnsResolution = proxyAddress != null && !isCreatedWithIpAddressOnly(proxyAddress);
if (needsDnsResolution) {
assert proxyAddress != null;
final Future<InetSocketAddress> resolveFuture = addressResolverGroup
.getResolver(ctx.eventLoop().withoutContext())
.resolve(createUnresolvedAddressForRefreshing(proxyAddress));

// use proxy information in context if available
final ServiceRequestContext serviceCtx = ServiceRequestContext.currentOrNull();
if (serviceCtx != null) {
final ProxiedAddresses proxiedAddresses = serviceCtx.proxiedAddresses();
return ProxyConfig.haproxy(proxyAddress, proxiedAddresses.sourceAddress());
}
resolveFuture.addListener(future -> {
if (future.isSuccess()) {
final InetSocketAddress resolvedAddress = (InetSocketAddress) future.getNow();
final ProxyConfig newProxyConfig = proxyConfig.withProxyAddress(resolvedAddress);
onComplete.accept(newProxyConfig, null);
} else {
final Throwable cause = future.cause();
onComplete.accept(null, cause);
}
});
} else {
onComplete.accept(proxyConfig, null);
}
}

private static ProxyConfig maybeSetHAProxySourceAddress(ProxyConfig proxyConfig) {
if (proxyConfig.proxyType() != ProxyType.HAPROXY) {
return proxyConfig;
}
if (((HAProxyConfig) proxyConfig).sourceAddress() != null) {
return proxyConfig;
}

final ServiceRequestContext sctx = ServiceRequestContext.currentOrNull();
final ProxiedAddresses serviceProxiedAddresses = sctx == null ? null : sctx.proxiedAddresses();
if (serviceProxiedAddresses != null) {
// A special behavior for haproxy when sourceAddress is null.
// Use proxy information in the service context if available.
final InetSocketAddress proxyAddress = proxyConfig.proxyAddress();
assert proxyAddress != null;
return ProxyConfig.haproxy(proxyAddress, serviceProxiedAddresses.sourceAddress());
}
return proxyConfig;
}

Expand All @@ -253,11 +292,24 @@ private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContex
return HttpResponse.ofFailure(cause);
}

private static HttpResponse earlyFailedResponse(Throwable t,
ClientRequestContext ctx,
DecodedHttpResponse response) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
ctx.cancel(cause);
response.close(cause);
return response;
}

private static void doExecute(PooledChannel pooledChannel, ClientRequestContext ctx,
HttpRequest req, DecodedHttpResponse res) {
final Channel channel = pooledChannel.get();
final HttpSession session = HttpSession.get(channel);
res.init(session.inboundTrafficController());
session.invoke(pooledChannel, ctx, req, res);
}

private static InetSocketAddress createUnresolvedAddressForRefreshing(InetSocketAddress previousAddress) {
return InetSocketAddress.createUnresolved(previousAddress.getHostString(), previousAddress.getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpObject;
Expand Down Expand Up @@ -58,6 +60,8 @@ class HttpResponseWrapper implements StreamWriter<HttpObject> {
private final EventLoop eventLoop;
private final ClientRequestContext ctx;
private final long maxContentLength;
@VisibleForTesting
static final String UNEXPECTED_EXCEPTION_MSG = "Unexpected exception while closing a request";

private boolean responseStarted;
private long contentLengthHeaderValue = -1;
Expand Down Expand Up @@ -232,14 +236,16 @@ void close(@Nullable Throwable cause, boolean cancel) {
requestAutoAbortDelayMillis, TimeUnit.MILLISECONDS);
}

private void closeAction(@Nullable Throwable cause) {
private boolean closeAction(@Nullable Throwable cause) {
final boolean closed;
if (cause != null) {
delegate.close(cause);
closed = delegate.tryClose(cause);
ctx.logBuilder().endResponse(cause);
} else {
delegate.close();
closed = delegate.tryClose();
ctx.logBuilder().endResponse();
}
return closed;
}

private void cancelAction(@Nullable Throwable cause) {
Expand All @@ -262,8 +268,10 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
cancelAction(cause);
return;
}
if (delegate.isOpen()) {
closeAction(cause);

// don't log if the cause will be exposed via the response/log
if (delegate.isOpen() && closeAction(cause)) {
return;
}

// the context has been cancelled either by timeout or by user invocation
Expand All @@ -275,7 +283,7 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
return;
}

final StringBuilder logMsg = new StringBuilder("Unexpected exception while closing a request");
final StringBuilder logMsg = new StringBuilder(UNEXPECTED_EXCEPTION_MSG);
final HttpRequest request = ctx.request();
assert request != null;
final String authority = request.authority();
Expand Down
Loading

0 comments on commit 1575885

Please sign in to comment.