Skip to content

Commit

Permalink
Fixing assert in schedule_pool test
Browse files Browse the repository at this point in the history
- flyby: additional unrelated cleanup
  • Loading branch information
hkaiser committed Jul 22, 2023
1 parent 681a991 commit 4afb6bc
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 25 deletions.
6 changes: 5 additions & 1 deletion libs/core/futures/src/detail/execute_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ namespace hpx::threads::detail {
set_thread_state(thrd.noref(), thread_schedule_state::pending,
thread_restart_state::signaled);
auto* scheduler = thrdptr->get_scheduler_base();
scheduler->schedule_thread_last(thrd, thread_schedule_hint());

auto const hint = thread_schedule_hint(static_cast<std::int16_t>(
thrdptr->get_last_worker_thread_num()));
scheduler->schedule_thread_last(HPX_MOVE(thrd), hint);
scheduler->do_some_work(hint.hint);
}

HPX_ASSERT(state_val != thread_schedule_state::terminated);
Expand Down
21 changes: 13 additions & 8 deletions libs/core/resource_partitioner/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2017 Hartmut Kaiser
# Copyright (c) 2017-2023 Hartmut Kaiser
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -33,13 +33,18 @@ set(used_pus_PARAMETERS THREADS_PER_LOCALITY 4 RUN_SERIAL)

set(scheduler_priority_check_PARAMETERS THREADS_PER_LOCALITY -1)
set(shutdown_suspended_pus_PARAMETERS THREADS_PER_LOCALITY 4)
set(suspend_disabled_PARAMETERS THREADS_PER_LOCALITY 4)
set(suspend_pool_PARAMETERS THREADS_PER_LOCALITY 4)
set(suspend_pool_external_PARAMETERS THREADS_PER_LOCALITY 4)
set(suspend_runtime_PARAMETERS THREADS_PER_LOCALITY 4)
set(suspend_thread_PARAMETERS THREADS_PER_LOCALITY 4)
set(suspend_thread_external_PARAMETERS THREADS_PER_LOCALITY 4)
set(suspend_thread_timed_PARAMETERS THREADS_PER_LOCALITY 4)

if(CMAKE_SYSTEM_NAME MATCHES "Linux")
set(additional_parameters "--hpx:ini=hpx.stacks.use_guard_pages=0")
endif()

set(suspend_disabled_PARAMETERS THREADS_PER_LOCALITY 4 ${additional_parameters})
set(suspend_pool_PARAMETERS THREADS_PER_LOCALITY 4 ${additional_parameters})
set(suspend_pool_external_PARAMETERS THREADS_PER_LOCALITY 4 ${additional_parameters})
set(suspend_runtime_PARAMETERS THREADS_PER_LOCALITY 4 ${additional_parameters})
set(suspend_thread_PARAMETERS THREADS_PER_LOCALITY 4 ${additional_parameters})
set(suspend_thread_external_PARAMETERS THREADS_PER_LOCALITY 4 ${additional_parameters})
set(suspend_thread_timed_PARAMETERS THREADS_PER_LOCALITY 4 ${additional_parameters})

foreach(test ${tests})
set(sources ${test}.cpp)
Expand Down
18 changes: 9 additions & 9 deletions libs/core/resource_partitioner/tests/unit/suspend_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
#include <hpx/semaphore.hpp>
#include <hpx/thread.hpp>

#include <atomic>
#include <cstddef>
#include <memory>
#include <string>
#include <utility>
#include <vector>

std::size_t const max_threads = (std::min)(
std::size_t(4), std::size_t(hpx::threads::hardware_concurrency()));
std::size_t const max_threads = (std::min)(static_cast<std::size_t>(4),
static_cast<std::size_t>(hpx::threads::hardware_concurrency()));

int hpx_main()
{
Expand Down Expand Up @@ -55,7 +53,7 @@ int hpx_main()

{
// Suspend and resume pool with future
hpx::chrono::high_resolution_timer t;
hpx::chrono::high_resolution_timer const t;

while (t.elapsed() < 1)
{
Expand All @@ -78,7 +76,7 @@ int hpx_main()
{
// Suspend and resume pool with callback
hpx::counting_semaphore_var<> sem;
hpx::chrono::high_resolution_timer t;
hpx::chrono::high_resolution_timer const t;

while (t.elapsed() < 1)
{
Expand Down Expand Up @@ -106,7 +104,7 @@ int hpx_main()

{
// Suspend pool with some threads already suspended
hpx::chrono::high_resolution_timer t;
hpx::chrono::high_resolution_timer const t;

while (t.elapsed() < 1)
{
Expand Down Expand Up @@ -144,7 +142,9 @@ void test_scheduler(
init_args.cfg = {"hpx.os_threads=" + std::to_string(max_threads)};
init_args.rp_callback = [scheduler](auto& rp,
hpx::program_options::variables_map const&) {
rp.create_thread_pool("worker", scheduler);
rp.create_thread_pool("worker", scheduler,
hpx::threads::policies::scheduler_mode::default_ |
hpx::threads::policies::scheduler_mode::enable_elasticity);

std::size_t const worker_pool_threads = max_threads - 1;
HPX_ASSERT(worker_pool_threads >= 1);
Expand Down Expand Up @@ -173,7 +173,7 @@ int main(int argc, char* argv[])
{
HPX_ASSERT(max_threads >= 2);

std::vector<hpx::resource::scheduling_policy> schedulers = {
std::vector<hpx::resource::scheduling_policy> const schedulers = {
hpx::resource::scheduling_policy::local,
hpx::resource::scheduling_policy::local_priority_fifo,
#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT)
Expand Down
18 changes: 13 additions & 5 deletions libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,12 @@ namespace hpx::threads::policies {
thread_id_type tid(todelete);
--terminated_items_count_;

// this thread has to be in this map
HPX_ASSERT(thread_map_.find(tid) != thread_map_.end());
// this thread has to be managed by this queue, it may have
// ended up on the terminate threads list more than once,
// however
HPX_ASSERT(
&get_thread_id_data(tid)->get_queue<thread_queue>() ==
this);

if (thread_map_.erase(tid) != 0)
{
Expand All @@ -426,9 +430,12 @@ namespace hpx::threads::policies {
thread_id_type tid(todelete);
--terminated_items_count_;

// this thread has to be in this map, except if it has changed
// its priority, then it could be elsewhere
HPX_ASSERT(thread_map_.find(tid) != thread_map_.end());
// this thread has to be managed by this queue, it may have
// ended up on the terminate threads list more than once,
// however
HPX_ASSERT(
&get_thread_id_data(tid)->get_queue<thread_queue>() ==
this);

if (thread_map_.erase(tid) != 0)
{
Expand Down Expand Up @@ -888,6 +895,7 @@ namespace hpx::threads::policies {
threads::thread_id_ref_type thrd, bool other_end = false)
{
++work_items_count_.data_;

#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
work_items_.push(new thread_description{HPX_MOVE(thrd),
hpx::chrono::high_resolution_clock::now()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ namespace hpx::threads {

HPX_FORCEINLINE coroutine_type::result_type thread_data::invoke_directly()
{
HPX_ASSERT(runs_as_child());
HPX_ASSERT(runs_as_child(std::memory_order_relaxed));

if (is_stackless())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace hpx::threads {
set_state_ex(thread_restart_state::signaled));

if (result.first == thread_schedule_state::terminated &&
runs_as_child())
runs_as_child(std::memory_order_relaxed))
{
result.first = thread_schedule_state::deleted;
}
Expand Down

0 comments on commit 4afb6bc

Please sign in to comment.