Skip to content

Commit

Permalink
[oneDPL][ranges] support size limit for output for merge algorithm (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDvorskiy authored Jan 31, 2025
1 parent 528c0f0 commit 4cbf18c
Show file tree
Hide file tree
Showing 11 changed files with 403 additions and 127 deletions.
115 changes: 115 additions & 0 deletions include/oneapi/dpl/pstl/algorithm_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <type_traits>
#include <functional>
#include <algorithm>
#include <cassert>
#include <cmath>

#include "algorithm_fwd.h"

Expand All @@ -31,6 +33,7 @@
#include "parallel_backend.h"
#include "parallel_impl.h"
#include "iterator_impl.h"
#include "../functional" //for oneapi::dpl::identity

#if _ONEDPL_HETERO_BACKEND
# include "hetero/algorithm_impl_hetero.h" // for __pattern_fill_n, __pattern_generate_n
Expand Down Expand Up @@ -2947,6 +2950,40 @@ __pattern_remove_if(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec,
//------------------------------------------------------------------------
// merge
//------------------------------------------------------------------------
// Serial version of ___merge_path_out_lim merges the 1st sequence and the 2nd sequence in "reverse order":
// the identical elements from the 2nd sequence are merged first.
template <typename _Iterator1, typename _Iterator2, typename _Iterator3, typename _Comp>
std::pair<_Iterator1, _Iterator2>
__serial_merge_out_lim(_Iterator1 __x, _Iterator1 __x_e, _Iterator2 __y, _Iterator2 __y_e, _Iterator3 __out,
_Iterator3 __out_e, _Comp __comp)
{
for (_Iterator3 __k = __out; __k != __out_e; ++__k)
{
if (__x == __x_e)
{
assert(__y != __y_e);
*__k = *__y;
++__y;
}
else if (__y == __y_e)
{
assert(__x != __x_e);
*__k = *__x;
++__x;
}
else if (std::invoke(__comp, *__x, *__y))
{
*__k = *__x;
++__x;
}
else
{
*__k = *__y;
++__y;
}
}
return {__x, __y};
}

template <class _ForwardIterator1, class _ForwardIterator2, class _OutputIterator, class _Compare>
_OutputIterator
Expand Down Expand Up @@ -2980,6 +3017,84 @@ __pattern_merge(_Tag, _ExecutionPolicy&&, _ForwardIterator1 __first1, _ForwardIt
typename _Tag::__is_vector{});
}

template <typename _Tag, typename _ExecutionPolicy, typename _It1, typename _Index1, typename _It2, typename _Index2,
typename _OutIt, typename _Index3, typename _Comp>
std::pair<_It1, _It2>
___merge_path_out_lim(_Tag, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2, _Index2 __n_2,
_OutIt __it_out, _Index3 __n_out, _Comp __comp)
{
return __serial_merge_out_lim(__it_1, __it_1 + __n_1, __it_2, __it_2 + __n_2, __it_out, __it_out + __n_out, __comp);
}

inline constexpr std::size_t __merge_path_cut_off = 2000;

// Parallel version of ___merge_path_out_lim merges the 1st sequence and the 2nd sequence in "reverse order":
// the identical elements from the 2nd sequence are merged first.
template <typename _IsVector, typename _ExecutionPolicy, typename _It1, typename _Index1, typename _It2,
typename _Index2, typename _OutIt, typename _Index3, typename _Comp>
std::pair<_It1, _It2>
___merge_path_out_lim(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2,
_Index2 __n_2, _OutIt __it_out, _Index3 __n_out, _Comp __comp)
{
using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag;

_It1 __it_res_1;
_It2 __it_res_2;

__internal::__except_handler([&]() {
__par_backend::__parallel_for(
__backend_tag{}, std::forward<_ExecutionPolicy>(__exec), _Index3(0), __n_out,
[=, &__it_res_1, &__it_res_2](_Index3 __i, _Index3 __j) {
//a start merging point on the merge path; for each thread
_Index1 __r = 0; //row index
_Index2 __c = 0; //column index

if (__i > 0)
{
//calc merge path intersection:
const _Index3 __d_size =
std::abs(std::max<_Index2>(0, __i - __n_2) - (std::min<_Index1>(__i, __n_1) - 1)) + 1;

auto __get_row = [__i, __n_1](auto __d) { return std::min<_Index1>(__i, __n_1) - __d - 1; };
auto __get_column = [__i, __n_1](auto __d) {
return std::max<_Index1>(0, __i - __n_1 - 1) + __d + (__i / (__n_1 + 1) > 0 ? 1 : 0);
};

oneapi::dpl::counting_iterator<_Index3> __it_d(0);

auto __res_d = *std::lower_bound(__it_d, __it_d + __d_size, 1, [&](auto __d, auto __val) {
auto __r = __get_row(__d);
auto __c = __get_column(__d);

oneapi::dpl::__internal::__compare<_Comp, oneapi::dpl::identity> __cmp{__comp,
oneapi::dpl::identity{}};
const auto __res = __cmp(__it_1[__r], __it_2[__c]) ? 1 : 0;

return __res < __val;
});

//intersection point
__r = __get_row(__res_d);
__c = __get_column(__res_d);
++__r; //to get a merge matrix ceil, lying on the current diagonal
}

//serial merge n elements, starting from input x and y, to [i, j) output range
const std::pair __res = __serial_merge_out_lim(__it_1 + __r, __it_1 + __n_1, __it_2 + __c,
__it_2 + __n_2, __it_out + __i, __it_out + __j, __comp);

if (__j == __n_out)
{
__it_res_1 = __res.first;
__it_res_2 = __res.second;
}
},
__merge_path_cut_off); //grainsize
});

return {__it_res_1, __it_res_2};
}

template <class _IsVector, class _ExecutionPolicy, class _RandomAccessIterator1, class _RandomAccessIterator2,
class _RandomAccessIterator3, class _Compare>
_RandomAccessIterator3
Expand Down
39 changes: 22 additions & 17 deletions include/oneapi/dpl/pstl/algorithm_ranges_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,31 +448,36 @@ auto
__pattern_merge(_Tag __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp,
_Proj1 __proj1, _Proj2 __proj2)
{
static_assert(__is_parallel_tag_v<_Tag> || typename _Tag::__is_vector{});
assert(std::ranges::size(__r1) + std::ranges::size(__r2) <= std::ranges::size(__out_r)); // for debug purposes only
using __return_type =
std::ranges::merge_result<std::ranges::borrowed_iterator_t<_R1>, std::ranges::borrowed_iterator_t<_R2>,
std::ranges::borrowed_iterator_t<_OutRange>>;

auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp,
std::invoke(__proj1, std::forward<decltype(__val1)>(__val1)), std::invoke(__proj2,
std::forward<decltype(__val2)>(__val2)));};

