From b9472b73346a06a59a0c4871e3e940bb8ac0f0eb Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Mon, 25 Nov 2019 12:49:34 -0600 Subject: [PATCH] Adding local work stealing scheduler that is based on message passing internally - Using uniform_int_distribution with proper bounds - Removing queue index from thread_queues as it was unused - flyby: remove commented out options from .clang-format - Renaming workstealing --> workrequesting - Adding adaptive work stealing (steal half/steal one) - this makes this scheduler consistently (albeit only slightly) faster than the (default) local-priority scheduler - Adding LIFO and FIFO variations of local work-stealing scheduler - flyby: fixing HPX_WITH_SWAP_CONTEXT_EMULATION - flyby: minor changes to fibonacci_local example - Adding high- and low- priority queues - flyby: cache_line_data now does not generate warnings errors if padding is not needed --- .circleci/config.yml | 3 +- .clang-format | 4 +- .jenkins/cscs-perftests/launch_perftests.sh | 6 +- .jenkins/lsu/env-clang-13.sh | 3 +- .jenkins/lsu/env-gcc-11.sh | 2 + CMakeLists.txt | 7 + cmake/HPX_AddTest.cmake | 5 + examples/quickstart/fibonacci_local.cpp | 25 +- .../src/parse_command_line_local.cpp | 1 + .../detail/contiguous_index_queue.hpp | 4 +- .../include/hpx/concurrency/spinlock.hpp | 23 +- .../resource_partitioner/partitioner_fwd.hpp | 2 + .../src/detail_partitioner.cpp | 18 + .../src/runtime_configuration.cpp | 2 +- libs/core/schedulers/CMakeLists.txt | 4 + .../include/hpx/modules/schedulers.hpp | 5 +- .../local_priority_queue_scheduler.hpp | 24 +- .../hpx/schedulers/local_queue_scheduler.hpp | 27 +- .../local_workrequesting_scheduler.hpp | 1665 +++++++++++++++++ .../schedulers/lockfree_queue_backends.hpp | 2 +- .../shared_priority_queue_scheduler.hpp | 15 +- .../static_priority_queue_scheduler.hpp | 2 +- .../hpx/schedulers/static_queue_scheduler.hpp | 6 +- .../include/hpx/schedulers/thread_queue.hpp | 60 +- .../hpx/schedulers/thread_queue_mc.hpp | 8 +- .../hpx/synchronization/channel_mpmc.hpp | 25 +- .../hpx/synchronization/channel_mpsc.hpp | 22 +- .../hpx/synchronization/channel_spsc.hpp | 16 +- .../detail/scoped_background_timer.hpp | 7 +- .../thread_pools/scheduled_thread_pool.hpp | 9 +- .../scheduled_thread_pool_impl.hpp | 9 +- .../hpx/thread_pools/scheduling_loop.hpp | 81 +- .../src/scheduled_thread_pool.cpp | 12 + .../hpx/threading_base/scheduler_base.hpp | 32 +- .../hpx/threading_base/thread_pool_base.hpp | 14 +- .../threading_base/src/scheduler_base.cpp | 49 +- .../include/hpx/modules/threadmanager.hpp | 19 +- libs/core/threadmanager/src/threadmanager.cpp | 93 +- .../src/parse_command_line.cpp | 1 + 39 files changed, 2057 insertions(+), 255 deletions(-) create mode 100644 libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp diff --git a/.circleci/config.yml b/.circleci/config.yml index 5376ba606823..e11f760c9cda 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -233,7 +233,8 @@ jobs: -DHPX_WITH_CHECK_MODULE_DEPENDENCIES=On \ -DCMAKE_EXPORT_COMPILE_COMMANDS=On \ -DHPX_WITH_DOCUMENTATION=On \ - -DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" + -DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" \ + -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo - persist_to_workspace: root: /hpx paths: diff --git a/.clang-format b/.clang-format index fad37f8c3712..2cacb02becd4 100644 --- a/.clang-format +++ b/.clang-format @@ -1,5 +1,5 @@ # Copyright (c) 2016 Thomas Heller -# Copyright (c) 2016-2018 Hartmut Kaiser +# Copyright (c) 2016-2019 Hartmut Kaiser # # SPDX-License-Identifier: BSL-1.0 # Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -105,7 +105,7 @@ PenaltyExcessCharacter: 1000000 PenaltyReturnTypeOnItsOwnLine: 200 PointerAlignment: Left ReflowComments: false -SortIncludes: true +SortIncludes: true SpaceAfterCStyleCast: true SpaceAfterTemplateKeyword: true SpaceBeforeAssignmentOperators: true diff --git a/.jenkins/cscs-perftests/launch_perftests.sh b/.jenkins/cscs-perftests/launch_perftests.sh index e0deade7f7b2..738e78aa8bb2 100644 --- a/.jenkins/cscs-perftests/launch_perftests.sh +++ b/.jenkins/cscs-perftests/launch_perftests.sh @@ -15,13 +15,13 @@ hpx_targets=( hpx_test_options=( "--hpx:ini=hpx.thread_queue.init_threads_count=100 \ --hpx:threads=4 --vector_size=10000 --work_delay=1 \ - --chunk_size=0 --test_count=5000" + --chunk_size=0 --test_count=5000 --hpx:queuing=local-workrequesting-fifo" "--hpx:ini=hpx.thread_queue.init_threads_count=100 \ --hpx:queuing=local-priority --hpx:threads=4 --test-all \ - --repetitions=100 --futures=500000" + --repetitions=100 --futures=500000 --hpx:queuing=local-workrequesting-fifo" "--hpx:ini=hpx.thread_queue.init_threads_count=100 \ --vector_size=1048576 --hpx:threads=4 --iterations=5000 \ - --warmup_iterations=500") + --warmup_iterations=500 --hpx:queuing=local-workrequesting-fifo") # Build binaries for performance tests ${perftests_dir}/driver.py -v -l $logfile build -b release -o build \ diff --git a/.jenkins/lsu/env-clang-13.sh b/.jenkins/lsu/env-clang-13.sh index 890c8c3c1a8c..dcd762493404 100644 --- a/.jenkins/lsu/env-clang-13.sh +++ b/.jenkins/lsu/env-clang-13.sh @@ -23,8 +23,9 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" -configure_extra_options+=" -DLCI_SERVER=ibv" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo" +configure_extra_options+=" -DLCI_SERVER=ibv" # Make sure HWLOC does not report 'cores'. This is purely an option to enable # testing the topology code under conditions close to those on FreeBSD. diff --git a/.jenkins/lsu/env-gcc-11.sh b/.jenkins/lsu/env-gcc-11.sh index 36c7f1c3a7a3..aeb82b4ac0ff 100644 --- a/.jenkins/lsu/env-gcc-11.sh +++ b/.jenkins/lsu/env-gcc-11.sh @@ -14,6 +14,7 @@ module load openmpi export HPXRUN_RUNWRAPPER=srun export CXX_STD="20" +configure_extra_options+=" -DCMAKE_BUILD_TYPE=${build_type}" configure_extra_options+=" -DHPX_WITH_CXX_STANDARD=${CXX_STD}" configure_extra_options+=" -DHPX_WITH_MALLOC=system" configure_extra_options+=" -DHPX_WITH_FETCH_ASIO=ON" @@ -22,4 +23,5 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo" configure_extra_options+=" -DLCI_SERVER=ibv" diff --git a/CMakeLists.txt b/CMakeLists.txt index 1b61bdd1c4ac..4c68b2888c35 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1388,6 +1388,13 @@ hpx_option( ADVANCED ) +hpx_option( + HPX_WITH_TESTS_COMMAND_LINE STRING + "Add given command line options to all tests run" "" + CATEGORY "Debugging" + ADVANCED +) + hpx_option( HPX_WITH_TESTS_MAX_THREADS_PER_LOCALITY STRING diff --git a/cmake/HPX_AddTest.cmake b/cmake/HPX_AddTest.cmake index 38cb626c5653..507e9735c03d 100644 --- a/cmake/HPX_AddTest.cmake +++ b/cmake/HPX_AddTest.cmake @@ -66,6 +66,11 @@ function(add_hpx_test category name) ) endif() + # add additional command line arguments to all test executions + if(NOT x${HPX_WITH_TESTS_COMMAND_LINE} STREQUAL x"") + set(args ${args} "${HPX_WITH_TESTS_COMMAND_LINE}") + endif() + if(${HPX_WITH_PARALLEL_TESTS_BIND_NONE} AND NOT run_serial AND NOT "${name}_RUNWRAPPER" diff --git a/examples/quickstart/fibonacci_local.cpp b/examples/quickstart/fibonacci_local.cpp index f1beb42307f9..54fd27f9fff1 100644 --- a/examples/quickstart/fibonacci_local.cpp +++ b/examples/quickstart/fibonacci_local.cpp @@ -18,6 +18,9 @@ #include #include +#include +#include +#include /////////////////////////////////////////////////////////////////////////////// //[fibonacci @@ -26,15 +29,10 @@ std::uint64_t fibonacci(std::uint64_t n) if (n < 2) return n; - // Invoking the Fibonacci algorithm twice is inefficient. - // However, we intentionally demonstrate it this way to create some - // heavy workload. - hpx::future n1 = hpx::async(fibonacci, n - 1); - hpx::future n2 = hpx::async(fibonacci, n - 2); + std::uint64_t n2 = fibonacci(n - 2); - return n1.get() + - n2.get(); // wait for the Futures to return their values + return n1.get() + n2; // wait for the Future to return their values } //fibonacci] @@ -42,6 +40,9 @@ std::uint64_t fibonacci(std::uint64_t n) //[hpx_main int hpx_main(hpx::program_options::variables_map& vm) { + hpx::threads::add_scheduler_mode( + hpx::threads::policies::scheduler_mode::fast_idle_mode); + // extract command line argument, i.e. fib(N) std::uint64_t n = vm["n-value"].as(); @@ -67,9 +68,13 @@ int main(int argc, char* argv[]) hpx::program_options::options_description desc_commandline( "Usage: " HPX_APPLICATION_STRING " [options]"); - desc_commandline.add_options()("n-value", - hpx::program_options::value()->default_value(10), - "n value for the Fibonacci function"); + // clang-format off + desc_commandline.add_options() + ("n-value", + hpx::program_options::value()->default_value(10), + "n value for the Fibonacci function") + ; + // clang-format on // Initialize and run HPX hpx::local::init_params init_args; diff --git a/libs/core/command_line_handling_local/src/parse_command_line_local.cpp b/libs/core/command_line_handling_local/src/parse_command_line_local.cpp index 51c3b537dd0c..f6d6c30ca282 100644 --- a/libs/core/command_line_handling_local/src/parse_command_line_local.cpp +++ b/libs/core/command_line_handling_local/src/parse_command_line_local.cpp @@ -363,6 +363,7 @@ namespace hpx { namespace local { namespace detail { ("hpx:queuing", value(), "the queue scheduling policy to use, options are " "'local', 'local-priority-fifo','local-priority-lifo', " + "'local-workrequesting-fifo', 'local-workrequesting-lifo' " "'abp-priority-fifo', 'abp-priority-lifo', 'static', and " "'static-priority' (default: 'local-priority'; " "all option values can be abbreviated)") diff --git a/libs/core/concurrency/include/hpx/concurrency/detail/contiguous_index_queue.hpp b/libs/core/concurrency/include/hpx/concurrency/detail/contiguous_index_queue.hpp index f404f6e2a6d7..efb1b537f5da 100644 --- a/libs/core/concurrency/include/hpx/concurrency/detail/contiguous_index_queue.hpp +++ b/libs/core/concurrency/include/hpx/concurrency/detail/contiguous_index_queue.hpp @@ -139,7 +139,7 @@ namespace hpx { namespace concurrency { namespace detail { { if (expected_range.empty()) { - return hpx::nullopt; + return hpx::optional(hpx::nullopt); } index = expected_range.first; @@ -166,7 +166,7 @@ namespace hpx { namespace concurrency { namespace detail { { if (expected_range.empty()) { - return hpx::nullopt; + return hpx::optional(hpx::nullopt); } desired_range = expected_range.decrement_last(); diff --git a/libs/core/concurrency/include/hpx/concurrency/spinlock.hpp b/libs/core/concurrency/include/hpx/concurrency/spinlock.hpp index d353d417c345..d99dae9761e0 100644 --- a/libs/core/concurrency/include/hpx/concurrency/spinlock.hpp +++ b/libs/core/concurrency/include/hpx/concurrency/spinlock.hpp @@ -13,9 +13,11 @@ #include #include +#include + namespace hpx { namespace util { - /// Lockable spinlock class + // Lockable spinlock class struct spinlock { public: @@ -25,9 +27,14 @@ namespace hpx { namespace util { hpx::util::detail::spinlock m; public: - spinlock(char const* /*desc*/ = nullptr) + spinlock() noexcept { - HPX_ITT_SYNC_CREATE(this, "util::spinlock", ""); + HPX_ITT_SYNC_CREATE(this, "util::spinlock", nullptr); + } + + explicit spinlock(char const* desc) noexcept + { + HPX_ITT_SYNC_CREATE(this, "util::spinlock", desc); } ~spinlock() @@ -35,7 +42,8 @@ namespace hpx { namespace util { HPX_ITT_SYNC_DESTROY(this); } - void lock() noexcept + void lock() noexcept( + noexcept(util::register_lock(std::declval()))) { HPX_ITT_SYNC_PREPARE(this); m.lock(); @@ -43,7 +51,8 @@ namespace hpx { namespace util { util::register_lock(this); } - bool try_lock() noexcept + bool try_lock() noexcept( + noexcept(util::register_lock(std::declval()))) { HPX_ITT_SYNC_PREPARE(this); if (m.try_lock()) @@ -56,7 +65,8 @@ namespace hpx { namespace util { return false; } - void unlock() noexcept + void unlock() noexcept( + noexcept(util::unregister_lock(std::declval()))) { HPX_ITT_SYNC_RELEASING(this); m.unlock(); @@ -64,5 +74,4 @@ namespace hpx { namespace util { util::unregister_lock(this); } }; - }} // namespace hpx::util diff --git a/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp b/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp index 741606bf9561..62f3d0bc5bdf 100644 --- a/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp +++ b/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp @@ -71,5 +71,7 @@ namespace hpx { namespace resource { abp_priority_fifo = 5, abp_priority_lifo = 6, shared_priority = 7, + local_workrequesting_fifo = 8, + local_workrequesting_lifo = 9, }; }} // namespace hpx::resource diff --git a/libs/core/resource_partitioner/src/detail_partitioner.cpp b/libs/core/resource_partitioner/src/detail_partitioner.cpp index eb53047fe97b..e91315e35323 100644 --- a/libs/core/resource_partitioner/src/detail_partitioner.cpp +++ b/libs/core/resource_partitioner/src/detail_partitioner.cpp @@ -134,6 +134,12 @@ namespace hpx { namespace resource { namespace detail { case resource::local_priority_lifo: sched = "local_priority_lifo"; break; + case resource::local_workrequesting_fifo: + sched = "local_workrequesting_fifo"; + break; + case resource::local_workrequesting_lifo: + sched = "local_workrequesting_lifo"; + break; case resource::static_: sched = "static"; break; @@ -451,6 +457,18 @@ namespace hpx { namespace resource { namespace detail { { default_scheduler = scheduling_policy::local_priority_lifo; } + else if (0 == + std::string("local-workrequesting-fifo") + .find(default_scheduler_str)) + { + default_scheduler = scheduling_policy::local_workrequesting_fifo; + } + else if (0 == + std::string("local-workrequesting-lifo") + .find(default_scheduler_str)) + { + default_scheduler = scheduling_policy::local_workrequesting_lifo; + } else if (0 == std::string("static").find(default_scheduler_str)) { default_scheduler = scheduling_policy::static_; diff --git a/libs/core/runtime_configuration/src/runtime_configuration.cpp b/libs/core/runtime_configuration/src/runtime_configuration.cpp index 87536f222ae5..af700f178f65 100644 --- a/libs/core/runtime_configuration/src/runtime_configuration.cpp +++ b/libs/core/runtime_configuration/src/runtime_configuration.cpp @@ -156,7 +156,7 @@ namespace hpx { namespace util { "localities = 1", "first_pu = 0", "runtime_mode = console", - "scheduler = local-priority-fifo", + "scheduler = local-workrequesting-fifo", "affinity = core", "pu_step = 1", "pu_offset = 0", diff --git a/libs/core/schedulers/CMakeLists.txt b/libs/core/schedulers/CMakeLists.txt index 771950be877a..1488f1d493a3 100644 --- a/libs/core/schedulers/CMakeLists.txt +++ b/libs/core/schedulers/CMakeLists.txt @@ -10,6 +10,7 @@ set(schedulers_headers hpx/schedulers/deadlock_detection.hpp hpx/schedulers/local_priority_queue_scheduler.hpp hpx/schedulers/local_queue_scheduler.hpp + hpx/schedulers/local_workrequesting_scheduler.hpp hpx/schedulers/lockfree_queue_backends.hpp hpx/schedulers/maintain_queue_wait_times.hpp hpx/schedulers/queue_helpers.hpp @@ -51,12 +52,15 @@ add_hpx_module( COMPAT_HEADERS ${schedulers_compat_headers} SOURCES ${schedulers_sources} MODULE_DEPENDENCIES + hpx_affinity + hpx_concurrency hpx_config hpx_assertion hpx_errors hpx_format hpx_functional hpx_logging + hpx_synchronization hpx_threading_base CMAKE_SUBDIRS examples tests ) diff --git a/libs/core/schedulers/include/hpx/modules/schedulers.hpp b/libs/core/schedulers/include/hpx/modules/schedulers.hpp index 2c2fae503b5c..c3186fabdff3 100644 --- a/libs/core/schedulers/include/hpx/modules/schedulers.hpp +++ b/libs/core/schedulers/include/hpx/modules/schedulers.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -6,10 +6,9 @@ #pragma once -#include - #include #include +#include #include #include #include diff --git a/libs/core/schedulers/include/hpx/schedulers/local_priority_queue_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_priority_queue_scheduler.hpp index 88baf0c78ec3..2f3a12bc319d 100644 --- a/libs/core/schedulers/include/hpx/schedulers/local_priority_queue_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/local_priority_queue_scheduler.hpp @@ -120,7 +120,7 @@ namespace hpx { namespace threads { namespace policies { , affinity_data_(init.affinity_data_) , num_queues_(init.num_queues_) , num_high_priority_queues_(init.num_high_priority_queues_) - , low_priority_queue_(init.num_queues_ - 1, thread_queue_init_) + , low_priority_queue_(thread_queue_init_) , queues_(num_queues_) , high_priority_queues_(num_queues_) , victim_threads_(num_queues_) @@ -131,7 +131,7 @@ namespace hpx { namespace threads { namespace policies { for (std::size_t i = 0; i != num_queues_; ++i) { queues_[i].data_ = - new thread_queue_type(i, thread_queue_init_); + new thread_queue_type(thread_queue_init_); } HPX_ASSERT(num_high_priority_queues_ != 0); @@ -139,7 +139,7 @@ namespace hpx { namespace threads { namespace policies { for (std::size_t i = 0; i != num_high_priority_queues_; ++i) { high_priority_queues_[i].data_ = - new thread_queue_type(i, thread_queue_init_); + new thread_queue_type(thread_queue_init_); } for (std::size_t i = num_high_priority_queues_; i != num_queues_; ++i) @@ -524,8 +524,7 @@ namespace hpx { namespace threads { namespace policies { num_thread %= num_queues_; } - std::unique_lock l; - num_thread = select_active_pu(l, num_thread); + num_thread = select_active_pu(num_thread); data.schedulehint.mode = thread_schedule_hint_mode::thread; data.schedulehint.hint = static_cast(num_thread); @@ -707,8 +706,7 @@ namespace hpx { namespace threads { namespace policies { num_thread %= num_queues_; } - std::unique_lock l; - num_thread = select_active_pu(l, num_thread, allow_fallback); + num_thread = select_active_pu(num_thread, allow_fallback); auto* thrdptr = get_thread_id_data(thrd); (void) thrdptr; @@ -783,15 +781,15 @@ namespace hpx { namespace threads { namespace policies { num_thread %= num_queues_; } - std::unique_lock l; - num_thread = select_active_pu(l, num_thread, allow_fallback); + num_thread = select_active_pu(num_thread, allow_fallback); if (priority == thread_priority::high_recursive || priority == thread_priority::high || priority == thread_priority::boost) { std::size_t num = num_thread % num_high_priority_queues_; - high_priority_queues_[num].data_->schedule_thread(thrd, true); + high_priority_queues_[num].data_->schedule_thread( + HPX_MOVE(thrd), true); } else if (priority == thread_priority::low) { @@ -1116,7 +1114,7 @@ namespace hpx { namespace threads { namespace policies { /// has to be terminated (i.e. no more work has to be done). bool wait_or_add_new(std::size_t num_thread, bool running, std::int64_t& idle_loop_count, bool enable_stealing, - std::size_t& added) override + std::size_t& added, thread_id_ref_type* = nullptr) override { bool result = true; @@ -1243,12 +1241,12 @@ namespace hpx { namespace threads { namespace policies { if (nullptr == queues_[num_thread].data_) { queues_[num_thread].data_ = - new thread_queue_type(num_thread, thread_queue_init_); + new thread_queue_type(thread_queue_init_); if (num_thread < num_high_priority_queues_) { high_priority_queues_[num_thread].data_ = - new thread_queue_type(num_thread, thread_queue_init_); + new thread_queue_type(thread_queue_init_); } } diff --git a/libs/core/schedulers/include/hpx/schedulers/local_queue_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_queue_scheduler.hpp index b1e65e978704..57917658e3cb 100644 --- a/libs/core/schedulers/include/hpx/schedulers/local_queue_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/local_queue_scheduler.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2017 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // Copyright (c) 2011 Bryce Lelbach // // SPDX-License-Identifier: BSL-1.0 @@ -100,10 +100,9 @@ namespace hpx { namespace threads { namespace policies { , queues_(init.num_queues_) , curr_queue_(0) , affinity_data_(init.affinity_data_) - , #if !defined( \ HPX_NATIVE_MIC) // we know that the MIC has one NUMA domain only) - steals_in_numa_domain_() + , steals_in_numa_domain_() , steals_outside_numa_domain_() #endif , numa_domain_masks_( @@ -120,11 +119,11 @@ namespace hpx { namespace threads { namespace policies { { HPX_ASSERT(init.num_queues_ != 0); for (std::size_t i = 0; i < init.num_queues_; ++i) - queues_[i] = new thread_queue_type(i, thread_queue_init_); + queues_[i] = new thread_queue_type(thread_queue_init_); } } - virtual ~local_queue_scheduler() + ~local_queue_scheduler() { for (std::size_t i = 0; i != queues_.size(); ++i) delete queues_[i]; @@ -308,8 +307,7 @@ namespace hpx { namespace threads { namespace policies { num_thread %= queue_size; } - std::unique_lock l; - num_thread = select_active_pu(l, num_thread); + num_thread = select_active_pu(num_thread); HPX_ASSERT(num_thread < queue_size); queues_[num_thread]->create_thread(data, id, ec); @@ -328,7 +326,7 @@ namespace hpx { namespace threads { namespace policies { /// Return the next thread to be executed, return false if none is /// available - virtual bool get_next_thread(std::size_t num_thread, bool running, + bool get_next_thread(std::size_t num_thread, bool running, threads::thread_id_ref_type& thrd, bool /*enable_stealing*/) override { @@ -482,8 +480,7 @@ namespace hpx { namespace threads { namespace policies { num_thread %= queue_size; } - std::unique_lock l; - num_thread = select_active_pu(l, num_thread, allow_fallback); + num_thread = select_active_pu(num_thread, allow_fallback); HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); @@ -525,8 +522,7 @@ namespace hpx { namespace threads { namespace policies { num_thread %= queue_size; } - std::unique_lock l; - num_thread = select_active_pu(l, num_thread, allow_fallback); + num_thread = select_active_pu(num_thread, allow_fallback); HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); @@ -703,9 +699,9 @@ namespace hpx { namespace threads { namespace policies { /// manager to allow for maintenance tasks to be executed in the /// scheduler. Returns true if the OS thread calling this function /// has to be terminated (i.e. no more work has to be done). - virtual bool wait_or_add_new(std::size_t num_thread, bool running, + bool wait_or_add_new(std::size_t num_thread, bool running, std::int64_t& idle_loop_count, bool /* enable_stealing */, - std::size_t& added) override + std::size_t& added, thread_id_ref_type* = nullptr) override { std::size_t queues_size = queues_.size(); HPX_ASSERT(num_thread < queues_.size()); @@ -875,8 +871,7 @@ namespace hpx { namespace threads { namespace policies { if (nullptr == queues_[num_thread]) { - queues_[num_thread] = - new thread_queue_type(num_thread, thread_queue_init_); + queues_[num_thread] = new thread_queue_type(thread_queue_init_); } queues_[num_thread]->on_start_thread(num_thread); diff --git a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp new file mode 100644 index 000000000000..2050e8e931f9 --- /dev/null +++ b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp @@ -0,0 +1,1665 @@ +// Copyright (c) 2007-2022 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +/////////////////////////////////////////////////////////////////////////////// +namespace hpx { namespace threads { namespace policies { + + /////////////////////////////////////////////////////////////////////////// +#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT) + using default_local_workrequesting_scheduler_terminated_queue = + lockfree_lifo; +#else + using default_local_workrequesting_scheduler_terminated_queue = + lockfree_fifo; +#endif + + /////////////////////////////////////////////////////////////////////////// + // The local_workrequesting_scheduler maintains exactly one queue of work + // items (threads) per OS thread, where this OS thread pulls its next work + // from. + template + class HPX_CORE_EXPORT local_workrequesting_scheduler : public scheduler_base + { + public: + using has_periodic_maintenance = std::false_type; + + using thread_queue_type = thread_queue; + + public: + struct init_parameter + { + init_parameter(std::size_t num_queues, + detail::affinity_data const& affinity_data, + std::size_t num_high_priority_queues = std::size_t(-1), + thread_queue_init_parameters const& thread_queue_init = {}, + char const* description = "local_workrequesting_scheduler") + : num_queues_(num_queues) + , num_high_priority_queues_( + num_high_priority_queues == std::size_t(-1) ? + num_queues : + num_high_priority_queues) + , thread_queue_init_(thread_queue_init) + , affinity_data_(affinity_data) + , description_(description) + { + } + + init_parameter(std::size_t num_queues, + detail::affinity_data const& affinity_data, + char const* description) + : num_queues_(num_queues) + , num_high_priority_queues_(num_queues) + , thread_queue_init_() + , affinity_data_(affinity_data) + , description_(description) + { + } + + std::size_t num_queues_; + std::size_t num_high_priority_queues_; + thread_queue_init_parameters thread_queue_init_; + detail::affinity_data const& affinity_data_; + char const* description_; + }; + using init_parameter_type = init_parameter; + + private: + //////////////////////////////////////////////////////////////////////// + struct task_data + { + // core number this task data originated from + std::uint16_t num_thread_; + std::vector tasks_; + }; + + //////////////////////////////////////////////////////////////////////// + struct steal_request + { + enum class state : std::uint16_t + { + working = 0, + idle = 2, + failed = 4 + }; + + steal_request() + : channel_(nullptr) + , victims_() + , num_thread_(static_cast(-1)) + , attempt_(0) + , state_(state::failed) + , stealhalf_(false) + { + } + + steal_request(std::size_t num_thread, + lcos::local::channel_spsc* channel, + mask_cref_type victims, bool idle, bool stealhalf) + : channel_(channel) + , victims_(victims) + , num_thread_(static_cast(num_thread)) + , attempt_(0) + , state_(idle ? state::idle : state::working) + , stealhalf_(stealhalf) + { + } + + lcos::local::channel_spsc* channel_; + mask_type victims_; + std::uint16_t num_thread_; + std::uint16_t attempt_; + state state_; + bool stealhalf_; // true ? attempt steal-half : attempt steal-one + }; + + //////////////////////////////////////////////////////////////////////// + struct scheduler_data + { + scheduler_data() noexcept + : requested_(0) + , num_thread_(static_cast(-1)) +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + , last_victim_(static_cast(-1)) +#endif + , victims_() + , queue_(nullptr) + , high_priority_queue_(nullptr) + , requests_() + , tasks_() + , stealhalf_(false) + , num_recent_steals_(0) + , num_recent_tasks_executed_(0) + , steal_requests_sent_(0) + , steal_requests_received_(0) + , steal_requests_discarded_(0) + { + } + + ~scheduler_data() = default; + + // interval at which we re-decide on whether we should steal just + // one task or half of what's available + constexpr static std::uint32_t const num_steal_adaptive_interval_ = + 25; + + void init(std::size_t num_thread, std::size_t size, + thread_queue_init_parameters const& queue_init, + bool need_high_priority_queue) + { + if (queue_ == nullptr) + { + num_thread_ = static_cast(num_thread); + + // initialize queues + queue_.reset(new thread_queue_type(queue_init)); + if (need_high_priority_queue) + { + high_priority_queue_.reset( + new thread_queue_type(queue_init)); + } + + // initialize channels needed for work stealing + requests_.reset( + new lcos::local::base_channel_mpsc( + size)); + tasks_.reset(new lcos::local::channel_spsc(1)); + } + } + + // the number of outstanding steal requests + std::uint16_t requested_; + + // core number this scheduler data instance refers to + std::uint16_t num_thread_; + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + // core number the last stolen tasks originated from + std::uint16_t last_victim_; +#endif + // initial affinity mask for this core + mask_type victims_; + + // queues for threads scheduled on this core + std::unique_ptr queue_; + std::unique_ptr high_priority_queue_; + + // channel for posting steal requests to this core (use + // hpx::util::spinlock) + std::unique_ptr> + requests_; + + // one channel per steal request per core + std::unique_ptr> tasks_; + + // adaptive stealing + bool stealhalf_; + std::uint32_t num_recent_steals_; + std::uint32_t num_recent_tasks_executed_; + + std::uint32_t steal_requests_sent_; + std::uint32_t steal_requests_received_; + std::uint32_t steal_requests_discarded_; + }; + + public: + static unsigned int random_seed() noexcept + { + static std::random_device rd; + return rd(); + } + + explicit local_workrequesting_scheduler(init_parameter_type const& init, + bool deferred_initialization = true) + : scheduler_base(init.num_queues_, init.description_, + init.thread_queue_init_, + policies::scheduler_mode::fast_idle_mode) + , data_(init.num_queues_) + , low_priority_queue_(thread_queue_init_) + , curr_queue_(0) + , gen_(random_seed()) + , affinity_data_(init.affinity_data_) + , num_queues_(init.num_queues_) + , num_high_priority_queues_(init.num_high_priority_queues_) + { + HPX_ASSERT(init.num_queues_ != 0); + HPX_ASSERT(num_high_priority_queues_ != 0); + HPX_ASSERT(num_high_priority_queues_ <= num_queues_); + + if (!deferred_initialization) + { + for (std::size_t i = 0; i != init.num_queues_; ++i) + { + data_[i].data_.init(i, init.num_queues_, + this->thread_queue_init_, + i < num_high_priority_queues_); + } + } + } + + ~local_workrequesting_scheduler() override = default; + + static std::string get_scheduler_name() + { + return "local_workrequesting_scheduler"; + } + +#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES + std::uint64_t get_creation_time(bool reset) override + { + std::uint64_t time = 0; + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + time += d.high_priority_queue_->get_creation_time(reset); + } + time += d.queue_->get_creation_time(reset); + } + + return time + low_priority_queue_.get_creation_time(reset); + } + + std::uint64_t get_cleanup_time(bool reset) override + { + std::uint64_t time = 0; + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + time += d.high_priority_queue_->get_cleanup_time(reset); + } + time += d.queue_->get_cleanup_time(reset); + } + + return time + low_priority_queue_.get_cleanup_time(reset); + } +#endif + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + std::int64_t get_num_pending_misses( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_num_pending_misses( + reset); + } + count += d.queue_->get_num_pending_misses(reset); + } + + return count + + low_priority_queue_.get_num_pending_misses(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_num_pending_misses(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_pending_misses(reset); + } + return count + d.queue_->get_num_pending_misses(reset); + } + + std::int64_t get_num_pending_accesses( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_pending_accesses( + reset); + } + count += d.queue_->get_num_pending_accesses(reset); + } + + return count + + low_priority_queue_.get_num_pending_accesses(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_pending_accesses(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_pending_accesses(reset); + } + return count + d.queue_->get_num_pending_accesses(reset); + } + + std::int64_t get_num_stolen_from_pending( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_pending( + reset); + } + count += d.queue_->get_num_stolen_from_pending(reset); + } + + return count + + low_priority_queue_.get_num_stolen_from_pending(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_pending(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_from_pending(reset); + } + return count + d.queue_->get_num_stolen_from_pending(reset); + } + + std::int64_t get_num_stolen_to_pending( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_pending( + reset); + } + count += d.queue_->get_num_stolen_to_pending(reset); + } + + return count + + low_priority_queue_.get_num_stolen_to_pending(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_pending(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_to_pending(reset); + } + return count + d.queue_->get_num_stolen_to_pending(reset); + } + + std::int64_t get_num_stolen_from_staged( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_staged( + reset); + } + count += d.queue_->get_num_stolen_from_staged(reset); + } + + return count + + low_priority_queue_.get_num_stolen_from_staged(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_staged(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_from_staged(reset); + } + return count + d.queue_->get_num_stolen_from_staged(reset); + } + + std::int64_t get_num_stolen_to_staged( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_staged( + reset); + } + count += d.queue_->get_num_stolen_to_staged(reset); + } + + return count + + low_priority_queue_.get_num_stolen_to_staged(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_staged(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_to_staged(reset); + } + return count + d.queue_->get_num_stolen_to_staged(reset); + } +#endif + + /////////////////////////////////////////////////////////////////////// + void abort_all_suspended_threads() override + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + data_[i].data_.queue_->abort_all_suspended_threads(); + } + } + + /////////////////////////////////////////////////////////////////////// + bool cleanup_terminated(bool delete_all) override + { + bool empty = true; + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + empty = d.high_priority_queue_->cleanup_terminated( + delete_all) && + empty; + } + empty = d.queue_->cleanup_terminated(delete_all) && empty; + } + return low_priority_queue_.cleanup_terminated(delete_all) && empty; + } + + bool cleanup_terminated( + std::size_t num_thread, bool delete_all) override + { + auto& d = data_[num_thread].data_; + bool empty = d.queue_->cleanup_terminated(delete_all); + if (!delete_all) + return empty; + + if (num_thread < num_high_priority_queues_) + { + empty = + d.high_priority_queue_->cleanup_terminated(delete_all) && + empty; + } + + if (num_thread == num_queues_ - 1) + { + return low_priority_queue_.cleanup_terminated(delete_all) && + empty; + } + return empty; + } + + /////////////////////////////////////////////////////////////////////// + // create a new thread and schedule it if the initial state is equal to + // pending + void create_thread(thread_init_data& data, thread_id_ref_type* id, + error_code& ec) override + { + // by default we always schedule new threads on our own queue + std::size_t num_thread = + data.schedulehint.mode == thread_schedule_hint_mode::thread ? + data.schedulehint.hint : + std::size_t(-1); + + if (std::size_t(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread); + + data.schedulehint.mode = thread_schedule_hint_mode::thread; + data.schedulehint.hint = static_cast(num_thread); + + // now create the thread + switch (data.priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + if (data.priority == thread_priority::boost) + { + data.priority = thread_priority::normal; + } + + std::size_t num = num_thread; + if (num >= num_high_priority_queues_) + { + num %= num_high_priority_queues_; + } + + // we never stage high priority threads, so there is no need to + // call wait_or_add_new for those. + data_[num].data_.high_priority_queue_->create_thread( + data, id, ec); + break; + } + + case thread_priority::low: + low_priority_queue_.create_thread(data, id, ec); + break; + + case thread_priority::default_: + case thread_priority::normal: + HPX_ASSERT(num_thread < num_queues_); + data_[num_thread].data_.queue_->create_thread(data, id, ec); + break; + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(bad_parameter, + "local_workrequesting_scheduler::create_thread", + "unknown thread priority value (thread_priority::unknown)"); + } + } + } + + // Retrieve the next viable steal request from our channel + bool try_receiving_steal_request( + scheduler_data& d, steal_request& req) noexcept + { + bool ret = d.requests_->get(&req); + while (ret && req.state_ == steal_request::state::failed) + { + // forget the received steal request + --data_[req.num_thread_].data_.requested_; + HPX_ASSERT(data_[req.num_thread_].data_.requested_ == 0); + + // try to retrieve next steal request + ret = d.requests_->get(&req); + } + + // No special treatment for other states + HPX_ASSERT( + (ret && req.state_ != steal_request::state::failed) || !ret); + + return ret; + } + + // Pass steal request on to another worker. + // Returns true if we have handled our own steal request. + bool decline_or_forward_steal_request( + scheduler_data& d, steal_request& req) noexcept + { + HPX_ASSERT(req.attempt_ < num_queues_); + + if (req.num_thread_ == d.num_thread_) + { + // Steal request was either returned by another worker or + // picked up by us. + + if (req.state_ == steal_request::state::idle || + d.queue_->get_pending_queue_length( + std::memory_order_relaxed) > 0) + { + // we have work now, drop this steal request + ++d.steal_requests_discarded_; + --d.requested_; + HPX_ASSERT(d.requested_ == 0); + } + else + { + // Continue circulating the steal request if it makes sense + req.attempt_ = 0; + req.state_ = steal_request::state::idle; + req.victims_ = d.victims_; + + std::size_t victim = next_victim(d, req); + data_[victim].data_.requests_->set(HPX_MOVE(req)); + + ++d.steal_requests_sent_; + } + + return true; + } + + // send this steal request on to the next (random) core + ++req.attempt_; + set(req.victims_, d.num_thread_); // don't ask a core twice + + std::size_t victim = next_victim(d, req); + data_[victim].data_.requests_->set(HPX_MOVE(req)); + + ++d.steal_requests_sent_; + return false; + } + + // decline_or_forward_all_steal_requests is only called when a worker + // has nothing else to do but to relay steal requests, which means the + // worker is idle. + void decline_or_forward_all_steal_requests(scheduler_data& d) noexcept + { + steal_request req; + while (try_receiving_steal_request(d, req)) + { + ++d.steal_requests_received_; + decline_or_forward_steal_request(d, req); + } + } + + // Handle a steal request by sending tasks in return or passing it on to + // another worker. Returns true if the request was satisfied. + bool handle_steal_request( + scheduler_data& d, steal_request& req) noexcept + { + ++d.steal_requests_received_; + + if (req.num_thread_ == d.num_thread_) + { + // got back our own steal request. + HPX_ASSERT(req.state_ != steal_request::state::failed); + + // Defer the decision to decline_steal_request + decline_or_forward_steal_request(d, req); + return false; + } + + // Send tasks from our queue to the requesting core, depending on + // what's requested, either one task or half of the available tasks + std::size_t max_num_to_steal = 1; + if (req.stealhalf_) + { + max_num_to_steal = d.queue_->get_pending_queue_length( + std::memory_order_relaxed) / + 2; + } + + if (max_num_to_steal != 0) + { + task_data thrds; + thrds.tasks_.reserve(max_num_to_steal); + + thread_id_ref_type thrd; + while (max_num_to_steal-- != 0 && + d.queue_->get_next_thread(thrd, false, true)) + { + d.queue_->increment_num_stolen_from_pending(); + thrds.tasks_.push_back(HPX_MOVE(thrd)); + } + + // we are ready to send at least one task + if (!thrds.tasks_.empty()) + { + // send these tasks to the core that has sent the steal + // request + thrds.num_thread_ = d.num_thread_; + req.channel_->set(HPX_MOVE(thrds)); + + // wake the thread up so that it can pick up the stolen + // tasks + do_some_work(req.num_thread_); + + return true; + } + } + + // There's nothing we can do with this steal request except pass + // it on to a different worker + decline_or_forward_steal_request(d, req); + return false; + } + + // Return the next thread to be executed, return false if none is + // available + bool get_next_thread(std::size_t num_thread, bool running, + thread_id_ref_type& thrd, bool enable_stealing) override + { + HPX_ASSERT(num_thread < num_queues_); + + auto& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + bool result = d.high_priority_queue_->get_next_thread(thrd); + + d.high_priority_queue_->increment_num_pending_accesses(); + if (result) + { + ++d.num_recent_tasks_executed_; + return true; + } + d.high_priority_queue_->increment_num_pending_misses(); + } + + bool result = d.queue_->get_next_thread(thrd); + + d.queue_->increment_num_pending_accesses(); + if (enable_stealing && result) + { + // We found a task to run, however before running it we handle + // steal requests (assuming that that there is more work left + // that could be used to satisfy steal requests). + + steal_request req; + while (try_receiving_steal_request(d, req)) + { + if (!handle_steal_request(d, req)) + break; + } + + ++d.num_recent_tasks_executed_; + return true; + } + d.queue_->increment_num_pending_misses(); + + // Give up, we should have work to convert. + if (d.queue_->get_staged_queue_length(std::memory_order_relaxed) != + 0 || + !running) + { + return false; + } + + if (low_priority_queue_.get_next_thread(thrd)) + { + ++d.num_recent_tasks_executed_; + return true; + } + + return false; + } + + // Schedule the passed thread + void schedule_thread(thread_id_ref_type thrd, + threads::thread_schedule_hint schedulehint, + bool allow_fallback = false, + thread_priority priority = thread_priority::normal) override + { + std::size_t num_thread = std::size_t(-1); + if (schedulehint.mode == thread_schedule_hint_mode::thread) + { + num_thread = schedulehint.hint; + } + else + { + allow_fallback = false; + } + + if (std::size_t(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread, allow_fallback); + + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); + HPX_ASSERT(num_thread < num_queues_); + + switch (priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + std::size_t num = num_thread; + if (num >= num_high_priority_queues_) + { + num %= num_high_priority_queues_; + } + + data_[num].data_.high_priority_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + + case thread_priority::low: + low_priority_queue_.schedule_thread(HPX_MOVE(thrd)); + break; + + case thread_priority::default_: + case thread_priority::normal: + data_[num_thread].data_.queue_->schedule_thread(HPX_MOVE(thrd)); + break; + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(bad_parameter, + "local_workrequesting_scheduler::schedule_thread", + "unknown thread priority value (thread_priority::unknown)"); + } + } + } + + void schedule_thread_last(thread_id_ref_type thrd, + threads::thread_schedule_hint schedulehint, + bool allow_fallback = false, + thread_priority priority = thread_priority::normal) override + { + std::size_t num_thread = std::size_t(-1); + if (schedulehint.mode == thread_schedule_hint_mode::thread) + { + num_thread = schedulehint.hint; + } + else + { + allow_fallback = false; + } + + if (std::size_t(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread, allow_fallback); + + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); + HPX_ASSERT(num_thread < num_queues_); + + switch (priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + std::size_t num = num_thread; + if (num >= num_high_priority_queues_) + { + num %= num_high_priority_queues_; + } + + data_[num].data_.high_priority_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + + case thread_priority::low: + low_priority_queue_.schedule_thread(HPX_MOVE(thrd), true); + break; + + default: + case thread_priority::default_: + case thread_priority::normal: + data_[num_thread].data_.queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + } + + /// Destroy the passed thread as it has been terminated + void destroy_thread(threads::thread_data* thrd) override + { + HPX_ASSERT(thrd->get_scheduler_base() == this); + thrd->get_queue().destroy_thread(thrd); + } + + /////////////////////////////////////////////////////////////////////// + // This returns the current length of the queues (work items and new items) + std::int64_t get_queue_length( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return queue length of one specific queue. + std::int64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + auto const& d = data_[num_thread].data_; + + if (num_thread < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_queue_length(); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_queue_length(); + } + return count + d.queue_->get_queue_length(); + } + + // Cumulative queue lengths of all queues. + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_queue_length(); + } + count += d.queue_->get_queue_length(); + } + return count + low_priority_queue_.get_queue_length(); + } + + /////////////////////////////////////////////////////////////////////// + // Queries the current thread count of the queues. + std::int64_t get_thread_count( + thread_schedule_state state = thread_schedule_state::unknown, + thread_priority priority = thread_priority::default_, + std::size_t num_thread = std::size_t(-1), + bool /* reset */ = false) const override + { + // Return thread count of one specific queue. + std::int64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + switch (priority) + { + case thread_priority::default_: + { + if (num_thread < num_high_priority_queues_) + { + count = d.high_priority_queue_->get_thread_count(state); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_thread_count(state); + } + return count + d.queue_->get_thread_count(state); + } + + case thread_priority::low: + { + if (num_queues_ - 1 == num_thread) + return low_priority_queue_.get_thread_count(state); + break; + } + + case thread_priority::normal: + return d.queue_->get_thread_count(state); + + case thread_priority::boost: + case thread_priority::high: + case thread_priority::high_recursive: + { + if (num_thread < num_high_priority_queues_) + { + return d.high_priority_queue_->get_thread_count(state); + } + break; + } + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(bad_parameter, + "local_workrequesting_scheduler::get_thread_count", + "unknown thread priority value " + "(thread_priority::unknown)"); + return 0; + } + } + return 0; + } + + // Return the cumulative count for all queues. + switch (priority) + { + case thread_priority::default_: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_thread_count(state); + } + count += d.queue_->get_thread_count(state); + } + count += low_priority_queue_.get_thread_count(state); + break; + } + + case thread_priority::low: + return low_priority_queue_.get_thread_count(state); + + case thread_priority::normal: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + count += data_[i].data_.queue_->get_thread_count(state); + } + break; + } + + case thread_priority::boost: + case thread_priority::high: + case thread_priority::high_recursive: + { + for (std::size_t i = 0; i != num_high_priority_queues_; ++i) + { + count += + data_[i].data_.high_priority_queue_->get_thread_count( + state); + } + break; + } + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(bad_parameter, + "local_workrequesting_scheduler::get_thread_count", + "unknown thread priority value " + "(thread_priority::unknown)"); + return 0; + } + } + return count; + } + + // Queries whether a given core is idle + bool is_core_idle(std::size_t num_thread) const override + { + if (num_thread < num_queues_ && + data_[num_thread].data_.queue_->get_queue_length() != 0) + { + return false; + } + if (num_thread < num_high_priority_queues_ && + data_[num_thread] + .data_.high_priority_queue_->get_queue_length() != 0) + { + return false; + } + return true; + } + + /////////////////////////////////////////////////////////////////////// + // Enumerate matching threads from all queues + bool enumerate_threads(hpx::function const& f, + thread_schedule_state state = + thread_schedule_state::unknown) const override + { + bool result = true; + for (std::size_t i = 0; i != num_high_priority_queues_; ++i) + { + result = result && + data_[i].data_.high_priority_queue_->enumerate_threads( + f, state); + } + + result = result && low_priority_queue_.enumerate_threads(f, state); + + for (std::size_t i = 0; i != num_queues_; ++i) + { + result = result && + data_[i].data_.queue_->enumerate_threads(f, state); + } + return result; + } + +#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME + /////////////////////////////////////////////////////////////////////// + // Queries the current average thread wait time of the queues. + std::int64_t get_average_thread_wait_time( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return average thread wait time of one specific queue. + std::uint64_t wait_time = 0; + std::uint64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time = + d.high_priority_queue_->get_average_thread_wait_time(); + ++count; + } + + if (num_thread == num_queues_ - 1) + { + wait_time += + low_priority_queue_.get_average_thread_wait_time(); + ++count; + } + + wait_time += d.queue_->get_average_thread_wait_time(); + return wait_time / (count + 1); + } + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time += + d.high_priority_queue_->get_average_thread_wait_time(); + } + wait_time += d.queue_->get_average_thread_wait_time(); + ++count; + } + + return (wait_time + + low_priority_queue_.get_average_thread_wait_time()) / + (count + 1); + } + + /////////////////////////////////////////////////////////////////////// + // Queries the current average task wait time of the queues. + std::int64_t get_average_task_wait_time( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return average task wait time of one specific queue. + // Return average thread wait time of one specific queue. + std::uint64_t wait_time = 0; + std::uint64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time = + d.high_priority_queue_->get_average_task_wait_time(); + ++count; + } + + if (num_thread == num_queues_ - 1) + { + wait_time += + low_priority_queue_.get_average_task_wait_time(); + ++count; + } + + wait_time += d.queue_->get_average_task_wait_time(); + return wait_time / (count + 1); + } + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time += + d.high_priority_queue_->get_average_task_wait_time(); + } + wait_time += d.queue_->get_average_task_wait_time(); + ++count; + } + + return (wait_time + + low_priority_queue_.get_average_task_wait_time()) / + (count + 1); + } +#endif + + // return a random victim for the current stealing operation + std::size_t random_victim(steal_request const& req) noexcept + { + std::size_t result = 0; + + { + // generate 3 random numbers max before resorting to more + // expensive algorithm + std::uniform_int_distribution uniform( + 0, std::int16_t(num_queues_ - 1)); + int attempts = 0; + do + { + result = uniform(gen_); + if (result != req.num_thread_ && + !test(req.victims_, result)) + { + HPX_ASSERT(result < num_queues_); + return result; + } + } while (++attempts < 3); + } + + // to avoid infinite trials we randomly select one of the possible + // victims + std::uniform_int_distribution uniform( + 0, std::int16_t(num_queues_ - count(req.victims_) - 1)); + + // generate one more random number + std::size_t selected_victim = uniform(gen_); + for (std::size_t i = 0; i != num_queues_; ++i) + { + if (!test(req.victims_, i)) + { + if (selected_victim == 0) + { + result = i; + break; + } + --selected_victim; + } + } + + HPX_ASSERT(result < num_queues_ && result != req.num_thread_ && + !test(req.victims_, result)); + + return result; + } + + // return the number of the next victim core + std::size_t next_victim( + scheduler_data& d, steal_request const& req) noexcept + { + std::size_t victim = std::size_t(-1); + + // return thief if max steal attempts has been reached + if (req.attempt_ == num_queues_ - 1) + { + // Return steal request to thief + victim = req.num_thread_; + } + else + { + HPX_ASSERT( + (req.attempt_ == 0 && req.num_thread_ == d.num_thread_) || + (req.attempt_ > 0 && req.num_thread_ != d.num_thread_)); + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + if (d.last_victim_ != std::uint16_t(-1)) + { + victim = d.last_victim_; + } + else +#else + HPX_UNUSED(d); +#endif + { + victim = random_victim(req); + } + } + + // couldn't find victim, return steal request to thief + if (victim == std::size_t(-1)) + { + victim = req.num_thread_; + HPX_ASSERT(victim != d.num_thread_); + } + + HPX_ASSERT(victim < num_queues_); + HPX_ASSERT(req.attempt_ < num_queues_); + + return victim; + } + + // Every worker can have at most MAXSTEAL pending steal requests. A + // steal request with idle == false indicates that the requesting worker + // is still busy working on some tasks. A steal request with idle == true + // indicates that the requesting worker is in fact idle and has nothing + // to work on. + void send_steal_request(scheduler_data& d, bool idle = true) noexcept + { + if (d.requested_ == 0) + { + // Estimate work-stealing efficiency during the last interval; + // switch strategies if the value is below a threshold + if (d.num_recent_steals_ >= d.num_steal_adaptive_interval_) + { + double ratio = + static_cast(d.num_recent_tasks_executed_) / + d.num_steal_adaptive_interval_; + + if (ratio >= 2.) + { + d.stealhalf_ = true; + } + else + { + if (d.stealhalf_) + { + d.stealhalf_ = false; + } + else if (ratio <= 1.) + { + d.stealhalf_ = true; + } + } + + d.num_recent_steals_ = 0; + d.num_recent_tasks_executed_ = 0; + } + + steal_request req(d.num_thread_, d.tasks_.get(), d.victims_, + idle, d.stealhalf_); + std::size_t victim = next_victim(d, req); + + ++d.requested_; + data_[victim].data_.requests_->set(HPX_MOVE(req)); + + ++d.steal_requests_sent_; + } + } + + // Try receiving tasks that are sent by another core as a response to + // one of our steal requests. + bool try_receiving_tasks(scheduler_data& d, std::size_t& added, + thread_id_ref_type* next_thrd) + { + task_data thrds; + if (d.tasks_->get(&thrds)) + { + --d.requested_; + HPX_ASSERT(d.requested_ == 0); + + // if at least one thrd was received + if (!thrds.tasks_.empty()) + { + // schedule all but the last thread + std::size_t received_threads = thrds.tasks_.size() - 1; + for (std::size_t i = 0; i != received_threads; ++i) + { + // schedule the received task to be picked up by the + // scheduler + HPX_ASSERT(thrds.tasks_[i]); + d.queue_->schedule_thread( + HPX_MOVE(thrds.tasks_[i]), true); + d.queue_->increment_num_stolen_to_pending(); + ++added; + } + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + // store the originating core for the next stealing + // operation + d.last_victim_ = thrds.num_thread_; + HPX_ASSERT(d.last_victim_ != d.num_thread_); +#endif + + if (next_thrd != nullptr) + { + // directly return the last thread as it should be run + // immediately + ++d.num_recent_tasks_executed_; + *next_thrd = HPX_MOVE(thrds.tasks_.back()); + } + else + { + d.queue_->schedule_thread( + HPX_MOVE(thrds.tasks_.back()), true); + d.queue_->increment_num_stolen_to_pending(); + ++added; + } + + ++d.num_recent_steals_; + return true; + } + } + return false; + } + + // This is a function which gets called periodically by the thread + // manager to allow for maintenance tasks to be executed in the + // scheduler. Returns true if the OS thread calling this function + // has to be terminated (i.e. no more work has to be done). + bool wait_or_add_new(std::size_t num_thread, bool running, + std::int64_t& idle_loop_count, bool enable_stealing, + std::size_t& added, + thread_id_ref_type* next_thrd = nullptr) override + { + HPX_ASSERT(num_thread < num_queues_); + + added = 0; + + auto& d = data_[num_thread].data_; + + // We don't need to call wait_or_add_new for high priority threads + // as these threads are never created 'staged'. + + bool result = + d.queue_->wait_or_add_new(running, added, enable_stealing); + + // check if work was available + if (0 != added) + return result; + + if (num_thread == num_queues_ - 1) + { + result = low_priority_queue_.wait_or_add_new(running, added) && + result; + } + + // check if we have been disabled + if (!running) + return true; + + // return if no stealing is requested (or not possible) + if (num_queues_ == 1 || !enable_stealing) + return result; + + // attempt to steal more work + send_steal_request(d); + HPX_ASSERT(d.requested_ != 0); + + // now try to handle steal requests again if we have not received a + // task from some other core yet + if (!try_receiving_tasks(d, added, next_thrd)) + { + // decline or forward all pending steal requests + decline_or_forward_all_steal_requests(d); + } + +#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION + // no new work is available, are we deadlocked? + if (HPX_UNLIKELY(get_minimal_deadlock_detection_enabled() && + LHPX_ENABLED(error))) + { + bool suspended_only = true; + + for (std::size_t i = 0; suspended_only && i != num_queues_; ++i) + { + suspended_only = + data_[i].data_.queue_->dump_suspended_threads( + i, idle_loop_count, running); + } + + if (HPX_UNLIKELY(suspended_only)) + { + if (running) + { + LTM_(error) //-V128 + << "queue(" << num_thread << "): " + << "no new work available, are we " + "deadlocked?"; + } + else + { + LHPX_CONSOLE_( + hpx::util::logging::level::error) //-V128 + << " [TM] " //-V128 + << "queue(" << num_thread << "): " + << "no new work available, are we " + "deadlocked?\n"; + } + } + } +#else + HPX_UNUSED(idle_loop_count); +#endif + + return result; + } + + /////////////////////////////////////////////////////////////////////// + void on_start_thread(std::size_t num_thread) override + { + hpx::threads::detail::set_local_thread_num_tss(num_thread); + hpx::threads::detail::set_thread_pool_num_tss( + parent_pool_->get_pool_id().index()); + + auto& d = data_[num_thread].data_; + d.init(num_thread, num_queues_, this->thread_queue_init_, + num_thread < num_high_priority_queues_); + + d.queue_->on_start_thread(num_thread); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_start_thread(num_thread); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_start_thread(num_thread); + } + + // create an empty mask that is properly sized + resize(d.victims_, num_queues_); + reset(d.victims_); + set(d.victims_, num_thread); + } + + void on_stop_thread(std::size_t num_thread) override + { + auto& d = data_[num_thread].data_; + + d.queue_->on_stop_thread(num_thread); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_stop_thread(num_thread); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_stop_thread(num_thread); + } + } + + void on_error( + std::size_t num_thread, std::exception_ptr const& e) override + { + auto& d = data_[num_thread].data_; + + d.queue_->on_error(num_thread, e); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_error(num_thread, e); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_error(num_thread, e); + } + } + + void reset_thread_distribution() override + { + curr_queue_.store(0, std::memory_order_release); + } + + void set_scheduler_mode(scheduler_mode mode) noexcept override + { + // we should not disable stealing for this scheduler, this would + // possibly lead to deadlocks + scheduler_base::set_scheduler_mode(mode | + policies::scheduler_mode::enable_stealing | + policies::scheduler_mode::enable_stealing_numa); + } + + protected: + std::vector> data_; + thread_queue_type low_priority_queue_; + + std::atomic curr_queue_; + + std::mt19937 gen_; + + detail::affinity_data const& affinity_data_; + std::size_t const num_queues_; + std::size_t const num_high_priority_queues_; + }; +}}} // namespace hpx::threads::policies + +#include diff --git a/libs/core/schedulers/include/hpx/schedulers/lockfree_queue_backends.hpp b/libs/core/schedulers/include/hpx/schedulers/lockfree_queue_backends.hpp index e6d8d62ea8c5..bd59621fb025 100644 --- a/libs/core/schedulers/include/hpx/schedulers/lockfree_queue_backends.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/lockfree_queue_backends.hpp @@ -1,6 +1,6 @@ //////////////////////////////////////////////////////////////////////////////// // Copyright (c) 2012 Bryce Adelstein-Lelbach -// Copyright (c) 2019 Hartmut Kaiser +// Copyright (c) 2019-2020 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying diff --git a/libs/core/schedulers/include/hpx/schedulers/shared_priority_queue_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/shared_priority_queue_scheduler.hpp index d120857d3dac..6bfc2db8f08b 100644 --- a/libs/core/schedulers/include/hpx/schedulers/shared_priority_queue_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/shared_priority_queue_scheduler.hpp @@ -185,7 +185,7 @@ namespace hpx { namespace threads { namespace policies { } // get/set scheduler mode - void set_scheduler_mode(scheduler_mode mode) override + void set_scheduler_mode(scheduler_mode mode) noexcept override { // clang-format off scheduler_base::set_scheduler_mode(mode); @@ -307,8 +307,6 @@ namespace hpx { namespace threads { namespace policies { auto msg = spq_deb.declare_variable(nullptr); - std::unique_lock l; - using threads::thread_schedule_hint_mode; switch (data.schedulehint.mode) { @@ -361,7 +359,7 @@ namespace hpx { namespace threads { namespace policies { ->worker_next( static_cast(num_workers_)); } - thread_num = select_active_pu(l, thread_num); + thread_num = select_active_pu(thread_num); // cppcheck-suppress redundantAssignment domain_num = d_lookup_[thread_num]; // cppcheck-suppress redundantAssignment @@ -373,7 +371,7 @@ namespace hpx { namespace threads { namespace policies { spq_deb.set(msg, "HINT_THREAD"); // @TODO. We should check that the thread num is valid // Create thread on requested worker thread - thread_num = select_active_pu(l, data.schedulehint.hint); + thread_num = select_active_pu(data.schedulehint.hint); domain_num = d_lookup_[thread_num]; q_index = q_lookup_[thread_num]; break; @@ -656,7 +654,8 @@ namespace hpx { namespace threads { namespace policies { /// Return the next thread to be executed, return false if none available virtual bool wait_or_add_new(std::size_t /* thread_num */, bool /* running */, std::int64_t& /* idle_loop_count */, - bool /*enable_stealing*/, std::size_t& added) override + bool /*enable_stealing*/, std::size_t& added, + thread_id_ref_type* = nullptr) override { std::size_t this_thread = local_thread_number(); HPX_ASSERT(this_thread < num_workers_); @@ -768,7 +767,7 @@ namespace hpx { namespace threads { namespace policies { "assign_work_round_robin", "thread_num", thread_num, debug::threadinfo(&thrd)); } - thread_num = select_active_pu(l, thread_num, allow_fallback); + thread_num = select_active_pu(thread_num, allow_fallback); break; } case thread_schedule_hint_mode::thread: @@ -779,7 +778,7 @@ namespace hpx { namespace threads { namespace policies { spq_deb.debug(debug::str<>("schedule_thread"), "received HINT_THREAD", debug::dec<3>(schedulehint.hint)); thread_num = - select_active_pu(l, schedulehint.hint, allow_fallback); + select_active_pu(schedulehint.hint, allow_fallback); domain_num = d_lookup_[thread_num]; q_index = q_lookup_[thread_num]; break; diff --git a/libs/core/schedulers/include/hpx/schedulers/static_priority_queue_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/static_priority_queue_scheduler.hpp index 0211014e6d4f..c15bb47aa954 100644 --- a/libs/core/schedulers/include/hpx/schedulers/static_priority_queue_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/static_priority_queue_scheduler.hpp @@ -66,7 +66,7 @@ namespace hpx { namespace threads { namespace policies { policies::scheduler_mode::enable_stealing_numa); } - void set_scheduler_mode(scheduler_mode mode) override + void set_scheduler_mode(scheduler_mode mode) noexcept override { // this scheduler does not support stealing or numa stealing mode = policies::scheduler_mode( diff --git a/libs/core/schedulers/include/hpx/schedulers/static_queue_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/static_queue_scheduler.hpp index 2b1b8f95f21a..62ba2b1cb31a 100644 --- a/libs/core/schedulers/include/hpx/schedulers/static_queue_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/static_queue_scheduler.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2017 Hartmut Kaiser +// Copyright (c) 2007-202 Hartmut Kaiser // Copyright (c) 2011 Bryce Lelbach // // SPDX-License-Identifier: BSL-1.0 @@ -68,7 +68,7 @@ namespace hpx { namespace threads { namespace policies { return "static_queue_scheduler"; } - void set_scheduler_mode(scheduler_mode mode) override + void set_scheduler_mode(scheduler_mode mode) noexcept override { // this scheduler does not support stealing or numa stealing mode = scheduler_mode(mode & ~scheduler_mode::enable_stealing); @@ -105,7 +105,7 @@ namespace hpx { namespace threads { namespace policies { /// has to be terminated (i.e. no more work has to be done). bool wait_or_add_new(std::size_t num_thread, bool running, std::int64_t& idle_loop_count, bool /*enable_stealing*/, - std::size_t& added) override + std::size_t& added, thread_id_ref_type* = nullptr) override { HPX_ASSERT(num_thread < this->queues_.size()); diff --git a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp index af0de91ccfca..deb2b0dc9934 100644 --- a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2019 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // Copyright (c) 2011 Bryce Lelbach // // SPDX-License-Identifier: BSL-1.0 @@ -447,11 +447,10 @@ namespace hpx { namespace threads { namespace policies { return cleanup_terminated_locked(false); } - thread_queue(std::size_t queue_num = std::size_t(-1), - thread_queue_init_parameters parameters = {}) + thread_queue(thread_queue_init_parameters parameters = {}) : parameters_(parameters) , thread_map_count_(0) - , work_items_(128, queue_num) + , work_items_(128) #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME , work_items_wait_(0) , work_items_wait_count_(0) @@ -509,12 +508,12 @@ namespace hpx { namespace threads { namespace policies { } #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES - std::uint64_t get_creation_time(bool reset) + std::uint64_t get_creation_time(bool reset) noexcept { return util::get_and_reset_value(add_new_time_, reset); } - std::uint64_t get_cleanup_time(bool reset) + std::uint64_t get_cleanup_time(bool reset) noexcept { return util::get_and_reset_value(cleanup_terminated_time_, reset); } @@ -523,7 +522,7 @@ namespace hpx { namespace threads { namespace policies { /////////////////////////////////////////////////////////////////////// // This returns the current length of the queues (work items and new items) std::int64_t get_queue_length( - std::memory_order order = std::memory_order_acquire) const + std::memory_order order = std::memory_order_acquire) const noexcept { return work_items_count_.data_.load(order) + new_tasks_count_.data_.load(order); @@ -531,20 +530,20 @@ namespace hpx { namespace threads { namespace policies { // This returns the current length of the pending queue std::int64_t get_pending_queue_length( - std::memory_order order = std::memory_order_acquire) const + std::memory_order order = std::memory_order_acquire) const noexcept { return work_items_count_.data_.load(order); } // This returns the current length of the staged queue std::int64_t get_staged_queue_length( - std::memory_order order = std::memory_order_acquire) const + std::memory_order order = std::memory_order_acquire) const noexcept { return new_tasks_count_.data_.load(order); } #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - std::uint64_t get_average_task_wait_time() const + std::uint64_t get_average_task_wait_time() const noexcept { std::uint64_t count = new_tasks_wait_count_; if (count == 0) @@ -552,7 +551,7 @@ namespace hpx { namespace threads { namespace policies { return new_tasks_wait_ / count; } - std::uint64_t get_average_thread_wait_time() const + std::uint64_t get_average_thread_wait_time() const noexcept { std::uint64_t count = work_items_wait_count_; if (count == 0) @@ -562,85 +561,88 @@ namespace hpx { namespace threads { namespace policies { #endif #ifdef HPX_HAVE_THREAD_STEALING_COUNTS - std::int64_t get_num_pending_misses(bool reset) + std::int64_t get_num_pending_misses(bool reset) noexcept { return util::get_and_reset_value(pending_misses_, reset); } - void increment_num_pending_misses(std::size_t num = 1) + void increment_num_pending_misses(std::size_t num = 1) noexcept { pending_misses_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_pending_accesses(bool reset) + std::int64_t get_num_pending_accesses(bool reset) noexcept { return util::get_and_reset_value(pending_accesses_, reset); } - void increment_num_pending_accesses(std::size_t num = 1) + void increment_num_pending_accesses(std::size_t num = 1) noexcept { pending_accesses_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_from_pending(bool reset) + std::int64_t get_num_stolen_from_pending(bool reset) noexcept { return util::get_and_reset_value(stolen_from_pending_, reset); } - void increment_num_stolen_from_pending(std::size_t num = 1) + void increment_num_stolen_from_pending(std::size_t num = 1) noexcept { stolen_from_pending_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_from_staged(bool reset) + std::int64_t get_num_stolen_from_staged(bool reset) noexcept { return util::get_and_reset_value(stolen_from_staged_, reset); } - void increment_num_stolen_from_staged(std::size_t num = 1) + void increment_num_stolen_from_staged(std::size_t num = 1) noexcept { stolen_from_staged_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_to_pending(bool reset) + std::int64_t get_num_stolen_to_pending(bool reset) noexcept { return util::get_and_reset_value(stolen_to_pending_, reset); } - void increment_num_stolen_to_pending(std::size_t num = 1) + void increment_num_stolen_to_pending(std::size_t num = 1) noexcept { stolen_to_pending_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_to_staged(bool reset) + std::int64_t get_num_stolen_to_staged(bool reset) noexcept { return util::get_and_reset_value(stolen_to_staged_, reset); } - void increment_num_stolen_to_staged(std::size_t num = 1) + void increment_num_stolen_to_staged(std::size_t num = 1) noexcept { stolen_to_staged_.fetch_add(num, std::memory_order_relaxed); } #else - constexpr void increment_num_pending_misses(std::size_t /* num */ = 1) + constexpr void increment_num_pending_misses( + std::size_t /* num */ = 1) noexcept { } - constexpr void increment_num_pending_accesses(std::size_t /* num */ = 1) + constexpr void increment_num_pending_accesses( + std::size_t /* num */ = 1) noexcept { } constexpr void increment_num_stolen_from_pending( - std::size_t /* num */ = 1) + std::size_t /* num */ = 1) noexcept { } constexpr void increment_num_stolen_from_staged( - std::size_t /* num */ = 1) + std::size_t /* num */ = 1) noexcept { } constexpr void increment_num_stolen_to_pending( - std::size_t /* num */ = 1) + std::size_t /* num */ = 1) noexcept { } - constexpr void increment_num_stolen_to_staged(std::size_t /* num */ = 1) + constexpr void increment_num_stolen_to_staged( + std::size_t /* num */ = 1) noexcept { } #endif diff --git a/libs/core/schedulers/include/hpx/schedulers/thread_queue_mc.hpp b/libs/core/schedulers/include/hpx/schedulers/thread_queue_mc.hpp index 554f33db2b3e..e64d61b6fcbf 100644 --- a/libs/core/schedulers/include/hpx/schedulers/thread_queue_mc.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/thread_queue_mc.hpp @@ -137,7 +137,7 @@ namespace hpx { namespace threads { namespace policies { } public: - explicit thread_queue_mc(const thread_queue_init_parameters& parameters, + explicit thread_queue_mc(thread_queue_init_parameters const& parameters, std::size_t queue_num = std::size_t(-1)) : parameters_(parameters) , queue_index_(static_cast(queue_num)) @@ -163,7 +163,7 @@ namespace hpx { namespace threads { namespace policies { // ---------------------------------------------------------------- // This returns the current length of the queues (work items and new items) - std::int64_t get_queue_length() const + std::int64_t get_queue_length() const noexcept { return work_items_count_.data_.load(std::memory_order_relaxed) + new_tasks_count_.data_.load(std::memory_order_relaxed); @@ -171,7 +171,7 @@ namespace hpx { namespace threads { namespace policies { // ---------------------------------------------------------------- // This returns the current length of the pending queue - std::int64_t get_queue_length_pending() const + std::int64_t get_queue_length_pending() const noexcept { return work_items_count_.data_.load(std::memory_order_relaxed); } @@ -179,7 +179,7 @@ namespace hpx { namespace threads { namespace policies { // ---------------------------------------------------------------- // This returns the current length of the staged queue std::int64_t get_queue_length_staged( - std::memory_order order = std::memory_order_relaxed) const + std::memory_order order = std::memory_order_relaxed) const noexcept { return new_tasks_count_.data_.load(order); } diff --git a/libs/core/synchronization/include/hpx/synchronization/channel_mpmc.hpp b/libs/core/synchronization/include/hpx/synchronization/channel_mpmc.hpp index dbb454094c15..975b9132aff3 100644 --- a/libs/core/synchronization/include/hpx/synchronization/channel_mpmc.hpp +++ b/libs/core/synchronization/include/hpx/synchronization/channel_mpmc.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Hartmut Kaiser +// Copyright (c) 2019-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -56,12 +56,6 @@ namespace hpx { namespace lcos { namespace local { { HPX_ASSERT(size != 0); - // invoke constructors for allocated buffer - for (std::size_t i = 0; i != size_; ++i) - { - new (&buffer_[i]) T(); - } - head_.data_ = 0; tail_.data_ = 0; } @@ -90,16 +84,9 @@ namespace hpx { namespace lcos { namespace local { ~bounded_channel() { - std::unique_lock l(mtx_.data_); - - // invoke destructors for allocated buffer - for (std::size_t i = 0; i != size_; ++i) - { - (&buffer_[i])->~T(); - } - if (!closed_) { + std::unique_lock l(mtx_.data_); close(l); } } @@ -134,7 +121,11 @@ namespace hpx { namespace lcos { namespace local { return true; } - bool set(T&& t) noexcept + // clang-format off + bool set(T&& t) noexcept( + noexcept(std::declval&>().lock()) && + noexcept(std::declval&>().unlock())) + // clang-format on { std::unique_lock l(mtx_.data_); if (closed_) @@ -165,7 +156,7 @@ namespace hpx { namespace lcos { namespace local { return close(l); } - std::size_t capacity() const + constexpr std::size_t capacity() const noexcept { return size_ - 1; } diff --git a/libs/core/synchronization/include/hpx/synchronization/channel_mpsc.hpp b/libs/core/synchronization/include/hpx/synchronization/channel_mpsc.hpp index 59609972a9e8..c4027a1590e1 100644 --- a/libs/core/synchronization/include/hpx/synchronization/channel_mpsc.hpp +++ b/libs/core/synchronization/include/hpx/synchronization/channel_mpsc.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Hartmut Kaiser +// Copyright (c) 2019-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -59,12 +59,6 @@ namespace hpx { namespace lcos { namespace local { { HPX_ASSERT(size != 0); - // invoke constructors for allocated buffer - for (std::size_t i = 0; i != size_; ++i) - { - new (&buffer_[i]) T(); - } - head_.data_.store(0, std::memory_order_relaxed); tail_.data_.tail_.store(0, std::memory_order_relaxed); } @@ -104,12 +98,6 @@ namespace hpx { namespace lcos { namespace local { ~base_channel_mpsc() { - // invoke destructors for allocated buffer - for (std::size_t i = 0; i != size_; ++i) - { - (&buffer_[i])->~T(); - } - if (!closed_.load(std::memory_order_relaxed)) { close(); @@ -145,7 +133,11 @@ namespace hpx { namespace lcos { namespace local { return true; } - bool set(T&& t) noexcept + // clang-format off + bool set(T&& t) noexcept( + noexcept(std::declval&>().lock()) && + noexcept(std::declval&>().unlock())) + // clang-format on { if (closed_.load(std::memory_order_relaxed)) { @@ -184,7 +176,7 @@ namespace hpx { namespace lcos { namespace local { return 0; } - std::size_t capacity() const + constexpr std::size_t capacity() const noexcept { return size_ - 1; } diff --git a/libs/core/synchronization/include/hpx/synchronization/channel_spsc.hpp b/libs/core/synchronization/include/hpx/synchronization/channel_spsc.hpp index c896e30cce30..54387a9b4868 100644 --- a/libs/core/synchronization/include/hpx/synchronization/channel_spsc.hpp +++ b/libs/core/synchronization/include/hpx/synchronization/channel_spsc.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Hartmut Kaiser +// Copyright (c) 2019-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -54,12 +54,6 @@ namespace hpx { namespace lcos { namespace local { { HPX_ASSERT(size != 0); - // invoke constructors for allocated buffer - for (std::size_t i = 0; i != size_; ++i) - { - new (&buffer_[i]) T(); - } - head_.data_.store(0, std::memory_order_relaxed); tail_.data_.store(0, std::memory_order_relaxed); } @@ -97,12 +91,6 @@ namespace hpx { namespace lcos { namespace local { ~channel_spsc() { - // invoke destructors for allocated buffer - for (std::size_t i = 0; i != size_; ++i) - { - (&buffer_[i])->~T(); - } - if (!closed_.load(std::memory_order_relaxed)) { close(); @@ -174,7 +162,7 @@ namespace hpx { namespace lcos { namespace local { return 0; } - std::size_t capacity() const + constexpr std::size_t capacity() const noexcept { return size_ - 1; } diff --git a/libs/core/thread_pools/include/hpx/thread_pools/detail/scoped_background_timer.hpp b/libs/core/thread_pools/include/hpx/thread_pools/detail/scoped_background_timer.hpp index 306c6b151e3f..1ea85fe31073 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/detail/scoped_background_timer.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/detail/scoped_background_timer.hpp @@ -18,12 +18,13 @@ namespace hpx { namespace threads { //////////////////////////////////////////////////////////////////////////// struct background_work_duration_counter { - background_work_duration_counter(std::int64_t& background_exec_time) + background_work_duration_counter( + std::int64_t& background_exec_time) noexcept : background_exec_time_(background_exec_time) { } - void collect_background_exec_time(std::int64_t timestamp) + void collect_background_exec_time(std::int64_t timestamp) noexcept { if (background_exec_time_ != -1) { @@ -38,7 +39,7 @@ namespace hpx { namespace threads { struct background_exec_time_wrapper { background_exec_time_wrapper( - background_work_duration_counter& background_work_duration) + background_work_duration_counter& background_work_duration) noexcept : timestamp_(background_work_duration.background_exec_time_ != -1 ? util::hardware::timestamp() : -1) diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool.hpp index 7a75f06a35fb..db027f37ed65 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool.hpp @@ -271,12 +271,13 @@ namespace hpx { namespace threads { namespace detail { #endif // HPX_HAVE_BACKGROUND_THREAD_COUNTERS #if defined(HPX_HAVE_THREAD_IDLE_RATES) - std::int64_t avg_idle_rate_all(bool reset) override; - std::int64_t avg_idle_rate(std::size_t, bool) override; + std::int64_t avg_idle_rate_all(bool reset) noexcept override; + std::int64_t avg_idle_rate(std::size_t, bool) noexcept override; #if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES) - std::int64_t avg_creation_idle_rate(std::size_t, bool) override; - std::int64_t avg_cleanup_idle_rate(std::size_t, bool) override; + std::int64_t avg_creation_idle_rate( + std::size_t, bool) noexcept override; + std::int64_t avg_cleanup_idle_rate(std::size_t, bool) noexcept override; #endif #endif diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp index 7d8e6db2b1eb..ffd755190381 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduled_thread_pool_impl.hpp @@ -1604,7 +1604,7 @@ namespace hpx { namespace threads { namespace detail { #if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES) template std::int64_t scheduled_thread_pool::avg_creation_idle_rate( - std::size_t, bool reset) + std::size_t, bool reset) noexcept { double const creation_total = static_cast(sched_->Scheduler::get_creation_time(reset)); @@ -1651,7 +1651,7 @@ namespace hpx { namespace threads { namespace detail { template std::int64_t scheduled_thread_pool::avg_cleanup_idle_rate( - std::size_t, bool reset) + std::size_t, bool reset) noexcept { double const cleanup_total = static_cast(sched_->Scheduler::get_cleanup_time(reset)); @@ -1698,7 +1698,8 @@ namespace hpx { namespace threads { namespace detail { #endif // HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES template - std::int64_t scheduled_thread_pool::avg_idle_rate_all(bool reset) + std::int64_t scheduled_thread_pool::avg_idle_rate_all( + bool reset) noexcept { std::int64_t exec_total = accumulate_projected(counter_data_.begin(), counter_data_.end(), @@ -1741,7 +1742,7 @@ namespace hpx { namespace threads { namespace detail { template std::int64_t scheduled_thread_pool::avg_idle_rate( - std::size_t num, bool reset) + std::size_t num, bool reset) noexcept { if (num == std::size_t(-1)) return avg_idle_rate_all(reset); diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp index f757f5c853d6..f11ec3cf263a 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2021 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -16,6 +16,7 @@ #include #include #include +#include #if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) && \ defined(HPX_HAVE_THREAD_IDLE_RATES) @@ -69,7 +70,8 @@ namespace hpx { namespace threads { namespace detail { class switch_status { public: - switch_status(thread_id_ref_type const& t, thread_state prev_state) + switch_status( + thread_id_ref_type const& t, thread_state prev_state) noexcept : thread_(t) , prev_state_(prev_state) , next_thread_id_(nullptr) @@ -86,14 +88,14 @@ namespace hpx { namespace threads { namespace detail { } } - bool is_valid() const + constexpr bool is_valid() const noexcept { return need_restore_state_; } // allow to change the state the thread will be switched to after // execution - thread_state operator=(thread_result_type&& new_state) + thread_state operator=(thread_result_type&& new_state) noexcept { prev_state_ = thread_state( new_state.first, prev_state_.state_ex(), prev_state_.tag() + 1); @@ -104,7 +106,7 @@ namespace hpx { namespace threads { namespace detail { // Get the state this thread was in before execution (usually pending), // this helps making sure no other worker-thread is started to execute this // HPX-thread in the meantime. - thread_schedule_state get_previous() const + thread_schedule_state get_previous() const noexcept { return prev_state_.state(); } @@ -113,7 +115,7 @@ namespace hpx { namespace threads { namespace detail { // original state has not been changed since we started executing this // thread. The function returns true if the state has been set, false // otherwise. - bool store_state(thread_state& newstate) + bool store_state(thread_state& newstate) noexcept { disable_restore(); @@ -127,17 +129,17 @@ namespace hpx { namespace threads { namespace detail { } // disable default handling in destructor - void disable_restore() + void disable_restore() noexcept { need_restore_state_ = false; } - thread_id_ref_type const& get_next_thread() const + thread_id_ref_type const& get_next_thread() const noexcept { return next_thread_id_; } - thread_id_ref_type move_next_thread() + thread_id_ref_type move_next_thread() noexcept { return HPX_MOVE(next_thread_id_); } @@ -154,7 +156,7 @@ namespace hpx { namespace threads { namespace detail { { public: switch_status_background( - thread_id_ref_type const& t, thread_state prev_state) + thread_id_ref_type const& t, thread_state prev_state) noexcept : thread_(t) , prev_state_(prev_state) , next_thread_id_(nullptr) @@ -172,14 +174,14 @@ namespace hpx { namespace threads { namespace detail { } } - bool is_valid() const + constexpr bool is_valid() const noexcept { return need_restore_state_; } // allow to change the state the thread will be switched to after // execution - thread_state operator=(thread_result_type&& new_state) + thread_state operator=(thread_result_type&& new_state) noexcept { prev_state_ = thread_state( new_state.first, prev_state_.state_ex(), prev_state_.tag() + 1); @@ -190,7 +192,7 @@ namespace hpx { namespace threads { namespace detail { // Get the state this thread was in before execution (usually pending), // this helps making sure no other worker-thread is started to execute this // HPX-thread in the meantime. - thread_schedule_state get_previous() const + thread_schedule_state get_previous() const noexcept { return prev_state_.state(); } @@ -199,7 +201,7 @@ namespace hpx { namespace threads { namespace detail { // original state has not been changed since we started executing this // thread. The function returns true if the state has been set, false // otherwise. - bool store_state(thread_state& newstate) + bool store_state(thread_state& newstate) noexcept { disable_restore(); if (get_thread_id_data(thread_)->restore_state(prev_state_, @@ -213,17 +215,17 @@ namespace hpx { namespace threads { namespace detail { } // disable default handling in destructor - void disable_restore() + void disable_restore() noexcept { need_restore_state_ = false; } - thread_id_ref_type const& get_next_thread() const + thread_id_ref_type const& get_next_thread() const noexcept { return next_thread_id_; } - thread_id_ref_type move_next_thread() + thread_id_ref_type move_next_thread() noexcept { return HPX_MOVE(next_thread_id_); } @@ -239,18 +241,20 @@ namespace hpx { namespace threads { namespace detail { #ifdef HPX_HAVE_THREAD_IDLE_RATES struct idle_collect_rate { - idle_collect_rate(std::int64_t& tfunc_time, std::int64_t& exec_time) + idle_collect_rate( + std::int64_t& tfunc_time, std::int64_t& exec_time) noexcept : start_timestamp_(util::hardware::timestamp()) , tfunc_time_(tfunc_time) , exec_time_(exec_time) { } - void collect_exec_time(std::int64_t timestamp) + void collect_exec_time(std::int64_t timestamp) noexcept { exec_time_ += util::hardware::timestamp() - timestamp; } - void take_snapshot() + + void take_snapshot() noexcept { if (tfunc_time_ == std::int64_t(-1)) { @@ -272,7 +276,7 @@ namespace hpx { namespace threads { namespace detail { struct exec_time_wrapper { - exec_time_wrapper(idle_collect_rate& idle_rate) + explicit exec_time_wrapper(idle_collect_rate& idle_rate) noexcept : timestamp_(util::hardware::timestamp()) , idle_rate_(idle_rate) { @@ -288,7 +292,8 @@ namespace hpx { namespace threads { namespace detail { struct tfunc_time_wrapper { - tfunc_time_wrapper(idle_collect_rate& idle_rate) + explicit constexpr tfunc_time_wrapper( + idle_collect_rate& idle_rate) noexcept : idle_rate_(idle_rate) { } @@ -302,24 +307,27 @@ namespace hpx { namespace threads { namespace detail { #else struct idle_collect_rate { - idle_collect_rate(std::int64_t&, std::int64_t&) {} + explicit constexpr idle_collect_rate( + std::int64_t&, std::int64_t&) noexcept + { + } }; struct exec_time_wrapper { - exec_time_wrapper(idle_collect_rate&) {} + explicit constexpr exec_time_wrapper(idle_collect_rate&) noexcept {} }; struct tfunc_time_wrapper { - tfunc_time_wrapper(idle_collect_rate&) {} + explicit constexpr tfunc_time_wrapper(idle_collect_rate&) noexcept {} }; #endif /////////////////////////////////////////////////////////////////////////// struct is_active_wrapper { - is_active_wrapper(bool& is_active) + explicit is_active_wrapper(bool& is_active) noexcept : is_active_(is_active) { is_active = true; @@ -343,7 +351,7 @@ namespace hpx { namespace threads { namespace detail { std::int64_t& busy_loop_count, bool& is_active, std::int64_t& background_work_duration, std::int64_t& background_send_duration, - std::int64_t& background_receive_duration) + std::int64_t& background_receive_duration) noexcept : executed_threads_(executed_threads) , executed_thread_phases_(executed_thread_phases) , tfunc_time_(tfunc_time) @@ -374,7 +382,7 @@ namespace hpx { namespace threads { namespace detail { scheduling_counters(std::int64_t& executed_threads, std::int64_t& executed_thread_phases, std::int64_t& tfunc_time, std::int64_t& exec_time, std::int64_t& idle_loop_count, - std::int64_t& busy_loop_count, bool& is_active) + std::int64_t& busy_loop_count, bool& is_active) noexcept : executed_threads_(executed_threads) , executed_thread_phases_(executed_thread_phases) , tfunc_time_(tfunc_time) @@ -597,6 +605,7 @@ namespace hpx { namespace threads { namespace detail { idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_); tfunc_time_wrapper tfunc_time_collector(idle_rate); + HPX_UNUSED(tfunc_time_collector); // spin for some time after queues have become empty bool may_exit = false; @@ -651,6 +660,8 @@ namespace hpx { namespace threads { namespace detail { num_thread, running, thrd, enable_stealing))) { tfunc_time_wrapper tfunc_time_collector(idle_rate); + HPX_UNUSED(tfunc_time_collector); + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == &scheduler); @@ -683,6 +694,7 @@ namespace hpx { namespace threads { namespace detail { thread_schedule_state::active); tfunc_time_wrapper tfunc_time_collector(idle_rate); + HPX_UNUSED(tfunc_time_collector); // thread returns new required state // store the returned state in the thread @@ -704,7 +716,7 @@ namespace hpx { namespace threads { namespace detail { // and add to aggregate execution time. exec_time_wrapper exec_time_collector( idle_rate); - + HPX_UNUSED(exec_time_collector); #if defined(HPX_HAVE_APEX) // get the APEX data pointer, in case we are resuming the // thread and have to restore any leaf timers from @@ -883,9 +895,10 @@ namespace hpx { namespace threads { namespace detail { { ++idle_loop_count; + next_thrd = nullptr; if (scheduler.SchedulingPolicy::wait_or_add_new(num_thread, - running, idle_loop_count, enable_stealing_staged, - added)) + running, idle_loop_count, enable_stealing_staged, added, + &next_thrd)) { // Clean up terminated threads before trying to exit bool can_exit = !running && @@ -961,6 +974,12 @@ namespace hpx { namespace threads { namespace detail { added = std::size_t(-1); } + // if stealing yielded a new task, run it first + if (next_thrd != nullptr) + { + continue; + } + #if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) && \ defined(HPX_HAVE_THREAD_IDLE_RATES) // do background work in parcel layer and in agas diff --git a/libs/core/thread_pools/src/scheduled_thread_pool.cpp b/libs/core/thread_pools/src/scheduled_thread_pool.cpp index bfece254a863..ef80f628ab69 100644 --- a/libs/core/thread_pools/src/scheduled_thread_pool.cpp +++ b/libs/core/thread_pools/src/scheduled_thread_pool.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -61,3 +62,14 @@ template class HPX_CORE_EXPORT hpx::threads::policies::shared_priority_queue_scheduler<>; template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< hpx::threads::policies::shared_priority_queue_scheduler<>>; + +template class HPX_CORE_EXPORT + hpx::threads::policies::local_workrequesting_scheduler<>; +template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< + hpx::threads::policies::local_workrequesting_scheduler<>>; +template class HPX_CORE_EXPORT + hpx::threads::policies::local_workrequesting_scheduler; +template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< + hpx::threads::policies::local_workrequesting_scheduler>; diff --git a/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp b/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp index 06bf98a314a1..1dc101e802cf 100644 --- a/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/scheduler_base.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2019 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -61,7 +61,7 @@ namespace hpx { namespace threads { namespace policies { typedef std::mutex pu_mutex_type; scheduler_base(std::size_t num_threads, char const* description = "", - thread_queue_init_parameters thread_queue_init = {}, + thread_queue_init_parameters const& thread_queue_init = {}, scheduler_mode mode = scheduler_mode::nothing_special); virtual ~scheduler_base() = default; @@ -103,7 +103,7 @@ namespace hpx { namespace threads { namespace policies { virtual void suspend(std::size_t num_thread); virtual void resume(std::size_t num_thread); - std::size_t select_active_pu(std::unique_lock& l, + std::size_t select_active_pu( std::size_t num_thread, bool allow_fallback = false); // allow to access/manipulate states @@ -119,13 +119,13 @@ namespace hpx { namespace threads { namespace policies { /////////////////////////////////////////////////////////////////////// // get/set scheduler mode - scheduler_mode get_scheduler_mode() const + scheduler_mode get_scheduler_mode() const noexcept { return mode_.data_.load(std::memory_order_relaxed); } // get/set scheduler mode - bool has_scheduler_mode(scheduler_mode mode) const + bool has_scheduler_mode(scheduler_mode mode) const noexcept { return (mode_.data_.load(std::memory_order_relaxed) & mode) != 0; } @@ -135,20 +135,20 @@ namespace hpx { namespace threads { namespace policies { // by schedulers that do not support certain operations/modes. // All other mode set functions should call this one to ensure // that flags are always consistent - virtual void set_scheduler_mode(scheduler_mode mode); + virtual void set_scheduler_mode(scheduler_mode mode) noexcept; // add a flag to the scheduler mode flags - void add_scheduler_mode(scheduler_mode mode); + void add_scheduler_mode(scheduler_mode mode) noexcept; // remove flag from scheduler mode - void remove_scheduler_mode(scheduler_mode mode); + void remove_scheduler_mode(scheduler_mode mode) noexcept; // add flag to scheduler mode void add_remove_scheduler_mode( - scheduler_mode to_add_mode, scheduler_mode to_remove_mode); + scheduler_mode to_add_mode, scheduler_mode to_remove_mode) noexcept; // conditionally add or remove depending on set true/false - void update_scheduler_mode(scheduler_mode mode, bool set); + void update_scheduler_mode(scheduler_mode mode, bool set) noexcept; pu_mutex_type& get_pu_mutex(std::size_t num_thread) { @@ -203,9 +203,9 @@ namespace hpx { namespace threads { namespace policies { virtual bool is_core_idle(std::size_t num_thread) const = 0; // count active background threads - std::int64_t get_background_thread_count(); - void increment_background_thread_count(); - void decrement_background_thread_count(); + std::int64_t get_background_thread_count() noexcept; + void increment_background_thread_count() noexcept; + void decrement_background_thread_count() noexcept; // Enumerate all matching threads virtual bool enumerate_threads( @@ -239,7 +239,7 @@ namespace hpx { namespace threads { namespace policies { virtual bool wait_or_add_new(std::size_t num_thread, bool running, std::int64_t& idle_loop_count, bool enable_stealing, - std::size_t& added) = 0; + std::size_t& added, thread_id_ref_type* next_thrd = nullptr) = 0; virtual void on_start_thread(std::size_t num_thread) = 0; virtual void on_stop_thread(std::size_t num_thread) = 0; @@ -293,12 +293,12 @@ namespace hpx { namespace threads { namespace policies { using polling_function_ptr = detail::polling_status (*)(); using polling_work_count_function_ptr = std::size_t (*)(); - static detail::polling_status null_polling_function() + static constexpr detail::polling_status null_polling_function() noexcept { return detail::polling_status::idle; } - static std::size_t null_polling_work_count_function() + static constexpr std::size_t null_polling_work_count_function() noexcept { return 0; } diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_pool_base.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_pool_base.hpp index d1cf2c8f58c8..4369768f6a61 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_pool_base.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_pool_base.hpp @@ -46,11 +46,11 @@ namespace hpx { namespace threads { { } - std::size_t index() const + std::size_t index() const noexcept { return index_; }; - std::string const& name() const + std::string const& name() const noexcept { return name_; } @@ -207,7 +207,7 @@ namespace hpx { namespace threads { { return id_.name(); } - std::size_t get_thread_offset() const + std::size_t get_thread_offset() const noexcept { return thread_offset_; } @@ -309,23 +309,23 @@ namespace hpx { namespace threads { #endif // HPX_HAVE_BACKGROUND_THREAD_COUNTERS #if defined(HPX_HAVE_THREAD_IDLE_RATES) - virtual std::int64_t avg_idle_rate_all(bool /*reset*/) + virtual std::int64_t avg_idle_rate_all(bool /*reset*/) noexcept { return 0; } - virtual std::int64_t avg_idle_rate(std::size_t, bool) + virtual std::int64_t avg_idle_rate(std::size_t, bool) noexcept { return 0; } #if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES) virtual std::int64_t avg_creation_idle_rate( - std::size_t /*thread_num*/, bool /*reset*/) + std::size_t /*thread_num*/, bool /*reset*/) noexcept { return 0; } virtual std::int64_t avg_cleanup_idle_rate( - std::size_t /*thread_num*/, bool /*reset*/) + std::size_t /*thread_num*/, bool /*reset*/) noexcept { return 0; } diff --git a/libs/core/threading_base/src/scheduler_base.cpp b/libs/core/threading_base/src/scheduler_base.cpp index 1fd2c6ad54f2..3992509faade 100644 --- a/libs/core/threading_base/src/scheduler_base.cpp +++ b/libs/core/threading_base/src/scheduler_base.cpp @@ -36,7 +36,8 @@ /////////////////////////////////////////////////////////////////////////////// namespace hpx { namespace threads { namespace policies { scheduler_base::scheduler_base(std::size_t num_threads, - char const* description, thread_queue_init_parameters thread_queue_init, + char const* description, + thread_queue_init_parameters const& thread_queue_init, scheduler_mode mode) : suspend_mtxs_(num_threads) , suspend_conds_(num_threads) @@ -151,8 +152,7 @@ namespace hpx { namespace threads { namespace policies { } std::size_t scheduler_base::select_active_pu( - std::unique_lock& l, std::size_t num_thread, - bool allow_fallback) + std::size_t num_thread, bool allow_fallback) { if (mode_.data_.load(std::memory_order_relaxed) & threads::policies::scheduler_mode::enable_elasticity) @@ -166,7 +166,7 @@ namespace hpx { namespace threads { namespace policies { // threads are available for scheduling. auto max_allowed_state = hpx::state::suspended; - hpx::util::yield_while([this, states_size, &l, &num_thread, + hpx::util::yield_while([this, states_size, &num_thread, &max_allowed_state]() { std::size_t num_allowed_threads = 0; @@ -175,18 +175,19 @@ namespace hpx { namespace threads { namespace policies { std::size_t num_thread_local = (num_thread + offset) % states_size; - l = std::unique_lock( - pu_mtxs_[num_thread_local], std::try_to_lock); - - if (l.owns_lock()) { - if (states_[num_thread_local] <= max_allowed_state) + std::unique_lock l( + pu_mtxs_[num_thread_local], std::try_to_lock); + + if (l.owns_lock()) { - num_thread = num_thread_local; - return false; + if (states_[num_thread_local] <= + max_allowed_state) + { + num_thread = num_thread_local; + return false; + } } - - l.unlock(); } if (states_[num_thread_local] <= max_allowed_state) @@ -228,7 +229,7 @@ namespace hpx { namespace threads { namespace policies { std::size_t num_thread_local = (num_thread + offset) % states_size; - l = std::unique_lock( + std::unique_lock l( pu_mtxs_[num_thread_local], std::try_to_lock); if (l.owns_lock() && @@ -317,35 +318,35 @@ namespace hpx { namespace threads { namespace policies { } // get/set scheduler mode - void scheduler_base::set_scheduler_mode(scheduler_mode mode) + void scheduler_base::set_scheduler_mode(scheduler_mode mode) noexcept { // distribute the same value across all cores mode_.data_.store(mode, std::memory_order_release); do_some_work(std::size_t(-1)); } - void scheduler_base::add_scheduler_mode(scheduler_mode mode) + void scheduler_base::add_scheduler_mode(scheduler_mode mode) noexcept { // distribute the same value across all cores - mode = scheduler_mode(get_scheduler_mode() | mode); - set_scheduler_mode(mode); + set_scheduler_mode(get_scheduler_mode() | mode); } - void scheduler_base::remove_scheduler_mode(scheduler_mode mode) + void scheduler_base::remove_scheduler_mode(scheduler_mode mode) noexcept { mode = scheduler_mode(get_scheduler_mode() & ~mode); set_scheduler_mode(mode); } void scheduler_base::add_remove_scheduler_mode( - scheduler_mode to_add_mode, scheduler_mode to_remove_mode) + scheduler_mode to_add_mode, scheduler_mode to_remove_mode) noexcept { scheduler_mode mode = scheduler_mode( (get_scheduler_mode() | to_add_mode) & ~to_remove_mode); set_scheduler_mode(mode); } - void scheduler_base::update_scheduler_mode(scheduler_mode mode, bool set) + void scheduler_base::update_scheduler_mode( + scheduler_mode mode, bool set) noexcept { if (set) { @@ -358,17 +359,17 @@ namespace hpx { namespace threads { namespace policies { } /////////////////////////////////////////////////////////////////////////// - std::int64_t scheduler_base::get_background_thread_count() + std::int64_t scheduler_base::get_background_thread_count() noexcept { return background_thread_count_; } - void scheduler_base::increment_background_thread_count() + void scheduler_base::increment_background_thread_count() noexcept { ++background_thread_count_; } - void scheduler_base::decrement_background_thread_count() + void scheduler_base::decrement_background_thread_count() noexcept { --background_thread_count_; } diff --git a/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp b/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp index d62edc9b2ea6..2b2f0834a589 100644 --- a/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp +++ b/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp @@ -257,7 +257,7 @@ namespace hpx { namespace threads { return get_pool(pool_name).get_numa_domain_bitmap(); } - void set_scheduler_mode(threads::policies::scheduler_mode mode) + void set_scheduler_mode(threads::policies::scheduler_mode mode) noexcept { for (auto& pool_iter : pools_) { @@ -265,7 +265,7 @@ namespace hpx { namespace threads { } } - void add_scheduler_mode(threads::policies::scheduler_mode mode) + void add_scheduler_mode(threads::policies::scheduler_mode mode) noexcept { for (auto& pool_iter : pools_) { @@ -275,7 +275,7 @@ namespace hpx { namespace threads { void add_remove_scheduler_mode( threads::policies::scheduler_mode to_add_mode, - threads::policies::scheduler_mode to_remove_mode) + threads::policies::scheduler_mode to_remove_mode) noexcept { for (auto& pool_iter : pools_) { @@ -284,7 +284,8 @@ namespace hpx { namespace threads { } } - void remove_scheduler_mode(threads::policies::scheduler_mode mode) + void remove_scheduler_mode( + threads::policies::scheduler_mode mode) noexcept { for (auto& pool_iter : pools_) { @@ -363,16 +364,16 @@ namespace hpx { namespace threads { } #ifdef HPX_HAVE_THREAD_IDLE_RATES - std::int64_t avg_idle_rate(bool reset); + std::int64_t avg_idle_rate(bool reset) noexcept; #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES - std::int64_t avg_creation_idle_rate(bool reset); - std::int64_t avg_cleanup_idle_rate(bool reset); + std::int64_t avg_creation_idle_rate(bool reset) noexcept; + std::int64_t avg_cleanup_idle_rate(bool reset) noexcept; #endif #endif #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS - std::int64_t get_executed_threads(bool reset); - std::int64_t get_executed_thread_phases(bool reset); + std::int64_t get_executed_threads(bool reset) noexcept; + std::int64_t get_executed_thread_phases(bool reset) noexcept; #ifdef HPX_HAVE_THREAD_IDLE_RATES std::int64_t get_thread_duration(bool reset); std::int64_t get_thread_phase_duration(bool reset); diff --git a/libs/core/threadmanager/src/threadmanager.cpp b/libs/core/threadmanager/src/threadmanager.cpp index f0733598457f..7d2b24097d6a 100644 --- a/libs/core/threadmanager/src/threadmanager.cpp +++ b/libs/core/threadmanager/src/threadmanager.cpp @@ -266,6 +266,7 @@ namespace hpx { namespace threads { // set the default scheduler flags sched->set_scheduler_mode(thread_pool_init.mode_); + // conditionally set/unset this flag sched->update_scheduler_mode( policies::scheduler_mode::enable_stealing_numa, @@ -326,6 +327,85 @@ namespace hpx { namespace threads { break; } + case resource::local_workrequesting_fifo: + { + // set parameters for scheduler and pool instantiation and + // perform compatibility checks + std::size_t num_high_priority_queues = + hpx::util::get_entry_as(rtcfg_, + "hpx.thread_queue.high_priority_queues", + thread_pool_init.num_threads_); + detail::check_num_high_priority_queues( + thread_pool_init.num_threads_, num_high_priority_queues); + + // instantiate the scheduler + using local_sched_type = + hpx::threads::policies::local_workrequesting_scheduler<>; + + local_sched_type::init_parameter_type init( + thread_pool_init.num_threads_, + thread_pool_init.affinity_data_, num_high_priority_queues, + thread_queue_init, "core-local_workrequesting_scheduler"); + + std::unique_ptr sched( + new local_sched_type(init)); + + // set the default scheduler flags + sched->set_scheduler_mode(thread_pool_init.mode_); + + // conditionally set/unset this flag + sched->update_scheduler_mode( + policies::scheduler_mode::enable_stealing_numa, + !numa_sensitive); + + // instantiate the pool + std::unique_ptr pool( + new hpx::threads::detail::scheduled_thread_pool< + local_sched_type>(HPX_MOVE(sched), thread_pool_init)); + pools_.push_back(HPX_MOVE(pool)); + break; + } + + case resource::local_workrequesting_lifo: + { + // set parameters for scheduler and pool instantiation and + // perform compatibility checks + std::size_t num_high_priority_queues = + hpx::util::get_entry_as(rtcfg_, + "hpx.thread_queue.high_priority_queues", + thread_pool_init.num_threads_); + detail::check_num_high_priority_queues( + thread_pool_init.num_threads_, num_high_priority_queues); + + // instantiate the scheduler + using local_sched_type = + hpx::threads::policies::local_workrequesting_scheduler< + std::mutex, hpx::threads::policies::lockfree_lifo>; + + local_sched_type::init_parameter_type init( + thread_pool_init.num_threads_, + thread_pool_init.affinity_data_, num_high_priority_queues, + thread_queue_init, "core-local_workrequesting_scheduler"); + + std::unique_ptr sched( + new local_sched_type(init)); + + // set the default scheduler flags + sched->set_scheduler_mode(thread_pool_init.mode_); + + // conditionally set/unset this flag + sched->update_scheduler_mode( + policies::scheduler_mode::enable_stealing_numa, + !numa_sensitive); + + // instantiate the pool + std::unique_ptr pool( + new hpx::threads::detail::scheduled_thread_pool< + local_sched_type>(HPX_MOVE(sched), thread_pool_init)); + pools_.push_back(HPX_MOVE(pool)); + break; + } + case resource::static_: { // instantiate the scheduler @@ -784,7 +864,8 @@ namespace hpx { namespace threads { } #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - std::int64_t threadmanager::get_average_thread_wait_time(bool reset) + std::int64_t threadmanager::get_average_thread_wait_time( + bool reset) noexcept { std::int64_t result = 0; for (auto const& pool_iter : pools_) @@ -867,7 +948,7 @@ namespace hpx { namespace threads { #endif // HPX_HAVE_BACKGROUND_THREAD_COUNTERS #ifdef HPX_HAVE_THREAD_IDLE_RATES - std::int64_t threadmanager::avg_idle_rate(bool reset) + std::int64_t threadmanager::avg_idle_rate(bool reset) noexcept { std::int64_t result = 0; for (auto const& pool_iter : pools_) @@ -876,7 +957,7 @@ namespace hpx { namespace threads { } #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES - std::int64_t threadmanager::avg_creation_idle_rate(bool reset) + std::int64_t threadmanager::avg_creation_idle_rate(bool reset) noexcept { std::int64_t result = 0; for (auto const& pool_iter : pools_) @@ -884,7 +965,7 @@ namespace hpx { namespace threads { return result; } - std::int64_t threadmanager::avg_cleanup_idle_rate(bool reset) + std::int64_t threadmanager::avg_cleanup_idle_rate(bool reset) noexcept { std::int64_t result = 0; for (auto const& pool_iter : pools_) @@ -895,7 +976,7 @@ namespace hpx { namespace threads { #endif #ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS - std::int64_t threadmanager::get_executed_threads(bool reset) + std::int64_t threadmanager::get_executed_threads(bool reset) noexcept { std::int64_t result = 0; for (auto const& pool_iter : pools_) @@ -903,7 +984,7 @@ namespace hpx { namespace threads { return result; } - std::int64_t threadmanager::get_executed_thread_phases(bool reset) + std::int64_t threadmanager::get_executed_thread_phases(bool reset) noexcept { std::int64_t result = 0; for (auto const& pool_iter : pools_) diff --git a/libs/full/command_line_handling/src/parse_command_line.cpp b/libs/full/command_line_handling/src/parse_command_line.cpp index 5518ba5a7f0a..aeee24c290e3 100644 --- a/libs/full/command_line_handling/src/parse_command_line.cpp +++ b/libs/full/command_line_handling/src/parse_command_line.cpp @@ -537,6 +537,7 @@ namespace hpx { namespace util { ("hpx:queuing", value(), "the queue scheduling policy to use, options are " "'local', 'local-priority-fifo','local-priority-lifo', " + "'local-workrequesting-fifo', 'local-workrequesting-lifo' " "'abp-priority-fifo', 'abp-priority-lifo', 'static', and " "'static-priority' (default: 'local-priority'; " "all option values can be abbreviated)")