diff --git a/core/src/main/java/com/linecorp/armeria/server/ConnectionLimitingHandler.java b/core/src/main/java/com/linecorp/armeria/server/ConnectionLimitingHandler.java index e750c42972e..c26da96e1e2 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ConnectionLimitingHandler.java +++ b/core/src/main/java/com/linecorp/armeria/server/ConnectionLimitingHandler.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import org.slf4j.Logger; @@ -38,89 +39,105 @@ * Limit the number of open connections to the configured value. * {@link ConnectionLimitingHandler} instance would be set to {@link ServerBootstrap#handler(ChannelHandler)}. */ -@Sharable -final class ConnectionLimitingHandler extends ChannelInboundHandlerAdapter { +final class ConnectionLimitingHandler { private static final Logger logger = LoggerFactory.getLogger(ConnectionLimitingHandler.class); private final Set childChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set unmodifiableChildChannels = Collections.unmodifiableSet(childChannels); private final int maxNumConnections; - private final ServerMetrics serverMetrics; + /** + * AtomicInteger is used to read the number of active connections frequently. + */ + private final AtomicInteger activeConnections = new AtomicInteger(); private final AtomicBoolean loggingScheduled = new AtomicBoolean(); private final LongAdder numDroppedConnections = new LongAdder(); - ConnectionLimitingHandler(int maxNumConnections, ServerMetrics serverMetrics) { + ConnectionLimitingHandler(int maxNumConnections) { this.maxNumConnections = validateMaxNumConnections(maxNumConnections); - this.serverMetrics = serverMetrics; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - final Channel child = (Channel) msg; - - final int conn = serverMetrics.increaseActiveConnectionsAndGet(); - if (conn > 0 && conn <= maxNumConnections) { - childChannels.add(child); - child.closeFuture().addListener(future -> { - childChannels.remove(child); - serverMetrics.decreaseActiveConnections(); - }); - super.channelRead(ctx, msg); - } else { - serverMetrics.decreaseActiveConnections(); - - // Set linger option to 0 so that the server doesn't get too many TIME_WAIT states. - child.config().setOption(ChannelOption.SO_LINGER, 0); - child.unsafe().closeForcibly(); - - numDroppedConnections.increment(); - - if (loggingScheduled.compareAndSet(false, true)) { - ctx.executor().schedule(this::writeNumDroppedConnectionsLog, 1, TimeUnit.SECONDS); - } - } - } - - private void writeNumDroppedConnectionsLog() { - loggingScheduled.set(false); - - final long dropped = numDroppedConnections.sumThenReset(); - if (dropped > 0) { - logger.warn("Dropped {} connection(s) to limit the number of open connections to {}", - dropped, maxNumConnections); - } } /** * Returns the maximum allowed number of open connections. */ - public int maxNumConnections() { + int maxNumConnections() { return maxNumConnections; } /** * Returns the number of open connections. */ - public int numConnections() { - return serverMetrics.activeConnections(); + int numConnections() { + return activeConnections.get(); } /** * Returns the immutable set of child {@link Channel}s. */ - public Set children() { + Set children() { return unmodifiableChildChannels; } /** * Validates the maximum allowed number of open connections. It must be a positive number. */ - public static int validateMaxNumConnections(int maxNumConnections) { + static int validateMaxNumConnections(int maxNumConnections) { if (maxNumConnections <= 0) { throw new IllegalArgumentException("maxNumConnections: " + maxNumConnections + " (expected: > 0)"); } return maxNumConnections; } + + ChannelHandler newChildHandler(ServerPortMetric serverPortMetric) { + return new ConnectionLimitingChildHandler(serverPortMetric); + } + + @Sharable + private class ConnectionLimitingChildHandler extends ChannelInboundHandlerAdapter { + + private final ServerPortMetric serverPortMetric; + + ConnectionLimitingChildHandler(ServerPortMetric serverPortMetric) { + this.serverPortMetric = serverPortMetric; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + final Channel child = (Channel) msg; + final int conn = activeConnections.incrementAndGet(); + if (conn > 0 && conn <= maxNumConnections) { + serverPortMetric.increaseActiveConnections(); + childChannels.add(child); + child.closeFuture().addListener(future -> { + childChannels.remove(child); + activeConnections.decrementAndGet(); + serverPortMetric.decreaseActiveConnections(); + }); + super.channelRead(ctx, msg); + } else { + activeConnections.decrementAndGet(); + + // Set linger option to 0 so that the server doesn't get too many TIME_WAIT states. + child.config().setOption(ChannelOption.SO_LINGER, 0); + child.unsafe().closeForcibly(); + + numDroppedConnections.increment(); + + if (loggingScheduled.compareAndSet(false, true)) { + ctx.executor().schedule(this::writeNumDroppedConnectionsLog, 1, TimeUnit.SECONDS); + } + } + } + + private void writeNumDroppedConnectionsLog() { + loggingScheduled.set(false); + + final long dropped = numDroppedConnections.sumThenReset(); + if (dropped > 0) { + logger.warn("Dropped {} connection(s) to limit the number of open connections to {}", + dropped, maxNumConnections); + } + } + } } diff --git a/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java b/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java index 3d80aa66a9e..b3321732737 100644 --- a/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java +++ b/core/src/main/java/com/linecorp/armeria/server/DefaultServerConfig.java @@ -118,7 +118,7 @@ final class DefaultServerConfig implements ServerConfig { @Nullable private final Mapping sslContexts; - private final ServerMetrics serverMetrics = new ServerMetrics(); + private final ServerMetrics serverMetrics; @Nullable private String strVal; @@ -266,6 +266,7 @@ final class DefaultServerConfig implements ServerConfig { this.absoluteUriTransformer = castAbsoluteUriTransformer; this.unloggedExceptionsReportIntervalMillis = unloggedExceptionsReportIntervalMillis; this.shutdownSupports = ImmutableList.copyOf(requireNonNull(shutdownSupports, "shutdownSupports")); + serverMetrics = new ServerMetrics(meterRegistry); } private static Int2ObjectMap> buildDomainAndPortMapping( diff --git a/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java b/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java index 9172ca42ae8..4311e980e19 100644 --- a/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java +++ b/core/src/main/java/com/linecorp/armeria/server/Http1RequestDecoder.java @@ -18,6 +18,7 @@ import static com.linecorp.armeria.internal.common.websocket.WebSocketUtil.isHttp1WebSocketUpgradeRequest; import static com.linecorp.armeria.server.HttpServerPipelineConfigurator.SCHEME_HTTP; +import static com.linecorp.armeria.server.ServerPortMetric.SERVER_PORT_METRIC; import static com.linecorp.armeria.server.ServiceRouteUtil.newRoutingContext; import java.net.URISyntaxException; @@ -81,6 +82,7 @@ final class Http1RequestDecoder extends ChannelDuplexHandler { private static final ResponseHeaders CONTINUE_RESPONSE = ResponseHeaders.of(HttpStatus.CONTINUE); private final ServerConfig cfg; + private final ServerPortMetric serverPortMetric; private final AsciiString scheme; private SessionProtocol sessionProtocol; private final InboundTrafficController inboundTrafficController; @@ -96,6 +98,9 @@ final class Http1RequestDecoder extends ChannelDuplexHandler { Http1RequestDecoder(ServerConfig cfg, Channel channel, AsciiString scheme, ServerHttp1ObjectEncoder encoder, HttpServer httpServer) { this.cfg = cfg; + final ServerPortMetric serverPortMetric = channel.attr(SERVER_PORT_METRIC).get(); + assert serverPortMetric != null; + this.serverPortMetric = serverPortMetric; this.scheme = scheme; sessionProtocol = scheme == SCHEME_HTTP ? SessionProtocol.H1C : SessionProtocol.H1; inboundTrafficController = InboundTrafficController.ofHttp1(channel); @@ -271,7 +276,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (pipeline.get(HttpServerUpgradeHandler.class) != null) { pipeline.remove(HttpServerUpgradeHandler.class); } - cfg.serverMetrics().increasePendingHttp1Requests(); + serverPortMetric.increasePendingHttp1Requests(); ctx.fireChannelRead(webSocketRequest); return; } @@ -284,7 +289,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (maxRequestLength > 0 && contentLength > maxRequestLength) { abortLargeRequest(ctx, req, id, endOfStream, keepAliveHandler, true); } - cfg.serverMetrics().increasePendingHttp1Requests(); + serverPortMetric.increasePendingHttp1Requests(); ctx.fireChannelRead(req); } else { fail(id, null, HttpStatus.BAD_REQUEST, "Invalid decoder state", null); diff --git a/core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java b/core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java index 72d4078fc91..bdfe7405be8 100644 --- a/core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java +++ b/core/src/main/java/com/linecorp/armeria/server/Http2RequestDecoder.java @@ -17,6 +17,7 @@ package com.linecorp.armeria.server; import static com.linecorp.armeria.server.HttpServerPipelineConfigurator.SCHEME_HTTP; +import static com.linecorp.armeria.server.ServerPortMetric.SERVER_PORT_METRIC; import static com.linecorp.armeria.server.ServiceRouteUtil.newRoutingContext; import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; @@ -65,6 +66,7 @@ final class Http2RequestDecoder extends Http2EventAdapter { private final ServerConfig cfg; private final Channel channel; + private final ServerPortMetric serverPortMetric; private final AsciiString scheme; @Nullable private ServerHttp2ObjectEncoder encoder; @@ -79,6 +81,9 @@ final class Http2RequestDecoder extends Http2EventAdapter { AsciiString scheme, KeepAliveHandler keepAliveHandler) { this.cfg = cfg; this.channel = channel; + final ServerPortMetric serverPortMetric = channel.attr(SERVER_PORT_METRIC).get(); + assert serverPortMetric != null; + this.serverPortMetric = serverPortMetric; this.scheme = scheme; inboundTrafficController = InboundTrafficController.ofHttp2(channel, cfg.http2InitialConnectionWindowSize()); @@ -211,7 +216,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers abortLargeRequest(req, endOfStream, true); } requests.put(streamId, req); - cfg.serverMetrics().increasePendingHttp2Requests(); + serverPortMetric.increasePendingHttp2Requests(); ctx.fireChannelRead(req); } else { if (!(req instanceof DecodedHttpRequestWriter)) { diff --git a/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java b/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java index 4eb48090da9..1e53d3784b4 100644 --- a/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java +++ b/core/src/main/java/com/linecorp/armeria/server/HttpServerHandler.java @@ -25,6 +25,7 @@ import static com.linecorp.armeria.internal.common.HttpHeadersUtil.CLOSE_STRING; import static com.linecorp.armeria.internal.common.RequestContextUtil.NOOP_CONTEXT_HOOK; import static com.linecorp.armeria.server.AccessLogWriterUtil.maybeWriteAccessLog; +import static com.linecorp.armeria.server.ServerPortMetric.SERVER_PORT_METRIC; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static java.util.Objects.requireNonNull; @@ -182,6 +183,7 @@ static void safeClose(Channel ch) { } private final ServerConfig config; + private final ServerPortMetric serverPortMetric; private final GracefulShutdownSupport gracefulShutdownSupport; private SessionProtocol protocol; @@ -193,10 +195,8 @@ static void safeClose(Channel ch) { @Nullable private final ProxiedAddresses proxiedAddresses; - @Nullable - private InetSocketAddress remoteAddress; - @Nullable - private InetSocketAddress localAddress; + private final InetSocketAddress remoteAddress; + private final InetSocketAddress localAddress; private final IdentityHashMap unfinishedRequests; private boolean isReading; @@ -205,7 +205,7 @@ static void safeClose(Channel ch) { private boolean handledLastRequest; HttpServerHandler(ServerConfig config, - GracefulShutdownSupport gracefulShutdownSupport, + Channel channel, GracefulShutdownSupport gracefulShutdownSupport, @Nullable ServerHttpObjectEncoder responseEncoder, SessionProtocol protocol, @Nullable ProxiedAddresses proxiedAddresses) { @@ -213,6 +213,11 @@ static void safeClose(Channel ch) { assert protocol == H1 || protocol == H1C || protocol == H2; this.config = requireNonNull(config, "config"); + final ServerPortMetric serverPortMetric = channel.attr(SERVER_PORT_METRIC).get(); + assert serverPortMetric != null; + this.serverPortMetric = serverPortMetric; + remoteAddress = firstNonNull(ChannelUtil.remoteAddress(channel), UNKNOWN_ADDR); + localAddress = firstNonNull(ChannelUtil.localAddress(channel), UNKNOWN_ADDR); this.gracefulShutdownSupport = requireNonNull(gracefulShutdownSupport, "gracefulShutdownSupport"); this.protocol = requireNonNull(protocol, "protocol"); @@ -402,9 +407,7 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th final Channel channel = ctx.channel(); final RequestHeaders headers = req.headers(); - final InetSocketAddress remoteAddress = firstNonNull(remoteAddress(channel), UNKNOWN_ADDR); - final InetSocketAddress localAddress = firstNonNull(localAddress(channel), UNKNOWN_ADDR); - final ProxiedAddresses proxiedAddresses = determineProxiedAddresses(remoteAddress, headers); + final ProxiedAddresses proxiedAddresses = determineProxiedAddresses(headers); final InetAddress clientAddress = config.clientAddressMapper().apply(proxiedAddresses).getAddress(); final EventLoop channelEventLoop = channel.eventLoop(); @@ -412,8 +415,7 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th final RoutingStatus routingStatus = routingCtx.status(); if (!routingStatus.routeMustExist()) { final ServiceRequestContext reqCtx = newEarlyRespondingRequestContext( - channel, req, proxiedAddresses, clientAddress, remoteAddress, localAddress, routingCtx, - channelEventLoop); + channel, req, proxiedAddresses, clientAddress, routingCtx, channelEventLoop); // Handle 'OPTIONS * HTTP/1.1'. if (routingStatus == RoutingStatus.OPTIONS) { @@ -522,21 +524,21 @@ private void handleRequest(ChannelHandlerContext ctx, DecodedHttpRequest req) th private void decreasePendingRequests() { if (protocol.isExplicitHttp1()) { - config.serverMetrics().decreasePendingHttp1Requests(); + serverPortMetric.decreasePendingHttp1Requests(); } else { assert protocol.isExplicitHttp2(); - config.serverMetrics().decreasePendingHttp2Requests(); + serverPortMetric.decreasePendingHttp2Requests(); } } private void increaseActiveRequests(boolean isHttp1WebSocket) { if (isHttp1WebSocket) { - config.serverMetrics().increaseActiveHttp1WebSocketRequests(); + serverPortMetric.increaseActiveHttp1WebSocketRequests(); } else if (protocol.isExplicitHttp1()) { - config.serverMetrics().increaseActiveHttp1Requests(); + serverPortMetric.increaseActiveHttp1Requests(); } else { assert protocol.isExplicitHttp2(); - config.serverMetrics().increaseActiveHttp2Requests(); + serverPortMetric.increaseActiveHttp2Requests(); } } @@ -570,8 +572,7 @@ private HttpResponse serveInServiceEventLoop(DecodedHttpRequest req, .subscribeOn(serviceEventLoop); } - private ProxiedAddresses determineProxiedAddresses(InetSocketAddress remoteAddress, - RequestHeaders headers) { + private ProxiedAddresses determineProxiedAddresses(RequestHeaders headers) { if (config.clientAddressTrustedProxyFilter().test(remoteAddress.getAddress())) { return HttpHeaderUtil.determineProxiedAddresses( headers, config.clientAddressSources(), proxiedAddresses, @@ -581,30 +582,6 @@ private ProxiedAddresses determineProxiedAddresses(InetSocketAddress remoteAddre } } - @Nullable - private InetSocketAddress remoteAddress(Channel ch) { - final InetSocketAddress remoteAddress = this.remoteAddress; - if (remoteAddress != null) { - return remoteAddress; - } - - final InetSocketAddress newRemoteAddress = ChannelUtil.remoteAddress(ch); - this.remoteAddress = newRemoteAddress; - return newRemoteAddress; - } - - @Nullable - private InetSocketAddress localAddress(Channel ch) { - final InetSocketAddress localAddress = this.localAddress; - if (localAddress != null) { - return localAddress; - } - - final InetSocketAddress newLocalAddress = ChannelUtil.localAddress(ch); - this.localAddress = newLocalAddress; - return newLocalAddress; - } - private void handleOptions(ChannelHandlerContext ctx, ServiceRequestContext reqCtx) { respond(ctx, reqCtx, ResponseHeaders.builder(HttpStatus.OK) @@ -733,8 +710,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E private ServiceRequestContext newEarlyRespondingRequestContext(Channel channel, DecodedHttpRequest req, ProxiedAddresses proxiedAddresses, InetAddress clientAddress, - InetSocketAddress remoteAddress, - InetSocketAddress localAddress, RoutingContext routingCtx, EventLoop eventLoop) { final ServiceConfig serviceConfig = routingCtx.virtualHost().fallbackServiceConfig(); @@ -849,11 +824,11 @@ private void handleRequestOrResponseComplete() { return; } if (req.isHttp1WebSocket()) { - config.serverMetrics().decreaseActiveHttp1WebSocketRequests(); + serverPortMetric.decreaseActiveHttp1WebSocketRequests(); } else if (protocol.isExplicitHttp1()) { - config.serverMetrics().decreaseActiveHttp1Requests(); + serverPortMetric.decreaseActiveHttp1Requests(); } else if (protocol.isExplicitHttp2()) { - config.serverMetrics().decreaseActiveHttp2Requests(); + serverPortMetric.decreaseActiveHttp2Requests(); } // NB: logBuilder.endResponse() is called by HttpResponseSubscriber. diff --git a/core/src/main/java/com/linecorp/armeria/server/HttpServerPipelineConfigurator.java b/core/src/main/java/com/linecorp/armeria/server/HttpServerPipelineConfigurator.java index 284eff8ca1f..c5f38529ee5 100644 --- a/core/src/main/java/com/linecorp/armeria/server/HttpServerPipelineConfigurator.java +++ b/core/src/main/java/com/linecorp/armeria/server/HttpServerPipelineConfigurator.java @@ -216,7 +216,7 @@ private void configureHttp(ChannelPipeline p, @Nullable ProxiedAddresses proxied p.channel(), H1C, keepAliveHandler, config.http1HeaderNaming() ); p.addLast(TrafficLoggingHandler.SERVER); - final HttpServerHandler httpServerHandler = new HttpServerHandler(config, + final HttpServerHandler httpServerHandler = new HttpServerHandler(config, p.channel(), gracefulShutdownSupport, responseEncoder, H1C, proxiedAddresses); @@ -511,7 +511,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) thr private void addHttp2Handlers(ChannelHandlerContext ctx) { final ChannelPipeline p = ctx.pipeline(); p.addLast(newHttp2ConnectionHandler(p, SCHEME_HTTPS)); - p.addLast(new HttpServerHandler(config, + p.addLast(new HttpServerHandler(config, p.channel(), gracefulShutdownSupport, null, H2, proxiedAddresses)); } @@ -541,7 +541,7 @@ private void addHttpHandlers(ChannelHandlerContext ctx) { config.http1MaxInitialLineLength(), config.http1MaxHeaderSize(), config.http1MaxChunkSize())); - final HttpServerHandler httpServerHandler = new HttpServerHandler(config, + final HttpServerHandler httpServerHandler = new HttpServerHandler(config, ch, gracefulShutdownSupport, encoder, H1, proxiedAddresses); p.addLast(new Http1RequestDecoder(config, ch, SCHEME_HTTPS, encoder, httpServerHandler)); diff --git a/core/src/main/java/com/linecorp/armeria/server/Server.java b/core/src/main/java/com/linecorp/armeria/server/Server.java index a535c587a7f..6b8b0b5572c 100644 --- a/core/src/main/java/com/linecorp/armeria/server/Server.java +++ b/core/src/main/java/com/linecorp/armeria/server/Server.java @@ -18,6 +18,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.linecorp.armeria.server.ServerPortMetric.SERVER_PORT_METRIC; import static com.linecorp.armeria.server.ServerSslContextUtil.validateSslContext; import static java.util.Objects.requireNonNull; @@ -133,8 +134,7 @@ public static ServerBuilder builder() { serverConfig.setServer(this); config = new UpdatableServerConfig(requireNonNull(serverConfig, "serverConfig")); startStop = new ServerStartStopSupport(config.startStopExecutor()); - connectionLimitingHandler = new ConnectionLimitingHandler(config.maxNumConnections(), - config.serverMetrics()); + connectionLimitingHandler = new ConnectionLimitingHandler(config.maxNumConnections()); // Server-wide metrics. RequestTargetCache.registerServerMetrics(config.meterRegistry()); @@ -460,7 +460,14 @@ public void reconfigure(ServerConfigurator serverConfigurator) { requireNonNull(serverConfigurator, "serverConfigurator"); final ServerBuilder sb = builder(); serverConfigurator.reconfigure(sb); - final DefaultServerConfig newConfig = sb.buildServerConfig(config()); + final ImmutableList serverPorts; + lock.lock(); + try { + serverPorts = ImmutableList.copyOf(activePorts.values()); + } finally { + lock.unlock(); + } + final DefaultServerConfig newConfig = sb.buildServerConfig(serverPorts); newConfig.setServer(this); config.updateConfig(newConfig); // Invoke the serviceAdded() method in Service so that it can keep the reference to this Server or @@ -522,12 +529,12 @@ protected CompletionStage doStart(@Nullable Void arg) { try { doStart(primary).addListener(new ServerPortStartListener(primary)) .addListener(new NextServerPortStartListener(this, it, future)); - setupServerMetrics(); + // Chain the future to set up server metrics first before server start future is completed. + return future.thenAccept(unused -> setupPendingResponsesMetrics()); } catch (Throwable cause) { future.completeExceptionally(cause); + return future; } - - return future; } private ChannelFuture doStart(ServerPort port) { @@ -555,10 +562,31 @@ private ChannelFuture doStart(ServerPort port) { final GracefulShutdownSupport gracefulShutdownSupport = this.gracefulShutdownSupport; assert gracefulShutdownSupport != null; + ServerPortMetric serverPortMetric = null; + lock.lock(); + try { + for (ServerPort serverPort : activePorts.values()) { + final InetSocketAddress localAddress = serverPort.localAddress(); + if (!(localAddress instanceof DomainSocketAddress) && + localAddress.getPort() == port.localAddress().getPort()) { + // Because we use the port number for aggregating metrics, use the previous + // serverPortMetric. + serverPortMetric = serverPort.serverPortMetric(); + break; + } + } + } finally { + lock.unlock(); + } + if (serverPortMetric == null) { + serverPortMetric = new ServerPortMetric(); + } b.group(bossGroup, config.workerGroup()); - b.handler(connectionLimitingHandler); + b.handler(connectionLimitingHandler.newChildHandler(serverPortMetric)); b.childHandler(new HttpServerPipelineConfigurator(config, port, gracefulShutdownSupport, hasWebSocketService)); + b.attr(SERVER_PORT_METRIC, serverPortMetric); + b.childAttr(SERVER_PORT_METRIC, serverPortMetric); final SocketAddress localAddress; final Class channelType; @@ -582,14 +610,13 @@ private ChannelFuture doStart(ServerPort port) { return b.bind(localAddress); } - private void setupServerMetrics() { - final MeterRegistry meterRegistry = config.meterRegistry(); + private void setupPendingResponsesMetrics() { final GracefulShutdownSupport gracefulShutdownSupport = this.gracefulShutdownSupport; assert gracefulShutdownSupport != null; - meterRegistry.gauge("armeria.server.pending.responses", gracefulShutdownSupport, - GracefulShutdownSupport::pendingResponses); - config.serverMetrics().bindTo(meterRegistry); + // Move to ServerMetrics. + config.meterRegistry().gauge("armeria.server.pending.responses", gracefulShutdownSupport, + GracefulShutdownSupport::pendingResponses); } @Override @@ -828,6 +855,9 @@ public void operationComplete(ChannelFuture f) { logger.warn("Unexpected local address type: {}", localAddress.getClass().getName()); return; } + final ServerPortMetric serverPortMetric = ch.attr(SERVER_PORT_METRIC).get(); + assert serverPortMetric != null; + actualPort.setServerPortMetric(serverPortMetric); // Update the boss thread so its name contains the actual port. Thread.currentThread().setName(bossThreadName(actualPort)); @@ -840,6 +870,8 @@ public void operationComplete(ChannelFuture f) { lock.unlock(); } + config.serverMetrics().addServerPort(actualPort); + if (logger.isInfoEnabled()) { if (isLocalPort(actualPort)) { port.protocols().forEach(p -> logger.info( diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java index 6366bb584f6..9e77ce63c84 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java @@ -2287,11 +2287,7 @@ public Server build() { return server; } - DefaultServerConfig buildServerConfig(ServerConfig existingConfig) { - return buildServerConfig(existingConfig.ports()); - } - - private DefaultServerConfig buildServerConfig(List serverPorts) { + DefaultServerConfig buildServerConfig(List serverPorts) { final AnnotatedServiceExtensions extensions = virtualHostTemplate.annotatedServiceExtensions(); assert extensions != null; diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerMetrics.java b/core/src/main/java/com/linecorp/armeria/server/ServerMetrics.java index 9a57bf65888..5d45f9e6114 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServerMetrics.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServerMetrics.java @@ -16,36 +16,39 @@ package com.linecorp.armeria.server; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.LongAdder; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import com.linecorp.armeria.common.annotation.UnstableApi; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.binder.MeterBinder; /** * A class that holds metrics related server. */ @UnstableApi -public final class ServerMetrics implements MeterBinder { +public final class ServerMetrics { - private final LongAdder pendingHttp1Requests = new LongAdder(); - private final LongAdder pendingHttp2Requests = new LongAdder(); - private final LongAdder activeHttp1WebSocketRequests = new LongAdder(); - private final LongAdder activeHttp1Requests = new LongAdder(); - private final LongAdder activeHttp2Requests = new LongAdder(); + static final String ALL_REQUESTS_METER_NAME = "armeria.server.all.requests"; + static final String ALL_CONNECTIONS_METER_NAME = "armeria.server.connections"; - /** - * AtomicInteger is used to read the number of active connections frequently. - */ - private final AtomicInteger activeConnections = new AtomicInteger(); + private final Set serverPortMetrics = new CopyOnWriteArraySet<>(); + private final MeterRegistry meterRegistry; + + ServerMetrics(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } - ServerMetrics() {} + void addServerPort(ServerPort serverPort) { + final ServerPortMetric serverPortMetric = serverPort.serverPortMetric(); + assert serverPortMetric != null; + if (serverPortMetrics.add(serverPortMetric)) { + serverPortMetric.bindTo(meterRegistry, serverPort); + } + } /** * Returns the number of all pending requests. @@ -58,14 +61,14 @@ public long pendingRequests() { * Returns the number of pending http1 requests. */ public long pendingHttp1Requests() { - return pendingHttp1Requests.longValue(); + return serverPortMetrics.stream().mapToLong(ServerPortMetric::pendingHttp1Requests).sum(); } /** * Returns the number of pending http2 requests. */ public long pendingHttp2Requests() { - return pendingHttp2Requests.longValue(); + return serverPortMetrics.stream().mapToLong(ServerPortMetric::pendingHttp2Requests).sum(); } /** @@ -81,110 +84,35 @@ public long activeRequests() { * Returns the number of active http1 web socket requests. */ public long activeHttp1WebSocketRequests() { - return activeHttp1WebSocketRequests.longValue(); + return serverPortMetrics.stream().mapToLong(ServerPortMetric::activeHttp1WebSocketRequests).sum(); } /** * Returns the number of active http1 requests. */ public long activeHttp1Requests() { - return activeHttp1Requests.longValue(); + return serverPortMetrics.stream().mapToLong(ServerPortMetric::activeHttp1Requests).sum(); } /** * Returns the number of active http2 requests. */ public long activeHttp2Requests() { - return activeHttp2Requests.longValue(); + return serverPortMetrics.stream().mapToLong(ServerPortMetric::activeHttp2Requests).sum(); } /** * Returns the number of open connections. */ public int activeConnections() { - return activeConnections.get(); - } - - void increasePendingHttp1Requests() { - pendingHttp1Requests.increment(); - } - - void decreasePendingHttp1Requests() { - pendingHttp1Requests.decrement(); - } - - void increasePendingHttp2Requests() { - pendingHttp2Requests.increment(); - } - - void decreasePendingHttp2Requests() { - pendingHttp2Requests.decrement(); - } - - void increaseActiveHttp1Requests() { - activeHttp1Requests.increment(); - } - - void decreaseActiveHttp1Requests() { - activeHttp1Requests.decrement(); - } - - void increaseActiveHttp1WebSocketRequests() { - activeHttp1WebSocketRequests.increment(); - } - - void decreaseActiveHttp1WebSocketRequests() { - activeHttp1WebSocketRequests.decrement(); - } - - void increaseActiveHttp2Requests() { - activeHttp2Requests.increment(); - } - - void decreaseActiveHttp2Requests() { - activeHttp2Requests.decrement(); - } - - int increaseActiveConnectionsAndGet() { - return activeConnections.incrementAndGet(); - } - - void decreaseActiveConnections() { - activeConnections.decrementAndGet(); - } - - @Override - public void bindTo(MeterRegistry meterRegistry) { - meterRegistry.gauge("armeria.server.connections", activeConnections); - // pending requests - final String allRequestsMeterName = "armeria.server.all.requests"; - meterRegistry.gauge(allRequestsMeterName, - ImmutableList.of(Tag.of("protocol", "http1"), Tag.of("state", "pending")), - pendingHttp1Requests); - meterRegistry.gauge(allRequestsMeterName, - ImmutableList.of(Tag.of("protocol", "http2"), Tag.of("state", "pending")), - pendingHttp2Requests); - // Active requests - meterRegistry.gauge(allRequestsMeterName, - ImmutableList.of(Tag.of("protocol", "http1"), Tag.of("state", "active")), - activeHttp1Requests); - meterRegistry.gauge(allRequestsMeterName, - ImmutableList.of(Tag.of("protocol", "http2"), Tag.of("state", "active")), - activeHttp2Requests); - meterRegistry.gauge(allRequestsMeterName, - ImmutableList.of(Tag.of("protocol", "http1.websocket"), Tag.of("state", "active")), - activeHttp1WebSocketRequests); + return Ints.saturatedCast(serverPortMetrics.stream().mapToLong(ServerPortMetric::activeConnections) + .sum()); } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("pendingHttp1Requests", pendingHttp1Requests) - .add("activeHttp1WebSocketRequests", activeHttp1WebSocketRequests) - .add("activeHttp1Requests", activeHttp1Requests) - .add("pendingHttp2Requests", pendingHttp2Requests) - .add("activeHttp2Requests", activeHttp2Requests) - .add("activeConnections", activeConnections) + .add("serverPortMetrics", serverPortMetrics) .toString(); } } diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerPort.java b/core/src/main/java/com/linecorp/armeria/server/ServerPort.java index f6da639aa7a..ff1580bf5be 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServerPort.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServerPort.java @@ -66,6 +66,8 @@ static long nextPortGroup() { private final Set protocols; private final long portGroup; private int hashCode; + @Nullable + private ServerPortMetric serverPortMetric; @Nullable private String strVal; @@ -226,6 +228,16 @@ long portGroup() { return portGroup; } + @Nullable + ServerPortMetric serverPortMetric() { + return serverPortMetric; + } + + void setServerPortMetric(ServerPortMetric serverPortMetric) { + this.serverPortMetric = serverPortMetric; + } + + // Do not take account into serverPortMetric for equality and hashCode. @Override public int hashCode() { int hashCode = this.hashCode; diff --git a/core/src/main/java/com/linecorp/armeria/server/ServerPortMetric.java b/core/src/main/java/com/linecorp/armeria/server/ServerPortMetric.java new file mode 100644 index 00000000000..5d34f3115b7 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/server/ServerPortMetric.java @@ -0,0 +1,165 @@ +/* + * Copyright 2025 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.server; + +import static com.linecorp.armeria.server.ServerMetrics.ALL_CONNECTIONS_METER_NAME; +import static com.linecorp.armeria.server.ServerMetrics.ALL_REQUESTS_METER_NAME; + +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.LongAdder; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.common.util.DomainSocketAddress; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.netty.util.AttributeKey; + +final class ServerPortMetric { + + static final AttributeKey SERVER_PORT_METRIC = + AttributeKey.valueOf(ServerPortMetric.class, "SERVER_PORT_METRIC"); + + private final LongAdder pendingHttp1Requests = new LongAdder(); + private final LongAdder pendingHttp2Requests = new LongAdder(); + private final LongAdder activeHttp1WebSocketRequests = new LongAdder(); + private final LongAdder activeHttp1Requests = new LongAdder(); + private final LongAdder activeHttp2Requests = new LongAdder(); + private final LongAdder activeConnections = new LongAdder(); + + void increasePendingHttp1Requests() { + pendingHttp1Requests.increment(); + } + + void decreasePendingHttp1Requests() { + pendingHttp1Requests.decrement(); + } + + void increasePendingHttp2Requests() { + pendingHttp2Requests.increment(); + } + + void decreasePendingHttp2Requests() { + pendingHttp2Requests.decrement(); + } + + void increaseActiveHttp1WebSocketRequests() { + activeHttp1WebSocketRequests.increment(); + } + + void decreaseActiveHttp1WebSocketRequests() { + activeHttp1WebSocketRequests.decrement(); + } + + void increaseActiveHttp1Requests() { + activeHttp1Requests.increment(); + } + + void decreaseActiveHttp1Requests() { + activeHttp1Requests.decrement(); + } + + void increaseActiveHttp2Requests() { + activeHttp2Requests.increment(); + } + + void decreaseActiveHttp2Requests() { + activeHttp2Requests.decrement(); + } + + void increaseActiveConnections() { + activeConnections.increment(); + } + + void decreaseActiveConnections() { + activeConnections.decrement(); + } + + long pendingHttp1Requests() { + return pendingHttp1Requests.sum(); + } + + long pendingHttp2Requests() { + return pendingHttp2Requests.sum(); + } + + long activeHttp1WebSocketRequests() { + return activeHttp1WebSocketRequests.sum(); + } + + long activeHttp1Requests() { + return activeHttp1Requests.sum(); + } + + long activeHttp2Requests() { + return activeHttp2Requests.sum(); + } + + long activeConnections() { + return activeConnections.sum(); + } + + void bindTo(MeterRegistry meterRegistry, ServerPort actualPort) { + final InetSocketAddress address = actualPort.localAddress(); + final Tag portTag; + if (address instanceof DomainSocketAddress) { + final String path = ((DomainSocketAddress) address).path(); + // The path is used as the 'port' tag. + portTag = Tag.of("port", path); + } else { + portTag = Tag.of("port", String.valueOf(address.getPort())); + } + + meterRegistry.gauge(ALL_REQUESTS_METER_NAME, + ImmutableList.of(portTag, Tag.of("protocol", "http1"), + Tag.of("state", "pending")), + this, ServerPortMetric::pendingHttp1Requests); + meterRegistry.gauge(ALL_REQUESTS_METER_NAME, + ImmutableList.of(portTag, Tag.of("protocol", "http2"), + Tag.of("state", "pending")), + this, ServerPortMetric::pendingHttp2Requests); + meterRegistry.gauge(ALL_REQUESTS_METER_NAME, + ImmutableList.of(portTag, Tag.of("protocol", "http1"), + Tag.of("state", "active")), + this, ServerPortMetric::activeHttp1Requests); + meterRegistry.gauge(ALL_REQUESTS_METER_NAME, + ImmutableList.of(portTag, Tag.of("protocol", "http2"), + Tag.of("state", "active")), + this, ServerPortMetric::activeHttp2Requests); + meterRegistry.gauge(ALL_REQUESTS_METER_NAME, + ImmutableList.of(portTag, Tag.of("protocol", "http1"), + Tag.of("state", "active_websocket")), + this, ServerPortMetric::activeHttp1WebSocketRequests); + meterRegistry.gauge(ALL_CONNECTIONS_METER_NAME, ImmutableList.of(portTag), + this, ServerPortMetric::activeConnections); + } + + // Use reference equality for comparison. + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("pendingHttp1Requests", pendingHttp1Requests) + .add("pendingHttp2Requests", pendingHttp2Requests) + .add("activeHttp1WebSocketRequests", activeHttp1WebSocketRequests) + .add("activeHttp1Requests", activeHttp1Requests) + .add("activeHttp2Requests", activeHttp2Requests) + .add("activeConnections", activeConnections) + .toString(); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/server/ConnectionLimitingHandlerTest.java b/core/src/test/java/com/linecorp/armeria/server/ConnectionLimitingHandlerTest.java index c814a955770..9a5e1b9c7e5 100644 --- a/core/src/test/java/com/linecorp/armeria/server/ConnectionLimitingHandlerTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/ConnectionLimitingHandlerTest.java @@ -20,22 +20,24 @@ import org.junit.jupiter.api.Test; +import io.netty.channel.ChannelHandler; import io.netty.channel.embedded.EmbeddedChannel; class ConnectionLimitingHandlerTest { @Test void testExceedMaxNumConnections() { - final ServerMetrics serverMetrics = new ServerMetrics(); - final ConnectionLimitingHandler handler = - new ConnectionLimitingHandler(1, serverMetrics); + final ServerPortMetric serverPortMetric = new ServerPortMetric(); + final ConnectionLimitingHandler handler = new ConnectionLimitingHandler(1); - final EmbeddedChannel ch1 = new EmbeddedChannel(handler); + final ChannelHandler channelHandler = handler.newChildHandler(serverPortMetric); + + final EmbeddedChannel ch1 = new EmbeddedChannel(channelHandler); ch1.writeInbound(ch1); assertThat(handler.numConnections()).isEqualTo(1); assertThat(ch1.isActive()).isTrue(); - final EmbeddedChannel ch2 = new EmbeddedChannel(handler); + final EmbeddedChannel ch2 = new EmbeddedChannel(channelHandler); ch2.writeInbound(ch2); assertThat(handler.numConnections()).isEqualTo(1); assertThat(ch2.isActive()).isFalse(); @@ -46,15 +48,13 @@ void testExceedMaxNumConnections() { @Test void testMaxNumConnectionsRange() { - final ServerMetrics serverMetrics = new ServerMetrics(); - final ConnectionLimitingHandler handler = new ConnectionLimitingHandler(Integer.MAX_VALUE, - serverMetrics); + final ConnectionLimitingHandler handler = new ConnectionLimitingHandler(Integer.MAX_VALUE); assertThat(handler.maxNumConnections()).isEqualTo(Integer.MAX_VALUE); - assertThatThrownBy(() -> new ConnectionLimitingHandler(0, serverMetrics)) + assertThatThrownBy(() -> new ConnectionLimitingHandler(0)) .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> new ConnectionLimitingHandler(-1, serverMetrics)) + assertThatThrownBy(() -> new ConnectionLimitingHandler(-1)) .isInstanceOf(IllegalArgumentException.class); } } diff --git a/core/src/test/java/com/linecorp/armeria/server/ServerMetricsTest.java b/core/src/test/java/com/linecorp/armeria/server/ServerMetricsTest.java index 43106ff8cf3..e3e36a625fe 100644 --- a/core/src/test/java/com/linecorp/armeria/server/ServerMetricsTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/ServerMetricsTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.CompletableFuture; import org.assertj.core.api.Condition; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -137,46 +136,6 @@ public ExchangeType exchangeType(RoutingContext routingContext) { } }; - @Test - void pendingRequests() { - final ServerMetrics serverMetrics = new ServerMetrics(); - - serverMetrics.increasePendingHttp1Requests(); - assertThat(serverMetrics.pendingRequests()).isEqualTo(1); - - serverMetrics.increasePendingHttp2Requests(); - assertThat(serverMetrics.pendingRequests()).isEqualTo(2); - - serverMetrics.decreasePendingHttp1Requests(); - assertThat(serverMetrics.pendingRequests()).isEqualTo(1); - - serverMetrics.decreasePendingHttp2Requests(); - assertThat(serverMetrics.pendingRequests()).isZero(); - } - - @Test - void activeRequests() { - final ServerMetrics serverMetrics = new ServerMetrics(); - - serverMetrics.increaseActiveHttp1Requests(); - assertThat(serverMetrics.activeRequests()).isEqualTo(1); - - serverMetrics.increaseActiveHttp1WebSocketRequests(); - assertThat(serverMetrics.activeRequests()).isEqualTo(2); - - serverMetrics.increaseActiveHttp2Requests(); - assertThat(serverMetrics.activeRequests()).isEqualTo(3); - - serverMetrics.decreaseActiveHttp1WebSocketRequests(); - assertThat(serverMetrics.activeRequests()).isEqualTo(2); - - serverMetrics.decreaseActiveHttp1Requests(); - assertThat(serverMetrics.activeRequests()).isEqualTo(1); - - serverMetrics.decreaseActiveHttp2Requests(); - assertThat(serverMetrics.activeRequests()).isZero(); - } - @CsvSource({ "H1C, 1, 0", "H2C, 0, 1" }) @ParameterizedTest void checkWhenOk(SessionProtocol sessionProtocol, long expectedPendingHttp1Request, @@ -296,10 +255,10 @@ public boolean matches(String key) { final String protocolName = protocol == SessionProtocol.H1C ? "http1" : "http2"; // armeria.server.active.requests.all#value is measured by ServerMetrics - assertThat(meters).containsKey("armeria.server.all.requests#value{protocol=" + protocolName + - ",state=active}"); - assertThat(meters).containsKey("armeria.server.all.requests#value{protocol=" + protocolName + - ",state=pending}"); + assertThat(meters).containsKey("armeria.server.all.requests#value{port=" + server.httpPort() + + ",protocol=" + protocolName + ",state=active}"); + assertThat(meters).containsKey("armeria.server.all.requests#value{port=" + server.httpPort() + + ",protocol=" + protocolName + ",state=pending}"); }); } }