diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ecc44a08e47c2a..23195c736c349d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -41,6 +41,7 @@ #include "common/status.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" +#include "util/cpu_info.h" namespace doris { namespace config { @@ -235,7 +236,14 @@ DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true"); DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500"); // number of scanner thread pool size for olap table // and the min thread num of remote scanner thread pool -DEFINE_Int32(doris_scanner_thread_pool_thread_num, "48"); +DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1"); +DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> bool { + if (config == -1) { + CpuInfo::init(); + doris_scanner_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 4); + } + return true; +}); DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1"); // number of olap scanner thread pool queue size DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400"); @@ -883,6 +891,8 @@ DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128"); // Max buffer size for parquet chunk column DEFINE_mInt32(parquet_column_max_buffer_mb, "8"); DEFINE_mDouble(max_amplified_read_ratio, "0.8"); +DEFINE_mInt32(merged_oss_min_io_size, "1048576"); +DEFINE_mInt32(merged_hdfs_min_io_size, "8192"); // OrcReader DEFINE_mInt32(orc_natural_read_size_mb, "8"); diff --git a/be/src/common/config.h b/be/src/common/config.h index a9508c6e8af3ff..1e0e28d0a8a68f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -283,7 +283,7 @@ DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk); DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms); // number of scanner thread pool size for olap table // and the min thread num of remote scanner thread pool -DECLARE_Int32(doris_scanner_thread_pool_thread_num); +DECLARE_mInt32(doris_scanner_thread_pool_thread_num); // max number of remote scanner thread pool size // if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10) DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num); @@ -940,6 +940,10 @@ DECLARE_mInt32(parquet_rowgroup_max_buffer_mb); DECLARE_mInt32(parquet_column_max_buffer_mb); // Merge small IO, the max amplified read ratio DECLARE_mDouble(max_amplified_read_ratio); +// Equivalent min size of each IO that can reach the maximum storage speed limit +// 1MB for oss, 8KB for hdfs +DECLARE_mInt32(merged_oss_min_io_size); +DECLARE_mInt32(merged_hdfs_min_io_size); // OrcReader DECLARE_mInt32(orc_natural_read_size_mb); diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index e78c1c79251146..2f8eb465cdfc54 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -131,8 +131,6 @@ class MergeRangeFileReader : public io::FileReader { static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB - static constexpr size_t HDFS_MIN_IO_SIZE = 4 * 1024; // 4KB - static constexpr size_t OSS_MIN_IO_SIZE = 512 * 1024; // 512KB static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128 MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader, @@ -146,8 +144,9 @@ class MergeRangeFileReader : public io::FileReader { _is_oss = typeid_cast(_reader.get()) != nullptr; _max_amplified_ratio = config::max_amplified_read_ratio; // Equivalent min size of each IO that can reach the maximum storage speed limit: - // 512KB for oss, 4KB for hdfs - _equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE; + // 1MB for oss, 8KB for hdfs + _equivalent_io_size = + _is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size; for (const PrefetchRange& range : _random_access_ranges) { _statistics.apply_bytes += range.end_offset - range.start_offset; } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index e8d7f8a7139a6d..7c1cd55d64910d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -122,6 +122,8 @@ Status ScannerScheduler::init(ExecEnv* env) { _remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1 ? config::doris_max_remote_scanner_thread_pool_thread_num : std::max(512, CpuInfo::num_cores() * 10); + _remote_thread_pool_max_size = + std::max(_remote_thread_pool_max_size, config::doris_scanner_thread_pool_thread_num); _remote_scan_thread_pool = std::make_unique( _remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size, "RemoteScanThreadPool"); diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index daf8d4a21fa87b..17ca650675de3c 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -85,8 +85,8 @@ public class HudiJniScanner extends JniScanner { private static final ScheduledExecutorService cleanResolverService = Executors.newScheduledThreadPool(1); static { - int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4); - if (numThreads > 32) { + int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2, 4); + if (numThreads > 48) { numThreads = Runtime.getRuntime().availableProcessors(); } avroReadPool = Executors.newFixedThreadPool(numThreads,