From 033f7bb553388dc578f371ba86afc98efa06cc16 Mon Sep 17 00:00:00 2001 From: donghun-cho Date: Wed, 5 Mar 2025 16:54:58 +0900 Subject: [PATCH] [#12064] Seperate hbase connection for Uid --- .../resources/profiles/local/hbase.properties | 5 + .../profiles/release/hbase.properties | 5 + .../collector/CollectorHbaseModule.java | 2 + .../HbasePinpointIdTemplateConfiguration.java | 113 ++++++++++++++++++ .../dao/hbase/HbaseApplicationNameDao.java | 4 +- .../dao/hbase/HbaseApplicationUidDao.java | 5 +- .../resources/profiles/local/hbase.properties | 5 + .../profiles/release/hbase.properties | 5 + 8 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 collector/src/main/java/com/navercorp/pinpoint/collector/config/HbasePinpointIdTemplateConfiguration.java diff --git a/collector-starter/src/main/resources/profiles/local/hbase.properties b/collector-starter/src/main/resources/profiles/local/hbase.properties index b84c86625886..fbff97d0b354 100644 --- a/collector-starter/src/main/resources/profiles/local/hbase.properties +++ b/collector-starter/src/main/resources/profiles/local/hbase.properties @@ -15,6 +15,11 @@ hbase.client.executor.maxPoolSize=64 hbase.client.executor.queueCapacity=5120 hbase.client.executor.prestartAllCoreThreads=false +# experimental configuration +hbase.client.pinpoint.id.executor.corePoolSize=2 +hbase.client.pinpoint.id.executor.maxPoolSize=8 +hbase.client.pinpoint.id.executor.queueCapacity=1280 + # warmup hbase connection cache hbase.client.warmup.enable=false diff --git a/collector-starter/src/main/resources/profiles/release/hbase.properties b/collector-starter/src/main/resources/profiles/release/hbase.properties index 01fab19093ba..9d67fdaec625 100644 --- a/collector-starter/src/main/resources/profiles/release/hbase.properties +++ b/collector-starter/src/main/resources/profiles/release/hbase.properties @@ -14,6 +14,11 @@ hbase.client.executor.corePoolSize=64 hbase.client.executor.maxPoolSize=64 hbase.client.executor.queueCapacity=5120 +# experimental configuration +hbase.client.pinpoint.id.executor.corePoolSize=16 +hbase.client.pinpoint.id.executor.maxPoolSize=16 +hbase.client.pinpoint.id.executor.queueCapacity=1280 + # warmup hbase connection cache hbase.client.warmup.enable=true diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java b/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java index 74ae7fe4bdf4..d473755ba034 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/CollectorHbaseModule.java @@ -2,6 +2,7 @@ import com.navercorp.pinpoint.collector.config.BatchHbaseClientConfiguration; import com.navercorp.pinpoint.collector.config.HbaseAsyncConfiguration; +import com.navercorp.pinpoint.collector.config.HbasePinpointIdTemplateConfiguration; import com.navercorp.pinpoint.collector.config.SchedulerConfiguration; import com.navercorp.pinpoint.collector.dao.hbase.encode.ApplicationIndexRowKeyEncoderV1; import com.navercorp.pinpoint.collector.dao.hbase.encode.ApplicationIndexRowKeyEncoderV2; @@ -37,6 +38,7 @@ HbasePutWriterConfiguration.class, BatchHbaseClientConfiguration.class, + HbasePinpointIdTemplateConfiguration.class, HbaseAsyncConfiguration.class, SchedulerConfiguration.class, diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/config/HbasePinpointIdTemplateConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/config/HbasePinpointIdTemplateConfiguration.java new file mode 100644 index 000000000000..e5110b006c4c --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/config/HbasePinpointIdTemplateConfiguration.java @@ -0,0 +1,113 @@ +package com.navercorp.pinpoint.collector.config; + +import com.navercorp.pinpoint.common.hbase.ConnectionFactoryBean; +import com.navercorp.pinpoint.common.hbase.HbaseTableFactory; +import com.navercorp.pinpoint.common.hbase.HbaseTemplate; +import com.navercorp.pinpoint.common.hbase.TableFactory; +import com.navercorp.pinpoint.common.hbase.async.AsyncConnectionFactoryBean; +import com.navercorp.pinpoint.common.hbase.async.AsyncTableCustomizer; +import com.navercorp.pinpoint.common.hbase.async.AsyncTableFactory; +import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTableFactory; +import com.navercorp.pinpoint.common.hbase.async.HbaseAsyncTemplate; +import com.navercorp.pinpoint.common.hbase.config.HbaseTemplateConfiguration; +import com.navercorp.pinpoint.common.hbase.config.ParallelScan; +import com.navercorp.pinpoint.common.hbase.scan.ResultScannerFactory; +import com.navercorp.pinpoint.common.hbase.util.ScanMetricReporter; +import com.navercorp.pinpoint.common.profiler.concurrent.ExecutorFactory; +import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory; +import com.navercorp.pinpoint.common.server.executor.ExecutorCustomizer; +import com.navercorp.pinpoint.common.server.executor.ExecutorProperties; +import com.navercorp.pinpoint.common.util.CpuUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.security.User; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; + +@org.springframework.context.annotation.Configuration +@ConditionalOnProperty(value = "pinpoint.collector.application.uid.enable", havingValue = "true") +public class HbasePinpointIdTemplateConfiguration { + private final Logger logger = LogManager.getLogger(HbasePinpointIdTemplateConfiguration.class); + + private final HbaseTemplateConfiguration config = new HbaseTemplateConfiguration(); + + public HbasePinpointIdTemplateConfiguration() { + logger.info("Install {}", HbasePinpointIdTemplateConfiguration.class.getSimpleName()); + } + + @Bean + @ConfigurationProperties(prefix = "hbase.client.pinpoint.id.executor") + public ExecutorProperties collectorPinpointIdExecutorProperties() { + return new ExecutorProperties(); + } + + @Bean + public FactoryBean hbasePinpointIdThreadPool(@Qualifier("hbaseExecutorCustomizer") ExecutorCustomizer executorCustomizer, + @Qualifier("collectorPinpointIdExecutorProperties") ExecutorProperties executorProperties) { + ThreadPoolExecutorFactoryBean factory = new ThreadPoolExecutorFactoryBean(); + executorCustomizer.customize(factory, executorProperties); + factory.setThreadNamePrefix("PinpointId-" + factory.getThreadNamePrefix()); + return factory; + } + + @Bean + public FactoryBean hbasePinpointIdConnection(Configuration configuration, + User user, + @Qualifier("hbasePinpointIdThreadPool") ExecutorService executorService) { + return new ConnectionFactoryBean(configuration, user, executorService); + } + + @Bean + public TableFactory hbasePinpointIdTableFactory(@Qualifier("hbasePinpointIdConnection") Connection connection) { + return new HbaseTableFactory(connection); + } + + @Bean + public FactoryBean hbaseAsyncPinpointIdConnection(Configuration configuration, User user) { + return new AsyncConnectionFactoryBean(configuration, user); + } + + @Bean + public AsyncTableFactory hbaseAsyncPinpointIdTableFactory(@Qualifier("hbaseAsyncPinpointIdConnection") AsyncConnection connection, + AsyncTableCustomizer customizer) { + return new HbaseAsyncTableFactory(connection, customizer); + } + + @Bean + public HbaseAsyncTemplate asyncPinpointIdTemplate(@Qualifier("hbaseAsyncPinpointIdTableFactory") AsyncTableFactory asyncTableFactory, + ScanMetricReporter scanMetricReporter, + ResultScannerFactory resultScannerFactory) { + ExecutorService executor = newAsyncTemplateExecutor(); + return new HbaseAsyncTemplate(asyncTableFactory, resultScannerFactory, scanMetricReporter, executor); + } + + private ExecutorService newAsyncTemplateExecutor() { + ThreadFactory threadFactory = new PinpointThreadFactory("Pinpoint-asyncPinpointIdTemplate", true); + return ExecutorFactory.newFixedThreadPool(CpuUtils.workerCount(), 1024 * 1024, threadFactory); + } + + + @Bean + public HbaseTemplate hbasePinpointIdTemplate(@Qualifier("hbaseConfiguration") org.apache.hadoop.conf.Configuration configurable, + @Qualifier("hbasePinpointIdTableFactory") TableFactory tableFactory, + @Qualifier("asyncPinpointIdTemplate") HbaseAsyncTemplate asyncTemplate, + Optional parallelScan, + @Value("${hbase.client.nativeAsync:false}") boolean nativeAsync, + ResultScannerFactory resultScannerFactory, + ScanMetricReporter scanMetricReport) { + return config.hbaseTemplate(configurable, tableFactory, asyncTemplate, parallelScan, nativeAsync, resultScannerFactory, scanMetricReport); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationNameDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationNameDao.java index 608ce7f0d6d5..e7b6e9a699a1 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationNameDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationNameDao.java @@ -12,6 +12,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Repository; import java.util.Objects; @@ -24,7 +25,8 @@ public class HbaseApplicationNameDao implements ApplicationNameDao { private final HbaseOperations hbaseOperations; private final TableNameProvider tableNameProvider; - public HbaseApplicationNameDao(HbaseOperations hbaseOperations, TableNameProvider tableNameProvider) { + public HbaseApplicationNameDao(@Qualifier("hbasePinpointIdTemplate") HbaseOperations hbaseOperations, + TableNameProvider tableNameProvider) { this.hbaseOperations = Objects.requireNonNull(hbaseOperations, "hbaseOperations"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationUidDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationUidDao.java index b41d2afa1e1b..9ace987b5d99 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationUidDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseApplicationUidDao.java @@ -16,12 +16,14 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Repository; import java.util.Objects; @Repository +@ConditionalOnProperty(value = "pinpoint.collector.application.uid.enable", havingValue = "true") public class HbaseApplicationUidDao implements ApplicationUidDao { private static final HbaseColumnFamily.ApplicationUid APPLICATION_ID = HbaseColumnFamily.APPLICATION_UID; @@ -31,7 +33,8 @@ public class HbaseApplicationUidDao implements ApplicationUidDao { private final RowMapper applicationIdMapper; - public HbaseApplicationUidDao(HbaseOperations hbaseOperations, TableNameProvider tableNameProvider, + public HbaseApplicationUidDao(@Qualifier("hbasePinpointIdTemplate") HbaseOperations hbaseOperations, + TableNameProvider tableNameProvider, @Qualifier("applicationUidMapper") RowMapper applicationIdMapper) { this.hbaseOperations = Objects.requireNonNull(hbaseOperations, "hbaseOperations"); this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); diff --git a/collector/src/main/resources/profiles/local/hbase.properties b/collector/src/main/resources/profiles/local/hbase.properties index 7e0abee95499..0a4fbfe43697 100644 --- a/collector/src/main/resources/profiles/local/hbase.properties +++ b/collector/src/main/resources/profiles/local/hbase.properties @@ -14,6 +14,11 @@ hbase.client.executor.corePoolSize=8 hbase.client.executor.maxPoolSize=32 hbase.client.executor.queueCapacity=5120 +# experimental configuration +hbase.client.pinpoint.id.executor.corePoolSize=2 +hbase.client.pinpoint.id.executor.maxPoolSize=8 +hbase.client.pinpoint.id.executor.queueCapacity=1280 + # warmup hbase connection cache hbase.client.warmup.enable=false diff --git a/collector/src/main/resources/profiles/release/hbase.properties b/collector/src/main/resources/profiles/release/hbase.properties index 01fab19093ba..9d67fdaec625 100644 --- a/collector/src/main/resources/profiles/release/hbase.properties +++ b/collector/src/main/resources/profiles/release/hbase.properties @@ -14,6 +14,11 @@ hbase.client.executor.corePoolSize=64 hbase.client.executor.maxPoolSize=64 hbase.client.executor.queueCapacity=5120 +# experimental configuration +hbase.client.pinpoint.id.executor.corePoolSize=16 +hbase.client.pinpoint.id.executor.maxPoolSize=16 +hbase.client.pinpoint.id.executor.queueCapacity=1280 + # warmup hbase connection cache hbase.client.warmup.enable=true