auto __res = oneapi::dpl::__internal::__pattern_merge(__tag, std::forward<_ExecutionPolicy>(__exec),
std::ranges::begin(__r1), std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2),
std::ranges::begin(__r2) + std::ranges::size(__r2), std::ranges::begin(__out_r), __comp_2);
using _Index1 = std::ranges::range_difference_t<_R1>;
using _Index2 = std::ranges::range_difference_t<_R2>;
using _Index3 = std::ranges::range_difference_t<_OutRange>;

using __return_type = std::ranges::merge_result<std::ranges::borrowed_iterator_t<_R1>, std::ranges::borrowed_iterator_t<_R2>,
std::ranges::borrowed_iterator_t<_OutRange>>;
const _Index1 __n_1 = std::ranges::size(__r1);
const _Index2 __n_2 = std::ranges::size(__r2);
const _Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r));

return __return_type{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) + std::ranges::size(__r2), __res};
}
auto __it_1 = std::ranges::begin(__r1);
auto __it_2 = std::ranges::begin(__r2);
auto __it_out = std::ranges::begin(__out_r);

template<typename _ExecutionPolicy, typename _R1, typename _R2, typename _OutRange, typename _Comp,
typename _Proj1, typename _Proj2>
auto
__pattern_merge(__serial_tag</*IsVector*/std::false_type>, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp,
_Proj1 __proj1, _Proj2 __proj2)
{
return std::ranges::merge(std::forward<_R1>(__r1), std::forward<_R2>(__r2), std::ranges::begin(__out_r), __comp, __proj1,
__proj2);
if (__n_out == 0)
return __return_type{__it_1, __it_2, __it_out};

// Parallel and serial versions of ___merge_path_out_lim merge the 1st sequence and the 2nd sequence in "reverse order":
// the identical elements from the 2nd sequence are merged first.
// So, the call to ___merge_path_out_lim swaps the order of sequences.
std::pair __res = ___merge_path_out_lim(__tag, std::forward<_ExecutionPolicy>(__exec), __it_2, __n_2, __it_1, __n_1,
__it_out, __n_out, __comp_2);

return __return_type{__res.second, __res.first, __it_out + __n_out};
}

} // namespace __ranges
Expand Down
7 changes: 5 additions & 2 deletions include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1173,9 +1173,12 @@ merge(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& _
{
const auto __dispatch_tag = oneapi::dpl::__ranges::__select_backend(__exec, __rng1, __rng2, __rng3);

return oneapi::dpl::__internal::__ranges::__pattern_merge(
auto __view_res = views::all_write(::std::forward<_Range3>(__rng3));
oneapi::dpl::__internal::__ranges::__pattern_merge(
__dispatch_tag, ::std::forward<_ExecutionPolicy>(__exec), views::all_read(::std::forward<_Range1>(__rng1)),
views::all_read(::std::forward<_Range2>(__rng2)), views::all_write(::std::forward<_Range3>(__rng3)), __comp);
views::all_read(::std::forward<_Range2>(__rng2)), __view_res, __comp);

return __view_res.size();
}

template <typename _ExecutionPolicy, typename _Range1, typename _Range2, typename _Range3>
Expand Down
65 changes: 39 additions & 26 deletions include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ namespace __ranges
//------------------------------------------------------------------------

template <typename _BackendTag, typename _ExecutionPolicy, typename _Function, typename... _Ranges>
void
auto /* see _Size inside the function */
__pattern_walk_n(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Function __f, _Ranges&&... __rngs)
{
auto __n = oneapi::dpl::__ranges::__get_first_range_size(__rngs...);
using _Size = std::make_unsigned_t<std::common_type_t<oneapi::dpl::__internal::__difference_t<_Ranges>...>>;
const _Size __n = std::min({_Size(__rngs.size())...});
if (__n > 0)
{
oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
unseq_backend::walk_n<_ExecutionPolicy, _Function>{__f}, __n,
::std::forward<_Ranges>(__rngs)...)
.__deferrable_wait();
}
return __n;
}

