From 711c7118f5bad7c57c90cfef80f377384eda1dbc Mon Sep 17 00:00:00 2001 From: Seoyoung Park Date: Fri, 20 Dec 2024 11:37:46 +0900 Subject: [PATCH] [#10882] Add server map module based on redis-timeseries --- collector/pom.xml | 6 + .../collector/PinpointCollectorModule.java | 3 + .../applicationmap/ApplicationMapModule.java | 32 +++ .../applicationmap/dao/InboundDao.java | 33 +++ .../applicationmap/dao/OutboundDao.java | 33 +++ .../collector/applicationmap/dao/SelfDao.java | 28 +++ .../applicationmap/redis/RedisInboundDao.java | 136 ++++++++++++ .../redis/RedisOutboundDao.java | 127 +++++++++++ .../applicationmap/redis/RedisSelfDao.java | 148 +++++++++++++ .../redis/schema/ApplicationMapTable.java | 35 +++ .../redis/schema/LabelToKey.java | 29 +++ .../redis/schema/TimeSeriesKey.java | 72 ++++++ .../redis/schema/TimeSeriesLabel.java | 39 ++++ .../redis/schema/TimeSeriesValue.java | 40 ++++ .../redis/statistics/RedisBulkFactory.java | 61 +++++ .../redis/statistics/RedisBulkWriter.java | 61 +++++ .../service/ApplicationMapService.java | 121 ++++++++++ .../service/RedisTraceService.java | 208 ++++++++++++++++++ .../dao/hbase/HbaseMapResponseTimeDao.java | 8 +- .../hbase/HbaseMapStatisticsCalleeDao.java | 4 +- .../hbase/HbaseMapStatisticsCallerDao.java | 6 +- .../dao/hbase/statistics/BulkFactory.java | 54 ++--- .../dao/hbase/statistics/BulkWriter.java | 8 +- .../hbase/statistics/DefaultBulkWriter.java | 2 +- .../dao/hbase/statistics/SyncWriter.java | 2 +- .../handler/grpc/GrpcSpanChunkHandler.java | 7 +- .../handler/grpc/GrpcSpanHandler.java | 7 +- 27 files changed, 1267 insertions(+), 43 deletions(-) create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/ApplicationMapModule.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/InboundDao.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/OutboundDao.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/SelfDao.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisInboundDao.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisOutboundDao.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisSelfDao.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/ApplicationMapTable.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/LabelToKey.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesKey.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesLabel.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesValue.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkFactory.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkWriter.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/ApplicationMapService.java create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/RedisTraceService.java diff --git a/collector/pom.xml b/collector/pom.xml index f2256d67885a..864bb285bae8 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -222,6 +222,12 @@ jakarta.annotation-api + + + com.navercorp.pinpoint + pinpoint-redis-timeseries + 3.1.0-SNAPSHOT + diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java b/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java index 7410ee2878d7..386f968e2e76 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java @@ -1,6 +1,7 @@ package com.navercorp.pinpoint.collector; +import com.navercorp.pinpoint.collector.applicationmap.ApplicationMapModule; import com.navercorp.pinpoint.collector.config.ClusterModule; import com.navercorp.pinpoint.collector.config.CollectorCommonConfiguration; import com.navercorp.pinpoint.collector.config.CollectorConfiguration; @@ -35,6 +36,8 @@ GrpcSslModule.class, RealtimeCollectorModule.class, + + ApplicationMapModule.class, }) @ComponentScan(basePackages = { "com.navercorp.pinpoint.collector.handler", diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/ApplicationMapModule.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/ApplicationMapModule.java new file mode 100644 index 000000000000..48ede98986af --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/ApplicationMapModule.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap; + +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +/** + * @author intr3p1d + */ +@Configuration(proxyBeanMethods = false) +@ComponentScan(basePackages = { + "com.navercorp.pinpoint.collector.applicationmap", + "com.navercorp.pinpoint.collector.applicationmap.dao", + "com.navercorp.pinpoint.collector.applicationmap.redis", + "com.navercorp.pinpoint.collector.applicationmap.service", +}) +public class ApplicationMapModule { +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/InboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/InboundDao.java new file mode 100644 index 000000000000..6ca7c8525525 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/InboundDao.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.dao; + +import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao; +import com.navercorp.pinpoint.common.trace.ServiceType; + +/** + * @author intr3p1d + */ +public interface InboundDao extends CachedStatisticsDao { + // src -> dest + // inbound (rowKey dest <- columnName src) + // outbound (rowKey src -> columnName dest) + void update( + String srcServiceName, String srcApplicationName, ServiceType srcApplicationType, + String destServiceName, String destApplicationName, ServiceType destApplicationType, + String srcHost, int elapsed, boolean isError + ); +} \ No newline at end of file diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/OutboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/OutboundDao.java new file mode 100644 index 000000000000..afa50f07edd7 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/OutboundDao.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.dao; + +import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao; +import com.navercorp.pinpoint.common.trace.ServiceType; + +/** + * @author intr3p1d + */ +public interface OutboundDao extends CachedStatisticsDao { + // src -> dest + // inbound (rowKey dest <- columnName src) + // outbound (rowKey src -> columnName dest) + void update( + String srcServiceName, String srcApplicationName, ServiceType srcApplicationType, + String destServiceName, String destApplicationName, ServiceType destApplicationType, + String srcHost, int elapsed, boolean isError + ); +} \ No newline at end of file diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/SelfDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/SelfDao.java new file mode 100644 index 000000000000..1da0f8c92fe9 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/SelfDao.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.dao; + + +import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao; +import com.navercorp.pinpoint.common.trace.ServiceType; + +/** + * @author intr3p1d + */ +public interface SelfDao extends CachedStatisticsDao { + void received(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError); + void updatePing(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError); +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisInboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisInboundDao.java new file mode 100644 index 000000000000..ed24a425ba78 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisInboundDao.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis; + +import com.navercorp.pinpoint.collector.applicationmap.dao.InboundDao; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.ApplicationMapTable; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesKey; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesValue; +import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration; +import com.navercorp.pinpoint.collector.applicationmap.redis.statistics.RedisBulkWriter; +import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.server.util.TimeSlot; +import com.navercorp.pinpoint.common.trace.HistogramSchema; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class RedisInboundDao implements InboundDao { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final AcceptedTimeService acceptedTimeService; + private final IgnoreStatFilter ignoreStatFilter; + private final RedisBulkWriter bulkWriter; + private final MapLinkConfiguration mapLinkConfiguration; + + public RedisInboundDao( + MapLinkConfiguration mapLinkConfiguration, + AcceptedTimeService acceptedTimeService, + IgnoreStatFilter ignoreStatFilter, + @Qualifier("inboundBulkWriter") RedisBulkWriter bulkWriter + ) { + this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); + this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); + this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter"); + this.bulkWriter = Objects.requireNonNull(bulkWriter, "inboundBulkWriter"); + } + + + @Override + public void update( + String srcServiceName, String srcApplicationName, ServiceType srcApplicationType, + String destServiceName, String destApplicationName, ServiceType destApplicationType, + String srcHost, int elapsed, boolean isError + ) { + Objects.requireNonNull(srcServiceName, "srcServiceName"); + Objects.requireNonNull(destServiceName, "destServiceName"); + Objects.requireNonNull(srcApplicationName, "srcApplicationName"); + Objects.requireNonNull(destServiceName, "destApplicationName"); + + if (logger.isDebugEnabled()) { + logger.debug("[Inbound] {} {}({}) <- {} {}({})[{}]", + destServiceName, destApplicationName, destApplicationType, + srcServiceName, srcApplicationName, srcApplicationType, srcHost + ); + } + + if (ignoreStatFilter.filter(srcApplicationType, srcHost)) { + logger.debug("[Ignore-Inbound] {} {}({}) <- {} {}({})[{}]", + destServiceName, destApplicationName, destApplicationType, + srcServiceName, srcApplicationName, srcApplicationType, srcHost + ); + return; + } + + final short srcSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(srcApplicationType, elapsed, isError); + HistogramSchema histogramSchema = srcApplicationType.getHistogramSchema(); + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + + // for inbound, main is destination + // and sub is source + final TimeSeriesKey applicationTypeKey = new TimeSeriesKey( + ApplicationMapTable.Inbound, "tenantId", + destServiceName, destApplicationName, + srcServiceName, srcApplicationName, srcSlotNumber + ); + TimeSeriesValue addOne = new TimeSeriesValue(acceptedTime); + this.bulkWriter.increment(applicationTypeKey, addOne); + + if (mapLinkConfiguration.isEnableAvg()) { + final TimeSeriesKey sumStatKey = new TimeSeriesKey( + ApplicationMapTable.Inbound, "tenantId", + destServiceName, destApplicationName, + srcServiceName, srcApplicationName, + histogramSchema.getSumStatSlot().getSlotTime() + ); + final TimeSeriesValue sumValue = new TimeSeriesValue(acceptedTime); + this.bulkWriter.increment(sumStatKey, sumValue, elapsed); + } + if (mapLinkConfiguration.isEnableMax()) { + final TimeSeriesKey maxStatKey = new TimeSeriesKey( + ApplicationMapTable.Inbound, "tenantId", + destServiceName, destApplicationName, + srcServiceName, srcApplicationName, + histogramSchema.getMaxStatSlot().getSlotTime() + ); + final TimeSeriesValue maxValue = new TimeSeriesValue(acceptedTime); + this.bulkWriter.updateMax(maxStatKey, maxValue, elapsed); + } + + } + + @Override + public void flushLink() { + this.bulkWriter.flushLink(); + } + + @Override + public void flushAvgMax() { + this.bulkWriter.flushAvgMax(); + } + +} \ No newline at end of file diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisOutboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisOutboundDao.java new file mode 100644 index 000000000000..7fba3d41bac4 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisOutboundDao.java @@ -0,0 +1,127 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis; + +import com.navercorp.pinpoint.collector.applicationmap.dao.OutboundDao; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.ApplicationMapTable; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesKey; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesValue; +import com.navercorp.pinpoint.collector.applicationmap.redis.statistics.RedisBulkWriter; +import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration; +import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.trace.HistogramSchema; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class RedisOutboundDao implements OutboundDao { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final AcceptedTimeService acceptedTimeService; + private final RedisBulkWriter bulkWriter; + private final MapLinkConfiguration mapLinkConfiguration; + + public RedisOutboundDao( + MapLinkConfiguration mapLinkConfiguration, + AcceptedTimeService acceptedTimeService, + IgnoreStatFilter ignoreStatFilter, + @Qualifier("outboundBulkWriter") RedisBulkWriter bulkWriter + ) { + this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); + this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); + this.bulkWriter = Objects.requireNonNull(bulkWriter, "outboundBulkWriter"); + } + + + @Override + public void update( + String srcServiceName, String srcApplicationName, ServiceType srcApplicationType, + String destServiceName, String destApplicationName, ServiceType destApplicationType, + String srcHost, int elapsed, boolean isError + ) { + // outbound (rowKey src -> columnName dest) + Objects.requireNonNull(destServiceName, "destServiceName"); + Objects.requireNonNull(srcServiceName, "srcServiceName"); + Objects.requireNonNull(destApplicationName, "destApplicationName"); + Objects.requireNonNull(srcServiceName, "srcApplicationName"); + + if (logger.isDebugEnabled()) { + logger.debug("[Outbound] {} {}({})[{}] -> {} {}({})", + srcServiceName, srcApplicationName, srcApplicationType, srcHost, + destServiceName, destApplicationName, destApplicationType + ); + } + + final short destSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(destApplicationType, elapsed, isError); + HistogramSchema histogramSchema = destApplicationType.getHistogramSchema(); + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + + // for outbound, main is source + // and sub is destination + final TimeSeriesKey applicationTypeKey = new TimeSeriesKey( + ApplicationMapTable.Outbound, "tenantId", + srcServiceName, srcApplicationName, + destServiceName, destApplicationName, destSlotNumber + ); + + final TimeSeriesValue addOne = new TimeSeriesValue(acceptedTime); + this.bulkWriter.increment(applicationTypeKey, addOne); + + if (mapLinkConfiguration.isEnableAvg()) { + final TimeSeriesKey sumStatKey = new TimeSeriesKey( + ApplicationMapTable.Outbound, "tenantId", + srcServiceName, srcApplicationName, + destServiceName, destApplicationName, + histogramSchema.getSumStatSlot().getSlotTime() + ); + final TimeSeriesValue sumValue = new TimeSeriesValue(acceptedTime); + this.bulkWriter.increment(sumStatKey, sumValue, elapsed); + } + if (mapLinkConfiguration.isEnableMax()) { + final TimeSeriesKey maxStatKey = new TimeSeriesKey( + ApplicationMapTable.Outbound, "tenantId", + srcServiceName, srcApplicationName, + destServiceName, destApplicationName, + histogramSchema.getMaxStatSlot().getSlotTime() + ); + final TimeSeriesValue maxValue = new TimeSeriesValue(acceptedTime); + this.bulkWriter.updateMax(maxStatKey, maxValue, elapsed); + } + } + + + @Override + public void flushLink() { + this.bulkWriter.flushLink(); + } + + @Override + public void flushAvgMax() { + this.bulkWriter.flushAvgMax(); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisSelfDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisSelfDao.java new file mode 100644 index 000000000000..06709ab66077 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/RedisSelfDao.java @@ -0,0 +1,148 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis; + +import com.navercorp.pinpoint.collector.applicationmap.dao.SelfDao; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.ApplicationMapTable; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesKey; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesValue; +import com.navercorp.pinpoint.collector.applicationmap.redis.statistics.RedisBulkWriter; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; +import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.server.util.TimeSlot; +import com.navercorp.pinpoint.common.trace.HistogramSchema; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class RedisSelfDao implements SelfDao { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final AcceptedTimeService acceptedTimeService; + + private final TimeSlot timeSlot; + private final RedisBulkWriter bulkWriter; + private final MapLinkConfiguration mapLinkConfiguration; + + public RedisSelfDao(MapLinkConfiguration mapLinkConfiguration, + AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, + @Qualifier("applicationMapSelfBulkWriter") RedisBulkWriter bulkWriter) { + this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); + this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); + this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); + this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWriter"); + } + + + @Override + public void received( + String serviceName, String applicationName, ServiceType applicationType, + int elapsed, boolean isError + ) { + Objects.requireNonNull(serviceName, "serviceName"); + Objects.requireNonNull(applicationName, "applicationName"); + + if (logger.isDebugEnabled()) { + logger.debug("[Received] {} {} ({})", serviceName, applicationName, applicationType); + } + + final short slotNumber = ApplicationMapStatisticsUtils.getSlotNumber(applicationType, elapsed, isError); + HistogramSchema histogramSchema = applicationType.getHistogramSchema(); + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + + // for self, main is me + // and sub is also me + final TimeSeriesKey applicationTypeKey = new TimeSeriesKey( + ApplicationMapTable.Self, "tenantId", + serviceName, applicationName, + serviceName, applicationName, + slotNumber + ); + TimeSeriesValue addOne = new TimeSeriesValue(acceptedTime); + this.bulkWriter.increment(applicationTypeKey, addOne); + + if (mapLinkConfiguration.isEnableAvg()) { + final TimeSeriesKey sumStatKey = new TimeSeriesKey( + ApplicationMapTable.Self, "tenantId", + serviceName, applicationName, + serviceName, applicationName, + histogramSchema.getSumStatSlot().getSlotTime() + ); + final TimeSeriesValue sumValue = new TimeSeriesValue(acceptedTime); + this.bulkWriter.increment(sumStatKey, sumValue, elapsed); + } + if (mapLinkConfiguration.isEnableMax()) { + final TimeSeriesKey maxStatKey = new TimeSeriesKey( + ApplicationMapTable.Self, "tenantId", + serviceName, applicationName, + serviceName, applicationName, + histogramSchema.getMaxStatSlot().getSlotTime() + ); + final TimeSeriesValue maxValue = new TimeSeriesValue(acceptedTime); + this.bulkWriter.updateMax(maxStatKey, maxValue, elapsed); + } + } + + @Override + public void updatePing( + String serviceName, String applicationName, ServiceType applicationType, + int elapsed, boolean isError + ) { + Objects.requireNonNull(serviceName, "serviceName"); + Objects.requireNonNull(applicationName, "applicationName"); + + if (logger.isDebugEnabled()) { + logger.debug("[Received] {} {} ({})", serviceName, applicationName, applicationType); + } + + // make row key. rowkey is me + final short slotNumber = ApplicationMapStatisticsUtils.getPingSlotNumber(applicationType, elapsed, isError); + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + + final TimeSeriesKey selfPingKey = new TimeSeriesKey( + ApplicationMapTable.Self, "tenantId", + serviceName, applicationName, + serviceName, applicationName, + slotNumber + ); + TimeSeriesValue addOne = new TimeSeriesValue(acceptedTime); + this.bulkWriter.increment(selfPingKey, addOne); + } + + + @Override + public void flushLink() { + this.bulkWriter.flushLink(); + } + + @Override + public void flushAvgMax() { + this.bulkWriter.flushAvgMax(); + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/ApplicationMapTable.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/ApplicationMapTable.java new file mode 100644 index 000000000000..cfabb37a7fb9 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/ApplicationMapTable.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis.schema; + +/** + * @author intr3p1d + */ +public enum ApplicationMapTable { + Inbound("ApplicationMapInbound"), + Outbound("ApplicationMapOutbound"), + Self("ApplicationMapSelf"); + + private final String table; + + ApplicationMapTable(String table) { + this.table = table; + } + + public String getTable() { + return table; + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/LabelToKey.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/LabelToKey.java new file mode 100644 index 000000000000..70b27788532d --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/LabelToKey.java @@ -0,0 +1,29 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis.schema; + +/** + * @author intr3p1d + */ +public record LabelToKey(TimeSeriesLabel label, String value) { + public String getLabel() { + return label.getLabel(); + } + + public String getValue() { + return value; + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesKey.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesKey.java new file mode 100644 index 000000000000..48bc715036ce --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesKey.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis.schema; + +import com.navercorp.pinpoint.redis.timeseries.protocol.Labels; + +/** + * @author intr3p1d + */ +public class TimeSeriesKey { + // label can be filtering by redis + + // ts:tableKind:tenantId:mainServiceId:mainApplicationName: + // subServiceId:subApplicationName:subServiceTypeSlot + + // record callCount as timeseries value + // rowTimeSlot will be calculated after the data is inserted (redis -> hbase) + private final LabelToKey tableKind; + private final LabelToKey tenantId; + private final LabelToKey mainServiceId; + private final LabelToKey mainApplicationName; + private final LabelToKey subServiceId; + private final LabelToKey subApplicationName; + private final LabelToKey subServiceTypeSlot; + + public TimeSeriesKey( + ApplicationMapTable tableKind, String tenantId, + String mainServiceId, String mainApplicationName, + String subServiceId, String subApplicationName, + short subServiceTypeSlot + ) { + this.tableKind = new LabelToKey(TimeSeriesLabel.TABLE_KIND, tableKind.getTable()); + this.tenantId = new LabelToKey(TimeSeriesLabel.TENANT_ID, tenantId); + this.mainServiceId = new LabelToKey(TimeSeriesLabel.MAIN_SERVICE_ID, mainServiceId); + this.mainApplicationName = new LabelToKey(TimeSeriesLabel.MAIN_APPLICATION_NAME, mainApplicationName); + this.subServiceId = new LabelToKey(TimeSeriesLabel.SUB_SERVICE_ID, subServiceId); + this.subApplicationName = new LabelToKey(TimeSeriesLabel.SUB_APPLICATION_NAME, subApplicationName); + this.subServiceTypeSlot = new LabelToKey(TimeSeriesLabel.SUB_SERVICE_TYPE_SLOT, String.valueOf(subServiceTypeSlot)); + } + + public String getKey() { + return "ts:" + tableKind.getValue() + ":" + tenantId.getValue() + + ":" + mainServiceId.getValue() + ":" + mainApplicationName.getValue() + + ":" + subServiceId.getValue() + ":" + subApplicationName.getValue() + + ":" + subServiceTypeSlot.getValue(); + } + + public Labels toLabels() { + Labels labels = new Labels(); + labels.addLabel(tableKind.getLabel(), tableKind.getValue()); + labels.addLabel(tenantId.getLabel(), tenantId.getValue()); + labels.addLabel(mainServiceId.getLabel(), mainServiceId.getValue()); + labels.addLabel(mainApplicationName.getLabel(), mainApplicationName.getValue()); + labels.addLabel(subServiceId.getLabel(), subServiceId.getValue()); + labels.addLabel(subApplicationName.getLabel(), subApplicationName.getValue()); + labels.addLabel(subServiceTypeSlot.getLabel(), subServiceTypeSlot.getValue()); + return labels; + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesLabel.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesLabel.java new file mode 100644 index 000000000000..c9fbb472d15c --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesLabel.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis.schema; + +/** + * @author intr3p1d + */ +public enum TimeSeriesLabel { + TABLE_KIND("tableKind"), + TENANT_ID("tenantId"), + MAIN_SERVICE_ID("mainServiceId"), + MAIN_APPLICATION_NAME("mainApplicationName"), + SUB_SERVICE_ID("subServiceId"), + SUB_APPLICATION_NAME("subApplicationName"), + SUB_SERVICE_TYPE_SLOT("subServiceTypeSlot"); + + private final String label; + + TimeSeriesLabel(String label) { + this.label = label; + } + + public String getLabel() { + return label; + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesValue.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesValue.java new file mode 100644 index 000000000000..9197a2818c58 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/schema/TimeSeriesValue.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis.schema; + +/** + * @author intr3p1d + */ +public class TimeSeriesValue { + private long callCount; + private final long timestamp; + + public TimeSeriesValue(long timestamp) { + this.timestamp = timestamp; + } + + public long getCallCount() { + return callCount; + } + + public long getTimestamp() { + return timestamp; + } + + public void setCallCount(long callCount) { + this.callCount = callCount; + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkFactory.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkFactory.java new file mode 100644 index 000000000000..4e22cd34f8b6 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis.statistics; + +import com.navercorp.pinpoint.redis.timeseries.RedisTimeseriesAsyncCommands; +import com.navercorp.pinpoint.redis.timeseries.RedisTimeseriesAsyncCommandsImpl; +import com.navercorp.pinpoint.redis.timeseries.connection.AsyncConnection; +import com.navercorp.pinpoint.redis.timeseries.connection.SimpleAsyncConnection; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author intr3p1d + */ +@Configuration +public class RedisBulkFactory { + + public RedisBulkFactory() { + } + + + private RedisBulkWriter newRedisBulkWriter() { + RedisURI redisURI = RedisURI.create("redis://localhost:6379"); + RedisClient client = RedisClient.create(redisURI); + AsyncConnection connection = new SimpleAsyncConnection<>(client.connect()); + RedisTimeseriesAsyncCommands commands = new RedisTimeseriesAsyncCommandsImpl(connection); + return new RedisBulkWriter(commands); + } + + + @Bean + public RedisBulkWriter inboundBulkWriter() { + return newRedisBulkWriter(); + } + + @Bean + public RedisBulkWriter outboundBulkWriter() { + return newRedisBulkWriter(); + } + + @Bean + public RedisBulkWriter applicationMapSelfBulkWriter() { + return newRedisBulkWriter(); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkWriter.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkWriter.java new file mode 100644 index 000000000000..fa0ef64c80bf --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/redis/statistics/RedisBulkWriter.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.redis.statistics; + +import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesKey; +import com.navercorp.pinpoint.collector.applicationmap.redis.schema.TimeSeriesValue; +import com.navercorp.pinpoint.redis.timeseries.RedisTimeseriesAsyncCommands; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +public class RedisBulkWriter implements BulkWriter { + + RedisTimeseriesAsyncCommands commands; + + public RedisBulkWriter(RedisTimeseriesAsyncCommands commands) { + this.commands = Objects.requireNonNull(commands, "commands"); + } + + @Override + public void increment(TimeSeriesKey timeSeriesKey, TimeSeriesValue timeSeriesValue) { + commands.tsAdd(timeSeriesKey.getKey(), timeSeriesValue.getTimestamp(), 1); + } + + @Override + public void increment(TimeSeriesKey timeSeriesKey, TimeSeriesValue timeSeriesValue, long addition) { + commands.tsAdd(timeSeriesKey.getKey(), timeSeriesValue.getTimestamp(), addition); + } + + @Override + public void updateMax(TimeSeriesKey timeSeriesKey, TimeSeriesValue timeSeriesValue, long value) { + // calculate max later + commands.tsAdd(timeSeriesKey.getKey(), timeSeriesValue.getTimestamp(), value); + } + + @Override + public void flushLink() { + // do nothing + } + + @Override + public void flushAvgMax() { + // do nothing + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/ApplicationMapService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/ApplicationMapService.java new file mode 100644 index 000000000000..8f96e5713b13 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/ApplicationMapService.java @@ -0,0 +1,121 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.service; + +import com.navercorp.pinpoint.collector.applicationmap.dao.InboundDao; +import com.navercorp.pinpoint.collector.applicationmap.dao.OutboundDao; +import com.navercorp.pinpoint.collector.applicationmap.dao.SelfDao; +import com.navercorp.pinpoint.common.trace.ServiceType; +import jakarta.validation.constraints.NotBlank; +import org.springframework.stereotype.Service; +import org.springframework.validation.annotation.Validated; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Service +@Validated +public class ApplicationMapService { + private final InboundDao inboundDao; + private final OutboundDao outboundDao; + private final SelfDao selfDao; + + public ApplicationMapService( + InboundDao inboundDao, + OutboundDao outboundDao, + SelfDao selfDao + ) { + this.inboundDao = Objects.requireNonNull(inboundDao, "inboundDao"); + this.outboundDao = Objects.requireNonNull(outboundDao, "outboundDao"); + this.selfDao = Objects.requireNonNull(selfDao, "selfDao"); + } + + + public void updateBidirectional( + @NotBlank String srcServiceGroup, + @NotBlank String srcApplicationName, ServiceType srcServiceType, + @NotBlank String srcHost, + @NotBlank String destServiceGroup, + @NotBlank String destApplicationName, ServiceType destServiceType, + @NotBlank String destHost, + int elapsed, boolean isError + ) { + // src -> dest + // inbound (rowKey dest <- columnName src) + // outbound (rowKey src -> columnName dest) + + updateOutbound( + srcServiceGroup, srcApplicationName, srcServiceType, + destServiceGroup, destApplicationName, destServiceType, + srcHost, elapsed, isError + ); + + updateInbound( + srcServiceGroup, srcApplicationName, srcServiceType, + destServiceGroup, destApplicationName, destServiceType, + srcHost, elapsed, isError + ); + + } + + + public void updateInbound( + @NotBlank String srcServiceGroup, @NotBlank String srcApplicationName, ServiceType srcServiceType, + @NotBlank String destServiceGroup, @NotBlank String destApplicationName, ServiceType destServiceType, + @NotBlank String srcHost, int elapsed, boolean isError + ) { + // inbound (rowKey dest <- columnName src) + inboundDao.update( + srcServiceGroup, srcApplicationName, srcServiceType, + destServiceGroup, destApplicationName, destServiceType, + srcHost, elapsed, isError + ); + } + + public void updateOutbound( + @NotBlank String srcServiceGroup, @NotBlank String srcApplicationName, ServiceType srcServiceType, + @NotBlank String destServiceGroup, @NotBlank String destApplicationName, ServiceType destServiceType, + @NotBlank String srcHost, int elapsed, boolean isError + ) { + // outbound (rowKey src -> columnName dest) + outboundDao.update( + srcServiceGroup, srcApplicationName, srcServiceType, + destServiceGroup, destApplicationName, destServiceType, + srcHost, elapsed, isError + ); + } + + public void updateSelfResponseTime( + @NotBlank String serviceGroup, @NotBlank String applicationName, ServiceType applicationServiceType, + int elapsed, boolean isError + ) { + selfDao.received( + serviceGroup, applicationName, applicationServiceType, elapsed, isError + ); + } + + public void updateAgentState( + @NotBlank String serviceGroup, @NotBlank String applicationName, ServiceType applicationServiceType + ) { + selfDao.updatePing( + serviceGroup, applicationName, applicationServiceType, 0, false + ); + } + + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/RedisTraceService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/RedisTraceService.java new file mode 100644 index 000000000000..c9a0ea10bb9c --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/RedisTraceService.java @@ -0,0 +1,208 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.applicationmap.service; + +import com.navercorp.pinpoint.collector.service.TraceService; +import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger; +import com.navercorp.pinpoint.common.server.bo.SpanBo; +import com.navercorp.pinpoint.common.server.bo.SpanChunkBo; +import com.navercorp.pinpoint.common.server.bo.SpanEventBo; +import com.navercorp.pinpoint.common.trace.ServiceType; +import com.navercorp.pinpoint.common.trace.ServiceTypeCategory; +import com.navercorp.pinpoint.loader.service.ServiceTypeRegistryService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.stereotype.Service; +import org.springframework.validation.annotation.Validated; + +import java.util.List; +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Service +@Validated +public class RedisTraceService implements TraceService { + private final Logger logger = LogManager.getLogger(getClass()); + + private final ThrottledLogger throttledLogger = ThrottledLogger.getLogger(logger, 10000); + private final ApplicationMapService applicationMapService; + private final ServiceTypeRegistryService registry; + + + public RedisTraceService( + ApplicationMapService applicationMapService, + ServiceTypeRegistryService registry + ) { + this.applicationMapService = Objects.requireNonNull(applicationMapService, "applicationMapService"); + this.registry = Objects.requireNonNull(registry, "registry"); + } + + @Override + public void insertSpanChunk(SpanChunkBo spanChunkBo) { + final ServiceType applicationServiceType = getApplicationServiceType(spanChunkBo); + final List spanEventList = spanChunkBo.getSpanEventBoList(); + if (spanEventList != null) { + // TODO need to batch update later. + insertSpanEventList(spanEventList, applicationServiceType, spanChunkBo.getApplicationId(), spanChunkBo.getAgentId(), spanChunkBo.getEndPoint()); + } + } + + @Override + public void insertSpan(SpanBo spanBo) { + insertSpanStat(spanBo); + } + + private ServiceType getApplicationServiceType(SpanChunkBo spanChunk) { + final short applicationServiceTypeCode = spanChunk.getApplicationServiceType(); + return registry.findServiceType(applicationServiceTypeCode); + } + + private ServiceType getApplicationServiceType(SpanBo span) { + // Check if applicationServiceType is set. If not, use span's service type. + final short applicationServiceTypeCode = span.getApplicationServiceType(); + return registry.findServiceType(applicationServiceTypeCode); + } + + private String normalize(String spanEventApplicationName, ServiceType spanEventType) { + if (spanEventType.getCategory() == ServiceTypeCategory.DATABASE) { + // empty database id + if (spanEventApplicationName == null) { + return "UNKNOWN_DATABASE"; + } + } + return spanEventApplicationName; + } + + + private void insertSpanStat(SpanBo span) { + final ServiceType applicationServiceType = getApplicationServiceType(span); + final ServiceType spanServiceType = registry.findServiceType(span.getServiceType()); + + final boolean isError = span.getErrCode() != 0; + int bugCheck = 0; + if (span.getParentSpanId() == -1) { + if (spanServiceType.isQueue()) { + // create virtual queue node + + applicationMapService.updateBidirectional( + "default", + span.getApplicationId(), applicationServiceType, + span.getAgentId(), + "default", + span.getAcceptorHost(), spanServiceType, + span.getEndPoint(), + span.getElapsed(), isError + ); + + } else { + // create virtual user + // update the span information of the current node (self) + applicationMapService.updateBidirectional( + "default", + span.getApplicationId(), ServiceType.USER, + span.getAgentId(), + "default", + span.getApplicationId(), applicationServiceType, + span.getAgentId(), + span.getElapsed(), isError + ); + } + bugCheck++; + } + + // save statistics info only when parentApplicationContext exists + // when drawing server map based on statistics info, you must know the application name of the previous node. + if (span.getParentApplicationId() != null) { + String parentApplicationName = span.getParentApplicationId(); + logger.debug("Received parent application name. {}", parentApplicationName); + + ServiceType parentApplicationType = registry.findServiceType(span.getParentApplicationServiceType()); + + // create virtual queue node if current' span's service type is a queue AND : + // 1. parent node's application service type is not a queue (it may have come from a queue that is traced) + // 2. current node's application service type is not a queue (current node may be a queue that is traced) + if (spanServiceType.isQueue()) { + if (!applicationServiceType.isQueue() && !parentApplicationType.isQueue()) { + // emulate virtual queue node's accept Span and record it's acceptor host + applicationMapService.updateOutbound( + "default", span.getAcceptorHost(), spanServiceType, + "default", span.getApplicationId(), applicationServiceType, + span.getRemoteAddr(), span.getElapsed(), isError + ); + + parentApplicationName = span.getAcceptorHost(); + parentApplicationType = spanServiceType; + } + } + + applicationMapService.updateInbound( + "default", parentApplicationName, parentApplicationType, + "default", span.getApplicationId(), applicationServiceType, + span.getAgentId(), span.getElapsed(), isError + ); + bugCheck++; + } + + // record the response time of the current node (self). + // blow code may be conflict of idea above callee key. + // it is odd to record reversely, because of already recording the caller data at previous node. + // the data may be different due to timeout or network error. + applicationMapService.updateSelfResponseTime( + "default", span.getApplicationId(), applicationServiceType, + span.getElapsed(), isError + ); + + if (bugCheck != 1) { + logger.info("ambiguous span found(bug). span:{}", span); + } + } + + + private void insertSpanEventList(List spanEventList, ServiceType applicationServiceType, String applicationId, String agentId, String endPoint) { + + for (SpanEventBo spanEvent : spanEventList) { + final ServiceType spanEventType = registry.findServiceType(spanEvent.getServiceType()); + + if (!spanEventType.isRecordStatistics()) { + continue; + } + + final String spanEventApplicationName = normalize(spanEvent.getDestinationId(), spanEventType); + // if terminal update statistics + final int elapsed = spanEvent.getEndElapsed(); + final boolean hasException = spanEvent.hasException(); + + if (applicationId == null || spanEventApplicationName == null) { + throttledLogger.info("Failed to insert statistics. Cause:SpanEvent has invalid format." + + "(application:{}/{}[{}], spanEventApplication:{}[{}])", + applicationId, agentId, applicationServiceType, spanEventApplicationName, spanEventType); + continue; + } + + /* + * save information to draw a server map based on statistics + */ + applicationMapService.updateBidirectional( + "default", applicationId, applicationServiceType, endPoint, + "default", spanEventApplicationName, spanEventType, agentId, + elapsed, hasException + ); + } + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java index cf4ee91d0842..a48f8643e8cb 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapResponseTimeDao.java @@ -17,9 +17,9 @@ package com.navercorp.pinpoint.collector.dao.hbase; import com.navercorp.pinpoint.collector.dao.MapResponseTimeDao; -import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; import com.navercorp.pinpoint.collector.dao.hbase.statistics.CallRowKey; import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration; import com.navercorp.pinpoint.collector.dao.hbase.statistics.ResponseColumnName; import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; @@ -51,16 +51,16 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao { private final AcceptedTimeService acceptedTimeService; private final TimeSlot timeSlot; - private final BulkWriter bulkWriter; + private final BulkWriter bulkWriter; private final MapLinkConfiguration mapLinkConfiguration; public HbaseMapResponseTimeDao(MapLinkConfiguration mapLinkConfiguration, AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, - @Qualifier("selfBulkWriter") BulkWriter bulkWriter) { + @Qualifier("selfBulkWriter") BulkWriter bulkWriter) { this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); - this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWrtier"); + this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWriter"); } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java index 2a1478c81710..f2214c7257fb 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCalleeDao.java @@ -53,13 +53,13 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao { private final TimeSlot timeSlot; private final IgnoreStatFilter ignoreStatFilter; - private final BulkWriter bulkWriter; + private final BulkWriter bulkWriter; private final MapLinkConfiguration mapLinkConfiguration; public HbaseMapStatisticsCalleeDao(MapLinkConfiguration mapLinkConfiguration, IgnoreStatFilter ignoreStatFilter, AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, - @Qualifier("calleeBulkWriter") BulkWriter bulkWriter) { + @Qualifier("calleeBulkWriter") BulkWriter bulkWriter) { this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java index 2edf93f85fe5..9df88e5ff450 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsCallerDao.java @@ -51,18 +51,18 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao { private final AcceptedTimeService acceptedTimeService; private final TimeSlot timeSlot; - private final BulkWriter bulkWriter; + private final BulkWriter bulkWriter; private final MapLinkConfiguration mapLinkConfiguration; public HbaseMapStatisticsCallerDao(MapLinkConfiguration mapLinkConfiguration, AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, - @Qualifier("callerBulkWriter") BulkWriter bulkWriter) { + @Qualifier("callerBulkWriter") BulkWriter bulkWriter) { this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); - this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWrtier"); + this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWriter"); } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java index a453c006d10b..97c681833dfd 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java @@ -49,14 +49,16 @@ private BulkUpdater getBulkUpdater(String reporterName) { return bulkIncrementerFactory.wrap(bulkUpdater, bulkConfiguration.getCalleeLimitSize(), reporter); } - private BulkWriter newBulkWriter(String loggerName, - HbaseOperations hbaseTemplate, - HbaseAsyncTemplate asyncTemplate, - HbaseColumnFamily descriptor, - TableNameProvider tableNameProvider, - RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, - BulkIncrementer bulkIncrementer, - BulkUpdater bulkUpdater) { + private BulkWriter newBulkWriter( + String loggerName, + HbaseOperations hbaseTemplate, + HbaseAsyncTemplate asyncTemplate, + HbaseColumnFamily descriptor, + TableNameProvider tableNameProvider, + RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + BulkIncrementer bulkIncrementer, + BulkUpdater bulkUpdater + ) { if (bulkConfiguration.enableBulk()) { return new DefaultBulkWriter(loggerName, asyncTemplate, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater, descriptor, tableNameProvider); @@ -82,12 +84,12 @@ public BulkUpdater callerBulkUpdater() { @Bean - public BulkWriter callerBulkWriter(HbaseOperations hbaseTemplate, - HbaseAsyncTemplate asyncTemplate, - TableNameProvider tableNameProvider, - @Qualifier("statisticsCallerRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, - @Qualifier("callerBulkIncrementer") BulkIncrementer bulkIncrementer, - @Qualifier("callerBulkUpdater") BulkUpdater bulkUpdater) { + public BulkWriter callerBulkWriter(HbaseOperations hbaseTemplate, + HbaseAsyncTemplate asyncTemplate, + TableNameProvider tableNameProvider, + @Qualifier("statisticsCallerRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + @Qualifier("callerBulkIncrementer") BulkIncrementer bulkIncrementer, + @Qualifier("callerBulkUpdater") BulkUpdater bulkUpdater) { String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName()); return newBulkWriter(loggerName, hbaseTemplate, asyncTemplate, HbaseColumnFamily.MAP_STATISTICS_CALLEE_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater); } @@ -110,12 +112,12 @@ public BulkUpdater calleeBulkUpdater() { } @Bean - public BulkWriter calleeBulkWriter(HbaseOperations hbaseTemplate, - HbaseAsyncTemplate asyncTemplate, - TableNameProvider tableNameProvider, - @Qualifier("statisticsCalleeRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, - @Qualifier("calleeBulkIncrementer") BulkIncrementer bulkIncrementer, - @Qualifier("calleeBulkUpdater") BulkUpdater bulkUpdater) { + public BulkWriter calleeBulkWriter(HbaseOperations hbaseTemplate, + HbaseAsyncTemplate asyncTemplate, + TableNameProvider tableNameProvider, + @Qualifier("statisticsCalleeRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + @Qualifier("calleeBulkIncrementer") BulkIncrementer bulkIncrementer, + @Qualifier("calleeBulkUpdater") BulkUpdater bulkUpdater) { String loggerName = newBulkWriterName(HbaseMapStatisticsCalleeDao.class.getName()); return newBulkWriter(loggerName, hbaseTemplate, asyncTemplate, HbaseColumnFamily.MAP_STATISTICS_CALLER_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater); } @@ -136,12 +138,12 @@ public BulkUpdater selfBulkUpdater() { } @Bean - public BulkWriter selfBulkWriter(HbaseOperations hbaseTemplate, - HbaseAsyncTemplate asyncTemplate, - TableNameProvider tableNameProvider, - @Qualifier("statisticsSelfRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, - @Qualifier("selfBulkIncrementer") BulkIncrementer bulkIncrementer, - @Qualifier("selfBulkUpdater") BulkUpdater bulkUpdater) { + public BulkWriter selfBulkWriter(HbaseOperations hbaseTemplate, + HbaseAsyncTemplate asyncTemplate, + TableNameProvider tableNameProvider, + @Qualifier("statisticsSelfRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + @Qualifier("selfBulkIncrementer") BulkIncrementer bulkIncrementer, + @Qualifier("selfBulkUpdater") BulkUpdater bulkUpdater) { String loggerName = newBulkWriterName(HbaseMapResponseTimeDao.class.getName()); return newBulkWriter(loggerName, hbaseTemplate, asyncTemplate, HbaseColumnFamily.MAP_STATISTICS_SELF_VER2_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater); } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkWriter.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkWriter.java index 7f3c04597486..d4ae4de47378 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkWriter.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkWriter.java @@ -3,12 +3,12 @@ /** * @author emeroad */ -public interface BulkWriter { - void increment(RowKey rowKey, ColumnName columnName); +public interface BulkWriter { + void increment(K rowKey, V columnName); - void increment(RowKey rowKey, ColumnName columnName, long addition); + void increment(K rowKey, V columnName, long addition); - void updateMax(RowKey rowKey, ColumnName columnName, long value); + void updateMax(K rowKey, V columnName, long value); void flushLink(); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java index 42730d68990d..59bd34bed089 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkWriter.java @@ -20,7 +20,7 @@ /** * @author emeroad */ -public class DefaultBulkWriter implements BulkWriter { +public class DefaultBulkWriter implements BulkWriter { private final Logger logger; diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java index f55e26016d75..0b6881125953 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/SyncWriter.java @@ -14,7 +14,7 @@ /** * @author emeroad */ -public class SyncWriter implements BulkWriter { +public class SyncWriter implements BulkWriter { private final HbaseOperations hbaseTemplate; diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanChunkHandler.java b/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanChunkHandler.java index 9cd85317f51d..64a7e04b7372 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanChunkHandler.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanChunkHandler.java @@ -49,7 +49,12 @@ public class GrpcSpanChunkHandler implements SimpleHandler { private final Sampler sampler; - public GrpcSpanChunkHandler(TraceService[] traceServices, GrpcSpanFactory spanFactory, AcceptedTimeService acceptedTimeService, SpanSamplerFactory spanSamplerFactory) { + public GrpcSpanChunkHandler( + TraceService[] traceServices, + GrpcSpanFactory spanFactory, + AcceptedTimeService acceptedTimeService, + SpanSamplerFactory spanSamplerFactory + ) { this.traceServices = Objects.requireNonNull(traceServices, "traceServices"); this.spanFactory = Objects.requireNonNull(spanFactory, "spanFactory"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanHandler.java b/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanHandler.java index 2811dd9dd369..5c79d8b53d5c 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanHandler.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/handler/grpc/GrpcSpanHandler.java @@ -65,7 +65,12 @@ public class GrpcSpanHandler implements SimpleHandler { private final Sampler sampler; - public GrpcSpanHandler(TraceService[] traceServices, GrpcSpanFactory spanFactory, AcceptedTimeService acceptedTimeService, SpanSamplerFactory spanSamplerFactory) { + public GrpcSpanHandler( + TraceService[] traceServices, + GrpcSpanFactory spanFactory, + AcceptedTimeService acceptedTimeService, + SpanSamplerFactory spanSamplerFactory + ) { this.traceServices = Objects.requireNonNull(traceServices, "traceServices"); this.spanFactory = Objects.requireNonNull(spanFactory, "spanFactory"); this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");