diff --git a/agent-module/agent-testweb/pom.xml b/agent-module/agent-testweb/pom.xml
index 5c172c7b0ddbc..e0453bf664e53 100644
--- a/agent-module/agent-testweb/pom.xml
+++ b/agent-module/agent-testweb/pom.xml
@@ -90,6 +90,8 @@
resilience4j-plugin-testweb
closed-module-testweb
closed-module-testlib
+ spring-boot3-testweb
+ spring-boot3-webflux-plugin-testweb
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java b/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java
index 16018edc01c24..efe67ae878318 100644
--- a/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java
@@ -35,7 +35,8 @@
SchedulerConfiguration.class,
})
@ComponentScan({
- "com.navercorp.pinpoint.collector.dao.hbase"
+ "com.navercorp.pinpoint.collector.dao.hbase",
+ "com.navercorp.pinpoint.collector.applicationmap.dao.hbase"
})
@PropertySource(name = "CollectorHbaseModule", value = {
"classpath:hbase-root.properties",
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java b/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java
index 0e916d0277447..299c59e68e5fa 100644
--- a/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/PinpointCollectorModule.java
@@ -49,6 +49,7 @@
"com.navercorp.pinpoint.collector.mapper",
"com.navercorp.pinpoint.collector.util",
"com.navercorp.pinpoint.collector.service",
+ "com.navercorp.pinpoint.collector.applicationmap.service",
"com.navercorp.pinpoint.collector.controller",
})
public class PinpointCollectorModule {
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/InboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/InboundDao.java
new file mode 100644
index 0000000000000..60eedead2bfb5
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/InboundDao.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao;
+
+import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+
+/**
+ * @author intr3p1d
+ */
+public interface InboundDao extends CachedStatisticsDao {
+ // src -> dest
+ // inbound (rowKey dest <- columnName src)
+ // outbound (rowKey src -> columnName dest)
+ void update(
+ String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
+ String destServiceName, String destApplicationName, ServiceType destApplicationType,
+ String srcHost, int elapsed, boolean isError
+ );
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/OutboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/OutboundDao.java
new file mode 100644
index 0000000000000..eb2a6fa05b6c5
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/OutboundDao.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao;
+
+import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+
+/**
+ * @author intr3p1d
+ */
+public interface OutboundDao extends CachedStatisticsDao {
+ // src -> dest
+ // inbound (rowKey dest <- columnName src)
+ // outbound (rowKey src -> columnName dest)
+ void update(
+ String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
+ String destServiceName, String destApplicationName, ServiceType destApplicationType,
+ String srcHost, int elapsed, boolean isError
+ );
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/SelfDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/SelfDao.java
new file mode 100644
index 0000000000000..fdd6e0492b25a
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/SelfDao.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao;
+
+import com.navercorp.pinpoint.collector.dao.CachedStatisticsDao;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+
+/**
+ * @author intr3p1d
+ */
+public interface SelfDao extends CachedStatisticsDao {
+ void received(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
+ void updatePing(String serviceName, String applicationName, ServiceType applicationType, int elapsed, boolean isError);
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseInboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseInboundDao.java
new file mode 100644
index 0000000000000..018eb191c5015
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseInboundDao.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao.hbase;
+
+import com.navercorp.pinpoint.collector.applicationmap.dao.InboundDao;
+import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
+import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapColumnName;
+import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapRowKey;
+import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
+import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
+import com.navercorp.pinpoint.common.server.util.TimeSlot;
+import com.navercorp.pinpoint.common.trace.HistogramSchema;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Repository;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+@Repository
+public class HbaseInboundDao implements InboundDao {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final AcceptedTimeService acceptedTimeService;
+
+ private final TimeSlot timeSlot;
+ private final IgnoreStatFilter ignoreStatFilter;
+ private final BulkWriter bulkWriter;
+ private final MapLinkConfiguration mapLinkConfiguration;
+
+ public HbaseInboundDao(
+ MapLinkConfiguration mapLinkConfiguration,
+ IgnoreStatFilter ignoreStatFilter,
+ AcceptedTimeService acceptedTimeService,
+ TimeSlot timeSlot,
+ @Qualifier("inboundBulkWriter") BulkWriter bulkWriter
+ ) {
+ this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
+ this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter");
+ this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
+ this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");
+
+ this.bulkWriter = Objects.requireNonNull(bulkWriter, "inboundBulkWriter");
+ }
+
+
+ @Override
+ public void update(
+ String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
+ String destServiceName, String destApplicationName, ServiceType destApplicationType,
+ String srcHost, int elapsed, boolean isError
+ ) {
+ Objects.requireNonNull(srcServiceName, "srcServiceName");
+ Objects.requireNonNull(destServiceName, "destServiceName");
+ Objects.requireNonNull(srcApplicationName, "srcApplicationName");
+ Objects.requireNonNull(destServiceName, "destApplicationName");
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("[Inbound] {} {}({}) <- {} {}({})[{}]",
+ destServiceName, destApplicationName, destApplicationType,
+ srcServiceName, srcApplicationName, srcApplicationType, srcHost
+ );
+ }
+
+
+ // TODO dest, src parameter normalization
+ if (ignoreStatFilter.filter(srcApplicationType, srcHost)) {
+ logger.debug("[Ignore-Inbound] {} {}({}) <- {} {}({})[{}]",
+ destServiceName, destApplicationName, destApplicationType,
+ srcServiceName, srcApplicationName, srcApplicationType, srcHost
+ );
+ return;
+ }
+
+ final long acceptedTime = acceptedTimeService.getAcceptedTime();
+ final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);
+
+ // rowKey is dest in inbound
+ final RowKey destRowKey = new ApplicationMapRowKey(destServiceName, destApplicationType.getCode(), destApplicationName, rowTimeSlot);
+
+ // columnName is src in outbound
+ final short srcSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(srcApplicationType, elapsed, isError);
+ HistogramSchema histogramSchema = srcApplicationType.getHistogramSchema();
+
+ final ColumnName srcColumnName = new ApplicationMapColumnName(srcServiceName, srcApplicationType.getCode(), srcApplicationName, srcSlotNumber);
+ this.bulkWriter.increment(destRowKey, srcColumnName);
+
+ if (mapLinkConfiguration.isEnableAvg()) {
+ final ColumnName sumColumnName = new ApplicationMapColumnName(srcServiceName, srcApplicationType.getCode(), srcApplicationName, histogramSchema.getSumStatSlot().getSlotTime());
+ this.bulkWriter.increment(destRowKey, sumColumnName, elapsed);
+ }
+ if (mapLinkConfiguration.isEnableMax()) {
+ final ColumnName maxColumnName = new ApplicationMapColumnName(srcServiceName, srcApplicationType.getCode(), srcApplicationName, histogramSchema.getMaxStatSlot().getSlotTime());
+ this.bulkWriter.updateMax(destRowKey, maxColumnName, elapsed);
+ }
+
+ }
+
+ @Override
+ public void flushLink() {
+ this.bulkWriter.flushLink();
+ }
+
+ @Override
+ public void flushAvgMax() {
+ this.bulkWriter.flushAvgMax();
+ }
+
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseOutboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseOutboundDao.java
new file mode 100644
index 0000000000000..da18a50079d95
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseOutboundDao.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao.hbase;
+
+import com.navercorp.pinpoint.collector.applicationmap.dao.OutboundDao;
+import com.navercorp.pinpoint.collector.dao.hbase.IgnoreStatFilter;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
+import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapColumnName;
+import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapRowKey;
+import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
+import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
+import com.navercorp.pinpoint.common.server.util.TimeSlot;
+import com.navercorp.pinpoint.common.trace.HistogramSchema;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Repository;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+@Repository
+public class HbaseOutboundDao implements OutboundDao {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final AcceptedTimeService acceptedTimeService;
+
+ private final TimeSlot timeSlot;
+
+ private final BulkWriter bulkWriter;
+ private final MapLinkConfiguration mapLinkConfiguration;
+
+ public HbaseOutboundDao(
+ MapLinkConfiguration mapLinkConfiguration,
+ IgnoreStatFilter ignoreStatFilter,
+ AcceptedTimeService acceptedTimeService, TimeSlot timeSlot,
+ @Qualifier("outboundBulkWriter") BulkWriter bulkWriter
+ ) {
+ this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
+ this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
+ this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");
+
+ this.bulkWriter = Objects.requireNonNull(bulkWriter, "outboundBulkWriter");
+ }
+
+
+ @Override
+ public void update(
+ String srcServiceName, String srcApplicationName, ServiceType srcApplicationType,
+ String destServiceName, String destApplicationName, ServiceType destApplicationType,
+ String srcHost, int elapsed, boolean isError
+ ) {
+ // outbound (rowKey src -> columnName dest)
+ Objects.requireNonNull(destServiceName, "destServiceName");
+ Objects.requireNonNull(srcServiceName, "srcServiceName");
+ Objects.requireNonNull(destApplicationName, "destApplicationName");
+ Objects.requireNonNull(srcServiceName, "srcApplicationName");
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("[Outbound] {} {}({})[{}] -> {} {}({})",
+ srcServiceName, srcApplicationName, srcApplicationType, srcHost,
+ destServiceName, destApplicationName, destApplicationType
+ );
+ }
+
+ final long acceptedTime = acceptedTimeService.getAcceptedTime();
+ final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);
+
+ // rowKey is src in outbound
+ final RowKey srcRowKey = new ApplicationMapRowKey(srcServiceName, srcApplicationType.getCode(), srcApplicationName, rowTimeSlot);
+
+ // columnName is dest in outbound
+ final short destSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(destApplicationType, elapsed, isError);
+ HistogramSchema histogramSchema = destApplicationType.getHistogramSchema();
+
+ final ColumnName destColumnName = new ApplicationMapColumnName(destServiceName, destApplicationType.getCode(), destApplicationName, destSlotNumber);
+ this.bulkWriter.increment(srcRowKey, destColumnName);
+
+ if (mapLinkConfiguration.isEnableAvg()) {
+ final ColumnName sumColumnName = new ApplicationMapColumnName(destServiceName, destApplicationType.getCode(), destApplicationName, histogramSchema.getSumStatSlot().getSlotTime());
+ this.bulkWriter.increment(srcRowKey, sumColumnName, elapsed);
+ }
+ if (mapLinkConfiguration.isEnableMax()) {
+ final ColumnName maxColumnName = new ApplicationMapColumnName(destServiceName, destApplicationType.getCode(), destApplicationName, histogramSchema.getMaxStatSlot().getSlotTime());
+ this.bulkWriter.updateMax(srcRowKey, maxColumnName, elapsed);
+ }
+ }
+
+
+ @Override
+ public void flushLink() {
+ this.bulkWriter.flushLink();
+ }
+
+ @Override
+ public void flushAvgMax() {
+ this.bulkWriter.flushAvgMax();
+ }
+
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseSelfDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseSelfDao.java
new file mode 100644
index 0000000000000..5e5ca235797dc
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/HbaseSelfDao.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao.hbase;
+
+import com.navercorp.pinpoint.collector.applicationmap.dao.SelfDao;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
+import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapRowKey;
+import com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics.ApplicationMapSelfColumnName;
+import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
+import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
+import com.navercorp.pinpoint.common.server.util.TimeSlot;
+import com.navercorp.pinpoint.common.trace.HistogramSchema;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Repository;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+@Repository
+public class HbaseSelfDao implements SelfDao {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final AcceptedTimeService acceptedTimeService;
+
+ private final TimeSlot timeSlot;
+ private final BulkWriter bulkWriter;
+ private final MapLinkConfiguration mapLinkConfiguration;
+
+ public HbaseSelfDao(MapLinkConfiguration mapLinkConfiguration,
+ AcceptedTimeService acceptedTimeService, TimeSlot timeSlot,
+ @Qualifier("applicationMapSelfBulkWriter") BulkWriter bulkWriter) {
+ this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration");
+ this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService");
+ this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot");
+ this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWriter");
+ }
+
+
+ @Override
+ public void received(
+ String serviceName, String applicationName, ServiceType applicationType,
+ int elapsed, boolean isError
+ ) {
+ Objects.requireNonNull(serviceName, "serviceName");
+ Objects.requireNonNull(applicationName, "applicationName");
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("[Received] {} {} ({})", serviceName, applicationName, applicationType);
+ }
+
+ // make row key. rowkey is me
+ final long acceptedTime = acceptedTimeService.getAcceptedTime();
+ final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);
+ final RowKey selfRowKey = new ApplicationMapRowKey(serviceName, applicationType.getCode(), applicationName, rowTimeSlot);
+
+ final short slotNumber = ApplicationMapStatisticsUtils.getSlotNumber(applicationType, elapsed, isError);
+ final ColumnName selfColumnName = new ApplicationMapSelfColumnName(applicationName, applicationType.getCode(), slotNumber);
+ this.bulkWriter.increment(selfRowKey, selfColumnName);
+
+ HistogramSchema histogramSchema = applicationType.getHistogramSchema();
+ if (mapLinkConfiguration.isEnableAvg()) {
+ final ColumnName sumColumnName = new ApplicationMapSelfColumnName(applicationName, applicationType.getCode(), histogramSchema.getSumStatSlot().getSlotTime());
+ this.bulkWriter.increment(selfRowKey, sumColumnName, elapsed);
+ }
+
+ final ColumnName maxColumnName = new ApplicationMapSelfColumnName(applicationName, applicationType.getCode(), histogramSchema.getMaxStatSlot().getSlotTime());
+ if (mapLinkConfiguration.isEnableMax()) {
+ this.bulkWriter.updateMax(selfRowKey, maxColumnName, elapsed);
+ }
+ }
+
+ @Override
+ public void updatePing(
+ String serviceName, String applicationName, ServiceType applicationType,
+ int elapsed, boolean isError
+ ) {
+ Objects.requireNonNull(serviceName, "serviceName");
+ Objects.requireNonNull(applicationName, "applicationName");
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("[Received] {} {} ({})", serviceName, applicationName, applicationType);
+ }
+
+ // make row key. rowkey is me
+ final long acceptedTime = acceptedTimeService.getAcceptedTime();
+ final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime);
+ final RowKey selfRowKey = new ApplicationMapRowKey(serviceName, applicationType.getCode(), applicationName, rowTimeSlot);
+
+ final short slotNumber = ApplicationMapStatisticsUtils.getPingSlotNumber(applicationType, elapsed, isError);
+ final ColumnName selfColumnName = new ApplicationMapSelfColumnName(applicationName, applicationType.getCode(), slotNumber);
+ this.bulkWriter.increment(selfRowKey, selfColumnName);
+ }
+
+
+ @Override
+ public void flushLink() {
+ this.bulkWriter.flushLink();
+ }
+
+ @Override
+ public void flushAvgMax() {
+ this.bulkWriter.flushAvgMax();
+ }
+
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapBulkFactory.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapBulkFactory.java
new file mode 100644
index 0000000000000..c3d5ea3ce66b4
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapBulkFactory.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics;
+
+import com.navercorp.pinpoint.collector.dao.hbase.BulkOperationReporter;
+import com.navercorp.pinpoint.collector.dao.hbase.HbaseMapStatisticsCallerDao;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkConfiguration;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkIncrementer;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkIncrementerFactory;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkOperationReporterFactory;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkUpdater;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.DefaultBulkIncrementer;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.DefaultBulkUpdater;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.DefaultBulkWriter;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKeyMerge;
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.SyncWriter;
+import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
+import com.navercorp.pinpoint.common.hbase.HbaseOperations;
+import com.navercorp.pinpoint.common.hbase.TableNameProvider;
+import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+@Configuration
+public class ApplicationMapBulkFactory {
+ private final BulkConfiguration bulkConfiguration;
+ private final BulkIncrementerFactory bulkIncrementerFactory;
+ private final BulkOperationReporterFactory bulkOperationReporterFactory;
+
+ public ApplicationMapBulkFactory(BulkConfiguration bulkConfiguration,
+ BulkIncrementerFactory bulkIncrementerFactory,
+ BulkOperationReporterFactory bulkOperationReporterFactory) {
+ this.bulkConfiguration = Objects.requireNonNull(bulkConfiguration, "bulkConfiguration");
+ this.bulkIncrementerFactory = Objects.requireNonNull(bulkIncrementerFactory, "bulkIncrementerFactory");
+ this.bulkOperationReporterFactory = Objects.requireNonNull(bulkOperationReporterFactory, "bulkOperationReporterFactory");
+ }
+
+ private BulkIncrementer newBulkIncrementer(String reporterName, HbaseColumnFamily hbaseColumnFamily, int limitSize) {
+ BulkOperationReporter reporter = bulkOperationReporterFactory.getBulkOperationReporter(reporterName);
+ RowKeyMerge merge = new RowKeyMerge(hbaseColumnFamily);
+ BulkIncrementer bulkIncrementer = new DefaultBulkIncrementer(merge);
+
+ return bulkIncrementerFactory.wrap(bulkIncrementer, limitSize, reporter);
+ }
+
+
+ private BulkUpdater getBulkUpdater(String reporterName) {
+ BulkOperationReporter reporter = bulkOperationReporterFactory.getBulkOperationReporter(reporterName);
+ BulkUpdater bulkUpdater = new DefaultBulkUpdater();
+ return bulkIncrementerFactory.wrap(bulkUpdater, bulkConfiguration.getCalleeLimitSize(), reporter);
+ }
+
+ private BulkWriter newBulkWriter(String loggerName,
+ HbaseOperations hbaseTemplate,
+ HbaseColumnFamily descriptor,
+ TableNameProvider tableNameProvider,
+ RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
+ BulkIncrementer bulkIncrementer,
+ BulkUpdater bulkUpdater) {
+ if (bulkConfiguration.enableBulk()) {
+ return new DefaultBulkWriter(loggerName, hbaseTemplate, rowKeyDistributorByHashPrefix,
+ bulkIncrementer, bulkUpdater, descriptor, tableNameProvider);
+ } else {
+ return new SyncWriter(loggerName, hbaseTemplate, rowKeyDistributorByHashPrefix, descriptor, tableNameProvider);
+ }
+ }
+
+ private static String newBulkWriterName(String className) {
+ return className + "-writer";
+ }
+
+ @Bean
+ public BulkIncrementer inboundBulkIncrementer() {
+ String reporterName = "inboundBulkIncrementerReporter";
+ HbaseColumnFamily hbaseColumnFamily = HbaseColumnFamily.MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER;
+ int limitSize = bulkConfiguration.getCallerLimitSize();
+
+ return newBulkIncrementer(reporterName, hbaseColumnFamily, limitSize);
+ }
+
+ @Bean
+ public BulkUpdater inboundBulkUpdater() {
+ String reporterName = "inboundBulkUpdaterReporter";
+ return getBulkUpdater(reporterName);
+ }
+
+ @Bean
+ public BulkWriter inboundBulkWriter(HbaseOperations hbaseTemplate,
+ TableNameProvider tableNameProvider,
+ @Qualifier("applicationMapInboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
+ @Qualifier("inboundBulkIncrementer") BulkIncrementer bulkIncrementer,
+ @Qualifier("inboundBulkUpdater") BulkUpdater bulkUpdater) {
+ String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName());
+ return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
+ }
+
+
+ @Bean
+ public BulkIncrementer outboundBulkIncrementer() {
+ String reporterName = "outboundBulkIncrementerReporter";
+ HbaseColumnFamily hbaseColumnFamily = HbaseColumnFamily.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER;
+ int limitSize = bulkConfiguration.getCallerLimitSize();
+
+ return newBulkIncrementer(reporterName, hbaseColumnFamily, limitSize);
+ }
+
+ @Bean
+ public BulkUpdater outboundBulkUpdater() {
+ String reporterName = "outboundBulkUpdaterReporter";
+ return getBulkUpdater(reporterName);
+ }
+
+
+ @Bean
+ public BulkWriter outboundBulkWriter(HbaseOperations hbaseTemplate,
+ TableNameProvider tableNameProvider,
+ @Qualifier("applicationMapOutboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
+ @Qualifier("outboundBulkIncrementer") BulkIncrementer bulkIncrementer,
+ @Qualifier("outboundBulkUpdater") BulkUpdater bulkUpdater) {
+ String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName());
+ return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
+ }
+
+
+ @Bean
+ public BulkIncrementer applicationMapSelfBulkIncrementer() {
+ String reporterName = "applicationMapSelfBulkIncrementerReporter";
+ HbaseColumnFamily hbaseColumnFamily = HbaseColumnFamily.MAP_STATISTICS_SELF_SERVICE_GROUP_COUNTER;
+ int limitSize = bulkConfiguration.getCallerLimitSize();
+
+ return newBulkIncrementer(reporterName, hbaseColumnFamily, limitSize);
+ }
+
+ @Bean
+ public BulkUpdater applicationMapSelfBulkUpdater() {
+ String reporterName = "ServiceGroupSelfBulkUpdaterReporter";
+ return getBulkUpdater(reporterName);
+ }
+
+ @Bean
+ public BulkWriter applicationMapSelfBulkWriter(HbaseOperations hbaseTemplate,
+ TableNameProvider tableNameProvider,
+ @Qualifier("applicationMapSelfRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix,
+ @Qualifier("applicationMapSelfBulkIncrementer") BulkIncrementer bulkIncrementer,
+ @Qualifier("applicationMapSelfBulkUpdater") BulkUpdater bulkUpdater) {
+ String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName());
+ return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_SELF_SERVICE_GROUP_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater);
+ }
+
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapColumnName.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapColumnName.java
new file mode 100644
index 0000000000000..4c6317520434d
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapColumnName.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics;
+
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
+import com.navercorp.pinpoint.common.server.applicationmap.util.ApplicationMapUtils;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+public class ApplicationMapColumnName implements ColumnName {
+
+ private final String serviceName;
+ private final short applicationTypeCode;
+ private final String applicationName;
+ private final short columnSlotNumber;
+
+ // WARNING - cached hash value should not be included for equals/hashCode
+ private int hash;
+ private long callCount;
+
+ public ApplicationMapColumnName(
+ String serviceName,
+ short applicationTypeCode, String applicationName,
+ short columnSlotNumber
+ ) {
+ this.serviceName = Objects.requireNonNull(serviceName, "serviceName");
+ this.applicationTypeCode = applicationTypeCode;
+ this.applicationName = Objects.requireNonNull(applicationName, "applicationName");
+ this.columnSlotNumber = columnSlotNumber;
+ }
+
+ @Override
+ public byte[] getColumnName() {
+ return ApplicationMapUtils.makeColumnName(
+ serviceName, applicationName, applicationTypeCode, columnSlotNumber
+ );
+ }
+
+ @Override
+ public long getCallCount() {
+ return callCount;
+ }
+
+ @Override
+ public void setCallCount(long callCount) {
+ this.callCount = callCount;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ApplicationMapColumnName that = (ApplicationMapColumnName) o;
+
+ if (applicationTypeCode != that.applicationTypeCode) return false;
+ if (columnSlotNumber != that.columnSlotNumber) return false;
+ if (hash != that.hash) return false;
+ if (callCount != that.callCount) return false;
+ if (!serviceName.equals(that.serviceName)) return false;
+ return applicationName.equals(that.applicationName);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = serviceName.hashCode();
+ result = 31 * result + (int) applicationTypeCode;
+ result = 31 * result + applicationName.hashCode();
+ result = 31 * result + (int) columnSlotNumber;
+ result = 31 * result + hash;
+ result = 31 * result + (int) (callCount ^ (callCount >>> 32));
+ return result;
+ }
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapRowKey.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapRowKey.java
new file mode 100644
index 0000000000000..c7ede23d7f412
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapRowKey.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics;
+
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey;
+import com.navercorp.pinpoint.common.server.applicationmap.util.ApplicationMapUtils;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+public class ApplicationMapRowKey implements RowKey {
+ private final String serviceName;
+ private final short applicationType;
+ private final String applicationName;
+ private final long rowTimeSlot;
+
+ // WARNING - cached hash value should not be included for equals/hashCode
+ private int hash;
+
+ public ApplicationMapRowKey(
+ String serviceName,
+ short applicationType, String applicationName,
+ long rowTimeSlot
+ ) {
+ this.serviceName = Objects.requireNonNull(serviceName, "serviceName");
+ this.applicationType = applicationType;
+ this.applicationName = Objects.requireNonNull(applicationName, "applicationName");
+ this.rowTimeSlot = rowTimeSlot;
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return ApplicationMapUtils.makeRowKey(serviceName, applicationName, applicationType, rowTimeSlot);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ApplicationMapRowKey that = (ApplicationMapRowKey) o;
+
+ if (applicationType != that.applicationType) return false;
+ if (rowTimeSlot != that.rowTimeSlot) return false;
+ if (hash != that.hash) return false;
+ if (!serviceName.equals(that.serviceName)) return false;
+ return applicationName.equals(that.applicationName);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = serviceName.hashCode();
+ result = 31 * result + (int) applicationType;
+ result = 31 * result + applicationName.hashCode();
+ result = 31 * result + (int) (rowTimeSlot ^ (rowTimeSlot >>> 32));
+ result = 31 * result + hash;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ServiceGroupRowKey{" +
+ "callServiceGroup='" + serviceName + '\'' +
+ ", thisServiceType=" + applicationType +
+ ", thisApplicationName='" + applicationName + '\'' +
+ ", rowTimeSlot=" + rowTimeSlot +
+ ", hash=" + hash +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapSelfColumnName.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapSelfColumnName.java
new file mode 100644
index 0000000000000..e33c89f0b4862
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/dao/hbase/statistics/ApplicationMapSelfColumnName.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.dao.hbase.statistics;
+
+import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName;
+import com.navercorp.pinpoint.common.server.applicationmap.util.ApplicationMapUtils;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+public class ApplicationMapSelfColumnName implements ColumnName {
+
+ private final String applicationName;
+ private final short applicationTypeCode;
+ private final short columnSlotNumber;
+
+ // WARNING - cached hash value should not be included for equals/hashCode
+ private int hash;
+
+ private long callCount;
+
+ public ApplicationMapSelfColumnName(String applicationName, short applicationTypeCode, short columnSlotNumber) {
+ this.applicationName = Objects.requireNonNull(applicationName, "applicationName");
+ this.applicationTypeCode = applicationTypeCode;
+ this.columnSlotNumber = columnSlotNumber;
+ }
+
+ public long getCallCount() {
+ return callCount;
+ }
+
+ public void setCallCount(long callCount) {
+ this.callCount = callCount;
+ }
+
+ public byte[] getColumnName() {
+ return ApplicationMapUtils.makeSelfColumnName(
+ applicationName, applicationTypeCode, columnSlotNumber
+ );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ApplicationMapSelfColumnName that = (ApplicationMapSelfColumnName) o;
+
+ if (applicationTypeCode != that.applicationTypeCode) return false;
+ if (columnSlotNumber != that.columnSlotNumber) return false;
+ if (hash != that.hash) return false;
+ if (callCount != that.callCount) return false;
+ return applicationName.equals(that.applicationName);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = applicationName.hashCode();
+ result = 31 * result + (int) applicationTypeCode;
+ result = 31 * result + (int) columnSlotNumber;
+ result = 31 * result + hash;
+ result = 31 * result + (int) (callCount ^ (callCount >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ServiceResponseColumnName{" +
+ "applicationName='" + applicationName + '\'' +
+ ", columnSlotNumber=" + columnSlotNumber +
+ ", hash=" + hash +
+ ", callCount=" + callCount +
+ '}';
+ }
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/ApplicationMapService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/ApplicationMapService.java
new file mode 100644
index 0000000000000..d88a2d4118432
--- /dev/null
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/applicationmap/service/ApplicationMapService.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.collector.applicationmap.service;
+
+import com.navercorp.pinpoint.collector.applicationmap.dao.InboundDao;
+import com.navercorp.pinpoint.collector.applicationmap.dao.OutboundDao;
+import com.navercorp.pinpoint.collector.applicationmap.dao.SelfDao;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+import jakarta.validation.constraints.NotBlank;
+import org.springframework.stereotype.Service;
+import org.springframework.validation.annotation.Validated;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+@Service
+@Validated
+public class ApplicationMapService {
+
+ private final InboundDao inboundDao;
+ private final OutboundDao outboundDao;
+ private final SelfDao selfDao;
+
+ public ApplicationMapService(
+ InboundDao inboundDao,
+ OutboundDao outboundDao,
+ SelfDao selfDao
+ ) {
+ this.inboundDao = Objects.requireNonNull(inboundDao, "inboundDao");
+ this.outboundDao = Objects.requireNonNull(outboundDao, "outboundDao");
+ this.selfDao = Objects.requireNonNull(selfDao, "selfDao");
+ }
+
+ public void updateBidirectional(
+ @NotBlank String srcServiceGroup,
+ @NotBlank String srcApplicationName, ServiceType srcServiceType,
+ @NotBlank String srcHost,
+ @NotBlank String destServiceGroup,
+ @NotBlank String destApplicationName, ServiceType destServiceType,
+ @NotBlank String destHost,
+ int elapsed, boolean isError
+ ) {
+ // src -> dest
+ // inbound (rowKey dest <- columnName src)
+ // outbound (rowKey src -> columnName dest)
+
+ updateOutbound(
+ srcServiceGroup, srcApplicationName, srcServiceType,
+ destServiceGroup, destApplicationName, destServiceType,
+ srcHost, elapsed, isError
+ );
+
+ updateInbound(
+ srcServiceGroup, srcApplicationName, srcServiceType,
+ destServiceGroup, destApplicationName, destServiceType,
+ srcHost, elapsed, isError
+ );
+
+ }
+
+
+ public void updateInbound(
+ @NotBlank String srcServiceGroup, @NotBlank String srcApplicationName, ServiceType srcServiceType,
+ @NotBlank String destServiceGroup, @NotBlank String destApplicationName, ServiceType destServiceType,
+ @NotBlank String srcHost, int elapsed, boolean isError
+ ) {
+ // inbound (rowKey dest <- columnName src)
+ inboundDao.update(
+ srcServiceGroup, srcApplicationName, srcServiceType,
+ destServiceGroup, destApplicationName, destServiceType,
+ srcHost, elapsed, isError
+ );
+ }
+
+ public void updateOutbound(
+ @NotBlank String srcServiceGroup, @NotBlank String srcApplicationName, ServiceType srcServiceType,
+ @NotBlank String destServiceGroup, @NotBlank String destApplicationName, ServiceType destServiceType,
+ @NotBlank String srcHost, int elapsed, boolean isError
+ ) {
+ // outbound (rowKey src -> columnName dest)
+ outboundDao.update(
+ srcServiceGroup, srcApplicationName, srcServiceType,
+ destServiceGroup, destApplicationName, destServiceType,
+ srcHost, elapsed, isError
+ );
+ }
+
+ public void updateSelfResponseTime(
+ @NotBlank String serviceGroup, @NotBlank String applicationName, ServiceType applicationServiceType,
+ int elapsed, boolean isError
+ ) {
+ selfDao.received(
+ serviceGroup, applicationName, applicationServiceType, elapsed, isError
+ );
+ }
+
+ public void updateAgentState(
+ @NotBlank String serviceGroup, @NotBlank String applicationName, ServiceType applicationServiceType
+ ) {
+ selfDao.updatePing(
+ serviceGroup, applicationName, applicationServiceType, 0, false
+ );
+ }
+}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkIncrementer.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkIncrementer.java
index c62acba7c4188..0f8bcfa4ae9bd 100644
--- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkIncrementer.java
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/DefaultBulkIncrementer.java
@@ -25,13 +25,13 @@
import java.util.Map;
import java.util.Objects;
-class DefaultBulkIncrementer implements BulkIncrementer {
+public class DefaultBulkIncrementer implements BulkIncrementer {
private final RowKeyMerge rowKeyMerge;
private final AtomicLongMap counter = AtomicLongMap.create();
- DefaultBulkIncrementer(RowKeyMerge rowKeyMerge) {
+ public DefaultBulkIncrementer(RowKeyMerge rowKeyMerge) {
this.rowKeyMerge = Objects.requireNonNull(rowKeyMerge, "rowKeyMerge");
}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java
index a1a3511b9cbe8..26479f9833227 100644
--- a/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java
@@ -16,6 +16,7 @@
package com.navercorp.pinpoint.collector.service;
+import com.navercorp.pinpoint.collector.applicationmap.service.ApplicationMapService;
import com.navercorp.pinpoint.collector.dao.ApplicationTraceIndexDao;
import com.navercorp.pinpoint.collector.dao.HostApplicationMapDao;
import com.navercorp.pinpoint.collector.dao.TraceDao;
@@ -57,6 +58,8 @@ public class HbaseTraceService implements TraceService {
private final StatisticsService statisticsService;
+ private final ApplicationMapService applicationMapService;
+
private final ServiceTypeRegistryService registry;
private final SpanStorePublisher publisher;
@@ -66,6 +69,7 @@ public HbaseTraceService(TraceDao traceDao,
ApplicationTraceIndexDao applicationTraceIndexDao,
HostApplicationMapDao hostApplicationMapDao,
StatisticsService statisticsService,
+ ApplicationMapService applicationMapService,
ServiceTypeRegistryService registry,
SpanStorePublisher spanStorePublisher,
@Qualifier("grpcSpanServerExecutor") Executor grpcSpanServerExecutor) {
@@ -73,6 +77,7 @@ public HbaseTraceService(TraceDao traceDao,
this.applicationTraceIndexDao = Objects.requireNonNull(applicationTraceIndexDao, "applicationTraceIndexDao");
this.hostApplicationMapDao = Objects.requireNonNull(hostApplicationMapDao, "hostApplicationMapDao");
this.statisticsService = Objects.requireNonNull(statisticsService, "statisticsService");
+ this.applicationMapService = Objects.requireNonNull(applicationMapService, "serviceGroupMapService");
this.registry = Objects.requireNonNull(registry, "registry");
this.publisher = Objects.requireNonNull(spanStorePublisher, "spanStorePublisher");
this.grpcSpanServerExecutor = Objects.requireNonNull(grpcSpanServerExecutor, "grpcSpanServerExecutor");
@@ -166,14 +171,33 @@ private void insertSpanStat(SpanBo span) {
if (spanServiceType.isQueue()) {
// create virtual queue node
statisticsService.updateCaller(span.getAcceptorHost(), spanServiceType, span.getRemoteAddr(), span.getApplicationId(), applicationServiceType, span.getEndPoint(), span.getElapsed(), isError);
-
statisticsService.updateCallee(span.getApplicationId(), applicationServiceType, span.getAcceptorHost(), spanServiceType, span.getAgentId(), span.getElapsed(), isError);
+
+ applicationMapService.updateBidirectional(
+ "default",
+ span.getApplicationId(), applicationServiceType,
+ span.getAgentId(),
+ "default",
+ span.getAcceptorHost(), spanServiceType,
+ span.getEndPoint(),
+ span.getElapsed(), isError
+ );
+
} else {
// create virtual user
statisticsService.updateCaller(span.getApplicationId(), ServiceType.USER, span.getAgentId(), span.getApplicationId(), applicationServiceType, span.getAgentId(), span.getElapsed(), isError);
-
// update the span information of the current node (self)
statisticsService.updateCallee(span.getApplicationId(), applicationServiceType, span.getApplicationId(), ServiceType.USER, span.getAgentId(), span.getElapsed(), isError);
+
+ applicationMapService.updateBidirectional(
+ "default",
+ span.getApplicationId(), ServiceType.USER,
+ span.getAgentId(),
+ "default",
+ span.getApplicationId(), applicationServiceType,
+ span.getAgentId(),
+ span.getElapsed(), isError
+ );
}
bugCheck++;
}
@@ -196,12 +220,23 @@ private void insertSpanStat(SpanBo span) {
// emulate virtual queue node's send SpanEvent
statisticsService.updateCaller(span.getAcceptorHost(), spanServiceType, span.getRemoteAddr(), span.getApplicationId(), applicationServiceType, span.getEndPoint(), span.getElapsed(), isError);
+ applicationMapService.updateOutbound(
+ "default", span.getAcceptorHost(), spanServiceType,
+ "default", span.getApplicationId(), applicationServiceType,
+ span.getRemoteAddr(), span.getElapsed(), isError
+ );
+
parentApplicationName = span.getAcceptorHost();
parentApplicationType = spanServiceType;
}
}
statisticsService.updateCallee(span.getApplicationId(), applicationServiceType, parentApplicationName, parentApplicationType, span.getAgentId(), span.getElapsed(), isError);
+ applicationMapService.updateInbound(
+ "default", parentApplicationName, parentApplicationType,
+ "default", span.getApplicationId(), applicationServiceType,
+ span.getAgentId(), span.getElapsed(), isError
+ );
bugCheck++;
}
@@ -211,6 +246,11 @@ private void insertSpanStat(SpanBo span) {
// the data may be different due to timeout or network error.
statisticsService.updateResponseTime(span.getApplicationId(), applicationServiceType, span.getAgentId(), span.getElapsed(), isError);
+ applicationMapService.updateSelfResponseTime(
+ "default", span.getApplicationId(), applicationServiceType,
+ span.getElapsed(), isError
+ );
+
if (bugCheck != 1) {
logger.info("ambiguous span found(bug). span:{}", span);
@@ -268,6 +308,12 @@ private void insertSpanEventList(List spanEventList, ServiceType ap
// save the information of callee (the span that spanevent called)
statisticsService.updateCallee(spanEventApplicationName, spanEventType, applicationId, applicationServiceType, endPoint, elapsed, hasException);
+
+ applicationMapService.updateBidirectional(
+ "default", applicationId, applicationServiceType, endPoint,
+ "default", spanEventApplicationName, spanEventType, agentId,
+ elapsed, hasException
+ );
}
}
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java
index 9e256bee8f06c..d8d8adc10e22e 100644
--- a/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java
@@ -58,12 +58,12 @@ public StatisticsService(MapStatisticsCalleeDao mapStatisticsCalleeDao, MapStati
* @param isError isError
*/
public void updateCaller(
- @NotBlank String callerApplicationName,
- ServiceType callerServiceType,
- @NotBlank String callerAgentId,
- @NotBlank String calleeApplicationName,
- ServiceType calleeServiceType,
- String calleeHost,
+ @NotBlank String callerApplicationName, // src
+ ServiceType callerServiceType, //src
+ @NotBlank String callerAgentId, //src
+ @NotBlank String calleeApplicationName, //dest
+ ServiceType calleeServiceType, //dest
+ String calleeHost, //dest
int elapsed,
boolean isError
) {
@@ -85,11 +85,11 @@ public void updateCaller(
* @param isError isError
*/
public void updateCallee(
- @NotBlank String calleeApplicationName,
- ServiceType calleeServiceType,
- @NotBlank String callerApplicationName,
- ServiceType callerServiceType,
- String callerHost,
+ @NotBlank String calleeApplicationName, // dest
+ ServiceType calleeServiceType, // dest
+ @NotBlank String callerApplicationName, //src
+ ServiceType callerServiceType, //src
+ String callerHost, //src
int elapsed,
boolean isError
) {
diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java
index 7a0b9269fa350..6bee733cfd116 100644
--- a/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java
+++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java
@@ -18,6 +18,7 @@
import com.navercorp.pinpoint.collector.config.CollectorProperties;
import com.navercorp.pinpoint.collector.service.AgentLifeCycleService;
+import com.navercorp.pinpoint.collector.applicationmap.service.ApplicationMapService;
import com.navercorp.pinpoint.collector.service.StatisticsService;
import com.navercorp.pinpoint.common.server.bo.AgentLifeCycleBo;
import com.navercorp.pinpoint.common.server.util.AgentLifeCycleState;
@@ -42,15 +43,18 @@ public class AgentLifeCycleAsyncTaskService {
private final AgentLifeCycleService agentLifeCycleService;
private final StatisticsService statisticsService;
+ private final ApplicationMapService applicationMapService;
private final ServiceTypeRegistryService registry;
private final CollectorProperties collectorProperties;
public AgentLifeCycleAsyncTaskService(AgentLifeCycleService agentLifeCycleService,
StatisticsService statisticsService,
+ ApplicationMapService applicationMapService,
ServiceTypeRegistryService registry,
CollectorProperties collectorProperties) {
this.agentLifeCycleService = agentLifeCycleService;
this.statisticsService = statisticsService;
+ this.applicationMapService = applicationMapService;
this.registry = registry;
this.collectorProperties = collectorProperties;
}
@@ -78,6 +82,7 @@ public void handleLifeCycleEvent(AgentProperty agentProperty, long eventTimestam
final ServiceType serviceType = registry.findServiceType(agentProperty.getServiceType());
if (isUpdateAgentState(serviceType)) {
statisticsService.updateAgentState(applicationName, serviceType, agentId);
+ applicationMapService.updateAgentState("default", applicationName, serviceType);
}
}
diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java
index 1198996d531d0..e7d5432851f5c 100644
--- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java
+++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java
@@ -135,6 +135,30 @@ private SelfStatMap(HbaseTable hBaseTable, byte[] columnFamilyName) {
}
}
+
+ public static final OutboundServiceMap MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER = new OutboundServiceMap(HbaseTable.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP, Bytes.toBytes("C"));
+ public static class OutboundServiceMap extends HbaseColumnFamily {
+ private OutboundServiceMap(HbaseTable hBaseTable, byte[] columnFamilyName) {
+ super(hBaseTable, columnFamilyName);
+ }
+ }
+
+ public static final InboundServiceMap MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER = new InboundServiceMap(HbaseTable.MAP_STATISTICS_INBOUND_SERVICE_GROUP, Bytes.toBytes("C"));
+ public static class InboundServiceMap extends HbaseColumnFamily {
+ private InboundServiceMap(HbaseTable hBaseTable, byte[] columnFamilyName) {
+ super(hBaseTable, columnFamilyName);
+ }
+ }
+
+ public static final SelfServiceMap MAP_STATISTICS_SELF_SERVICE_GROUP_COUNTER = new SelfServiceMap(HbaseTable.MAP_STATISTICS_SELF_SERVICE_GROUP, Bytes.toBytes("C"));
+ public static class SelfServiceMap extends HbaseColumnFamily {
+ private SelfServiceMap(HbaseTable hBaseTable, byte[] columnFamilyName) {
+ super(hBaseTable, columnFamilyName);
+ }
+ }
+
+
+
public static final SqlMetadataV2 SQL_METADATA_VER2_SQL = new SqlMetadataV2(HbaseTable.SQL_METADATA_VER2, Bytes.toBytes("Sql"));
public static class SqlMetadataV2 extends HbaseColumnFamily {
diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java
index 07990ce8494f7..c26f93482844f 100644
--- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java
+++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java
@@ -35,6 +35,9 @@ public enum HbaseTable {
MAP_STATISTICS_CALLEE_VER2("ApplicationMapStatisticsCallee_Ver2"),
MAP_STATISTICS_CALLER_VER2("ApplicationMapStatisticsCaller_Ver2"),
MAP_STATISTICS_SELF_VER2("ApplicationMapStatisticsSelf_Ver2"),
+ MAP_STATISTICS_OUTBOUND_SERVICE_GROUP("ApplicationMapOutbound"),
+ MAP_STATISTICS_INBOUND_SERVICE_GROUP("ApplicationMapInbound"),
+ MAP_STATISTICS_SELF_SERVICE_GROUP("ApplicationMapSelf"),
SQL_METADATA_VER2("SqlMetaData_Ver2"),
SQL_UID_METADATA("SqlUidMetaData"),
STRING_METADATA("StringMetaData"),
diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java
index 9af9c9e880994..2fa4838f6d845 100644
--- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java
+++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java
@@ -80,6 +80,24 @@ public RowKeyDistributorByHashPrefix statisticsSelfRowKeyDistributor() {
return new RowKeyDistributorByHashPrefix(hasher);
}
+ @Bean
+ public RowKeyDistributorByHashPrefix applicationMapInboundRowKeyDistributor() {
+ RowKeyDistributorByHashPrefix.Hasher hasher = newRangeOneByteSimpleHash(0, 36, 32);
+ return new RowKeyDistributorByHashPrefix(hasher);
+ }
+
+ @Bean
+ public RowKeyDistributorByHashPrefix applicationMapOutboundRowKeyDistributor() {
+ RowKeyDistributorByHashPrefix.Hasher hasher = newRangeOneByteSimpleHash(0, 36, 32);
+ return new RowKeyDistributorByHashPrefix(hasher);
+ }
+
+ @Bean
+ public RowKeyDistributorByHashPrefix applicationMapSelfRowKeyDistributor() {
+ RowKeyDistributorByHashPrefix.Hasher hasher = newRangeOneByteSimpleHash(0, 32, 8);
+ return new RowKeyDistributorByHashPrefix(hasher);
+ }
+
private RowKeyDistributorByHashPrefix.Hasher newRangeOneByteSimpleHash(int start, int end, int maxBuckets) {
return new RangeOneByteSimpleHash(start, end, maxBuckets);
}
diff --git a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/applicationmap/util/ApplicationMapUtils.java b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/applicationmap/util/ApplicationMapUtils.java
new file mode 100644
index 0000000000000..c7beb937cd13a
--- /dev/null
+++ b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/applicationmap/util/ApplicationMapUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.common.server.applicationmap.util;
+
+import com.navercorp.pinpoint.common.buffer.AutomaticBuffer;
+import com.navercorp.pinpoint.common.buffer.Buffer;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+import com.navercorp.pinpoint.common.util.BytesUtils;
+import com.navercorp.pinpoint.common.util.TimeUtils;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+public class ApplicationMapUtils {
+
+ private ApplicationMapUtils() {
+ }
+
+ public static byte[] makeRowKey(
+ String serviceName,
+ String applicationName, short applicationType,
+ long timestamp
+ ) {
+ Objects.requireNonNull(serviceName, "serviceName");
+ Objects.requireNonNull(applicationName, "applicationName");
+
+ final byte[] serviceNameBytes = BytesUtils.toBytes(serviceName);
+ final byte[] applicationNameBytes = BytesUtils.toBytes(applicationName);
+
+ final Buffer buffer = new AutomaticBuffer(64);
+ buffer.putShort((short) serviceNameBytes.length);
+ buffer.putBytes(serviceNameBytes);
+ buffer.putShort((short) applicationNameBytes.length);
+ buffer.putBytes(applicationNameBytes);
+ buffer.putShort(applicationType);
+ long reverseTimeMillis = TimeUtils.reverseTimeMillis(timestamp);
+ buffer.putLong(reverseTimeMillis);
+ return buffer.getBuffer();
+ }
+
+ public static byte[] makeColumnName(
+ String serviceName,
+ String applicationName, short applicationType,
+ short columnSlotNumber
+ ) {
+ Objects.requireNonNull(serviceName, "serviceName");
+ Objects.requireNonNull(applicationName, "applicationName");
+
+ final byte[] serviceNameBytes = BytesUtils.toBytes(serviceName);
+ final byte[] applicationNameBytes = BytesUtils.toBytes(applicationName);
+
+ final Buffer buffer = new AutomaticBuffer(64);
+ buffer.putShort(columnSlotNumber);
+ buffer.putShort(applicationType);
+ buffer.putShort((short) applicationNameBytes.length);
+ buffer.putBytes(applicationNameBytes);
+ buffer.putShort((short) serviceNameBytes.length);
+ buffer.putBytes(serviceNameBytes);
+ return buffer.getBuffer();
+ }
+
+ public static byte[] makeSelfColumnName(
+ String applicationName, short applicationType,
+ short columnSlotNumber
+ ) {
+ final Buffer buffer = new AutomaticBuffer(
+ applicationName.length() + BytesUtils.SHORT_BYTE_LENGTH * 2
+ );
+ buffer.putShort(columnSlotNumber);
+ buffer.putShort(applicationType);
+ buffer.put2PrefixedString(applicationName);
+ return buffer.getBuffer();
+ }
+
+ public static short getServiceTypeFromColumnName(byte[] bytes) {
+ return BytesUtils.bytesToShort(bytes, 2);
+ }
+
+ public static String getApplicationNameFromColumnName(byte[] bytes) {
+ final short length = BytesUtils.bytesToShort(bytes, 4);
+ return BytesUtils.toStringAndRightTrim(bytes, 6, length);
+ }
+
+ public static String getApplicationNameFromColumnNameForUser(byte[] bytes, ServiceType destServiceType) {
+ String destApplicationName = getApplicationNameFromColumnName(bytes);
+ String destServiceTypeName = destServiceType.getName();
+ return destApplicationName + "_" + destServiceTypeName;
+ }
+
+ public static short getHistogramSlotFromColumnName(byte[] bytes) {
+ return BytesUtils.bytesToShort(bytes, 0);
+ }
+}
diff --git a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java
index ff1da6a159741..d8f26ec47f3c0 100644
--- a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java
+++ b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java
@@ -76,7 +76,6 @@ public static byte[] makeColumnName(String agentId, short columnSlotNumber) {
return buffer.getBuffer();
}
-
private static short findResponseHistogramSlotNo(ServiceType serviceType, int elapsed, boolean isError, boolean isPing) {
Objects.requireNonNull(serviceType, "serviceType");
@@ -136,11 +135,11 @@ public static String getHost(byte[] bytes) {
public static byte[] makeRowKey(String applicationName, short applicationType, long timestamp) {
Objects.requireNonNull(applicationName, "applicationName");
- final byte[] applicationNameBytes= BytesUtils.toBytes(applicationName);
+ final byte[] applicationNameBytes = BytesUtils.toBytes(applicationName);
final Buffer buffer = new AutomaticBuffer(2 + applicationNameBytes.length + 2 + 8);
// buffer.put2PrefixedString(applicationName);
- buffer.putShort((short)applicationNameBytes.length);
+ buffer.putShort((short) applicationNameBytes.length);
buffer.putBytes(applicationNameBytes);
buffer.putShort(applicationType);
long reverseTimeMillis = TimeUtils.reverseTimeMillis(timestamp);
@@ -148,6 +147,8 @@ public static byte[] makeRowKey(String applicationName, short applicationType, l
return buffer.getBuffer();
}
+
+
public static String getApplicationNameFromRowKey(byte[] bytes, int offset) {
Objects.requireNonNull(bytes, "bytes");
diff --git a/hbase/scripts/hbase-create-snappy.hbase b/hbase/scripts/hbase-create-snappy.hbase
index 7d31f1b6fa4ca..7fcd20123e28f 100644
--- a/hbase/scripts/hbase-create-snappy.hbase
+++ b/hbase/scripts/hbase-create-snappy.hbase
@@ -21,6 +21,10 @@ create 'ApplicationMapStatisticsCaller_Ver2', { NAME => 'C', TTL => 5184000, VER
create 'ApplicationMapStatisticsCallee_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
create 'ApplicationMapStatisticsSelf_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
+create 'ApplicationMapOutbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }
+create 'ApplicationMapInbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }
+create 'ApplicationMapSelf', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }
+
create 'HostApplicationMap_Ver2', { NAME => 'M', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
list
diff --git a/hbase/scripts/hbase-create.hbase b/hbase/scripts/hbase-create.hbase
index d8abbdb0a0d5c..c640eb43454d6 100644
--- a/hbase/scripts/hbase-create.hbase
+++ b/hbase/scripts/hbase-create.hbase
@@ -20,6 +20,10 @@ create 'ApplicationMapStatisticsCaller_Ver2', { NAME => 'C', TTL => 5184000, VER
create 'ApplicationMapStatisticsCallee_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
create 'ApplicationMapStatisticsSelf_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
+create 'ApplicationMapOutbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }
+create 'ApplicationMapInbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }
+create 'ApplicationMapSelf', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }
+
create 'HostApplicationMap_Ver2', { NAME => 'M', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]}
list
diff --git a/hbase/scripts/hbase-drop.hbase b/hbase/scripts/hbase-drop.hbase
index a2a3bcb9c9281..fe6dcf41a7ab5 100644
--- a/hbase/scripts/hbase-drop.hbase
+++ b/hbase/scripts/hbase-drop.hbase
@@ -18,6 +18,10 @@ disable 'ApplicationMapStatisticsCaller_Ver2'
disable 'ApplicationMapStatisticsCallee_Ver2'
disable 'ApplicationMapStatisticsSelf_Ver2'
+disable 'ApplicationMapOutbound'
+disable 'ApplicationMapInbound'
+disable 'ApplicationMapSelf'
+
disable 'HostApplicationMap_Ver2'
@@ -41,6 +45,10 @@ drop 'ApplicationMapStatisticsCaller_Ver2'
drop 'ApplicationMapStatisticsCallee_Ver2'
drop 'ApplicationMapStatisticsSelf_Ver2'
+drop 'ApplicationMapOutbound'
+drop 'ApplicationMapInbound'
+drop 'ApplicationMapSelf'
+
drop 'HostApplicationMap_Ver2'
exit
diff --git a/hbase/scripts/hbase-flush-table.hbase b/hbase/scripts/hbase-flush-table.hbase
index a58170bd28825..929e832d4467f 100644
--- a/hbase/scripts/hbase-flush-table.hbase
+++ b/hbase/scripts/hbase-flush-table.hbase
@@ -16,6 +16,10 @@ flush 'ApplicationMapStatisticsCaller_Ver2'
flush 'ApplicationMapStatisticsCallee_Ver2'
flush 'ApplicationMapStatisticsSelf_Ver2'
+flush 'ApplicationMapOutbound'
+flush 'ApplicationMapInbound'
+flush 'ApplicationMapSelf'
+
flush 'HostApplicationMap_Ver2'
exit
diff --git a/hbase/scripts/hbase-major-compact-htable.hbase b/hbase/scripts/hbase-major-compact-htable.hbase
index 0ad6259646762..2bee28f663479 100644
--- a/hbase/scripts/hbase-major-compact-htable.hbase
+++ b/hbase/scripts/hbase-major-compact-htable.hbase
@@ -19,6 +19,10 @@ major_compact 'ApplicationMapStatisticsCaller_Ver2'
major_compact 'ApplicationMapStatisticsCallee_Ver2'
major_compact 'ApplicationMapStatisticsSelf_Ver2'
+major_compact 'ApplicationMapOutbound'
+major_compact 'ApplicationMapInbound'
+major_compact 'ApplicationMapSelf'
+
major_compact 'HostApplicationMap_Ver2'
exit
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/InboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/InboundDao.java
new file mode 100644
index 0000000000000..28d27892cf1cb
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/InboundDao.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao;
+
+import com.navercorp.pinpoint.common.server.util.time.Range;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.vo.Application;
+
+/**
+ * @author intr3p1d
+ */
+public interface InboundDao {
+ LinkDataMap selectInbound(Application destApplication, Range range, boolean timeAggregated);
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/OutboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/OutboundDao.java
new file mode 100644
index 0000000000000..51ac570e61667
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/OutboundDao.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao;
+
+import com.navercorp.pinpoint.common.server.util.time.Range;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.vo.Application;
+
+/**
+ * @author intr3p1d
+ */
+public interface OutboundDao {
+ LinkDataMap selectOutboud(Application callerApplication, Range range, boolean timeAggregated);
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/SelfDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/SelfDao.java
new file mode 100644
index 0000000000000..b5ccca19afcec
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/SelfDao.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao;
+
+/**
+ * @author intr3p1d
+ */
+public interface SelfDao {
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseInboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseInboundDao.java
new file mode 100644
index 0000000000000..1625d7ab1b22b
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseInboundDao.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao.hbase;
+
+import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
+import com.navercorp.pinpoint.common.hbase.HbaseOperations;
+import com.navercorp.pinpoint.common.hbase.ResultsExtractor;
+import com.navercorp.pinpoint.common.hbase.RowMapper;
+import com.navercorp.pinpoint.common.hbase.TableNameProvider;
+import com.navercorp.pinpoint.common.server.applicationmap.util.ApplicationMapUtils;
+import com.navercorp.pinpoint.common.server.util.time.Range;
+import com.navercorp.pinpoint.web.applicationmap.dao.InboundDao;
+import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsTimeWindowReducer;
+import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMapUtils;
+import com.navercorp.pinpoint.web.mapper.RowMapReduceResultExtractor;
+import com.navercorp.pinpoint.common.server.util.timewindow.TimeWindow;
+import com.navercorp.pinpoint.common.server.util.timewindow.TimeWindowDownSampler;
+import com.navercorp.pinpoint.web.vo.Application;
+import com.navercorp.pinpoint.web.vo.RangeFactory;
+import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Repository;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+@Repository
+public class HbaseInboundDao implements InboundDao {
+
+ private static final int MAP_STATISTICS_INBOUND_SERVICE_GROUP_NUM_PARTITIONS = 32;
+ private static final int SCAN_CACHE_SIZE = 40;
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private static final HbaseColumnFamily.InboundServiceMap DESCRIPTOR = HbaseColumnFamily.MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER;
+
+ private final HbaseOperations hbaseTemplate;
+ private final TableNameProvider tableNameProvider;
+
+ private final RowMapper serviceGroupInboundMapper;
+ private final RowMapper serviceGroupInboundTimeAggregatedMapper;
+
+ private final RangeFactory rangeFactory;
+
+ private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
+
+
+ public HbaseInboundDao(
+ @Qualifier("mapHbaseTemplate") HbaseOperations hbaseTemplate,
+ TableNameProvider tableNameProvider,
+ @Qualifier("applicationMapInboundMapper") RowMapper serviceGroupInboundMapper,
+ @Qualifier("applicationMapInboundTimeAggregatedMapper") RowMapper serviceGroupInboundTimeAggregatedMapper,
+ RangeFactory rangeFactory,
+ @Qualifier("applicationMapInboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix) {
+ this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
+ this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
+ this.serviceGroupInboundMapper = Objects.requireNonNull(serviceGroupInboundMapper, "serviceGroupInboundMapper");
+ this.serviceGroupInboundTimeAggregatedMapper = Objects.requireNonNull(serviceGroupInboundTimeAggregatedMapper, "serviceGroupInboundTimeAggregatedMapper");
+ this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory");
+ this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix");
+ }
+
+ @Override
+ public LinkDataMap selectInbound(Application destApplication, Range range, boolean timeAggregated) {
+ Objects.requireNonNull(destApplication, "destApplication");
+ Objects.requireNonNull(range, "range");
+
+ final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER);
+ // find distributed key - ver2.
+ final Scan scan = createScan(destApplication, range, DESCRIPTOR.getName());
+
+
+ ResultsExtractor resultsExtractor;
+ if (timeAggregated) {
+ resultsExtractor = new RowMapReduceResultExtractor<>(serviceGroupInboundTimeAggregatedMapper, new MapStatisticsTimeWindowReducer(timeWindow));
+ } else {
+ resultsExtractor = new RowMapReduceResultExtractor<>(serviceGroupInboundMapper, new MapStatisticsTimeWindowReducer(timeWindow));
+ }
+
+ TableName serviceGroupInboundTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable());
+ LinkDataMap linkDataMap = hbaseTemplate.findParallel(serviceGroupInboundTableName, scan, rowKeyDistributorByHashPrefix, resultsExtractor, MAP_STATISTICS_INBOUND_SERVICE_GROUP_NUM_PARTITIONS);
+ logger.debug("{} data. {}, {}", LinkDirection.IN_LINK, linkDataMap, range);
+ if (LinkDataMapUtils.hasLength(linkDataMap)) {
+ return linkDataMap;
+ }
+ return new LinkDataMap();
+ }
+
+ private Scan createScan(Application application, Range range, byte[] family) {
+ range = rangeFactory.createStatisticsRange(range);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("scan time:{} ", range.prettyToString());
+ }
+
+ // start key is replaced by end key because timestamp has been reversed
+ byte[] startKey = ApplicationMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getTo());
+ byte[] endKey = ApplicationMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getFrom());
+
+ Scan scan = new Scan();
+ scan.setCaching(SCAN_CACHE_SIZE);
+ scan.withStartRow(startKey);
+ scan.withStopRow(endKey);
+ scan.addFamily(family);
+ scan.setId("ServiceGroupMapScan");
+
+ return scan;
+ }
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseOutboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseOutboundDao.java
new file mode 100644
index 0000000000000..7c815fe8c01ee
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseOutboundDao.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao.hbase;
+
+import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily;
+import com.navercorp.pinpoint.common.hbase.HbaseOperations;
+import com.navercorp.pinpoint.common.hbase.ResultsExtractor;
+import com.navercorp.pinpoint.common.hbase.RowMapper;
+import com.navercorp.pinpoint.common.hbase.TableNameProvider;
+import com.navercorp.pinpoint.common.server.applicationmap.util.ApplicationMapUtils;
+import com.navercorp.pinpoint.common.server.util.time.Range;
+import com.navercorp.pinpoint.web.applicationmap.dao.OutboundDao;
+import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsTimeWindowReducer;
+import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMapUtils;
+import com.navercorp.pinpoint.web.mapper.RowMapReduceResultExtractor;
+import com.navercorp.pinpoint.common.server.util.timewindow.TimeWindow;
+import com.navercorp.pinpoint.common.server.util.timewindow.TimeWindowDownSampler;
+import com.navercorp.pinpoint.web.vo.Application;
+import com.navercorp.pinpoint.web.vo.RangeFactory;
+import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Repository;
+
+import java.util.Objects;
+
+/**
+ * @author intr3p1d
+ */
+@Repository
+public class HbaseOutboundDao implements OutboundDao {
+ private static final int MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_NUM_PARTITIONS = 32;
+ private static final int SCAN_CACHE_SIZE = 40;
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private static final HbaseColumnFamily.OutboundServiceMap DESCRIPTOR = HbaseColumnFamily.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER;
+
+ private final HbaseOperations hbaseTemplate;
+ private final TableNameProvider tableNameProvider;
+
+ private final RowMapper applicationMapOutboundMapper;
+ private final RowMapper applicationMapOutboundTimeAggregatedMapper;
+
+ private final RangeFactory rangeFactory;
+
+ private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
+
+
+ public HbaseOutboundDao(
+ @Qualifier("mapHbaseTemplate") HbaseOperations hbaseTemplate,
+ TableNameProvider tableNameProvider,
+ @Qualifier("applicationMapOutboundMapper") RowMapper applicationMapOutboundMapper,
+ @Qualifier("applicationMapOutboundTimeAggregatedMapper") RowMapper applicationMapOutboundTimeAggregatedMapper,
+ RangeFactory rangeFactory,
+ @Qualifier("applicationMapOutboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix
+ ) {
+ this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate");
+ this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider");
+ this.applicationMapOutboundMapper = Objects.requireNonNull(applicationMapOutboundMapper, "applicationMapOutboundMapper");
+ this.applicationMapOutboundTimeAggregatedMapper = Objects.requireNonNull(applicationMapOutboundTimeAggregatedMapper, "mapStatisticsTimeAggregatedCallerMapper");
+ this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory");
+ this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix");
+ }
+
+
+ @Override
+ public LinkDataMap selectOutboud(Application callerApplication, Range range, boolean timeAggregated) { Objects.requireNonNull(callerApplication, "callerApplication");
+ Objects.requireNonNull(callerApplication, "callerApplication");
+ Objects.requireNonNull(range, "range");
+
+ final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER);
+ // find distributed key.
+ final Scan scan = createScan(callerApplication, range, DESCRIPTOR.getName());
+
+ ResultsExtractor resultsExtractor;
+ if (timeAggregated) {
+ resultsExtractor = new RowMapReduceResultExtractor<>(applicationMapOutboundTimeAggregatedMapper, new MapStatisticsTimeWindowReducer(timeWindow));
+ } else {
+ resultsExtractor = new RowMapReduceResultExtractor<>(applicationMapOutboundMapper, new MapStatisticsTimeWindowReducer(timeWindow));
+ }
+
+ TableName applicationMapOutboundTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable());
+ LinkDataMap linkDataMap = this.hbaseTemplate.findParallel(applicationMapOutboundTableName, scan, rowKeyDistributorByHashPrefix, resultsExtractor, MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_NUM_PARTITIONS);
+ logger.debug("tableInfo({}). {} data. {}, {} : ", applicationMapOutboundTableName.getNameAsString(), LinkDirection.OUT_LINK, linkDataMap, range );
+
+ if (LinkDataMapUtils.hasLength(linkDataMap)) {
+ return linkDataMap;
+ }
+
+ return new LinkDataMap();
+ }
+
+ private Scan createScan(Application application, Range range, byte[]... familyArgs) {
+
+ range = rangeFactory.createStatisticsRange(range);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("scan Time:{}", range.prettyToString());
+ }
+
+ // start key is replaced by end key because timestamp has been reversed
+ byte[] startKey = ApplicationMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getTo());
+ byte[] endKey = ApplicationMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getFrom());
+
+ Scan scan = new Scan();
+ scan.setCaching(SCAN_CACHE_SIZE);
+ scan.withStartRow(startKey);
+ scan.withStopRow(endKey);
+ for (byte[] family : familyArgs) {
+ scan.addFamily(family);
+ }
+ scan.setId("ServiceGroupMapScan");
+
+ return scan;
+
+
+ }
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapInboundMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapInboundMapper.java
new file mode 100644
index 0000000000000..02e5ab4cf568e
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapInboundMapper.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao.mapper;
+
+import com.navercorp.pinpoint.common.buffer.Buffer;
+import com.navercorp.pinpoint.common.buffer.FixedBuffer;
+import com.navercorp.pinpoint.common.hbase.RowMapper;
+import com.navercorp.pinpoint.common.hbase.util.CellUtils;
+import com.navercorp.pinpoint.common.server.applicationmap.util.ApplicationMapUtils;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+import com.navercorp.pinpoint.common.util.TimeUtils;
+import com.navercorp.pinpoint.loader.service.ServiceTypeRegistryService;
+import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.component.ApplicationFactory;
+import com.navercorp.pinpoint.web.vo.Application;
+import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author intr3p1d
+ */
+@Component
+public class ApplicationMapInboundMapper implements RowMapper {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final LinkFilter filter;
+
+ @Autowired
+ private ServiceTypeRegistryService registry;
+
+ @Autowired
+ private ApplicationFactory applicationFactory;
+
+ @Autowired
+ @Qualifier("applicationMapInboundRowKeyDistributor")
+ private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
+
+ public ApplicationMapInboundMapper() {
+ this(LinkFilter::skip);
+ }
+
+ public ApplicationMapInboundMapper(LinkFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public LinkDataMap mapRow(Result result, int rowNum) throws Exception {
+ if (result.isEmpty()) {
+ return new LinkDataMap();
+ }
+ logger.debug("mapRow: {}", rowNum);
+
+ final byte[] rowKey = getOriginalKey(result.getRow());
+
+ final Buffer row = new FixedBuffer(rowKey);
+ final Application destApplication = readDestApplication(row);
+ final long timestamp = TimeUtils.recoveryTimeMillis(row.readLong());
+
+ final LinkDataMap linkDataMap = new LinkDataMap();
+ for (Cell cell : result.rawCells()) {
+
+ final byte[] qualifier = CellUtil.cloneQualifier(cell);
+ final Application srcApplication = readSourceApplication(qualifier, destApplication.getServiceType());
+ if (filter.filter(srcApplication)) {
+ continue;
+ }
+
+ long requestCount = CellUtils.valueToLong(cell);
+ short histogramSlot = ApplicationMapUtils.getHistogramSlotFromColumnName(qualifier);
+
+ String srcHost = srcApplication.getName();
+ String destHost = destApplication.getName();
+
+ boolean isError = histogramSlot == (short) -1;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Fetched {}. {} srcHost:{} -> {} (slot:{}/{}), ", LinkDirection.IN_LINK, srcApplication, srcHost, destApplication, histogramSlot, requestCount);
+ }
+
+ final short slotTime = (isError) ? (short) -1 : histogramSlot;
+ linkDataMap.addLinkData(srcApplication, srcApplication.getName(), destApplication, srcHost, timestamp, slotTime, requestCount);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Fetched {}. statistics:{}", LinkDirection.IN_LINK, linkDataMap);
+ }
+ }
+
+ return linkDataMap;
+ }
+
+ private Application readSourceApplication(byte[] qualifier, ServiceType destServiceType) {
+ short srcServiceType = ApplicationMapUtils.getServiceTypeFromColumnName(qualifier);
+ // Caller may be a user node, and user nodes may call nodes with the same application name but different service type.
+ // To distinguish between these user nodes, append callee's service type to the application name.
+ String srcApplicationName;
+ if (registry.findServiceType(srcServiceType).isUser()) {
+ srcApplicationName = ApplicationMapUtils.getApplicationNameFromColumnNameForUser(qualifier, destServiceType);
+ } else {
+ srcApplicationName = ApplicationMapUtils.getApplicationNameFromColumnName(qualifier);
+ }
+ return applicationFactory.createApplication(srcApplicationName, srcServiceType);
+ }
+
+ private Application readDestApplication(Buffer row) {
+ String serviceName = row.read2PrefixedString();
+ String applicationName = row.read2PrefixedString();
+ short serviceType = row.readShort();
+ return applicationFactory.createApplication(applicationName, serviceType);
+ }
+
+ private byte[] getOriginalKey(byte[] rowKey) {
+ return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey);
+ }
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapInboundTimeAggregatedMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapInboundTimeAggregatedMapper.java
new file mode 100644
index 0000000000000..4521216dfa19e
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapInboundTimeAggregatedMapper.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao.mapper;
+
+import com.navercorp.pinpoint.common.buffer.Buffer;
+import com.navercorp.pinpoint.common.buffer.FixedBuffer;
+import com.navercorp.pinpoint.common.hbase.RowMapper;
+import com.navercorp.pinpoint.common.hbase.util.CellUtils;
+import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils;
+import com.navercorp.pinpoint.common.trace.ServiceType;
+import com.navercorp.pinpoint.loader.service.ServiceTypeRegistryService;
+import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.component.ApplicationFactory;
+import com.navercorp.pinpoint.web.vo.Application;
+import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author intr3p1d
+ */
+@Component
+public class ApplicationMapInboundTimeAggregatedMapper implements RowMapper {
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final LinkFilter filter;
+
+ @Autowired
+ private ServiceTypeRegistryService registry;
+
+ @Autowired
+ private ApplicationFactory applicationFactory;
+
+ @Autowired
+ @Qualifier("applicationMapInboundRowKeyDistributor")
+ private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
+
+ public ApplicationMapInboundTimeAggregatedMapper() {
+ this(LinkFilter::skip);
+ }
+
+ public ApplicationMapInboundTimeAggregatedMapper(LinkFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public LinkDataMap mapRow(Result result, int rowNum) throws Exception {
+ if (result.isEmpty()) {
+ return new LinkDataMap();
+ }
+ logger.debug("mapRow: {}", rowNum);
+
+ final byte[] rowKey = getOriginalKey(result.getRow());
+
+ final Buffer row = new FixedBuffer(rowKey);
+ final Application destApplication = readDestApplication(row);
+ final long timestamp = 0; // time aggregated
+
+ final LinkDataMap linkDataMap = new LinkDataMap();
+ for (Cell cell : result.rawCells()) {
+
+ final byte[] qualifier = CellUtil.cloneQualifier(cell);
+ final Application srcApplication = readSourceApplication(qualifier, destApplication.getServiceType());
+ if (filter.filter(srcApplication)) {
+ continue;
+ }
+
+ long requestCount = CellUtils.valueToLong(cell);
+ short histogramSlot = ApplicationMapStatisticsUtils.getHistogramSlotFromColumnName(qualifier);
+
+ String srcHost = srcApplication.getName();
+ String destHost = destApplication.getName();
+
+ boolean isError = histogramSlot == (short) -1;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Fetched {}. {} srcHost:{} -> {} (slot:{}/{}), ", LinkDirection.IN_LINK, srcApplication, srcHost, destApplication, histogramSlot, requestCount);
+ }
+
+ final short slotTime = (isError) ? (short) -1 : histogramSlot;
+ linkDataMap.addLinkData(srcApplication, srcApplication.getName(), destApplication, srcHost, timestamp, slotTime, requestCount);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Fetched {}. statistics:{}", LinkDirection.IN_LINK, linkDataMap);
+ }
+ }
+
+ return linkDataMap;
+ }
+
+ private Application readSourceApplication(byte[] qualifier, ServiceType destServiceType) {
+ short srcServiceType = ApplicationMapStatisticsUtils.getDestServiceTypeFromColumnName(qualifier);
+ // Caller may be a user node, and user nodes may call nodes with the same application name but different service type.
+ // To distinguish between these user nodes, append callee's service type to the application name.
+ String srcApplicationName;
+ if (registry.findServiceType(srcServiceType).isUser()) {
+ srcApplicationName = ApplicationMapStatisticsUtils.getDestApplicationNameFromColumnNameForUser(qualifier, destServiceType);
+ } else {
+ srcApplicationName = ApplicationMapStatisticsUtils.getDestApplicationNameFromColumnName(qualifier);
+ }
+ return applicationFactory.createApplication(srcApplicationName, srcServiceType);
+ }
+
+ private Application readDestApplication(Buffer row) {
+ String serviceName = row.read2PrefixedString();
+ String applicationName = row.read2PrefixedString();
+ short serviceType = row.readShort();
+ return applicationFactory.createApplication(applicationName, serviceType);
+ }
+
+ private byte[] getOriginalKey(byte[] rowKey) {
+ return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey);
+ }
+
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapOutboundMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapOutboundMapper.java
new file mode 100644
index 0000000000000..f0a6f1f7118ac
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapOutboundMapper.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao.mapper;
+
+import com.navercorp.pinpoint.common.buffer.Buffer;
+import com.navercorp.pinpoint.common.buffer.FixedBuffer;
+import com.navercorp.pinpoint.common.buffer.OffsetFixedBuffer;
+import com.navercorp.pinpoint.common.hbase.RowMapper;
+import com.navercorp.pinpoint.common.hbase.util.CellUtils;
+import com.navercorp.pinpoint.common.util.TimeUtils;
+import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.component.ApplicationFactory;
+import com.navercorp.pinpoint.web.vo.Application;
+import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author intr3p1d
+ */
+@Component
+public class ApplicationMapOutboundMapper implements RowMapper {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final LinkFilter filter;
+
+ @Autowired
+ private ApplicationFactory applicationFactory;
+
+ @Autowired
+ @Qualifier("applicationMapOutboundRowKeyDistributor")
+ private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
+
+ public ApplicationMapOutboundMapper() {
+ this(LinkFilter::skip);
+ }
+
+ public ApplicationMapOutboundMapper(LinkFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public LinkDataMap mapRow(Result result, int rowNum) throws Exception {
+ if (result.isEmpty()) {
+ return new LinkDataMap();
+ }
+
+ logger.debug("mapRow: {}", rowNum);
+ final byte[] rowKey = getOriginalKey(result.getRow());
+
+ final Buffer row = new FixedBuffer(rowKey);
+ final Application srcApplication = readSourceApplication(row);
+ final long timestamp = TimeUtils.recoveryTimeMillis(row.readLong());
+
+ // key is dest ApplicationName
+ final LinkDataMap linkDataMap = new LinkDataMap();
+ for (Cell cell : result.rawCells()) {
+ final Buffer buffer = new OffsetFixedBuffer(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ short histogramSlot = buffer.readShort();
+
+ final Application destApplication = readDestApplication(buffer);
+ if (filter.filter(destApplication)) {
+ continue;
+ }
+
+ String srcAgentId = srcApplication.getName();
+ String destHost = destApplication.getName();
+
+ boolean isError = histogramSlot == (short) -1;
+
+ long requestCount = CellUtils.valueToLong(cell);
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Fetched {}.(New) {} {} -> {} (slot:{}/{}) destApplicationHost:{}", LinkDirection.OUT_LINK, srcApplication, srcAgentId, destApplication, histogramSlot, requestCount, destHost);
+ }
+
+ final short slotTime = (isError) ? (short) -1 : histogramSlot;
+
+ linkDataMap.addLinkData(srcApplication, srcAgentId, destApplication, destHost, timestamp, slotTime, requestCount);
+ }
+
+ return linkDataMap;
+ }
+
+ private Application readSourceApplication(Buffer row) {
+ String serviceName = row.read2PrefixedString();
+ String applicationName = row.read2PrefixedString();
+ short serviceType = row.readShort();
+ return applicationFactory.createApplication(applicationName, serviceType);
+ }
+
+ private Application readDestApplication(Buffer buffer) {
+ short serviceType = buffer.readShort();
+ String applicationName = buffer.read2PrefixedString();
+ String serviceName = buffer.read2PrefixedString();
+ return applicationFactory.createApplication(applicationName, serviceType);
+ }
+
+ private byte[] getOriginalKey(byte[] rowKey) {
+ return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey);
+ }
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapOutboundTimeAggregatedMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapOutboundTimeAggregatedMapper.java
new file mode 100644
index 0000000000000..460a65d439726
--- /dev/null
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ApplicationMapOutboundTimeAggregatedMapper.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2024 NAVER Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.navercorp.pinpoint.web.applicationmap.dao.mapper;
+
+import com.navercorp.pinpoint.common.buffer.Buffer;
+import com.navercorp.pinpoint.common.buffer.FixedBuffer;
+import com.navercorp.pinpoint.common.buffer.OffsetFixedBuffer;
+import com.navercorp.pinpoint.common.hbase.RowMapper;
+import com.navercorp.pinpoint.common.hbase.util.CellUtils;
+import com.navercorp.pinpoint.common.util.TimeUtils;
+import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection;
+import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
+import com.navercorp.pinpoint.web.component.ApplicationFactory;
+import com.navercorp.pinpoint.web.vo.Application;
+import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author intr3p1d
+ */
+@Component
+public class ApplicationMapOutboundTimeAggregatedMapper implements RowMapper {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final LinkFilter filter;
+
+ @Autowired
+ private ApplicationFactory applicationFactory;
+
+ @Autowired
+ @Qualifier("applicationMapOutboundRowKeyDistributor")
+ private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
+
+ public ApplicationMapOutboundTimeAggregatedMapper() {
+ this(LinkFilter::skip);
+ }
+
+ public ApplicationMapOutboundTimeAggregatedMapper(LinkFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public LinkDataMap mapRow(Result result, int rowNum) throws Exception {
+ if (result.isEmpty()) {
+ return new LinkDataMap();
+ }
+
+ logger.debug("mapRow: {}", rowNum);
+ final byte[] rowKey = getOriginalKey(result.getRow());
+
+ final Buffer row = new FixedBuffer(rowKey);
+ final Application srcApplication = readSourceApplication(row);
+ final long timestamp = 0; // aggregate timestamp
+
+ // key is dest ApplicationName
+ final LinkDataMap linkDataMap = new LinkDataMap();
+ for (Cell cell : result.rawCells()) {
+ final Buffer buffer = new OffsetFixedBuffer(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ short histogramSlot = buffer.readShort();
+
+ final Application destApplication = readDestApplication(buffer);
+ if (filter.filter(destApplication)) {
+ continue;
+ }
+
+ String srcAgentId = srcApplication.getName();
+ String destHost = destApplication.getName();
+
+ boolean isError = histogramSlot == (short) -1;
+
+ long requestCount = CellUtils.valueToLong(cell);
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Fetched {}.(New) {} {} -> {} (slot:{}/{}) destApplicationHost:{}", LinkDirection.OUT_LINK, srcApplication, srcAgentId, destApplication, histogramSlot, requestCount, destHost);
+ }
+
+ final short slotTime = (isError) ? (short) -1 : histogramSlot;
+
+ linkDataMap.addLinkData(srcApplication, srcAgentId, destApplication, destHost, timestamp, slotTime, requestCount);
+ }
+
+ return linkDataMap;
+ }
+
+ private Application readSourceApplication(Buffer row) {
+ String serviceName = row.read2PrefixedString();
+ String applicationName = row.read2PrefixedString();
+ short serviceType = row.readShort();
+ return applicationFactory.createApplication(applicationName, serviceType);
+ }
+
+ private Application readDestApplication(Buffer buffer) {
+ short serviceType = buffer.readShort();
+ String applicationName = buffer.read2PrefixedString();
+ String serviceName = buffer.read2PrefixedString();
+ return applicationFactory.createApplication(applicationName, serviceType);
+ }
+
+ private byte[] getOriginalKey(byte[] rowKey) {
+ return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey);
+ }
+
+
+}
diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java
index cd8c09ba1070e..4165db98b181a 100644
--- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java
+++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java
@@ -20,6 +20,8 @@
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCalleeDao;
import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCallerDao;
+import com.navercorp.pinpoint.web.applicationmap.dao.InboundDao;
+import com.navercorp.pinpoint.web.applicationmap.dao.OutboundDao;
import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap;
import com.navercorp.pinpoint.web.vo.Application;
import org.springframework.stereotype.Service;
@@ -36,18 +38,34 @@ public class LinkDataMapServiceImpl implements LinkDataMapService {
private final MapStatisticsCalleeDao mapStatisticsCalleeDao;
- public LinkDataMapServiceImpl(MapStatisticsCallerDao mapStatisticsCallerDao, MapStatisticsCalleeDao mapStatisticsCalleeDao) {
+ private final OutboundDao outboundDao;
+
+ private final InboundDao inboundDao;
+
+ public LinkDataMapServiceImpl(
+ MapStatisticsCallerDao mapStatisticsCallerDao, MapStatisticsCalleeDao mapStatisticsCalleeDao,
+ OutboundDao outboundDao, InboundDao inboundDao
+ ) {
this.mapStatisticsCallerDao = Objects.requireNonNull(mapStatisticsCallerDao, "mapStatisticsCallerDao");
this.mapStatisticsCalleeDao = Objects.requireNonNull(mapStatisticsCalleeDao, "mapStatisticsCalleeDao");
+ this.outboundDao = Objects.requireNonNull(outboundDao, "serviceGroupOutboundDao");
+ this.inboundDao = Objects.requireNonNull(inboundDao, "serviceGroupInboundDao");
}
@Override
public LinkDataMap selectCallerLinkDataMap(Application application, Range range, boolean timeAggregated) {
- return mapStatisticsCallerDao.selectCaller(application, range, timeAggregated);
+// return mapStatisticsCallerDao.selectCaller(application, range, timeAggregated);
+
+ LinkDataMap linkDataMap = mapStatisticsCallerDao.selectCaller(application, range, timeAggregated);
+ LinkDataMap linkDataMap1 = outboundDao.selectOutboud(application, range, timeAggregated);
+ return linkDataMap1;
}
@Override
public LinkDataMap selectCalleeLinkDataMap(Application application, Range range, boolean timeAggregated) {
- return mapStatisticsCalleeDao.selectCallee(application, range, timeAggregated);
+// return mapStatisticsCalleeDao.selectCallee(application, range, timeAggregated);
+ LinkDataMap linkDataMap = mapStatisticsCalleeDao.selectCallee(application, range, timeAggregated);
+ LinkDataMap linkDataMap1 = inboundDao.selectInbound(application, range, timeAggregated);
+ return linkDataMap1;
}
}