#if _ONEDPL_CPP20_RANGES_PRESENT
Expand Down Expand Up @@ -678,46 +680,51 @@ struct __copy1_wrapper;
template <typename _Name>
struct __copy2_wrapper;

struct __out_size_limit : public std::true_type
{
};

template <typename _BackendTag, typename _ExecutionPolicy, typename _Range1, typename _Range2, typename _Range3,
typename _Compare>
oneapi::dpl::__internal::__difference_t<_Range3>
std::pair<oneapi::dpl::__internal::__difference_t<_Range1>, oneapi::dpl::__internal::__difference_t<_Range2>>
__pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2,
_Range3&& __rng3, _Compare __comp)
{
auto __n1 = __rng1.size();
auto __n2 = __rng2.size();
auto __n = __n1 + __n2;
if (__n == 0)
return 0;
if (__rng3.empty())
return {0, 0};

const auto __n1 = __rng1.size();
const auto __n2 = __rng2.size();

//To consider the direct copying pattern call in case just one of sequences is empty.
if (__n1 == 0)
{
oneapi::dpl::__internal::__ranges::__pattern_walk_n(
auto __res = oneapi::dpl::__internal::__ranges::__pattern_walk_n(
__tag,
oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy1_wrapper>(
::std::forward<_ExecutionPolicy>(__exec)),
oneapi::dpl::__internal::__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{},
::std::forward<_Range2>(__rng2), ::std::forward<_Range3>(__rng3));
return {0, __res};
}
else if (__n2 == 0)

if (__n2 == 0)
{
oneapi::dpl::__internal::__ranges::__pattern_walk_n(
auto __res = oneapi::dpl::__internal::__ranges::__pattern_walk_n(
__tag,
oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy2_wrapper>(
::std::forward<_ExecutionPolicy>(__exec)),
oneapi::dpl::__internal::__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{},
::std::forward<_Range1>(__rng1), ::std::forward<_Range3>(__rng3));
}
else
{
__par_backend_hetero::__parallel_merge(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec),
::std::forward<_Range1>(__rng1), ::std::forward<_Range2>(__rng2),
::std::forward<_Range3>(__rng3), __comp)
.__deferrable_wait();
return {__res, 0};
}

return __n;
auto __res = __par_backend_hetero::__parallel_merge(
_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), ::std::forward<_Range1>(__rng1),
::std::forward<_Range2>(__rng2), ::std::forward<_Range3>(__rng3), __comp, __out_size_limit{});

auto __val = __res.get();
return {__val.first, __val.second};
}

