From 55628bf4b684f41c925deca39e181ff277e0e08d Mon Sep 17 00:00:00 2001 From: emeroad Date: Fri, 16 Aug 2024 14:36:59 +0900 Subject: [PATCH] [#noissue] isActiveAgent --- .../grpc/config/GrpcAgentConfiguration.java | 12 +-- .../receiver/grpc/service/AgentService.java | 56 ++++++----- .../receiver/grpc/AgentServerTestMain.java | 4 +- .../pinpoint/grpc/server/AgentKey.java | 0 .../lifecycle/DefaultPingEventHandler.java | 93 +++++++++---------- .../DefaultPingEventHandlerFactory.java | 43 +++++++++ .../server/lifecycle/PingEventHandler.java | 5 +- .../lifecycle/PingEventHandlerFactory.java | 0 .../web/service/AgentEventServiceImpl.java | 4 + .../web/service/AgentInfoServiceImpl.java | 4 +- 10 files changed, 139 insertions(+), 82 deletions(-) create mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/server/AgentKey.java create mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandlerFactory.java create mode 100644 grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/PingEventHandlerFactory.java diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcAgentConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcAgentConfiguration.java index c75a28f43b08..886ff4a9c9c5 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcAgentConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/grpc/config/GrpcAgentConfiguration.java @@ -39,9 +39,9 @@ import com.navercorp.pinpoint.collector.service.async.AgentLifeCycleAsyncTaskService; import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; import com.navercorp.pinpoint.common.server.util.IgnoreAddressFilter; -import com.navercorp.pinpoint.grpc.server.lifecycle.DefaultPingEventHandler; +import com.navercorp.pinpoint.grpc.server.lifecycle.DefaultPingEventHandlerFactory; import com.navercorp.pinpoint.grpc.server.lifecycle.DefaultPingSessionRegistry; -import com.navercorp.pinpoint.grpc.server.lifecycle.PingEventHandler; +import com.navercorp.pinpoint.grpc.server.lifecycle.PingEventHandlerFactory; import com.navercorp.pinpoint.grpc.server.lifecycle.PingSessionRegistry; import com.navercorp.pinpoint.rpc.server.ChannelPropertiesFactory; import com.navercorp.pinpoint.rpc.server.handler.ServerStateChangeEventHandler; @@ -74,7 +74,7 @@ public GrpcAgentConfiguration() { @Bean public AgentService agentService(@Qualifier("grpcDispatchHandlerFactoryBean") DispatchHandler dispatchHandler, - PingEventHandler pingEventHandler, + PingEventHandlerFactory pingEventHandler, @Qualifier("grpcAgentWorkerExecutor") Executor executor, ServerRequestFactory serverRequestFactory) { @@ -192,9 +192,9 @@ public KeepAliveService keepAliveService(AgentEventAsyncTaskService agentEventAs } @Bean - public PingEventHandler pingEventHandler(PingSessionRegistry pingSessionRegistry, - AgentLifecycleListener agentLifecycleListener) { - return new DefaultPingEventHandler(pingSessionRegistry, agentLifecycleListener); + public PingEventHandlerFactory pingEventHandler(PingSessionRegistry pingSessionRegistry, + AgentLifecycleListener agentLifecycleListener) { + return new DefaultPingEventHandlerFactory(pingSessionRegistry, agentLifecycleListener); } @Bean diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java index 9741fba91a07..0bb3b8b7f416 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/AgentService.java @@ -20,8 +20,11 @@ import com.navercorp.pinpoint.collector.receiver.DispatchHandler; import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger; import com.navercorp.pinpoint.grpc.MessageFormatUtils; +import com.navercorp.pinpoint.grpc.server.AgentKey; import com.navercorp.pinpoint.grpc.server.ServerContext; +import com.navercorp.pinpoint.grpc.server.TransportMetadata; import com.navercorp.pinpoint.grpc.server.lifecycle.PingEventHandler; +import com.navercorp.pinpoint.grpc.server.lifecycle.PingEventHandlerFactory; import com.navercorp.pinpoint.grpc.trace.AgentGrpc; import com.navercorp.pinpoint.grpc.trace.PAgentInfo; import com.navercorp.pinpoint.grpc.trace.PPing; @@ -44,24 +47,25 @@ import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** * @author jaehong.kim */ public class AgentService extends AgentGrpc.AgentImplBase { - private static final AtomicLong idAllocator = new AtomicLong(); private final Logger logger = LogManager.getLogger(this.getClass()); private final boolean isDebug = logger.isDebugEnabled(); + + private final AtomicLong idAllocator = new AtomicLong(); + private final SimpleRequestHandlerAdaptor simpleRequestHandlerAdaptor; - private final PingEventHandler pingEventHandler; + private final PingEventHandlerFactory pingEventHandlerProvider; private final Executor executor; public AgentService(DispatchHandler dispatchHandler, - PingEventHandler pingEventHandler, Executor executor, ServerRequestFactory serverRequestFactory) { + PingEventHandlerFactory pingEventHandlerProvider, Executor executor, ServerRequestFactory serverRequestFactory) { this.simpleRequestHandlerAdaptor = new SimpleRequestHandlerAdaptor<>(this.getClass().getName(), dispatchHandler, serverRequestFactory); - this.pingEventHandler = Objects.requireNonNull(pingEventHandler, "pingEventHandler"); + this.pingEventHandlerProvider = Objects.requireNonNull(pingEventHandlerProvider, "pingEventHandlerProvider"); Objects.requireNonNull(executor, "executor"); this.executor = Context.currentContextExecutor(executor); } @@ -72,14 +76,26 @@ public void requestAgentInfo(PAgentInfo agentInfo, StreamObserver respo logger.debug("Request PAgentInfo={}", MessageFormatUtils.debugLog(agentInfo)); } + + final com.navercorp.pinpoint.grpc.Header header = ServerContext.getAgentInfo(); + final boolean legacyAgent = !header.isServiceTypeSupported(); + if (legacyAgent) { + final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(); + AgentKey key = new AgentKey(header.getAgentId(), header.getAgentStartTime()); + transportMetadata.registerServiceType(key, (short)agentInfo.getServiceType()); + } + + PingEventHandler pingEventHandler = pingEventHandlerProvider.createPingEventHandler(); try { executor.execute(new Runnable() { @Override public void run() { final Message message = newMessage(agentInfo, DefaultTBaseLocator.AGENT_INFO); simpleRequestHandlerAdaptor.request(message, responseObserver); - // Update service type of PingSession - AgentService.this.pingEventHandler.update((short) agentInfo.getServiceType()); + if (legacyAgent) { + // Update service type of PingSession + pingEventHandler.update(); + } } }); } catch (RejectedExecutionException ree) { @@ -92,30 +108,26 @@ public void run() { @Override public StreamObserver pingSession(final StreamObserver response) { final ServerCallStreamObserver responseObserver = (ServerCallStreamObserver) response; + + final PingEventHandler pingEventHandler = pingEventHandlerProvider.createPingEventHandler(); + return new StreamObserver<>() { - private final AtomicBoolean first = new AtomicBoolean(false); private final ThrottledLogger thLogger = ThrottledLogger.getLogger(AgentService.this.logger, 100); private final long id = nextSessionId(); + @Override public void onNext(PPing ping) { - if (first.compareAndSet(false, true)) { - // Only first - if (isDebug) { - thLogger.debug("PingSession:{} start:{}", id, MessageFormatUtils.debugLog(ping)); - } - AgentService.this.pingEventHandler.connect(); - } else { - AgentService.this.pingEventHandler.ping(); - } if (isDebug) { - thLogger.debug("PingSession:{} onNext:{}", id, MessageFormatUtils.debugLog(ping)); + thLogger.debug("PingSession:{} onNext:Ping", id); } + pingEventHandler.ping(); + if (responseObserver.isReady()) { PPing replay = newPing(); responseObserver.onNext(replay); } else { - thLogger.warn("ping message is ignored: stream is not ready: {}", ServerContext.getAgentInfo()); + thLogger.warn("Ping message is ignored: stream is not ready: {}", ServerContext.getAgentInfo()); } } @@ -136,15 +148,15 @@ public void onError(Throwable t) { @Override public void onCompleted() { - if (isDebug) { - thLogger.debug("PingSession:{} onCompleted()", id); + if (logger.isDebugEnabled()) { + logger.debug("PingSession:{} onCompleted()", id); } responseObserver.onCompleted(); disconnect(); } private void disconnect() { - AgentService.this.pingEventHandler.close(); + pingEventHandler.close(); } }; diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java index 127721e0b3a3..91a8060611d7 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java @@ -26,6 +26,7 @@ import com.navercorp.pinpoint.common.server.util.AddressFilter; import com.navercorp.pinpoint.grpc.server.ServerOption; import com.navercorp.pinpoint.grpc.server.lifecycle.PingEventHandler; +import com.navercorp.pinpoint.grpc.server.lifecycle.PingEventHandlerFactory; import com.navercorp.pinpoint.grpc.trace.PApiMetaData; import com.navercorp.pinpoint.grpc.trace.PResult; import com.navercorp.pinpoint.io.request.ServerRequest; @@ -60,7 +61,8 @@ public void run() throws Exception { grpcReceiver.setBindAddress(builder.build()); PingEventHandler pingEventHandler = mock(PingEventHandler.class); - BindableService agentService = new AgentService(new MockDispatchHandler(), pingEventHandler, Executors.newFixedThreadPool(8), serverRequestFactory); + PingEventHandlerFactory pingEventHandlerFactory = () -> pingEventHandler; + BindableService agentService = new AgentService(new MockDispatchHandler(), pingEventHandlerFactory, Executors.newFixedThreadPool(8), serverRequestFactory); MetadataService metadataService = new MetadataService(new MockDispatchHandler(), Executors.newFixedThreadPool(8), serverRequestFactory); List serviceList = List.of(agentService.bindService(), metadataService.bindService()); diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/AgentKey.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/AgentKey.java new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java index 72028a0776c0..e8b5363cada6 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandler.java @@ -17,12 +17,11 @@ package com.navercorp.pinpoint.grpc.server.lifecycle; import com.navercorp.pinpoint.grpc.Header; -import com.navercorp.pinpoint.grpc.server.ServerContext; -import com.navercorp.pinpoint.grpc.server.TransportMetadata; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Objects; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** * @author Woonduk Kang(emeroad) @@ -31,27 +30,46 @@ public class DefaultPingEventHandler implements PingEventHandler { private static final long PING_MIN_TIME_MILLIS = 60 * 1000; // 1min + private static final AtomicIntegerFieldUpdater UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultPingEventHandler.class, "connect"); + private static final int INIT = 0; + private static final int CONNECTED = 1; + private final Logger logger = LogManager.getLogger(this.getClass()); + private final PingSessionRegistry pingSessionRegistry; private final LifecycleListener lifecycleListener; - public DefaultPingEventHandler(PingSessionRegistry pingSessionRegistry, LifecycleListener lifecycleListener) { + private volatile int connect = INIT; + + private final PingSession pingSession; + + + public DefaultPingEventHandler(PingSessionRegistry pingSessionRegistry, LifecycleListener lifecycleListener, + long transportId, Header header) { this.pingSessionRegistry = Objects.requireNonNull(pingSessionRegistry, "pingSessionRegistry"); this.lifecycleListener = Objects.requireNonNull(lifecycleListener, "lifecycleListener"); + + this.pingSession = PingSession.of(transportId, header); + pingSession.setLastPingTimeMillis(getCurrentPingTimeMillis()); } - @Override - public void connect() { - final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(); - if (transportMetadata == null) { - logger.info("Skip connect event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo()); - return; + public PingSession getPingSession() { + return pingSession; + } + + public void ping() { + if (UPDATER.compareAndSet(this, INIT, CONNECTED)) { + connect0(); + } else { + ping0(); } + } + + public void connect0() { + logger.debug("connect {}", pingSession); + pingSession.setLastPingTimeMillis(getCurrentPingTimeMillis()); + - final Long transportId = transportMetadata.getTransportId(); - final Header header = ServerContext.getAgentInfo(); - final PingSession pingSession = PingSession.of(transportId, header); - pingSession.setLastPingTimeMillis(System.currentTimeMillis()); final PingSession oldSession = pingSessionRegistry.add(pingSession.getId(), pingSession); if (oldSession != null) { logger.warn("Duplicated ping session old={}, new={}", oldSession, pingSession); @@ -59,21 +77,15 @@ public void connect() { lifecycleListener.connect(pingSession); } - @Override - public void ping() { - final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(); - if (transportMetadata == null) { - logger.info("Skip ping event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo()); - return; - } + long getCurrentPingTimeMillis() { + return System.currentTimeMillis(); + } + + + public void ping0() { - final PingSession pingSession = pingSessionRegistry.get(transportMetadata.getTransportId()); - if (pingSession == null) { - logger.info("Skip ping event handle of ping, not found ping session. transportMetadata={}", transportMetadata); - return; - } // Avoid too frequent updates. - final long currentTimeMillis = System.currentTimeMillis(); + final long currentTimeMillis = getCurrentPingTimeMillis(); if (PING_MIN_TIME_MILLIS < (currentTimeMillis - pingSession.getLastPingTimeMillis())) { return; } @@ -84,36 +96,17 @@ public void ping() { @Override public void close() { - final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(); - if (transportMetadata == null) { - logger.info("Skip close event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo()); - return; - } - final PingSession removedSession = pingSessionRegistry.remove(transportMetadata.getTransportId()); - if (removedSession == null) { - return; - } + pingSessionRegistry.remove(pingSession.getId()); + if (logger.isDebugEnabled()) { - logger.debug("Remove ping session. pingSession={}", removedSession); + logger.debug("Remove ping session. pingSession={}", pingSession); } - lifecycleListener.close(removedSession); + lifecycleListener.close(pingSession); } @Override - public void update(final short serviceType) { - final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(); - if (transportMetadata == null) { - logger.info("Skip update event handle of ping, not found TransportMetadata. header={}", ServerContext.getAgentInfo()); - return; - } - - final PingSession pingSession = pingSessionRegistry.get(transportMetadata.getTransportId()); - if (pingSession == null) { - logger.info("Skip update event handle of ping, not found ping session. transportMetadata={}", transportMetadata); - return; - } - pingSession.setServiceType(serviceType); + public void update() { if (logger.isDebugEnabled()) { logger.debug("Update ping session. PingSession={}", pingSession); } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandlerFactory.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandlerFactory.java new file mode 100644 index 000000000000..f86ff9115c3d --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/DefaultPingEventHandlerFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright 2019 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.grpc.server.lifecycle; + +import com.navercorp.pinpoint.grpc.server.ServerContext; +import com.navercorp.pinpoint.grpc.server.TransportMetadata; + +import java.util.Objects; + +/** + * @author Woonduk Kang(emeroad) + * @author jaehong.kim + */ +public class DefaultPingEventHandlerFactory implements PingEventHandlerFactory { + private final PingSessionRegistry pingSessionRegistry; + private final LifecycleListener lifecycleListener; + + public DefaultPingEventHandlerFactory(PingSessionRegistry pingSessionRegistry, LifecycleListener lifecycleListener) { + this.pingSessionRegistry = Objects.requireNonNull(pingSessionRegistry, "pingSessionRegistry"); + this.lifecycleListener = Objects.requireNonNull(lifecycleListener, "lifecycleListener"); + } + + @Override + public PingEventHandler createPingEventHandler() { + final TransportMetadata transportMetadata = ServerContext.getTransportMetadata(); + final com.navercorp.pinpoint.grpc.Header header = ServerContext.getAgentInfo(); + return new DefaultPingEventHandler(pingSessionRegistry, lifecycleListener, transportMetadata.getTransportId(), header); + } +} diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/PingEventHandler.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/PingEventHandler.java index db525cc0af73..8814a2d28d47 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/PingEventHandler.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/PingEventHandler.java @@ -21,11 +21,12 @@ * @author jaehong.kim */ public interface PingEventHandler { - void connect(); + + PingSession getPingSession(); void ping(); void close(); - void update(short serviceType); + void update(); } diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/PingEventHandlerFactory.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/lifecycle/PingEventHandlerFactory.java new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentEventServiceImpl.java b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentEventServiceImpl.java index 8bad21191af3..49887b50402f 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentEventServiceImpl.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentEventServiceImpl.java @@ -73,6 +73,10 @@ public List getAgentEvents(String agentId, Range range, Set agentEventBos = this.agentEventDao.getAgentEvents(agentId, range, excludeEventTypeCodes); List agentEvents = createAgentEvents(agentEventBos); agentEvents.sort(AgentEvent.EVENT_TIMESTAMP_ASC_COMPARATOR); + + if (!agentEvents.isEmpty() ) { + logger.info("--------------agentEvents: {} {}", agentEvents.size(), agentId); + } return agentEvents; } diff --git a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentInfoServiceImpl.java b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentInfoServiceImpl.java index 3365efddb52c..61e98e77ea6d 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/service/AgentInfoServiceImpl.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/service/AgentInfoServiceImpl.java @@ -172,7 +172,7 @@ public AgentsMapByHost getAgentsListByApplicationName(AgentStatusFilter agentSta final int totalAgentCount = agentsMapByHost.size(); if (logger.isInfoEnabled()) { - logger.info("getAgentsMapByHostname size:{}", totalAgentCount); + logger.info("getAgentsMapByHostname filter:{}-->{}", agentInfoAndStatuses.size(), totalAgentCount); } if (logger.isDebugEnabled()) { logger.debug("getAgentsMapByHostname size:{} data:{}", totalAgentCount, agentsMapByHost); @@ -187,6 +187,7 @@ private boolean isActiveAgentPredicate(AgentAndStatus agentAndStatus, logger.trace("isActiveAgentPredicate {}", agentAndStatus); AgentInfo agentInfo = agentAndStatus.getAgentInfo(); if (agentInfoPredicate.test(agentInfo)) { + logger.info("-----------isActiveAgentPredicate {}", agentAndStatus); logger.trace("agentInfoPredicate=true"); } if (agentStatusFilter.test(agentAndStatus.getStatus())) { @@ -479,6 +480,7 @@ public InspectorTimeline getAgentStatusTimeline(String agentId, Range range, int AgentStatus initialStatus = getAgentStatus(agentId, range.getFrom()); List agentEvents = agentEventService.getAgentEvents(agentId, range); + logger.info("---------agentEvents:{}", agentEvents); List warningStatusTimelineSegmentList = agentWarningStatService.select(agentId, range);