diff --git a/include/oneapi/dpl/pstl/algorithm_impl.h b/include/oneapi/dpl/pstl/algorithm_impl.h index ae9094f721a..5f0aef1bb32 100644 --- a/include/oneapi/dpl/pstl/algorithm_impl.h +++ b/include/oneapi/dpl/pstl/algorithm_impl.h @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include "algorithm_fwd.h" @@ -31,6 +33,7 @@ #include "parallel_backend.h" #include "parallel_impl.h" #include "iterator_impl.h" +#include "../functional" //for oneapi::dpl::identity #if _ONEDPL_HETERO_BACKEND # include "hetero/algorithm_impl_hetero.h" // for __pattern_fill_n, __pattern_generate_n @@ -2947,6 +2950,40 @@ __pattern_remove_if(__parallel_tag<_IsVector> __tag, _ExecutionPolicy&& __exec, //------------------------------------------------------------------------ // merge //------------------------------------------------------------------------ +// Serial version of ___merge_path_out_lim merges the 1st sequence and the 2nd sequence in "reverse order": +// the identical elements from the 2nd sequence are merged first. +template +std::pair<_Iterator1, _Iterator2> +__serial_merge_out_lim(_Iterator1 __x, _Iterator1 __x_e, _Iterator2 __y, _Iterator2 __y_e, _Iterator3 __out, + _Iterator3 __out_e, _Comp __comp) +{ + for (_Iterator3 __k = __out; __k != __out_e; ++__k) + { + if (__x == __x_e) + { + assert(__y != __y_e); + *__k = *__y; + ++__y; + } + else if (__y == __y_e) + { + assert(__x != __x_e); + *__k = *__x; + ++__x; + } + else if (std::invoke(__comp, *__x, *__y)) + { + *__k = *__x; + ++__x; + } + else + { + *__k = *__y; + ++__y; + } + } + return {__x, __y}; +} template _OutputIterator @@ -2980,6 +3017,84 @@ __pattern_merge(_Tag, _ExecutionPolicy&&, _ForwardIterator1 __first1, _ForwardIt typename _Tag::__is_vector{}); } +template +std::pair<_It1, _It2> +___merge_path_out_lim(_Tag, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2, _Index2 __n_2, + _OutIt __it_out, _Index3 __n_out, _Comp __comp) +{ + return __serial_merge_out_lim(__it_1, __it_1 + __n_1, __it_2, __it_2 + __n_2, __it_out, __it_out + __n_out, __comp); +} + +inline constexpr std::size_t __merge_path_cut_off = 2000; + +// Parallel version of ___merge_path_out_lim merges the 1st sequence and the 2nd sequence in "reverse order": +// the identical elements from the 2nd sequence are merged first. +template +std::pair<_It1, _It2> +___merge_path_out_lim(__parallel_tag<_IsVector>, _ExecutionPolicy&& __exec, _It1 __it_1, _Index1 __n_1, _It2 __it_2, + _Index2 __n_2, _OutIt __it_out, _Index3 __n_out, _Comp __comp) +{ + using __backend_tag = typename __parallel_tag<_IsVector>::__backend_tag; + + _It1 __it_res_1; + _It2 __it_res_2; + + __internal::__except_handler([&]() { + __par_backend::__parallel_for( + __backend_tag{}, std::forward<_ExecutionPolicy>(__exec), _Index3(0), __n_out, + [=, &__it_res_1, &__it_res_2](_Index3 __i, _Index3 __j) { + //a start merging point on the merge path; for each thread + _Index1 __r = 0; //row index + _Index2 __c = 0; //column index + + if (__i > 0) + { + //calc merge path intersection: + const _Index3 __d_size = + std::abs(std::max<_Index2>(0, __i - __n_2) - (std::min<_Index1>(__i, __n_1) - 1)) + 1; + + auto __get_row = [__i, __n_1](auto __d) { return std::min<_Index1>(__i, __n_1) - __d - 1; }; + auto __get_column = [__i, __n_1](auto __d) { + return std::max<_Index1>(0, __i - __n_1 - 1) + __d + (__i / (__n_1 + 1) > 0 ? 1 : 0); + }; + + oneapi::dpl::counting_iterator<_Index3> __it_d(0); + + auto __res_d = *std::lower_bound(__it_d, __it_d + __d_size, 1, [&](auto __d, auto __val) { + auto __r = __get_row(__d); + auto __c = __get_column(__d); + + oneapi::dpl::__internal::__compare<_Comp, oneapi::dpl::identity> __cmp{__comp, + oneapi::dpl::identity{}}; + const auto __res = __cmp(__it_1[__r], __it_2[__c]) ? 1 : 0; + + return __res < __val; + }); + + //intersection point + __r = __get_row(__res_d); + __c = __get_column(__res_d); + ++__r; //to get a merge matrix ceil, lying on the current diagonal + } + + //serial merge n elements, starting from input x and y, to [i, j) output range + const std::pair __res = __serial_merge_out_lim(__it_1 + __r, __it_1 + __n_1, __it_2 + __c, + __it_2 + __n_2, __it_out + __i, __it_out + __j, __comp); + + if (__j == __n_out) + { + __it_res_1 = __res.first; + __it_res_2 = __res.second; + } + }, + __merge_path_cut_off); //grainsize + }); + + return {__it_res_1, __it_res_2}; +} + template _RandomAccessIterator3 diff --git a/include/oneapi/dpl/pstl/algorithm_ranges_impl.h b/include/oneapi/dpl/pstl/algorithm_ranges_impl.h index 55d29a56be8..8621c104679 100644 --- a/include/oneapi/dpl/pstl/algorithm_ranges_impl.h +++ b/include/oneapi/dpl/pstl/algorithm_ranges_impl.h @@ -448,31 +448,36 @@ auto __pattern_merge(_Tag __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, _Proj1 __proj1, _Proj2 __proj2) { - static_assert(__is_parallel_tag_v<_Tag> || typename _Tag::__is_vector{}); - assert(std::ranges::size(__r1) + std::ranges::size(__r2) <= std::ranges::size(__out_r)); // for debug purposes only + using __return_type = + std::ranges::merge_result, std::ranges::borrowed_iterator_t<_R2>, + std::ranges::borrowed_iterator_t<_OutRange>>; auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp, std::invoke(__proj1, std::forward(__val1)), std::invoke(__proj2, std::forward(__val2)));}; - auto __res = oneapi::dpl::__internal::__pattern_merge(__tag, std::forward<_ExecutionPolicy>(__exec), - std::ranges::begin(__r1), std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2), - std::ranges::begin(__r2) + std::ranges::size(__r2), std::ranges::begin(__out_r), __comp_2); + using _Index1 = std::ranges::range_difference_t<_R1>; + using _Index2 = std::ranges::range_difference_t<_R2>; + using _Index3 = std::ranges::range_difference_t<_OutRange>; - using __return_type = std::ranges::merge_result, std::ranges::borrowed_iterator_t<_R2>, - std::ranges::borrowed_iterator_t<_OutRange>>; + const _Index1 __n_1 = std::ranges::size(__r1); + const _Index2 __n_2 = std::ranges::size(__r2); + const _Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r)); - return __return_type{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) + std::ranges::size(__r2), __res}; -} + auto __it_1 = std::ranges::begin(__r1); + auto __it_2 = std::ranges::begin(__r2); + auto __it_out = std::ranges::begin(__out_r); -template -auto -__pattern_merge(__serial_tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, - _Proj1 __proj1, _Proj2 __proj2) -{ - return std::ranges::merge(std::forward<_R1>(__r1), std::forward<_R2>(__r2), std::ranges::begin(__out_r), __comp, __proj1, - __proj2); + if (__n_out == 0) + return __return_type{__it_1, __it_2, __it_out}; + + // Parallel and serial versions of ___merge_path_out_lim merge the 1st sequence and the 2nd sequence in "reverse order": + // the identical elements from the 2nd sequence are merged first. + // So, the call to ___merge_path_out_lim swaps the order of sequences. + std::pair __res = ___merge_path_out_lim(__tag, std::forward<_ExecutionPolicy>(__exec), __it_2, __n_2, __it_1, __n_1, + __it_out, __n_out, __comp_2); + + return __return_type{__res.second, __res.first, __it_out + __n_out}; } } // namespace __ranges diff --git a/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h b/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h index 77a4875ccde..048c3348cd6 100644 --- a/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h +++ b/include/oneapi/dpl/pstl/glue_algorithm_ranges_impl.h @@ -1173,9 +1173,12 @@ merge(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& _ { const auto __dispatch_tag = oneapi::dpl::__ranges::__select_backend(__exec, __rng1, __rng2, __rng3); - return oneapi::dpl::__internal::__ranges::__pattern_merge( + auto __view_res = views::all_write(::std::forward<_Range3>(__rng3)); + oneapi::dpl::__internal::__ranges::__pattern_merge( __dispatch_tag, ::std::forward<_ExecutionPolicy>(__exec), views::all_read(::std::forward<_Range1>(__rng1)), - views::all_read(::std::forward<_Range2>(__rng2)), views::all_write(::std::forward<_Range3>(__rng3)), __comp); + views::all_read(::std::forward<_Range2>(__rng2)), __view_res, __comp); + + return __view_res.size(); } template diff --git a/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h b/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h index da7820b91a2..65d15ebf26a 100644 --- a/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h +++ b/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h @@ -51,10 +51,11 @@ namespace __ranges //------------------------------------------------------------------------ template -void +auto /* see _Size inside the function */ __pattern_walk_n(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Function __f, _Ranges&&... __rngs) { - auto __n = oneapi::dpl::__ranges::__get_first_range_size(__rngs...); + using _Size = std::make_unsigned_t...>>; + const _Size __n = std::min({_Size(__rngs.size())...}); if (__n > 0) { oneapi::dpl::__par_backend_hetero::__parallel_for(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), @@ -62,6 +63,7 @@ __pattern_walk_n(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Function ::std::forward<_Ranges>(__rngs)...) .__deferrable_wait(); } + return __n; } #if _ONEDPL_CPP20_RANGES_PRESENT @@ -678,46 +680,51 @@ struct __copy1_wrapper; template struct __copy2_wrapper; +struct __out_size_limit : public std::true_type +{ +}; + template -oneapi::dpl::__internal::__difference_t<_Range3> +std::pair, oneapi::dpl::__internal::__difference_t<_Range2>> __pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) { - auto __n1 = __rng1.size(); - auto __n2 = __rng2.size(); - auto __n = __n1 + __n2; - if (__n == 0) - return 0; + if (__rng3.empty()) + return {0, 0}; + + const auto __n1 = __rng1.size(); + const auto __n2 = __rng2.size(); //To consider the direct copying pattern call in case just one of sequences is empty. if (__n1 == 0) { - oneapi::dpl::__internal::__ranges::__pattern_walk_n( + auto __res = oneapi::dpl::__internal::__ranges::__pattern_walk_n( __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy1_wrapper>( ::std::forward<_ExecutionPolicy>(__exec)), oneapi::dpl::__internal::__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{}, ::std::forward<_Range2>(__rng2), ::std::forward<_Range3>(__rng3)); + return {0, __res}; } - else if (__n2 == 0) + + if (__n2 == 0) { - oneapi::dpl::__internal::__ranges::__pattern_walk_n( + auto __res = oneapi::dpl::__internal::__ranges::__pattern_walk_n( __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy2_wrapper>( ::std::forward<_ExecutionPolicy>(__exec)), oneapi::dpl::__internal::__brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy>{}, ::std::forward<_Range1>(__rng1), ::std::forward<_Range3>(__rng3)); - } - else - { - __par_backend_hetero::__parallel_merge(_BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), - ::std::forward<_Range1>(__rng1), ::std::forward<_Range2>(__rng2), - ::std::forward<_Range3>(__rng3), __comp) - .__deferrable_wait(); + return {__res, 0}; } - return __n; + auto __res = __par_backend_hetero::__parallel_merge( + _BackendTag{}, ::std::forward<_ExecutionPolicy>(__exec), ::std::forward<_Range1>(__rng1), + ::std::forward<_Range2>(__rng2), ::std::forward<_Range3>(__rng3), __comp, __out_size_limit{}); + + auto __val = __res.get(); + return {__val.first, __val.second}; } #if _ONEDPL_CPP20_RANGES_PRESENT @@ -727,21 +734,27 @@ auto __pattern_merge(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _R1&& __r1, _R2&& __r2, _OutRange&& __out_r, _Comp __comp, _Proj1 __proj1, _Proj2 __proj2) { - assert(std::ranges::size(__r1) + std::ranges::size(__r2) <= std::ranges::size(__out_r)); // for debug purposes only - auto __comp_2 = [__comp, __proj1, __proj2](auto&& __val1, auto&& __val2) { return std::invoke(__comp, std::invoke(__proj1, std::forward(__val1)), std::invoke(__proj2, std::forward(__val2)));}; - auto __res = oneapi::dpl::__internal::__ranges::__pattern_merge(__tag, std::forward<_ExecutionPolicy>(__exec), - oneapi::dpl::__ranges::views::all_read(__r1), oneapi::dpl::__ranges::views::all_read(__r2), - oneapi::dpl::__ranges::views::all_write(__out_r), __comp_2); + using _Index1 = std::ranges::range_difference_t<_R1>; + using _Index2 = std::ranges::range_difference_t<_R2>; + using _Index3 = std::ranges::range_difference_t<_OutRange>; + + const _Index1 __n_1 = std::ranges::size(__r1); + const _Index2 __n_2 = std::ranges::size(__r2); + const _Index3 __n_out = std::min<_Index3>(__n_1 + __n_2, std::ranges::size(__out_r)); + + const std::pair __res = oneapi::dpl::__internal::__ranges::__pattern_merge( + __tag, std::forward<_ExecutionPolicy>(__exec), oneapi::dpl::__ranges::views::all_read(__r1), + oneapi::dpl::__ranges::views::all_read(__r2), oneapi::dpl::__ranges::views::all_write(__out_r), __comp_2); using __return_t = std::ranges::merge_result, std::ranges::borrowed_iterator_t<_R2>, std::ranges::borrowed_iterator_t<_OutRange>>; - return __return_t{std::ranges::begin(__r1) + std::ranges::size(__r1), std::ranges::begin(__r2) + - std::ranges::size(__r2), std::ranges::begin(__out_r) + __res}; + return __return_t{std::ranges::begin(__r1) + __res.first, std::ranges::begin(__r2) + __res.second, + std::ranges::begin(__out_r) + __n_out}; } #endif //_ONEDPL_CPP20_RANGES_PRESENT diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h index b074a42fbbe..280e3e20b13 100644 --- a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h @@ -149,9 +149,10 @@ constexpr static bool __can_use_ternary_op_v = __can_use_ternary_op<_Rng1DataTyp // Do serial merge of the data from rng1 (starting from start1) and rng2 (starting from start2) and writing // to rng3 (starting from start3) in 'chunk' steps, but do not exceed the total size of the sequences (n1 and n2) template -void +std::pair<_Index, _Index> __serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, const _Index __start1, const _Index __start2, - const _Index __start3, const _Index __chunk, const _Index __n1, const _Index __n2, _Compare __comp) + const _Index __start3, const _Index __chunk, const _Index __n1, const _Index __n2, _Compare __comp, + const _Index __n3 = 0) { const _Index __rng1_size = std::min<_Index>(__n1 > __start1 ? __n1 - __start1 : _Index{0}, __chunk); const _Index __rng2_size = std::min<_Index>(__n2 > __start2 ? __n2 - __start2 : _Index{0}, __chunk); @@ -159,7 +160,7 @@ __serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, const _I const _Index __rng1_idx_end = __start1 + __rng1_size; const _Index __rng2_idx_end = __start2 + __rng2_size; - const _Index __rng3_idx_end = __start3 + __rng3_size; + const _Index __rng3_idx_end = __n3 > 0 ? std::min<_Index>(__n3, __start3 + __rng3_size) : __start3 + __rng3_size; _Index __rng1_idx = __start1; _Index __rng2_idx = __start2; @@ -193,14 +194,15 @@ __serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, const _I __rng3[__rng3_idx] = __rng1[__rng1_idx++]; } } + return {__rng1_idx, __rng2_idx}; } // Please see the comment for __parallel_for_submitter for optional kernel name explanation -template +template struct __parallel_merge_submitter; -template -struct __parallel_merge_submitter<_IdType, __internal::__optional_kernel_name<_Name...>> +template +struct __parallel_merge_submitter<_OutSizeLimit, _IdType, __internal::__optional_kernel_name<_Name...>> { template auto @@ -208,7 +210,7 @@ struct __parallel_merge_submitter<_IdType, __internal::__optional_kernel_name<_N { const _IdType __n1 = __rng1.size(); const _IdType __n2 = __rng2.size(); - const _IdType __n = __n1 + __n2; + const _IdType __n = std::min<_IdType>(__n1 + __n2, __rng3.size()); assert(__n1 > 0 || __n2 > 0); @@ -219,28 +221,65 @@ struct __parallel_merge_submitter<_IdType, __internal::__optional_kernel_name<_N const _IdType __steps = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __chunk); - auto __event = __exec.queue().submit( - [&__rng1, &__rng2, &__rng3, __comp, __chunk, __steps, __n1, __n2](sycl::handler& __cgh) { - oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); - __cgh.parallel_for<_Name...>(sycl::range(__steps), [=](sycl::item __item_id) { - const _IdType __i_elem = __item_id.get_linear_id() * __chunk; - const auto __start = - __find_start_point(__rng1, _IdType{0}, __n1, __rng2, _IdType{0}, __n2, __i_elem, __comp); - __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, __chunk, __n1, __n2, - __comp); - }); + using __val_t = _split_point_t<_IdType>; + using __result_and_scratch_storage_t = __result_and_scratch_storage<_ExecutionPolicy, __val_t>; + __result_and_scratch_storage_t* __p_res_storage = nullptr; + + if constexpr (_OutSizeLimit{}) + __p_res_storage = new __result_and_scratch_storage_t(__exec, 1, 0); + else + assert(__rng3.size() >= __n1 + __n2); + + auto __event = __exec.queue().submit([&__rng1, &__rng2, &__rng3, __p_res_storage, __comp, __chunk, __steps, __n, + __n1, __n2](sycl::handler& __cgh) { + oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); + auto __result_acc = __get_acc(__p_res_storage, __cgh); + + __cgh.parallel_for<_Name...>(sycl::range(__steps), [=](sycl::item __item_id) { + auto __id = __item_id.get_linear_id(); + const _IdType __i_elem = __id * __chunk; + + const auto __n_merge = std::min<_IdType>(__chunk, __n - __i_elem); + const auto __start = + __find_start_point(__rng1, _IdType{0}, __n1, __rng2, _IdType{0}, __n2, __i_elem, __comp); + + [[maybe_unused]] const std::pair __ends = + __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, __n_merge, __n1, + __n2, __comp, __n); + + if constexpr (_OutSizeLimit{}) + if (__id == __steps - 1) //the last WI does additional work + { + auto __res_ptr = __result_and_scratch_storage_t::__get_usm_or_buffer_accessor_ptr(__result_acc); + *__res_ptr = __ends; + } }); + }); + // Save the raw pointer into a shared_ptr to return it in __future and extend the lifetime of the storage. // We should return the same thing in the second param of __future for compatibility // with the returning value in __parallel_merge_submitter_large::operator() - return __future(__event, std::shared_ptr<__result_and_scratch_storage_base>{}); + return __future(__event, std::shared_ptr<__result_and_scratch_storage_base>{__p_res_storage}); + } + + private: + template + static constexpr auto + __get_acc(_Storage* __p_res_storage, sycl::handler& __cgh) + { + if constexpr (_OutSizeLimit{}) + return __p_res_storage->template __get_result_acc(__cgh, __dpl_sycl::__no_init{}); + else + return int{0}; } }; -template +template struct __parallel_merge_submitter_large; -template -struct __parallel_merge_submitter_large<_IdType, _CustomName, +template +struct __parallel_merge_submitter_large<_OutSizeLimit, _IdType, _CustomName, __internal::__optional_kernel_name<_DiagonalsKernelName...>, __internal::__optional_kernel_name<_MergeKernelName...>> { @@ -256,10 +295,9 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, // Calculate nd-range parameters template nd_range_params - eval_nd_range_params(_ExecutionPolicy&& __exec, const _Range1& __rng1, const _Range2& __rng2) const + eval_nd_range_params(_ExecutionPolicy&& __exec, const _Range1& __rng1, const _Range2& __rng2, + const std::size_t __n) const { - const std::size_t __n = __rng1.size() + __rng2.size(); - // Empirical number of values to process per work-item const std::uint8_t __chunk = __exec.queue().get_device().is_cpu() ? 128 : 4; @@ -275,13 +313,12 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, // Calculation of split points on each base diagonal template sycl::event - eval_split_points_for_groups(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Compare __comp, - const nd_range_params& __nd_range_params, + eval_split_points_for_groups(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _IdType __n, + _Compare __comp, const nd_range_params& __nd_range_params, _Storage& __base_diagonals_sp_global_storage) const { const _IdType __n1 = __rng1.size(); const _IdType __n2 = __rng2.size(); - const _IdType __n = __n1 + __n2; const _IdType __base_diag_chunk = __nd_range_params.steps_between_two_base_diags * __nd_range_params.chunk; @@ -319,13 +356,16 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, { const _IdType __n1 = __rng1.size(); const _IdType __n2 = __rng2.size(); + const _IdType __n = std::min<_IdType>(__n1 + __n2, __rng3.size()); - return __exec.queue().submit([&__event, &__rng1, &__rng2, &__rng3, __comp, __nd_range_params, + return __exec.queue().submit([&__event, &__rng1, &__rng2, &__rng3, __n, __comp, __nd_range_params, __base_diagonals_sp_global_storage, __n1, __n2](sycl::handler& __cgh) { oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); auto __base_diagonals_sp_global_acc = __base_diagonals_sp_global_storage.template __get_scratch_acc(__cgh); + auto __result_acc = __get_acc(__base_diagonals_sp_global_storage, __cgh); + __cgh.depends_on(__event); __cgh.parallel_for<_MergeKernelName...>( @@ -351,35 +391,58 @@ struct __parallel_merge_submitter_large<_IdType, _CustomName, __start = __base_diagonals_sp_global_ptr[__diagonal_idx]; } - __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, - __nd_range_params.chunk, __n1, __n2, __comp); + [[maybe_unused]] const std::pair __ends = + __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, + __nd_range_params.chunk, __n1, __n2, __comp, __n); + + if constexpr (_OutSizeLimit{}) + if (__global_idx == __nd_range_params.steps - 1) + { + auto __res_ptr = _Storage::__get_usm_or_buffer_accessor_ptr(__result_acc); + *__res_ptr = __ends; + } }); }); } + template + static constexpr auto + __get_acc(const _Storage& __base_diagonals_sp_global_storage, sycl::handler& __cgh) + { + if constexpr (_OutSizeLimit{}) + return __base_diagonals_sp_global_storage.template __get_result_acc( + __cgh, __dpl_sycl::__no_init{}); + else + return int{0}; + } + public: template auto operator()(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) const { - assert(__rng1.size() > 0 || __rng2.size() > 0); + const _IdType __n1 = __rng1.size(); + const _IdType __n2 = __rng2.size(); + assert(__n1 > 0 || __n2 > 0); + + const _IdType __n = std::min<_IdType>(__n1 + __n2, __rng3.size()); _PRINT_INFO_IN_DEBUG_MODE(__exec); // Calculate nd-range parameters - const nd_range_params __nd_range_params = eval_nd_range_params(__exec, __rng1, __rng2); + const nd_range_params __nd_range_params = eval_nd_range_params(__exec, __rng1, __rng2, __n); // Create storage to save split-points on each base diagonal + 1 (for the right base diagonal in the last work-group) - auto __p_base_diagonals_sp_global_storage = - new __result_and_scratch_storage<_ExecutionPolicy, _split_point_t<_IdType>>( - __exec, 0, __nd_range_params.base_diag_count + 1); + using __val_t = _split_point_t<_IdType>; + auto __p_base_diagonals_sp_global_storage = new __result_and_scratch_storage<_ExecutionPolicy, __val_t>( + __exec, _OutSizeLimit{} ? 1 : 0, __nd_range_params.base_diag_count + 1); // Save the raw pointer into a shared_ptr to return it in __future and extend the lifetime of the storage. std::shared_ptr<__result_and_scratch_storage_base> __p_result_and_scratch_storage_base( static_cast<__result_and_scratch_storage_base*>(__p_base_diagonals_sp_global_storage)); // Find split-points on the base diagonals - sycl::event __event = eval_split_points_for_groups(__exec, __rng1, __rng2, __comp, __nd_range_params, + sycl::event __event = eval_split_points_for_groups(__exec, __rng1, __rng2, __n, __comp, __nd_range_params, *__p_base_diagonals_sp_global_storage); // Merge data using split points on each diagonal @@ -413,16 +476,17 @@ __get_starting_size_limit_for_large_submitter() return 16 * 1'048'576; // 16 MB } -template +template auto __parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy&& __exec, _Range1&& __rng1, - _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) + _Range2&& __rng2, _Range3&& __rng3, _Compare __comp, _OutSizeLimit = {}) { using _CustomName = oneapi::dpl::__internal::__policy_kernel_name<_ExecutionPolicy>; using __value_type = oneapi::dpl::__internal::__value_t<_Range3>; - const std::size_t __n = __rng1.size() + __rng2.size(); + const std::size_t __n = std::min(__rng1.size() + __rng2.size(), __rng3.size()); if (__n < __get_starting_size_limit_for_large_submitter<__value_type>()) { using _WiIndex = std::uint32_t; @@ -430,7 +494,7 @@ __parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy std::numeric_limits<_WiIndex>::max()); using _MergeKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< __merge_kernel_name<_CustomName, _WiIndex>>; - return __parallel_merge_submitter<_WiIndex, _MergeKernelName>()( + return __parallel_merge_submitter<_OutSizeLimit, _WiIndex, _MergeKernelName>()( std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__rng1), std::forward<_Range2>(__rng2), std::forward<_Range3>(__rng3), __comp); } @@ -443,7 +507,8 @@ __parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy __diagonals_kernel_name<_CustomName, _WiIndex>>; using _MergeKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< __merge_kernel_name_large<_CustomName, _WiIndex>>; - return __parallel_merge_submitter_large<_WiIndex, _CustomName, _DiagonalsKernelName, _MergeKernelName>()( + return __parallel_merge_submitter_large<_OutSizeLimit, _WiIndex, _CustomName, _DiagonalsKernelName, + _MergeKernelName>()( std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__rng1), std::forward<_Range2>(__rng2), std::forward<_Range3>(__rng3), __comp); } @@ -454,7 +519,8 @@ __parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy __diagonals_kernel_name<_CustomName, _WiIndex>>; using _MergeKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< __merge_kernel_name_large<_CustomName, _WiIndex>>; - return __parallel_merge_submitter_large<_WiIndex, _CustomName, _DiagonalsKernelName, _MergeKernelName>()( + return __parallel_merge_submitter_large<_OutSizeLimit, _WiIndex, _CustomName, _DiagonalsKernelName, + _MergeKernelName>()( std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__rng1), std::forward<_Range2>(__rng2), std::forward<_Range3>(__rng3), __comp); } diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h index fe1db3ff542..215075b7a79 100644 --- a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_utils.h @@ -525,6 +525,8 @@ struct __usm_or_buffer_accessor struct __result_and_scratch_storage_base { virtual ~__result_and_scratch_storage_base() = default; + virtual std::size_t + __get_data(sycl::event, std::size_t* __p_buf) const = 0; }; template @@ -657,10 +659,13 @@ struct __result_and_scratch_storage : __result_and_scratch_storage_base #endif } - bool - is_USM() const + _T + __wait_and_get_value(sycl::event __event) const { - return __supports_USM_device; + if (is_USM()) + __event.wait_and_throw(); + + return __get_value(); } // Note: this member function assumes the result is *ready*, since the __future has already @@ -681,18 +686,41 @@ struct __result_and_scratch_storage : __result_and_scratch_storage_base } else { - return __sycl_buf->get_host_access(sycl::read_only)[__scratch_n]; + return __sycl_buf->get_host_access(sycl::read_only)[__scratch_n + idx]; } } - template - _T - __wait_and_get_value(_Event&& __event, size_t idx = 0) const + private: + bool + is_USM() const + { + return __supports_USM_device; + } + + template + std::size_t + __fill_data(std::pair<_Type, _Type>&& __p, std::size_t* __p_buf) const + { + __p_buf[0] = __p.first; + __p_buf[1] = __p.second; + return 2; + } + + template + std::size_t + __fill_data(_Args&&...) const + { + assert(!"Unsupported return type"); + return 0; + } + + virtual std::size_t + __get_data(sycl::event __event, std::size_t* __p_buf) const override { if (is_USM()) __event.wait_and_throw(); - return __get_value(idx); + return __fill_data(__get_value(), __p_buf); } }; @@ -718,7 +746,7 @@ class __future : private std::tuple<_Args...> _Event __my_event; template - constexpr auto + constexpr _T __wait_and_get_value(const sycl::buffer<_T>& __buf) { //according to a contract, returned value is one-element sycl::buffer @@ -732,8 +760,18 @@ class __future : private std::tuple<_Args...> return __storage.__wait_and_get_value(__my_event); } + constexpr std::pair + __wait_and_get_value(const std::shared_ptr<__result_and_scratch_storage_base>& __p_storage) + { + std::size_t __buf[2] = {0, 0}; + auto __n = __p_storage->__get_data(__my_event, __buf); + assert(__n == 2); + + return {__buf[0], __buf[1]}; + } + template - constexpr auto + constexpr _T __wait_and_get_value(const _T& __val) { wait(); diff --git a/include/oneapi/dpl/pstl/omp/parallel_for.h b/include/oneapi/dpl/pstl/omp/parallel_for.h index 1a0ea24d798..7fd0ec78636 100644 --- a/include/oneapi/dpl/pstl/omp/parallel_for.h +++ b/include/oneapi/dpl/pstl/omp/parallel_for.h @@ -29,10 +29,10 @@ namespace __omp_backend template void -__parallel_for_body(_Index __first, _Index __last, _Fp __f) +__parallel_for_body(_Index __first, _Index __last, _Fp __f, std::size_t __grainsize) { // initial partition of the iteration space into chunks - auto __policy = oneapi::dpl::__omp_backend::__chunk_partitioner(__first, __last); + auto __policy = oneapi::dpl::__omp_backend::__chunk_partitioner(__first, __last, __grainsize); // To avoid over-subscription we use taskloop for the nested parallelism _PSTL_PRAGMA(omp taskloop untied mergeable) @@ -49,20 +49,24 @@ __parallel_for_body(_Index __first, _Index __last, _Fp __f) template void -__parallel_for(oneapi::dpl::__internal::__omp_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f) +__parallel_for(oneapi::dpl::__internal::__omp_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f, + std::size_t __grainsize = __default_chunk_size) { if (omp_in_parallel()) { // we don't create a nested parallel region in an existing parallel // region: just create tasks - oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f); + oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f, __grainsize); } else { // in any case (nested or non-nested) one parallel region is created and // only one thread creates a set of tasks _PSTL_PRAGMA(omp parallel) - _PSTL_PRAGMA(omp single nowait) { oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f); } + _PSTL_PRAGMA(omp single nowait) + { + oneapi::dpl::__omp_backend::__parallel_for_body(__first, __last, __f, __grainsize); + } } } diff --git a/include/oneapi/dpl/pstl/parallel_backend_serial.h b/include/oneapi/dpl/pstl/parallel_backend_serial.h index 73714ebaf34..b898b582af2 100644 --- a/include/oneapi/dpl/pstl/parallel_backend_serial.h +++ b/include/oneapi/dpl/pstl/parallel_backend_serial.h @@ -87,7 +87,7 @@ __cancel_execution(oneapi::dpl::__internal::__serial_backend_tag) template void __parallel_for(oneapi::dpl::__internal::__serial_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, - _Fp __f) + _Fp __f, std::size_t /*__grainsize*/ = 1) { __f(__first, __last); } diff --git a/include/oneapi/dpl/pstl/parallel_backend_tbb.h b/include/oneapi/dpl/pstl/parallel_backend_tbb.h index 84f0708e756..04b63b16f94 100644 --- a/include/oneapi/dpl/pstl/parallel_backend_tbb.h +++ b/include/oneapi/dpl/pstl/parallel_backend_tbb.h @@ -93,10 +93,13 @@ class __parallel_for_body // wrapper over tbb::parallel_for template void -__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f) +__parallel_for(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, _Index __first, _Index __last, _Fp __f, + std::size_t __grainsize = 1 /*matches the default grainsize value of tbb::blocked_range according to + the specification*/) { tbb::this_task_arena::isolate([=]() { - tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last), __parallel_for_body<_Index, _Fp>(__f)); + tbb::parallel_for(tbb::blocked_range<_Index>(__first, __last, __grainsize), + __parallel_for_body<_Index, _Fp>(__f)); }); } @@ -412,7 +415,6 @@ __parallel_transform_scan(oneapi::dpl::__internal::__tbb_backend_tag, _Execution // // These are used by parallel implementations but do not depend on them. //------------------------------------------------------------------------ -#define _ONEDPL_MERGE_CUT_OFF 2000 template class __func_task; @@ -712,6 +714,8 @@ class __root_task : public __task }; #endif // TBB_INTERFACE_VERSION <= 12000 +inline constexpr std::size_t __merge_cut_off = 2000; + template class __merge_func @@ -731,8 +735,6 @@ class __merge_func _LeafMerge _M_leaf_merge; _SizeType _M_nsort; //number of elements to be sorted for partial_sort algorithm - static const _SizeType __merge_cut_off = _ONEDPL_MERGE_CUT_OFF; - bool _root; //means a task is merging root task bool _x_orig; //"true" means X(or left ) subrange is in the original container; false - in the buffer bool _y_orig; //"true" means Y(or right) subrange is in the original container; false - in the buffer @@ -1223,7 +1225,6 @@ operator()(__task* __self) typedef typename ::std::iterator_traits<_RandomAccessIterator2>::difference_type _DifferenceType2; typedef typename ::std::common_type_t<_DifferenceType1, _DifferenceType2> _SizeType; const _SizeType __n = (_M_xe - _M_xs) + (_M_ye - _M_ys); - const _SizeType __merge_cut_off = _ONEDPL_MERGE_CUT_OFF; if (__n <= __merge_cut_off) { _M_leaf_merge(_M_xs, _M_xe, _M_ys, _M_ye, _M_zs, _M_comp); @@ -1264,7 +1265,6 @@ __parallel_merge(oneapi::dpl::__internal::__tbb_backend_tag, _ExecutionPolicy&&, typedef typename ::std::iterator_traits<_RandomAccessIterator2>::difference_type _DifferenceType2; typedef typename ::std::common_type_t<_DifferenceType1, _DifferenceType2> _SizeType; const _SizeType __n = (__xe - __xs) + (__ye - __ys); - const _SizeType __merge_cut_off = _ONEDPL_MERGE_CUT_OFF; if (__n <= __merge_cut_off) { // Fall back on serial merge diff --git a/test/parallel_api/ranges/std_ranges_merge.pass.cpp b/test/parallel_api/ranges/std_ranges_merge.pass.cpp index 7752c82f275..2c1307bff67 100644 --- a/test/parallel_api/ranges/std_ranges_merge.pass.cpp +++ b/test/parallel_api/ranges/std_ranges_merge.pass.cpp @@ -25,25 +25,58 @@ main() //A checker below modifies a return type; a range based version with policy has another return type. auto merge_checker = [](std::ranges::random_access_range auto&& r_1, std::ranges::random_access_range auto&& r_2, - std::ranges::random_access_range auto&& r_out, auto&&... args) + std::ranges::random_access_range auto&& r_out, auto comp, auto proj1, + auto proj2) { - auto res = std::ranges::merge(std::forward(r_1), std::forward(r_2), - std::ranges::begin(r_out), std::forward(args)...); - using ret_type = std::ranges::merge_result, std::ranges::borrowed_iterator_t, std::ranges::borrowed_iterator_t>; - return ret_type{res.in1, res.in2, res.out}; + + auto it_out = std::ranges::begin(r_out); + auto it_1 = std::ranges::begin(r_1); + auto it_2 = std::ranges::begin(r_2); + auto it_1_e = std::ranges::end(r_1); + auto it_2_e = std::ranges::end(r_2); + auto it_out_e = std::ranges::end(r_out); + + while(it_1 != it_1_e && it_2 != it_2_e) + { + if (std::invoke(comp, std::invoke(proj2, *it_2), std::invoke(proj1, *it_1))) + { + *it_out = *it_2; + ++it_out, ++it_2; + } + else + { + *it_out = *it_1; + ++it_out, ++it_1; + } + if(it_out == it_out_e) + return ret_type{it_1, it_2, it_out}; + } + + if(it_1 == it_1_e) + { + for(; it_2 != it_2_e && it_out != it_out_e; ++it_2, ++it_out) + *it_out = *it_2; + } + else + { + for(; it_1 != it_1_e && it_out != it_out_e; ++it_1, ++it_out) + *it_out = *it_1; + } + + return ret_type{it_1, it_2, it_out}; }; - test_range_algo<0, int, data_in_in_out>{big_sz}(dpl_ranges::merge, merge_checker, std::ranges::less{}); + test_range_algo<0, int, data_in_in_out_lim>{big_sz}(dpl_ranges::merge, merge_checker, std::ranges::less{}, std::identity{}, std::identity{}); - test_range_algo<1, int, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, proj, proj); - test_range_algo<2, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::x, &P2::x); - test_range_algo<3, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::proj, &P2::proj); + test_range_algo<1, int, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, proj, proj); + test_range_algo<2, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::x, &P2::x); + test_range_algo<3, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::less{}, &P2::proj, &P2::proj); - test_range_algo<4, int, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, proj, proj); - test_range_algo<5, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::x, &P2::x); - test_range_algo<6, P2, data_in_in_out>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::proj, &P2::proj); + test_range_algo<4, int, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, proj, proj); + test_range_algo<5, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::x, &P2::x); + test_range_algo<6, P2, data_in_in_out_lim>{}(dpl_ranges::merge, merge_checker, std::ranges::greater{}, &P2::proj, &P2::proj); #endif //_ENABLE_STD_RANGES_TESTING return TestUtils::done(_ENABLE_STD_RANGES_TESTING); diff --git a/test/parallel_api/ranges/std_ranges_test.h b/test/parallel_api/ranges/std_ranges_test.h index 6057ccebfcd..51e0665a27f 100644 --- a/test/parallel_api/ranges/std_ranges_test.h +++ b/test/parallel_api/ranges/std_ranges_test.h @@ -277,14 +277,12 @@ struct test Container cont_in1(exec, n_in1, [](auto i) { return i;}); Container cont_in2(exec, n_in2, [](auto i) { return i/3;}); - const int max_n_out = max_n*2; - Container cont_out(exec, max_n_out, [](auto i) { return 0;}); - Container cont_exp(exec, max_n_out, [](auto i) { return 0;}); + Container cont_out(exec, n_out, [](auto i) { return 0;}); + Container cont_exp(exec, n_out, [](auto i) { return 0;}); assert(n_in1 <= max_n); assert(n_in2 <= max_n); - assert(n_out <= max_n_out); - + auto src_view1 = tr_in(std::views::all(cont_in1())); auto src_view2 = tr_in(std::views::all(cont_in2())); auto expected_view = tr_out(std::views::all(cont_exp())); @@ -322,6 +320,7 @@ struct test { const int r_size = max_n; process_data_in_in_out(r_size, r_size, r_size, exec, algo, checker, args...); + process_data_in_in_out(r_size, r_size, r_size*2, exec, algo, checker, args...); process_data_in_in_out(r_size/2, r_size, r_size, exec, algo, checker, args...); process_data_in_in_out(r_size, r_size/2, r_size, exec, algo, checker, args...); process_data_in_in_out(r_size, r_size, r_size/2, std::forward(exec), algo, checker, args...);