#if _ONEDPL_CPP20_RANGES_PRESENT
Expand All @@ -727,21 +734,27 @@ auto
__pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r,
_Comp __comp, _Proj1 __proj1, _Proj2 __proj2)
{
assert(std::ranges::size(__r1) + std::ranges::size(__r2) <= std::ranges::size(__out_r)); // for debug purposes only

auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp,
std::invoke(__proj1, std::forward<decltype(__val1)>(__val1)),
std::invoke(__proj2, std::forward<decltype(__val2)>(__val2)));};

auto __res = oneapi::dpl::__internal::__ranges::__pattern_merge(__tag, std::forward<_ExecutionPolicy>(__exec),
oneapi::dpl::__ranges::views::all_read(__r1), oneapi::dpl::__ranges::views::all_read(__r2),
oneapi::dpl::__ranges::views::all_write(__out_r), __comp_2);
using _Index1 = std::ranges::range_difference_t<_R1>;
using _Index2 = std::ranges::range_difference_t<_R2>;
using _Index3 = std::ranges::range_difference_t<_OutRange>;

const _Index1 __n_1 = std::ranges::size(__r1);
const _Index2 __n_2 = std::ranges::size(__r2);
const _Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r));

const std::pair __res = oneapi::dpl::__internal::__ranges::__pattern_merge(
__tag, std::forward<_ExecutionPolicy>(__exec), oneapi::dpl::__ranges::views::all_read(__r1),
oneapi::dpl::__ranges::views::all_read(__r2), oneapi::dpl::__ranges::views::all_write(__out_r), __comp_2);

using __return_t = std::ranges::merge_result<std::ranges::borrowed_iterator_t<_R1>, std::ranges::borrowed_iterator_t<_R2>,
std::ranges::borrowed_iterator_t<_OutRange>>;

return __return_t{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) +
std::ranges::size(__r2), std::ranges::begin(__out_r) + __res};
return __return_t{std::ranges::begin(__r1) + __res.first, std::ranges::begin(__r2) + __res.second,
std::ranges::begin(__out_r) + __n_out};
}
#endif //_ONEDPL_CPP20_RANGES_PRESENT

Expand Down
Loading

0 comments on commit 4cbf18c

Please sign in to comment.