Skip to content

Commit

Permalink
Making sure changed number of counts is propagated to executor
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Jul 26, 2023
1 parent 09eabc7 commit f2a6d74
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/async_base/scheduling_properties.hpp>
#include <hpx/datastructures/tuple.hpp>
#include <hpx/execution/algorithms/detail/is_negative.hpp>
#include <hpx/execution/algorithms/detail/predicates.hpp>
Expand All @@ -16,6 +17,7 @@
#include <hpx/futures/future.hpp>
#include <hpx/iterator_support/iterator_range.hpp>
#include <hpx/parallel/util/detail/chunk_size_iterator.hpp>
#include <hpx/properties/property.hpp>

#include <algorithm>
#include <cstddef>
Expand Down Expand Up @@ -132,7 +134,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -166,6 +168,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -175,7 +181,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename Future, typename F1, typename IterOrR,
typename Stride = std::size_t>
hpx::util::iterator_range<chunk_size_iterator<IterOrR>>
get_bulk_iteration_shape(ExPolicy&& policy, std::vector<Future>& workitems,
get_bulk_iteration_shape(ExPolicy& policy, std::vector<Future>& workitems,
F1&& f1, IterOrR& it_or_r, std::size_t& count, Stride s = Stride(1))
{
if (count == 0)
Expand Down Expand Up @@ -241,6 +247,10 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

auto shape_begin = chunk_size_iterator(it_or_r, chunk_size, count);
auto shape_end = chunk_size_iterator(last, chunk_size, count, count);

Expand All @@ -250,7 +260,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename IterOrR,
typename Stride = std::size_t>
std::vector<hpx::tuple<IterOrR, std::size_t>>
get_bulk_iteration_shape_variable(ExPolicy&& policy, IterOrR& it_or_r,
get_bulk_iteration_shape_variable(ExPolicy& policy, IterOrR& it_or_r,
std::size_t& count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<IterOrR, std::size_t>;
Expand Down Expand Up @@ -308,27 +318,31 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::false_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

template <typename ExPolicy, typename Future, typename F1, typename FwdIter,
typename Stride = std::size_t>
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy&& policy,
decltype(auto) get_bulk_iteration_shape(std::true_type, ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter& begin,
std::size_t& count, Stride s = Stride(1))
{
return get_bulk_iteration_shape_variable(HPX_FORWARD(ExPolicy, policy),
workitems, HPX_FORWARD(F1, f1), begin, count, s);
return get_bulk_iteration_shape_variable(
policy, workitems, HPX_FORWARD(F1, f1), begin, count, s);
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -360,7 +374,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy, FwdIter begin,
get_bulk_iteration_shape_idx(ExPolicy& policy, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
using iterator =
Expand Down Expand Up @@ -397,6 +411,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, 0);
iterator shape_end(last, chunk_size, count, count, 0);

Expand All @@ -407,7 +428,7 @@ namespace hpx::parallel::util::detail {
typename Stride = std::size_t>
hpx::util::iterator_range<
parallel::util::detail::chunk_size_idx_iterator<FwdIter>>
get_bulk_iteration_shape_idx(ExPolicy&& policy,
get_bulk_iteration_shape_idx(ExPolicy& policy,
std::vector<Future>& workitems, F1&& f1, FwdIter begin,
std::size_t count, Stride s = Stride(1))
{
Expand Down Expand Up @@ -475,6 +496,13 @@ namespace hpx::parallel::util::detail {
// clang-format on
}

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

using iterator =
parallel::util::detail::chunk_size_idx_iterator<FwdIter>;

iterator shape_begin(begin, chunk_size, count, 0, base_idx);
iterator shape_end(last, chunk_size, count, count, base_idx);

Expand All @@ -484,7 +512,7 @@ namespace hpx::parallel::util::detail {
template <typename ExPolicy, typename FwdIter,
typename Stride = std::size_t>
std::vector<hpx::tuple<FwdIter, std::size_t, std::size_t>>
get_bulk_iteration_shape_idx_variable(ExPolicy&& policy, FwdIter first,
get_bulk_iteration_shape_idx_variable(ExPolicy& policy, FwdIter first,
std::size_t count, Stride s = Stride(1))
{
using tuple_type = hpx::tuple<FwdIter, std::size_t, std::size_t>;
Expand Down Expand Up @@ -543,6 +571,10 @@ namespace hpx::parallel::util::detail {
}
// clang-format on

// update executor with new values
policy = hpx::experimental::prefer(
execution::with_processing_units_count, policy, cores);

return shape;
}
} // namespace hpx::parallel::util::detail
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter, typename F>
auto foreach_partition(
ExPolicy&& policy, FwdIter first, std::size_t count, F&& f)
ExPolicy policy, FwdIter first, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -53,16 +53,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count);
policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count);
auto&& shape =
detail::get_bulk_iteration_shape_idx(policy, first, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -72,7 +72,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count);
policy, inititems, f, first, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
19 changes: 9 additions & 10 deletions libs/core/algorithms/include/hpx/parallel/util/partitioner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename IterOrR, typename F>
auto partition(ExPolicy&& policy, IterOrR it_or_r, std::size_t count, F&& f)
auto partition(ExPolicy policy, IterOrR it_or_r, std::size_t count, F&& f)
{
// estimate a chunk size based on number of cores used
using parameters_type =
Expand All @@ -57,16 +57,16 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_variable(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
HPX_MOVE(shape));
}
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), it_or_r, count);
auto&& shape =
detail::get_bulk_iteration_shape(policy, it_or_r, count);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -76,7 +76,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape(
HPX_FORWARD(ExPolicy, policy), inititems, f, it_or_r, count);
policy, inititems, f, it_or_r, count);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -88,7 +88,7 @@ namespace hpx::parallel::util::detail {

template <typename Result, typename ExPolicy, typename FwdIter,
typename Stride, typename F>
auto partition_with_index(ExPolicy&& policy, FwdIter first,
auto partition_with_index(ExPolicy policy, FwdIter first,
std::size_t count, Stride stride, F&& f)
{
// estimate a chunk size based on number of cores used
Expand All @@ -106,7 +106,7 @@ namespace hpx::parallel::util::detail {
"has_variable_chunk_size and invokes_testing_function");

auto&& shape = detail::get_bulk_iteration_shape_idx_variable(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -115,7 +115,7 @@ namespace hpx::parallel::util::detail {
else if constexpr (!invokes_testing_function)
{
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), first, count, stride);
policy, first, count, stride);

return execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand All @@ -125,8 +125,7 @@ namespace hpx::parallel::util::detail {
{
std::vector<hpx::future<Result>> inititems;
auto&& shape = detail::get_bulk_iteration_shape_idx(
HPX_FORWARD(ExPolicy, policy), inititems, f, first, count,
stride);
policy, inititems, f, first, count, stride);

auto&& workitems = execution::bulk_async_execute(policy.executor(),
partitioner_iteration<Result, F>{HPX_FORWARD(F, f)},
Expand Down
1 change: 1 addition & 0 deletions libs/core/algorithms/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set(tests
for_loop_5735
for_loop_with_auto_chunk_size
minimal_findend
num_cores
reduce_3641
scan_different_inits
scan_non_commutative
Expand Down
39 changes: 39 additions & 0 deletions libs/core/algorithms/tests/regressions/num_cores.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2023 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/algorithm.hpp>
#include <hpx/chrono.hpp>
#include <hpx/execution.hpp>
#include <hpx/init.hpp>
#include <hpx/modules/testing.hpp>

int hpx_main()
{
hpx::execution::experimental::num_cores nc(2);
auto policy = hpx::execution::par.with(nc);

HPX_TEST_EQ(
hpx::parallel::execution::processing_units_count(policy.parameters(),
policy.executor(), hpx::chrono::null_duration, 0),
2);

auto policy2 =
hpx::parallel::execution::with_processing_units_count(policy, 2);
HPX_TEST_EQ(hpx::parallel::execution::processing_units_count(
hpx::execution::par.parameters(), policy2.executor(),
hpx::chrono::null_duration, 0),
2);

return hpx::local::finalize();
}

int main(int argc, char* argv[])
{
HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0,
"HPX main exited with non-zero status");

return hpx::util::report_errors();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ namespace hpx::execution::experimental {

/// \cond NOINTERNAL
// discover the number of cores to use for parallelization
template <typename Executor>
friend std::size_t tag_invoke(
hpx::parallel::execution::processing_units_count_t,
num_cores params, Executor&&, hpx::chrono::steady_duration const&,
std::size_t) noexcept
{
return params.num_cores_;
}

template <typename Executor>
constexpr std::size_t processing_units_count(Executor&&,
hpx::chrono::steady_duration const&, std::size_t) const noexcept
Expand Down
32 changes: 32 additions & 0 deletions libs/core/executors/examples/disable_thread_stealing_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,38 @@ namespace executor_example {
}
};

// support all properties exposed by the wrapped executor
// clang-format off
template <typename Tag, typename BaseExecutor,typename Property,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_scheduling_property_v<Tag>
)>
// clang-format on
auto tag_invoke(Tag tag,
disable_thread_stealing_executor<BaseExecutor> const& exec,
Property&& prop)
-> decltype(disable_thread_stealing_executor<BaseExecutor>(
std::declval<Tag>()(
std::declval<BaseExecutor>(), std::declval<Property>())))
{
return disable_thread_stealing_executor<BaseExecutor>(
tag(static_cast<BaseExecutor const&>(exec),
HPX_FORWARD(Property, prop)));
}

// clang-format off
template <typename Tag, typename BaseExecutor,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_scheduling_property_v<Tag>
)>
// clang-format on
auto tag_invoke(
Tag tag, disable_thread_stealing_executor<BaseExecutor> const& exec)
-> decltype(std::declval<Tag>()(std::declval<BaseExecutor>()))
{
return tag(static_cast<BaseExecutor const&>(exec));
}

template <typename BaseExecutor>
auto make_disable_thread_stealing_executor(BaseExecutor&& exec)
{
Expand Down
6 changes: 6 additions & 0 deletions libs/core/executors/examples/executor_with_thread_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ namespace executor_example {
std::forward<Future>(predecessor), std::forward<Ts>(ts)...);
}

[[nodiscard]] constexpr std::decay_t<BaseExecutor> const& get_executor()
const noexcept
{
return exec_;
}

private:
using thread_hook = hpx::function<void()>;

Expand Down

0 comments on commit f2a6d74

Please sign in to comment.