Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add port tag to server metrics #6116

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@

package com.linecorp.armeria.server;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.linecorp.armeria.server.HttpServerHandler.UNKNOWN_ADDR;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.internal.common.util.ChannelUtil;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
Expand All @@ -48,6 +55,10 @@ final class ConnectionLimitingHandler extends ChannelInboundHandlerAdapter {
private final int maxNumConnections;
private final ServerMetrics serverMetrics;

/**
* AtomicInteger is used to read the number of active connections frequently.
*/
private final AtomicInteger activeConnections = new AtomicInteger();
private final AtomicBoolean loggingScheduled = new AtomicBoolean();
private final LongAdder numDroppedConnections = new LongAdder();

Expand All @@ -59,17 +70,19 @@ final class ConnectionLimitingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final Channel child = (Channel) msg;

final int conn = serverMetrics.increaseActiveConnectionsAndGet();
final int conn = activeConnections.incrementAndGet();
if (conn > 0 && conn <= maxNumConnections) {
final InetSocketAddress localAddress = firstNonNull(ChannelUtil.localAddress(child), UNKNOWN_ADDR);
serverMetrics.increaseActiveConnections(localAddress);
childChannels.add(child);
child.closeFuture().addListener(future -> {
childChannels.remove(child);
serverMetrics.decreaseActiveConnections();
activeConnections.decrementAndGet();
serverMetrics.decreaseActiveConnections(localAddress);
});
super.channelRead(ctx, msg);
} else {
serverMetrics.decreaseActiveConnections();
activeConnections.decrementAndGet();

// Set linger option to 0 so that the server doesn't get too many TIME_WAIT states.
child.config().setOption(ChannelOption.SO_LINGER, 0);
Expand Down Expand Up @@ -104,7 +117,7 @@ public int maxNumConnections() {
* Returns the number of open connections.
*/
public int numConnections() {
return serverMetrics.activeConnections();
return activeConnections.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ final class DefaultServerConfig implements ServerConfig {

@Nullable
private final Mapping<String, SslContext> sslContexts;
private final ServerMetrics serverMetrics = new ServerMetrics();
private final ServerMetrics serverMetrics;

@Nullable
private String strVal;
Expand Down Expand Up @@ -266,6 +266,7 @@ final class DefaultServerConfig implements ServerConfig {
this.absoluteUriTransformer = castAbsoluteUriTransformer;
this.unloggedExceptionsReportIntervalMillis = unloggedExceptionsReportIntervalMillis;
this.shutdownSupports = ImmutableList.copyOf(requireNonNull(shutdownSupports, "shutdownSupports"));
serverMetrics = new ServerMetrics(ports);
}

private static Int2ObjectMap<Mapping<String, VirtualHost>> buildDomainAndPortMapping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package com.linecorp.armeria.server;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.linecorp.armeria.internal.common.websocket.WebSocketUtil.isHttp1WebSocketUpgradeRequest;
import static com.linecorp.armeria.server.HttpServerHandler.UNKNOWN_ADDR;
import static com.linecorp.armeria.server.HttpServerPipelineConfigurator.SCHEME_HTTP;
import static com.linecorp.armeria.server.ServiceRouteUtil.newRoutingContext;

import java.net.InetSocketAddress;
import java.net.URISyntaxException;

import org.slf4j.Logger;
Expand All @@ -46,6 +49,7 @@
import com.linecorp.armeria.internal.common.InitiateConnectionShutdown;
import com.linecorp.armeria.internal.common.KeepAliveHandler;
import com.linecorp.armeria.internal.common.NoopKeepAliveHandler;
import com.linecorp.armeria.internal.common.util.ChannelUtil;
import com.linecorp.armeria.server.HttpServerUpgradeHandler.UpgradeEvent;
import com.linecorp.armeria.server.websocket.WebSocketService;

Expand Down Expand Up @@ -84,6 +88,7 @@ final class Http1RequestDecoder extends ChannelDuplexHandler {
private final AsciiString scheme;
private SessionProtocol sessionProtocol;
private final InboundTrafficController inboundTrafficController;
private final InetSocketAddress localAddress;
private ServerHttpObjectEncoder encoder;
private final HttpServer httpServer;

Expand All @@ -96,6 +101,7 @@ final class Http1RequestDecoder extends ChannelDuplexHandler {
Http1RequestDecoder(ServerConfig cfg, Channel channel, AsciiString scheme,
ServerHttp1ObjectEncoder encoder, HttpServer httpServer) {
this.cfg = cfg;
localAddress = firstNonNull(ChannelUtil.localAddress(channel), UNKNOWN_ADDR);
this.scheme = scheme;
sessionProtocol = scheme == SCHEME_HTTP ? SessionProtocol.H1C : SessionProtocol.H1;
inboundTrafficController = InboundTrafficController.ofHttp1(channel);
Expand Down Expand Up @@ -271,7 +277,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (pipeline.get(HttpServerUpgradeHandler.class) != null) {
pipeline.remove(HttpServerUpgradeHandler.class);
}
cfg.serverMetrics().increasePendingHttp1Requests();
cfg.serverMetrics().increasePendingHttp1Requests(localAddress);
ctx.fireChannelRead(webSocketRequest);
return;
}
Expand All @@ -284,7 +290,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (maxRequestLength > 0 && contentLength > maxRequestLength) {
abortLargeRequest(ctx, req, id, endOfStream, keepAliveHandler, true);
}
cfg.serverMetrics().increasePendingHttp1Requests();
cfg.serverMetrics().increasePendingHttp1Requests(localAddress);
ctx.fireChannelRead(req);
} else {
fail(id, null, HttpStatus.BAD_REQUEST, "Invalid decoder state", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package com.linecorp.armeria.server;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.linecorp.armeria.server.HttpServerHandler.UNKNOWN_ADDR;
import static com.linecorp.armeria.server.HttpServerPipelineConfigurator.SCHEME_HTTP;
import static com.linecorp.armeria.server.ServiceRouteUtil.newRoutingContext;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;

import java.net.InetSocketAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +45,7 @@
import com.linecorp.armeria.internal.common.Http2GoAwayHandler;
import com.linecorp.armeria.internal.common.InboundTrafficController;
import com.linecorp.armeria.internal.common.KeepAliveHandler;
import com.linecorp.armeria.internal.common.util.ChannelUtil;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand All @@ -65,6 +70,7 @@ final class Http2RequestDecoder extends Http2EventAdapter {

private final ServerConfig cfg;
private final Channel channel;
private final InetSocketAddress localAddress;
private final AsciiString scheme;
@Nullable
private ServerHttp2ObjectEncoder encoder;
Expand All @@ -79,6 +85,7 @@ final class Http2RequestDecoder extends Http2EventAdapter {
AsciiString scheme, KeepAliveHandler keepAliveHandler) {
this.cfg = cfg;
this.channel = channel;
localAddress = firstNonNull(ChannelUtil.localAddress(channel), UNKNOWN_ADDR);
this.scheme = scheme;
inboundTrafficController =
InboundTrafficController.ofHttp2(channel, cfg.http2InitialConnectionWindowSize());
Expand Down Expand Up @@ -211,7 +218,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
abortLargeRequest(req, endOfStream, true);
}
requests.put(streamId, req);
cfg.serverMetrics().increasePendingHttp2Requests();
cfg.serverMetrics().increasePendingHttp2Requests(localAddress);
ctx.fireChannelRead(req);
} else {
if (!(req instanceof DecodedHttpRequestWriter)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ final class HttpServerHandler extends ChannelInboundHandlerAdapter implements Ht
private static final String ALLOWED_METHODS_STRING =
HttpMethod.knownMethods().stream().map(HttpMethod::name).collect(Collectors.joining(","));

private static final InetSocketAddress UNKNOWN_ADDR;
static final InetSocketAddress UNKNOWN_ADDR;

static {
InetAddress unknownAddr;
Expand Down Expand Up @@ -193,10 +193,8 @@ static void safeClose(Channel ch) {

@Nullable
private final ProxiedAddresses proxiedAddresses;
@Nullable
private InetSocketAddress remoteAddress;
@Nullable
private InetSocketAddress localAddress;
private final InetSocketAddress remoteAddress;
private final InetSocketAddress localAddress;

private final IdentityHashMap<DecodedHttpRequest, HttpResponse> unfinishedRequests;
private boolean isReading;
Expand All @@ -205,14 +203,16 @@ static void safeClose(Channel ch) {
private boolean handledLastRequest;

HttpServerHandler(ServerConfig config,
GracefulShutdownSupport gracefulShutdownSupport,
Channel channel, GracefulShutdownSupport gracefulShutdownSupport,
@Nullable ServerHttpObjectEncoder responseEncoder,
SessionProtocol protocol,
@Nullable ProxiedAddresses proxiedAddresses) {

assert protocol == H1 || protocol == H1C || protocol == H2;

this.config = requireNonNull(config, "config");
remoteAddress = firstNonNull(ChannelUtil.remoteAddress(channel), UNKNOWN_ADDR);
localAddress = firstNonNull(ChannelUtil.localAddress(channel), UNKNOWN_ADDR);
this.gracefulShutdownSupport = requireNonNull(gracefulShutdownSupport, "gracefulShutdownSupport");

this.protocol = requireNonNull(protocol, "protocol");
Expand Down Expand Up @@ -402,18 +402,15 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th

final Channel channel = ctx.channel();
final RequestHeaders headers = req.headers();
final InetSocketAddress remoteAddress = firstNonNull(remoteAddress(channel), UNKNOWN_ADDR);
final InetSocketAddress localAddress = firstNonNull(localAddress(channel), UNKNOWN_ADDR);
final ProxiedAddresses proxiedAddresses = determineProxiedAddresses(remoteAddress, headers);
final ProxiedAddresses proxiedAddresses = determineProxiedAddresses(headers);
final InetAddress clientAddress = config.clientAddressMapper().apply(proxiedAddresses).getAddress();
final EventLoop channelEventLoop = channel.eventLoop();

final RoutingContext routingCtx = req.routingContext();
final RoutingStatus routingStatus = routingCtx.status();
if (!routingStatus.routeMustExist()) {
final ServiceRequestContext reqCtx = newEarlyRespondingRequestContext(
channel, req, proxiedAddresses, clientAddress, remoteAddress, localAddress, routingCtx,
channelEventLoop);
channel, req, proxiedAddresses, clientAddress, routingCtx, channelEventLoop);

// Handle 'OPTIONS * HTTP/1.1'.
if (routingStatus == RoutingStatus.OPTIONS) {
Expand Down Expand Up @@ -522,21 +519,21 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th

private void decreasePendingRequests() {
if (protocol.isExplicitHttp1()) {
config.serverMetrics().decreasePendingHttp1Requests();
config.serverMetrics().decreasePendingHttp1Requests(localAddress);
} else {
assert protocol.isExplicitHttp2();
config.serverMetrics().decreasePendingHttp2Requests();
config.serverMetrics().decreasePendingHttp2Requests(localAddress);
}
}

private void increaseActiveRequests(boolean isHttp1WebSocket) {
if (isHttp1WebSocket) {
config.serverMetrics().increaseActiveHttp1WebSocketRequests();
config.serverMetrics().increaseActiveHttp1WebSocketRequests(localAddress);
} else if (protocol.isExplicitHttp1()) {
config.serverMetrics().increaseActiveHttp1Requests();
config.serverMetrics().increaseActiveHttp1Requests(localAddress);
} else {
assert protocol.isExplicitHttp2();
config.serverMetrics().increaseActiveHttp2Requests();
config.serverMetrics().increaseActiveHttp2Requests(localAddress);
}
}

Expand Down Expand Up @@ -570,8 +567,7 @@ private HttpResponse serveInServiceEventLoop(DecodedHttpRequest req,
.subscribeOn(serviceEventLoop);
}

private ProxiedAddresses determineProxiedAddresses(InetSocketAddress remoteAddress,
RequestHeaders headers) {
private ProxiedAddresses determineProxiedAddresses(RequestHeaders headers) {
if (config.clientAddressTrustedProxyFilter().test(remoteAddress.getAddress())) {
return HttpHeaderUtil.determineProxiedAddresses(
headers, config.clientAddressSources(), proxiedAddresses,
Expand All @@ -581,30 +577,6 @@ private ProxiedAddresses determineProxiedAddresses(InetSocketAddress remoteAddre
}
}

@Nullable
private InetSocketAddress remoteAddress(Channel ch) {
final InetSocketAddress remoteAddress = this.remoteAddress;
if (remoteAddress != null) {
return remoteAddress;
}

final InetSocketAddress newRemoteAddress = ChannelUtil.remoteAddress(ch);
this.remoteAddress = newRemoteAddress;
return newRemoteAddress;
}

@Nullable
private InetSocketAddress localAddress(Channel ch) {
final InetSocketAddress localAddress = this.localAddress;
if (localAddress != null) {
return localAddress;
}

final InetSocketAddress newLocalAddress = ChannelUtil.localAddress(ch);
this.localAddress = newLocalAddress;
return newLocalAddress;
}

private void handleOptions(ChannelHandlerContext ctx, ServiceRequestContext reqCtx) {
respond(ctx, reqCtx,
ResponseHeaders.builder(HttpStatus.OK)
Expand Down Expand Up @@ -733,8 +705,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
private ServiceRequestContext newEarlyRespondingRequestContext(Channel channel, DecodedHttpRequest req,
ProxiedAddresses proxiedAddresses,
InetAddress clientAddress,
InetSocketAddress remoteAddress,
InetSocketAddress localAddress,
RoutingContext routingCtx,
EventLoop eventLoop) {
final ServiceConfig serviceConfig = routingCtx.virtualHost().fallbackServiceConfig();
Expand Down Expand Up @@ -849,11 +819,11 @@ private void handleRequestOrResponseComplete() {
return;
}
if (req.isHttp1WebSocket()) {
config.serverMetrics().decreaseActiveHttp1WebSocketRequests();
config.serverMetrics().decreaseActiveHttp1WebSocketRequests(localAddress);
} else if (protocol.isExplicitHttp1()) {
config.serverMetrics().decreaseActiveHttp1Requests();
config.serverMetrics().decreaseActiveHttp1Requests(localAddress);
} else if (protocol.isExplicitHttp2()) {
config.serverMetrics().decreaseActiveHttp2Requests();
config.serverMetrics().decreaseActiveHttp2Requests(localAddress);
}

// NB: logBuilder.endResponse() is called by HttpResponseSubscriber.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private void configureHttp(ChannelPipeline p, @Nullable ProxiedAddresses proxied
p.channel(), H1C, keepAliveHandler, config.http1HeaderNaming()
);
p.addLast(TrafficLoggingHandler.SERVER);
final HttpServerHandler httpServerHandler = new HttpServerHandler(config,
final HttpServerHandler httpServerHandler = new HttpServerHandler(config, p.channel(),
gracefulShutdownSupport,
responseEncoder,
H1C, proxiedAddresses);
Expand Down Expand Up @@ -511,7 +511,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr
private void addHttp2Handlers(ChannelHandlerContext ctx) {
final ChannelPipeline p = ctx.pipeline();
p.addLast(newHttp2ConnectionHandler(p, SCHEME_HTTPS));
p.addLast(new HttpServerHandler(config,
p.addLast(new HttpServerHandler(config, p.channel(),
gracefulShutdownSupport,
null, H2, proxiedAddresses));
}
Expand Down Expand Up @@ -541,7 +541,7 @@ private void addHttpHandlers(ChannelHandlerContext ctx) {
config.http1MaxInitialLineLength(),
config.http1MaxHeaderSize(),
config.http1MaxChunkSize()));
final HttpServerHandler httpServerHandler = new HttpServerHandler(config,
final HttpServerHandler httpServerHandler = new HttpServerHandler(config, ch,
gracefulShutdownSupport,
encoder, H1, proxiedAddresses);
p.addLast(new Http1RequestDecoder(config, ch, SCHEME_HTTPS, encoder, httpServerHandler));
Expand Down
Loading
Loading