diff --git a/include/oneapi/dpl/internal/reduce_by_segment_impl.h b/include/oneapi/dpl/internal/reduce_by_segment_impl.h index d0979554786..aa718e3743f 100644 --- a/include/oneapi/dpl/internal/reduce_by_segment_impl.h +++ b/include/oneapi/dpl/internal/reduce_by_segment_impl.h @@ -57,9 +57,8 @@ #include "../pstl/utils_ranges.h" #include "../pstl/hetero/dpcpp/utils_ranges_sycl.h" #include "../pstl/ranges_defs.h" -#include "../pstl/glue_algorithm_ranges_impl.h" +#include "../pstl/hetero/algorithm_impl_hetero.h" #include "../pstl/hetero/dpcpp/sycl_traits.h" //SYCL traits specialization for some oneDPL types. -#include "scan_by_segment_impl.h" #endif namespace oneapi @@ -169,414 +168,9 @@ reduce_by_segment_impl(_Tag, Policy&& policy, InputIterator1 first1, InputIterat #if _ONEDPL_BACKEND_SYCL -template -class __seg_reduce_count_kernel; -template -class __seg_reduce_offset_kernel; -template -class __seg_reduce_wg_kernel; -template -class __seg_reduce_prefix_kernel; - -namespace -{ -template -using _SegReduceCountPhase = __seg_reduce_count_kernel<_Name...>; -template -using _SegReduceOffsetPhase = __seg_reduce_offset_kernel<_Name...>; -template -using _SegReduceWgPhase = __seg_reduce_wg_kernel<_Name...>; -template -using _SegReducePrefixPhase = __seg_reduce_prefix_kernel<_Name...>; -} // namespace - -template -oneapi::dpl::__internal::__difference_t<_Range3> -__sycl_reduce_by_segment(__internal::__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Range1&& __keys, - _Range2&& __values, _Range3&& __out_keys, _Range4&& __out_values, - _BinaryPredicate __binary_pred, _BinaryOperator __binary_op, - ::std::false_type /* has_known_identity */) -{ - return oneapi::dpl::experimental::ranges::reduce_by_segment( - ::std::forward<_ExecutionPolicy>(__exec), ::std::forward<_Range1>(__keys), ::std::forward<_Range2>(__values), - ::std::forward<_Range3>(__out_keys), ::std::forward<_Range4>(__out_values), __binary_pred, __binary_op); -} - -template -oneapi::dpl::__internal::__difference_t<_Range3> -__sycl_reduce_by_segment(__internal::__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _Range1&& __keys, - _Range2&& __values, _Range3&& __out_keys, _Range4&& __out_values, - _BinaryPredicate __binary_pred, _BinaryOperator __binary_op, - ::std::true_type /* has_known_identity */) -{ - using _CustomName = oneapi::dpl::__internal::__policy_kernel_name<_ExecutionPolicy>; - - using _SegReduceCountKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< - _SegReduceCountPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, - _BinaryOperator>; - using _SegReduceOffsetKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< - _SegReduceOffsetPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, - _BinaryOperator>; - using _SegReduceWgKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< - _SegReduceWgPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, - _BinaryOperator>; - using _SegReducePrefixKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< - _SegReducePrefixPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, - _BinaryOperator>; - - using __diff_type = oneapi::dpl::__internal::__difference_t<_Range3>; - using __key_type = oneapi::dpl::__internal::__value_t<_Range1>; - using __val_type = oneapi::dpl::__internal::__value_t<_Range2>; - - const ::std::size_t __n = __keys.size(); - - constexpr ::std::uint16_t __vals_per_item = - 16; // Each work item serially processes 16 items. Best observed performance on gpu - - // Limit the work-group size to prevent large sizes on CPUs. Empirically found value. - // This value exceeds the current practical limit for GPUs, but may need to be re-evaluated in the future. - std::size_t __wgroup_size = oneapi::dpl::__internal::__max_work_group_size(__exec, (std::size_t)2048); - - // adjust __wgroup_size according to local memory limit. Double the requirement on __val_type due to sycl group algorithm's use - // of SLM. - __wgroup_size = oneapi::dpl::__internal::__slm_adjusted_work_group_size( - __exec, sizeof(__key_type) + 2 * sizeof(__val_type), __wgroup_size); - -#if _ONEDPL_COMPILE_KERNEL - auto __seg_reduce_count_kernel = - __par_backend_hetero::__internal::__kernel_compiler<_SegReduceCountKernel>::__compile(__exec); - auto __seg_reduce_offset_kernel = - __par_backend_hetero::__internal::__kernel_compiler<_SegReduceOffsetKernel>::__compile(__exec); - auto __seg_reduce_wg_kernel = - __par_backend_hetero::__internal::__kernel_compiler<_SegReduceWgKernel>::__compile(__exec); - auto __seg_reduce_prefix_kernel = - __par_backend_hetero::__internal::__kernel_compiler<_SegReducePrefixKernel>::__compile(__exec); - __wgroup_size = - ::std::min({__wgroup_size, - oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_count_kernel), - oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_offset_kernel), - oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_wg_kernel), - oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_prefix_kernel)}); -#endif - - ::std::size_t __n_groups = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __wgroup_size * __vals_per_item); - - // intermediate reductions within a workgroup - auto __partials = - oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __val_type>(__exec, __n_groups).get_buffer(); - - auto __end_idx = oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, 1).get_buffer(); - - // the number of segment ends found in each work group - auto __seg_ends = - oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, __n_groups).get_buffer(); - - // buffer that stores an exclusive scan of the results - auto __seg_ends_scanned = - oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, __n_groups).get_buffer(); - - // 1. Count the segment ends in each workgroup - auto __seg_end_identification = __exec.queue().submit([&](sycl::handler& __cgh) { - oneapi::dpl::__ranges::__require_access(__cgh, __keys); - auto __seg_ends_acc = __seg_ends.template get_access(__cgh); -#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT - __cgh.use_kernel_bundle(__seg_reduce_count_kernel.get_kernel_bundle()); -#endif - __cgh.parallel_for<_SegReduceCountKernel>( - sycl::nd_range<1>{__n_groups * __wgroup_size, __wgroup_size}, [=]( -#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT - __seg_reduce_count_kernel, -#endif - sycl::nd_item<1> __item) { - auto __group = __item.get_group(); - ::std::size_t __group_id = __item.get_group(0); - ::std::size_t __local_id = __item.get_local_id(0); - ::std::size_t __global_id = __item.get_global_id(0); - - ::std::size_t __start = __global_id * __vals_per_item; - ::std::size_t __end = __dpl_sycl::__minimum{}(__start + __vals_per_item, __n); - ::std::size_t __item_segments = 0; - - // 1a. Work item scan to identify segment ends - for (::std::size_t __i = __start; __i < __end; ++__i) - if (__n - 1 == __i || !__binary_pred(__keys[__i], __keys[__i + 1])) - ++__item_segments; - - // 1b. Work group reduction - ::std::size_t __num_segs = __dpl_sycl::__reduce_over_group( - __group, __item_segments, __dpl_sycl::__plus()); - - // 1c. First work item writes segment count to global memory - if (__local_id == 0) - __seg_ends_acc[__group_id] = __num_segs; - }); - }); - - // 1.5 Small single-group kernel - auto __single_group_scan = __exec.queue().submit([&](sycl::handler& __cgh) { - __cgh.depends_on(__seg_end_identification); - auto __seg_ends_acc = __seg_ends.template get_access(__cgh); - auto __seg_ends_scan_acc = __seg_ends_scanned.template get_access(__cgh); -#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT - __cgh.use_kernel_bundle(__seg_reduce_offset_kernel.get_kernel_bundle()); -#endif - __cgh.parallel_for<_SegReduceOffsetKernel>( -#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT - __seg_reduce_offset_kernel, -#endif - sycl::nd_range<1>{__wgroup_size, __wgroup_size}, [=](sycl::nd_item<1> __item) { - auto __beg = __dpl_sycl::__get_accessor_ptr(__seg_ends_acc); - auto __out_beg = __dpl_sycl::__get_accessor_ptr(__seg_ends_scan_acc); - __dpl_sycl::__joint_exclusive_scan(__item.get_group(), __beg, __beg + __n_groups, __out_beg, - __diff_type(0), sycl::plus<__diff_type>()); - }); - }); - - // 2. Work group reduction - auto __wg_reduce = __exec.queue().submit([&](sycl::handler& __cgh) { - __cgh.depends_on(__single_group_scan); - oneapi::dpl::__ranges::__require_access(__cgh, __keys, __out_keys, __out_values, __values); - - auto __partials_acc = __partials.template get_access(__cgh); - auto __seg_ends_scan_acc = __seg_ends_scanned.template get_access(__cgh); - __dpl_sycl::__local_accessor<__val_type> __loc_acc(2 * __wgroup_size, __cgh); -#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT - __cgh.use_kernel_bundle(__seg_reduce_wg_kernel.get_kernel_bundle()); -#endif - __cgh.parallel_for<_SegReduceWgKernel>( -#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT - __seg_reduce_wg_kernel, -#endif - sycl::nd_range<1>{__n_groups * __wgroup_size, __wgroup_size}, [=](sycl::nd_item<1> __item) { - ::std::array<__val_type, __vals_per_item> __loc_partials; - - auto __group = __item.get_group(); - ::std::size_t __group_id = __item.get_group(0); - ::std::size_t __local_id = __item.get_local_id(0); - ::std::size_t __global_id = __item.get_global_id(0); - - // 2a. Lookup the number of prior segs - auto __wg_num_prior_segs = __seg_ends_scan_acc[__group_id]; - - // 2b. Perform a serial scan within the work item over assigned elements. Store partial - // reductions in work group local memory. - ::std::size_t __start = __global_id * __vals_per_item; - ::std::size_t __end = __dpl_sycl::__minimum{}(__start + __vals_per_item, __n); - - ::std::size_t __max_end = 0; - ::std::size_t __item_segments = 0; - auto __identity = unseq_backend::__known_identity<_BinaryOperator, __val_type>; - - __val_type __accumulator = __identity; - for (::std::size_t __i = __start; __i < __end; ++__i) - { - __accumulator = __binary_op(__accumulator, __values[__i]); - if (__n - 1 == __i || !__binary_pred(__keys[__i], __keys[__i + 1])) - { - __loc_partials[__i - __start] = __accumulator; - ++__item_segments; - __max_end = __local_id; - __accumulator = __identity; - } - } - - // 2c. Count the number of prior work segments cooperatively over group - ::std::size_t __prior_segs_in_wg = __dpl_sycl::__exclusive_scan_over_group( - __group, __item_segments, __dpl_sycl::__plus()); - ::std::size_t __start_idx = __wg_num_prior_segs + __prior_segs_in_wg; - - // 2d. Find the greatest segment end less than the current index (inclusive) - ::std::size_t __closest_seg_id = __dpl_sycl::__inclusive_scan_over_group( - __group, __max_end, __dpl_sycl::__maximum()); - - // __wg_segmented_scan is a derivative work and responsible for the third header copyright - __val_type __carry_in = oneapi::dpl::internal::__wg_segmented_scan( - __item, __loc_acc, __local_id, __local_id - __closest_seg_id, __accumulator, __identity, - __binary_op, __wgroup_size); - - // 2e. Update local partial reductions in first segment and write to global memory. - bool __apply_aggs = true; - ::std::size_t __item_offset = 0; - - // first item in group does not have any work-group aggregates to apply - if (__local_id == 0) - { - __apply_aggs = false; - if (__global_id == 0 && __n > 0) - { - // first segment identifier is always the first key - __out_keys[0] = __keys[0]; - } - } - - // apply the aggregates and copy the locally stored values to destination buffer - for (::std::size_t __i = __start; __i < __end; ++__i) - { - if (__i == __n - 1 || !__binary_pred(__keys[__i], __keys[__i + 1])) - { - ::std::size_t __idx = __start_idx + __item_offset; - if (__apply_aggs) - { - __out_values[__idx] = __binary_op(__carry_in, __loc_partials[__i - __start]); - __apply_aggs = false; - } - else - { - __out_values[__idx] = __loc_partials[__i - __start]; - } - if (__i != __n - 1) - { - __out_keys[__idx + 1] = __keys[__i + 1]; - } - ++__item_offset; - } - } - - // 2f. Output the work group aggregate and total number of segments for use in phase 3. - if (__local_id == __wgroup_size - 1) // last work item writes the group's carry out - { - // If no segment ends in the item, the aggregates from previous work groups must be applied. - if (__max_end == 0) - { - // needs to be inclusive with last element - __partials_acc[__group_id] = __binary_op(__carry_in, __accumulator); - } - else - { - __partials_acc[__group_id] = __accumulator; - } - } - }); - }); - - // 3. Apply inter work-group aggregates - __exec.queue() - .submit([&](sycl::handler& __cgh) { - oneapi::dpl::__ranges::__require_access(__cgh, __keys, __out_keys, __out_values); - - auto __partials_acc = __partials.template get_access(__cgh); - auto __seg_ends_scan_acc = __seg_ends_scanned.template get_access(__cgh); - auto __seg_ends_acc = __seg_ends.template get_access(__cgh); - auto __end_idx_acc = __end_idx.template get_access(__cgh); - - __dpl_sycl::__local_accessor<__val_type> __loc_partials_acc(__wgroup_size, __cgh); - __dpl_sycl::__local_accessor<__diff_type> __loc_seg_ends_acc(__wgroup_size, __cgh); - - __cgh.depends_on(__wg_reduce); -#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT - __cgh.use_kernel_bundle(__seg_reduce_prefix_kernel.get_kernel_bundle()); -#endif - __cgh.parallel_for<_SegReducePrefixKernel>( -#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT - __seg_reduce_prefix_kernel, -#endif - sycl::nd_range<1>{__n_groups * __wgroup_size, __wgroup_size}, [=](sycl::nd_item<1> __item) { - auto __group = __item.get_group(); - ::std::int64_t __group_id = __item.get_group(0); - ::std::size_t __global_id = __item.get_global_id(0); - ::std::size_t __local_id = __item.get_local_id(0); - - ::std::size_t __start = __global_id * __vals_per_item; - ::std::size_t __end = __dpl_sycl::__minimum{}(__start + __vals_per_item, __n); - ::std::size_t __item_segments = 0; - - ::std::int64_t __wg_agg_idx = __group_id - 1; - __val_type __agg_collector = unseq_backend::__known_identity<_BinaryOperator, __val_type>; - - bool __ag_exists = false; - // 3a. Check to see if an aggregate exists and compute that value in the first - // work item. - if (__group_id != 0) - { - __ag_exists = __start < __n; - // local reductions followed by a sweep - constexpr ::std::int32_t __vals_to_explore = 16; - bool __last_it = false; - __loc_seg_ends_acc[__local_id] = false; - __loc_partials_acc[__local_id] = unseq_backend::__known_identity<_BinaryOperator, __val_type>; - for (::std::int32_t __i = __wg_agg_idx - __vals_to_explore * __local_id; !__last_it; - __i -= __wgroup_size * __vals_to_explore) - { - __val_type __local_collector = unseq_backend::__known_identity<_BinaryOperator, __val_type>; - // exploration phase - for (::std::int32_t __j = __i; - __j > __dpl_sycl::__maximum<::std::int32_t>{}(-1L, __i - __vals_to_explore); --__j) - { - __local_collector = __binary_op(__partials_acc[__j], __local_collector); - if (__seg_ends_acc[__j] || __j == 0) - { - __loc_seg_ends_acc[__local_id] = true; - break; - } - } - __loc_partials_acc[__local_id] = __local_collector; - __dpl_sycl::__group_barrier(__item); - // serial aggregate collection and synchronization - if (__local_id == 0) - { - for (::std::size_t __j = 0; __j < __wgroup_size; ++__j) - { - __agg_collector = __binary_op(__loc_partials_acc[__j], __agg_collector); - if (__loc_seg_ends_acc[__j]) - { - __last_it = true; - break; - } - } - } - __agg_collector = __dpl_sycl::__group_broadcast(__item.get_group(), __agg_collector); - __last_it = __dpl_sycl::__group_broadcast(__item.get_group(), __last_it); - } - - // Check to see if aggregates exist. - // The last group must always stay to write the final index - __ag_exists = __dpl_sycl::__any_of_group(__group, __ag_exists); - if (!__ag_exists && __group_id != __n_groups - 1) - return; - } - // 3b. count the segment ends - for (::std::size_t __i = __start; __i < __end; ++__i) - if (__i == __n - 1 || !__binary_pred(__keys[__i], __keys[__i + 1])) - ++__item_segments; - - ::std::size_t __prior_segs_in_wg = __dpl_sycl::__exclusive_scan_over_group( - __group, __item_segments, __dpl_sycl::__plus()); - - // 3c. Determine prior index - ::std::size_t __wg_num_prior_segs = __seg_ends_scan_acc[__group_id]; - - // 3d. Second pass over the keys, reidentifying end segments and applying work group - // aggregates if appropriate. Both the key and reduction value are written to the final output at the - // computed index - ::std::size_t __item_offset = 0; - for (::std::size_t __i = __start; __i < __end; ++__i) - { - if (__i == __n - 1 || !__binary_pred(__keys[__i], __keys[__i + 1])) - { - ::std::size_t __idx = __wg_num_prior_segs + __prior_segs_in_wg + __item_offset; - - // apply the aggregate if it is the first segment end in the workgroup only - if (__prior_segs_in_wg == 0 && __item_offset == 0 && __ag_exists) - __out_values[__idx] = __binary_op(__agg_collector, __out_values[__idx]); - - ++__item_offset; - // the last item must write the last index's position to return - if (__i == __n - 1) - __end_idx_acc[0] = __idx; - } - } - }); - }) - .wait(); - - return __end_idx.get_host_access()[0] + 1; -} - template -::std::pair +std::pair reduce_by_segment_impl(__internal::__hetero_tag<_BackendTag> __tag, Policy&& policy, InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, OutputIterator1 result1, OutputIterator2 result2, BinaryPred binary_pred, BinaryOperator binary_op) @@ -590,34 +184,20 @@ reduce_by_segment_impl(__internal::__hetero_tag<_BackendTag> __tag, Policy&& pol // keys_result = { 1, 2, 3, 4, 1, 3, 1, 3, 0 } -- result1 // values_result = { 1, 2, 3, 4, 2, 6, 2, 6, 0 } -- result2 - using _CountType = ::std::uint64_t; + using _CountType = std::uint64_t; namespace __bknd = __par_backend_hetero; - const auto n = ::std::distance(first1, last1); + const auto n = std::distance(first1, last1); if (n == 0) - return ::std::make_pair(result1, result2); - - auto keep_keys = __ranges::__get_sycl_range<__bknd::access_mode::read, InputIterator1>(); - auto key_buf = keep_keys(first1, last1); - auto keep_values = __ranges::__get_sycl_range<__bknd::access_mode::read, InputIterator2>(); - auto value_buf = keep_values(first2, first2 + n); - auto keep_key_outputs = __ranges::__get_sycl_range<__bknd::access_mode::write, OutputIterator1>(); - auto key_output_buf = keep_key_outputs(result1, result1 + n); - auto keep_value_outputs = __ranges::__get_sycl_range<__bknd::access_mode::write, OutputIterator2>(); - auto value_output_buf = keep_value_outputs(result2, result2 + n); - - using has_known_identity = - typename unseq_backend::__has_known_identity::value_type>::type; + return std::make_pair(result1, result2); // number of unique keys - _CountType __n = __sycl_reduce_by_segment( - __tag, ::std::forward(policy), key_buf.all_view(), value_buf.all_view(), key_output_buf.all_view(), - value_output_buf.all_view(), binary_pred, binary_op, has_known_identity{}); + _CountType __n = oneapi::dpl::__internal::__pattern_reduce_by_segment( + __tag, std::forward(policy), first1, last1, first2, result1, result2, binary_pred, binary_op); - return ::std::make_pair(result1 + __n, result2 + __n); + return std::make_pair(result1 + __n, result2 + __n); } #endif } // namespace internal diff --git a/include/oneapi/dpl/pstl/hetero/algorithm_impl_hetero.h b/include/oneapi/dpl/pstl/hetero/algorithm_impl_hetero.h index 1a51076c612..65bf99c8777 100644 --- a/include/oneapi/dpl/pstl/hetero/algorithm_impl_hetero.h +++ b/include/oneapi/dpl/pstl/hetero/algorithm_impl_hetero.h @@ -2003,6 +2003,48 @@ __pattern_shift_right(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec return __last - __res; } +template +struct __copy_keys_values_wrapper; + +template +typename std::iterator_traits<_Iterator3>::difference_type +__pattern_reduce_by_segment(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __exec, _Iterator1 __keys_first, + _Iterator1 __keys_last, _Iterator2 __values_first, _Iterator3 __out_keys_first, + _Iterator4 __out_values_first, _BinaryPredicate __binary_pred, _BinaryOperator __binary_op) +{ + std::size_t __n = std::distance(__keys_first, __keys_last); + + if (__n == 0) + return 0; + + if (__n == 1) + { + __brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy> __copy_op{}; + + oneapi::dpl::__internal::__pattern_walk2_n( + __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy_keys_values_wrapper>(__exec), + oneapi::dpl::make_zip_iterator(__keys_first, __values_first), 1, + oneapi::dpl::make_zip_iterator(__out_keys_first, __out_values_first), __copy_op); + + return 1; + } + + auto __keep_keys = oneapi::dpl::__ranges::__get_sycl_range<__par_backend_hetero::access_mode::read, _Iterator1>(); + auto __keys = __keep_keys(__keys_first, __keys_last); + auto __keep_values = oneapi::dpl::__ranges::__get_sycl_range<__par_backend_hetero::access_mode::read, _Iterator2>(); + auto __values = __keep_values(__values_first, __values_first + __n); + auto __keep_key_outputs = + oneapi::dpl::__ranges::__get_sycl_range<__par_backend_hetero::access_mode::read_write, _Iterator3>(); + auto __out_keys = __keep_key_outputs(__out_keys_first, __out_keys_first + __n); + auto __keep_value_outputs = + oneapi::dpl::__ranges::__get_sycl_range<__par_backend_hetero::access_mode::read_write, _Iterator4>(); + auto __out_values = __keep_value_outputs(__out_values_first, __out_values_first + __n); + return oneapi::dpl::__par_backend_hetero::__parallel_reduce_by_segment( + _BackendTag{}, std::forward<_ExecutionPolicy>(__exec), __keys.all_view(), __values.all_view(), + __out_keys.all_view(), __out_values.all_view(), __binary_pred, __binary_op); +} + } // namespace __internal } // namespace dpl } // namespace oneapi diff --git a/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h b/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h index b9cd154a044..da7820b91a2 100644 --- a/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h +++ b/include/oneapi/dpl/pstl/hetero/algorithm_ranges_impl_hetero.h @@ -889,22 +889,7 @@ __pattern_minmax_element(__hetero_tag<_BackendTag>, _ExecutionPolicy&& __exec, _ //------------------------------------------------------------------------ template -struct __copy_keys_wrapper; - -template -struct __copy_values_wrapper; - -template -struct __reduce1_wrapper; - -template -struct __reduce2_wrapper; - -template -struct __assign_key1_wrapper; - -template -struct __assign_key2_wrapper; +struct __copy_keys_values_range_wrapper; template @@ -932,117 +917,18 @@ __pattern_reduce_by_segment(__hetero_tag<_BackendTag> __tag, _ExecutionPolicy&& __brick_copy<__hetero_tag<_BackendTag>, _ExecutionPolicy> __copy_range{}; oneapi::dpl::__internal::__ranges::__pattern_walk_n( - __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy_keys_wrapper>(__exec), __copy_range, - std::forward<_Range1>(__keys), std::forward<_Range3>(__out_keys)); - - oneapi::dpl::__internal::__ranges::__pattern_walk_n( - __tag, - oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy_values_wrapper>( - std::forward<_ExecutionPolicy>(__exec)), - __copy_range, std::forward<_Range2>(__values), std::forward<_Range4>(__out_values)); + __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__copy_keys_values_range_wrapper>(__exec), + __copy_range, + oneapi::dpl::__ranges::zip_view(std::forward<_Range1>(__keys), std::forward<_Range2>(__values)), + oneapi::dpl::__ranges::zip_view(std::forward<_Range3>(__out_keys), std::forward<_Range4>(__out_values))); return 1; } - using __diff_type = oneapi::dpl::__internal::__difference_t<_Range1>; - using __key_type = oneapi::dpl::__internal::__value_t<_Range1>; - using __val_type = oneapi::dpl::__internal::__value_t<_Range2>; - - // Round 1: reduce with extra indices added to avoid long segments - // TODO: At threshold points check if the key is equal to the key at the previous threshold point, indicating a long sequence. - // Skip a round of copy_if and reduces if there are none. - auto __idx = oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, __n).get_buffer(); - auto __tmp_out_keys = - oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __key_type>(__exec, __n).get_buffer(); - auto __tmp_out_values = - oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __val_type>(__exec, __n).get_buffer(); - - // Replicating first element of keys view to be able to compare (i-1)-th and (i)-th key with aligned sequences, - // dropping the last key for the i-1 sequence. - auto __k1 = - oneapi::dpl::__ranges::take_view_simple(oneapi::dpl::__ranges::replicate_start_view_simple(__keys, 1), __n); - - // view1 elements are a tuple of the element index and pairs of adjacent keys - // view2 elements are a tuple of the elements where key-index pairs will be written by copy_if - auto __view1 = experimental::ranges::zip_view(experimental::ranges::views::iota(0, __n), __k1, __keys); - auto __view2 = experimental::ranges::zip_view(experimental::ranges::views::all_write(__tmp_out_keys), - experimental::ranges::views::all_write(__idx)); - - // use work group size adjusted to shared local memory as the maximum segment size. - std::size_t __wgroup_size = - oneapi::dpl::__internal::__slm_adjusted_work_group_size(__exec, sizeof(__key_type) + sizeof(__val_type)); - - // element is copied if it is the 0th element (marks beginning of first segment), is in an index - // evenly divisible by wg size (ensures segments are not long), or has a key not equal to the - // adjacent element (marks end of real segments) - // TODO: replace wgroup size with segment size based on platform specifics. - auto __intermediate_result_end = __ranges::__pattern_copy_if( - __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__assign_key1_wrapper>(__exec), __view1, __view2, - [__binary_pred, __wgroup_size](const auto& __a) { - // The size of key range for the (i-1) view is one less, so for the 0th index we do not check the keys - // for (i-1), but we still need to get its key value as it is the start of a segment - const auto index = std::get<0>(__a); - if (index == 0) - return true; - return index % __wgroup_size == 0 // segment size - || !__binary_pred(std::get<1>(__a), std::get<2>(__a)); // key comparison - }, - unseq_backend::__brick_assign_key_position{}); - - //reduce by segment - oneapi::dpl::__par_backend_hetero::__parallel_for( - _BackendTag{}, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__reduce1_wrapper>(__exec), - unseq_backend::__brick_reduce_idx<_BinaryOperator, decltype(__n)>(__binary_op, __n), __intermediate_result_end, - oneapi::dpl::__ranges::take_view_simple(experimental::ranges::views::all_read(__idx), - __intermediate_result_end), - std::forward<_Range2>(__values), experimental::ranges::views::all_write(__tmp_out_values)) - .wait(); - - // Round 2: final reduction to get result for each segment of equal adjacent keys - // create views over adjacent keys - oneapi::dpl::__ranges::all_view<__key_type, __par_backend_hetero::access_mode::read_write> __new_keys( - __tmp_out_keys); - - // Replicating first element of key views to be able to compare (i-1)-th and (i)-th key, - // dropping the last key for the i-1 sequence. Only taking the appropriate number of keys to start with here. - auto __clipped_new_keys = oneapi::dpl::__ranges::take_view_simple(__new_keys, __intermediate_result_end); - - auto __k3 = oneapi::dpl::__ranges::take_view_simple( - oneapi::dpl::__ranges::replicate_start_view_simple(__clipped_new_keys, 1), __intermediate_result_end); - - // view3 elements are a tuple of the element index and pairs of adjacent keys - // view4 elements are a tuple of the elements where key-index pairs will be written by copy_if - auto __view3 = experimental::ranges::zip_view(experimental::ranges::views::iota(0, __intermediate_result_end), __k3, - __clipped_new_keys); - auto __view4 = experimental::ranges::zip_view(experimental::ranges::views::all_write(__out_keys), - experimental::ranges::views::all_write(__idx)); - - // element is copied if it is the 0th element (marks beginning of first segment), or has a key not equal to - // the adjacent element (end of a segment). Artificial segments based on wg size are not created. - auto __result_end = __ranges::__pattern_copy_if( - __tag, oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__assign_key2_wrapper>(__exec), __view3, __view4, - [__binary_pred](const auto& __a) { - // The size of key range for the (i-1) view is one less, so for the 0th index we do not check the keys - // for (i-1), but we still need to get its key value as it is the start of a segment - if (std::get<0>(__a) == 0) - return true; - return !__binary_pred(std::get<1>(__a), std::get<2>(__a)); // keys comparison - }, - unseq_backend::__brick_assign_key_position{}); - - //reduce by segment - oneapi::dpl::__par_backend_hetero::__parallel_for( - _BackendTag{}, - oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__reduce2_wrapper>( - std::forward<_ExecutionPolicy>(__exec)), - unseq_backend::__brick_reduce_idx<_BinaryOperator, decltype(__intermediate_result_end)>( - __binary_op, __intermediate_result_end), - __result_end, - oneapi::dpl::__ranges::take_view_simple(experimental::ranges::views::all_read(__idx), __result_end), - experimental::ranges::views::all_read(__tmp_out_values), std::forward<_Range4>(__out_values)) - .__deferrable_wait(); - - return __result_end; + return oneapi::dpl::__par_backend_hetero::__parallel_reduce_by_segment( + _BackendTag{}, std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__keys), + std::forward<_Range2>(__values), std::forward<_Range3>(__out_keys), std::forward<_Range4>(__out_values), + __binary_pred, __binary_op); } } // namespace __ranges diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl.h b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl.h index 68dd00188dd..96d63e33aee 100644 --- a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl.h +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl.h @@ -32,12 +32,14 @@ #include "../../iterator_impl.h" #include "../../execution_impl.h" #include "../../utils_ranges.h" +#include "../../ranges_defs.h" #include "sycl_defs.h" #include "parallel_backend_sycl_utils.h" #include "parallel_backend_sycl_reduce.h" #include "parallel_backend_sycl_merge.h" #include "parallel_backend_sycl_merge_sort.h" +#include "parallel_backend_sycl_reduce_by_segment.h" #include "parallel_backend_sycl_reduce_then_scan.h" #include "execution_sycl_defs.h" #include "sycl_iterator.h" @@ -788,6 +790,182 @@ struct __gen_transform_input _UnaryOp __unary_op; }; +template +struct __gen_red_by_seg_reduce_input +{ + // Returns the following tuple: + // (new_seg_mask, value) + // size_t new_seg_mask : 1 for a start of a new segment, 0 otherwise + // ValueType value : Current element's value for reduction + template + auto + operator()(const _InRng& __in_rng, std::size_t __id) const + { + const auto __in_keys = std::get<0>(__in_rng.tuple()); + const auto __in_vals = std::get<1>(__in_rng.tuple()); + using _ValueType = oneapi::dpl::__internal::__value_t; + // The first segment start (index 0) is not marked with a 1. This is because we need the first + // segment's key and value output index to be 0. We begin marking new segments only after the + // first. + const std::size_t __new_seg_mask = __id > 0 && !__binary_pred(__in_keys[__id - 1], __in_keys[__id]); + return oneapi::dpl::__internal::make_tuple(__new_seg_mask, _ValueType{__in_vals[__id]}); + } + _BinaryPred __binary_pred; +}; + +template +struct __gen_red_by_seg_scan_input +{ + // Returns the following tuple: + // ((new_seg_mask, value), output_value, next_key, current_key) + // size_t new_seg_mask : 1 for a start of a new segment, 0 otherwise + // ValueType value : Current element's value for reduction + // bool output_value : Whether this work-item should write an output (end of segment) + // KeyType next_key : The key of the next segment to write if output_value is true + // KeyType current_key : The current element's key. This is only ever used by work-item 0 to write the first key + template + auto + operator()(const _InRng& __in_rng, std::size_t __id) const + { + const auto __in_keys = std::get<0>(__in_rng.tuple()); + const auto __in_vals = std::get<1>(__in_rng.tuple()); + using _KeyType = oneapi::dpl::__internal::__value_t; + using _ValueType = oneapi::dpl::__internal::__value_t; + const _KeyType& __current_key = __in_keys[__id]; + const _ValueType& __current_val = __in_vals[__id]; + // Ordering the most common condition first has yielded the best results. + if (__id > 0 && __id < __n - 1) + { + const _KeyType& __prev_key = __in_keys[__id - 1]; + const _KeyType& __next_key = __in_keys[__id + 1]; + const std::size_t __new_seg_mask = !__binary_pred(__prev_key, __current_key); + return oneapi::dpl::__internal::make_tuple( + oneapi::dpl::__internal::make_tuple(__new_seg_mask, __current_val), + !__binary_pred(__current_key, __next_key), __next_key, __current_key); + } + else if (__id == __n - 1) + { + const _KeyType& __prev_key = __in_keys[__id - 1]; + const std::size_t __new_seg_mask = !__binary_pred(__prev_key, __current_key); + return oneapi::dpl::__internal::make_tuple( + oneapi::dpl::__internal::make_tuple(__new_seg_mask, __current_val), true, __current_key, + __current_key); // Passing __current_key as the next key for the last element is a placeholder + } + else // __id == 0 + { + const _KeyType& __next_key = __in_keys[__id + 1]; + return oneapi::dpl::__internal::make_tuple( + oneapi::dpl::__internal::make_tuple(std::size_t{0}, __current_val), + !__binary_pred(__current_key, __next_key), __next_key, __current_key); + } + } + _BinaryPred __binary_pred; + // For correctness of the function call operator, __n must be greater than 1. + std::size_t __n; +}; + +template +struct __red_by_seg_op +{ + // Consider the following segment / value pairs that would be processed in reduce-then-scan by a sub-group of size 8: + // ---------------------------------------------------------- + // Keys: 0 0 1 1 2 2 2 2 + // Values: 1 1 1 1 1 1 1 1 + // ---------------------------------------------------------- + // The reduce and scan input generation phase flags new segments (excluding index 0) for use in the sub-group scan + // operation. The above key, value pairs correspond to the following flag, value pairs: + // ---------------------------------------------------------- + // Flags: 0 0 1 0 1 0 0 0 + // Values: 1 1 1 1 1 1 1 1 + // ---------------------------------------------------------- + // The sub-group scan operation looks back by powers-of-2 applying encountered prefixes. The __red_by_seg_op + // operation performs a standard inclusive scan over the flags to compute output indices while performing a masked + // scan over values to avoid applying a previous segment's partial reduction. Previous value elements are reduced + // so long as the current index's flag is 0, indicating that input within its segment is still being processed + // ---------------------------------------------------------- + // Start: + // ---------------------------------------------------------- + // Flags: 0 0 1 0 1 0 0 0 + // Values: 1 1 1 1 1 1 1 1 + // ---------------------------------------------------------- + // After step 1 (apply the i-1th value if the ith flag is 0): + // ---------------------------------------------------------- + // Flags: 0 0 1 1 1 1 0 0 + // Values: 1 2 1 2 1 2 2 2 + // ---------------------------------------------------------- + // After step 2 (apply the i-2th value if the ith flag is 0): + // ---------------------------------------------------------- + // Flags: 0 0 1 1 2 2 1 1 + // Values: 1 2 1 2 1 2 3 4 + // ---------------------------------------------------------- + // After step 3 (apply the i-4th value if the ith flag is 0): + // ---------------------------------------------------------- + // Flags: 0 0 1 1 2 2 2 2 + // Values: 1 2 1 2 1 2 3 4 + // ^ ^ ^ + // ---------------------------------------------------------- + // Note that the scan of segment flags results in the desired output index of the reduce_by_segment operation in + // each segment and the item corresponding to the final key in a segment contains its output reduction value. This + // operation is first applied within a sub-group and then across sub-groups, work-groups, and blocks to + // reduce-by-segment across the full input. The result of these operations combined with cached key data in + // __gen_red_by_seg_scan_input enables the write phase to output keys and reduction values. + // => + // Segments : 0 1 2 + // Values : 2 2 4 + template + auto + operator()(const _Tup1& __lhs_tup, const _Tup2& __rhs_tup) const + { + using std::get; + // The left-hand side has processed elements from the same segment, so update the reduction value. + if (get<0>(__rhs_tup) == 0) + { + return oneapi::dpl::__internal::make_tuple(get<0>(__lhs_tup), + __binary_op(get<1>(__lhs_tup), get<1>(__rhs_tup))); + } + // We are looking at elements from a previous segment so just update the output index. + return oneapi::dpl::__internal::make_tuple(get<0>(__lhs_tup) + get<0>(__rhs_tup), get<1>(__rhs_tup)); + } + _BinaryOp __binary_op; +}; + +template +struct __write_red_by_seg +{ + template + void + operator()(_OutRng& __out_rng, std::size_t __id, const _Tup& __tup) const + { + using std::get; + auto __out_keys = get<0>(__out_rng.tuple()); + auto __out_values = get<1>(__out_rng.tuple()); + using _KeyType = oneapi::dpl::__internal::__value_t; + using _ValType = oneapi::dpl::__internal::__value_t; + + const _KeyType& __next_key = get<2>(__tup); + const _KeyType& __current_key = get<3>(__tup); + const _ValType& __current_value = get<1>(get<0>(__tup)); + const bool __is_seg_end = get<1>(__tup); + const std::size_t __out_idx = get<0>(get<0>(__tup)); + + // With the exception of the first key which is output by index 0, the first key in each segment is written + // by the work item that outputs the previous segment's reduction value. This is because the reduce_by_segment + // API requires that the first key in a segment is output and is important for when keys in a segment might not + // be the same (but satisfy the predicate). The last segment does not output a key as there are no future + // segments process. + if (__id == 0) + __out_keys[0] = __current_key; + if (__is_seg_end) + { + __out_values[__out_idx] = __current_value; + if (__id != __n - 1) + __out_keys[__out_idx + 1] = __next_key; + } + } + _BinaryPred __binary_pred; + std::size_t __n; +}; + struct __simple_write_to_id { template @@ -1188,6 +1366,38 @@ __parallel_unique_copy(oneapi::dpl::__internal::__device_backend_tag __backend_t } } +template +auto +__parallel_reduce_by_segment_reduce_then_scan(oneapi::dpl::__internal::__device_backend_tag __backend_tag, + _ExecutionPolicy&& __exec, _Range1&& __keys, _Range2&& __values, + _Range3&& __out_keys, _Range4&& __out_values, + _BinaryPredicate __binary_pred, _BinaryOperator __binary_op) +{ + // Flags new segments and passes input value through a 2-tuple + using _GenReduceInput = __gen_red_by_seg_reduce_input<_BinaryPredicate>; + // Operation that computes output indices and output reduction values per segment + using _ReduceOp = __red_by_seg_op<_BinaryOperator>; + // Returns 4-component tuple which contains flags, keys, value, and a flag to write output + using _GenScanInput = __gen_red_by_seg_scan_input<_BinaryPredicate>; + // Returns the first component from scan input which is scanned over + using _ScanInputTransform = __get_zeroth_element; + // Writes current segment's output reduction and the next segment's output key + using _WriteOp = __write_red_by_seg<_BinaryPredicate>; + using _ValueType = oneapi::dpl::__internal::__value_t<_Range2>; + std::size_t __n = __keys.size(); + // __gen_red_by_seg_scan_input requires that __n > 1 + assert(__n > 1); + return __parallel_transform_reduce_then_scan( + __backend_tag, std::forward<_ExecutionPolicy>(__exec), + oneapi::dpl::__ranges::make_zip_view(std::forward<_Range1>(__keys), std::forward<_Range2>(__values)), + oneapi::dpl::__ranges::make_zip_view(std::forward<_Range3>(__out_keys), std::forward<_Range4>(__out_values)), + _GenReduceInput{__binary_pred}, _ReduceOp{__binary_op}, _GenScanInput{__binary_pred, __n}, + _ScanInputTransform{}, _WriteOp{__binary_pred, __n}, + oneapi::dpl::unseq_backend::__no_init_value>{}, + /*Inclusive*/ std::true_type{}, /*_IsUniquePattern=*/std::false_type{}); +} + template auto __parallel_partition_copy(oneapi::dpl::__internal::__device_backend_tag __backend_tag, _ExecutionPolicy&& __exec, @@ -2126,6 +2336,189 @@ __parallel_partial_sort(oneapi::dpl::__internal::__device_backend_tag __backend_ return __parallel_partial_sort_impl(__backend_tag, ::std::forward<_ExecutionPolicy>(__exec), __buf.all_view(), __partial_merge_kernel{__mid_idx}, __comp); } + +//------------------------------------------------------------------------ +// reduce_by_segment - sync pattern +// +// TODO: The non-identity fallback path of reduce-by-segment must currently be implemented synchronously due to the +// inability to create event dependency chains across separate parallel pattern calls. If we ever add support for +// cross parallel pattern dependencies, then we can implement this as an async pattern. +//------------------------------------------------------------------------ +template +struct __reduce1_wrapper; + +template +struct __reduce2_wrapper; + +template +struct __assign_key1_wrapper; + +template +struct __assign_key2_wrapper; + +template +oneapi::dpl::__internal::__difference_t<_Range3> +__parallel_reduce_by_segment_fallback(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy&& __exec, + _Range1&& __keys, _Range2&& __values, _Range3&& __out_keys, + _Range4&& __out_values, _BinaryPredicate __binary_pred, + _BinaryOperator __binary_op, + /*known_identity=*/std::false_type) +{ + using __diff_type = oneapi::dpl::__internal::__difference_t<_Range1>; + using __key_type = oneapi::dpl::__internal::__value_t<_Range1>; + using __val_type = oneapi::dpl::__internal::__value_t<_Range2>; + + const auto __n = __keys.size(); + // Round 1: reduce with extra indices added to avoid long segments + // TODO: At threshold points check if the key is equal to the key at the previous threshold point, indicating a long sequence. + // Skip a round of copy_if and reduces if there are none. + auto __idx = oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, __n).get_buffer(); + auto __tmp_out_keys = + oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __key_type>(__exec, __n).get_buffer(); + auto __tmp_out_values = + oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __val_type>(__exec, __n).get_buffer(); + + // Replicating first element of keys view to be able to compare (i-1)-th and (i)-th key with aligned sequences, + // dropping the last key for the i-1 sequence. + auto __k1 = + oneapi::dpl::__ranges::take_view_simple(oneapi::dpl::__ranges::replicate_start_view_simple(__keys, 1), __n); + + // view1 elements are a tuple of the element index and pairs of adjacent keys + // view2 elements are a tuple of the elements where key-index pairs will be written by copy_if + auto __view1 = oneapi::dpl::__ranges::zip_view(experimental::ranges::views::iota(0, __n), __k1, __keys); + auto __view2 = oneapi::dpl::__ranges::zip_view(oneapi::dpl::__ranges::views::all_write(__tmp_out_keys), + oneapi::dpl::__ranges::views::all_write(__idx)); + + // use work group size adjusted to shared local memory as the maximum segment size. + std::size_t __wgroup_size = + oneapi::dpl::__internal::__slm_adjusted_work_group_size(__exec, sizeof(__key_type) + sizeof(__val_type)); + + // element is copied if it is the 0th element (marks beginning of first segment), is in an index + // evenly divisible by wg size (ensures segments are not long), or has a key not equal to the + // adjacent element (marks end of real segments) + // TODO: replace wgroup size with segment size based on platform specifics. + auto __intermediate_result_end = + oneapi::dpl::__par_backend_hetero::__parallel_copy_if( + oneapi::dpl::__internal::__device_backend_tag{}, + oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__assign_key1_wrapper>(__exec), __view1, __view2, + __n, + [__binary_pred, __wgroup_size](const auto& __a) { + // The size of key range for the (i-1) view is one less, so for the 0th index we do not check the keys + // for (i-1), but we still need to get its key value as it is the start of a segment + const auto index = std::get<0>(__a); + if (index == 0) + return true; + return index % __wgroup_size == 0 // segment size + || !__binary_pred(std::get<1>(__a), std::get<2>(__a)); // key comparison + }, + unseq_backend::__brick_assign_key_position{}) + .get(); + + //reduce by segment + oneapi::dpl::__par_backend_hetero::__parallel_for( + oneapi::dpl::__internal::__device_backend_tag{}, + oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__reduce1_wrapper>(__exec), + unseq_backend::__brick_reduce_idx<_BinaryOperator, decltype(__n)>(__binary_op, __n), __intermediate_result_end, + oneapi::dpl::__ranges::take_view_simple(oneapi::dpl::__ranges::views::all_read(__idx), + __intermediate_result_end), + std::forward<_Range2>(__values), oneapi::dpl::__ranges::views::all_write(__tmp_out_values)) + .wait(); + + // Round 2: final reduction to get result for each segment of equal adjacent keys + // create views over adjacent keys + oneapi::dpl::__ranges::all_view<__key_type, __par_backend_hetero::access_mode::read_write> __new_keys( + __tmp_out_keys); + + // Replicating first element of key views to be able to compare (i-1)-th and (i)-th key, + // dropping the last key for the i-1 sequence. Only taking the appropriate number of keys to start with here. + auto __clipped_new_keys = oneapi::dpl::__ranges::take_view_simple(__new_keys, __intermediate_result_end); + + auto __k3 = oneapi::dpl::__ranges::take_view_simple( + oneapi::dpl::__ranges::replicate_start_view_simple(__clipped_new_keys, 1), __intermediate_result_end); + + // view3 elements are a tuple of the element index and pairs of adjacent keys + // view4 elements are a tuple of the elements where key-index pairs will be written by copy_if + auto __view3 = oneapi::dpl::__ranges::zip_view(experimental::ranges::views::iota(0, __intermediate_result_end), + __k3, __clipped_new_keys); + auto __view4 = oneapi::dpl::__ranges::zip_view(oneapi::dpl::__ranges::views::all_write(__out_keys), + oneapi::dpl::__ranges::views::all_write(__idx)); + + // element is copied if it is the 0th element (marks beginning of first segment), or has a key not equal to + // the adjacent element (end of a segment). Artificial segments based on wg size are not created. + auto __result_end = oneapi::dpl::__par_backend_hetero::__parallel_copy_if( + oneapi::dpl::__internal::__device_backend_tag{}, + oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__assign_key2_wrapper>(__exec), + __view3, __view4, __view3.size(), + [__binary_pred](const auto& __a) { + // The size of key range for the (i-1) view is one less, so for the 0th index we do not check the keys + // for (i-1), but we still need to get its key value as it is the start of a segment + if (std::get<0>(__a) == 0) + return true; + return !__binary_pred(std::get<1>(__a), std::get<2>(__a)); // keys comparison + }, + unseq_backend::__brick_assign_key_position{}) + .get(); + + //reduce by segment + oneapi::dpl::__par_backend_hetero::__parallel_for( + oneapi::dpl::__internal::__device_backend_tag{}, + oneapi::dpl::__par_backend_hetero::make_wrapped_policy<__reduce2_wrapper>( + std::forward<_ExecutionPolicy>(__exec)), + unseq_backend::__brick_reduce_idx<_BinaryOperator, decltype(__intermediate_result_end)>( + __binary_op, __intermediate_result_end), + __result_end, + oneapi::dpl::__ranges::take_view_simple(oneapi::dpl::__ranges::views::all_read(__idx), __result_end), + oneapi::dpl::__ranges::views::all_read(__tmp_out_values), std::forward<_Range4>(__out_values)) + .__deferrable_wait(); + return __result_end; +} + +template +oneapi::dpl::__internal::__difference_t<_Range3> +__parallel_reduce_by_segment(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy&& __exec, _Range1&& __keys, + _Range2&& __values, _Range3&& __out_keys, _Range4&& __out_values, + _BinaryPredicate __binary_pred, _BinaryOperator __binary_op) +{ + // The algorithm reduces values in __values where the + // associated keys for the values are equal to the adjacent key. + // + // Example: __keys = { 1, 2, 3, 4, 1, 1, 3, 3, 1, 1, 3, 3, 0 } + // __values = { 1, 2, 3, 4, 1, 1, 3, 3, 1, 1, 3, 3, 0 } + // + // __out_keys = { 1, 2, 3, 4, 1, 3, 1, 3, 0 } + // __out_values = { 1, 2, 3, 4, 2, 6, 2, 6, 0 } + + const auto __n = __keys.size(); + + using __diff_type = oneapi::dpl::__internal::__difference_t<_Range1>; + using __key_type = oneapi::dpl::__internal::__value_t<_Range1>; + using __val_type = oneapi::dpl::__internal::__value_t<_Range2>; + // Prior to icpx 2025.0, the reduce-then-scan path performs poorly and should be avoided. +#if !defined(__INTEL_LLVM_COMPILER) || __INTEL_LLVM_COMPILER >= 20250000 + if constexpr (std::is_trivially_copyable_v<__val_type>) + { + if (oneapi::dpl::__par_backend_hetero::__is_gpu_with_sg_32(__exec)) + { + auto __res = oneapi::dpl::__par_backend_hetero::__parallel_reduce_by_segment_reduce_then_scan( + oneapi::dpl::__internal::__device_backend_tag{}, std::forward<_ExecutionPolicy>(__exec), + std::forward<_Range1>(__keys), std::forward<_Range2>(__values), std::forward<_Range3>(__out_keys), + std::forward<_Range4>(__out_values), __binary_pred, __binary_op); + __res.wait(); + // Because our init type ends up being tuple, return the first component which is the write index. Add 1 to return the + // past-the-end iterator pair of segmented reduction. + return std::get<0>(__res.get()) + 1; + } + } +#endif + return __parallel_reduce_by_segment_fallback( + oneapi::dpl::__internal::__device_backend_tag{}, std::forward<_ExecutionPolicy>(__exec), + std::forward<_Range1>(__keys), std::forward<_Range2>(__values), std::forward<_Range3>(__out_keys), + std::forward<_Range4>(__out_values), __binary_pred, __binary_op, + oneapi::dpl::unseq_backend::__has_known_identity<_BinaryOperator, __val_type>{}); +} + } // namespace __par_backend_hetero } // namespace dpl } // namespace oneapi diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_reduce_by_segment.h b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_reduce_by_segment.h new file mode 100644 index 00000000000..62ae736782d --- /dev/null +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_reduce_by_segment.h @@ -0,0 +1,464 @@ +// -*- C++ -*- +//===-- parallel_backend_sycl_reduce_by_segment.h ---------------------------------===// +/* Copyright (c) Intel Corporation + * + *  Copyright 2008-2013 NVIDIA Corporation + * + *  Licensed under the Apache License, Version 2.0 (the "License"); + *  you may not use this file except in compliance with the License. + *  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + *  Unless required by applicable law or agreed to in writing, software + *  distributed under the License is distributed on an "AS IS" BASIS, + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + *  See the License for the specific language governing permissions and + *  limitations under the License. + * + *  Copyright (c) 2013, NVIDIA CORPORATION.  All rights reserved. + *  + *  Redistribution and use in source and binary forms, with or without + *  modification, are permitted provided that the following conditions are met: + *     * Redistributions of source code must retain the above copyright + *       notice, this list of conditions and the following disclaimer. + *     * Redistributions in binary form must reproduce the above copyright + *       notice, this list of conditions and the following disclaimer in the + *       documentation and/or other materials provided with the distribution. + *     * Neither the name of the NVIDIA CORPORATION nor the + *       names of its contributors may be used to endorse or promote products + *       derived from this software without specific prior written permission. + *  + *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  + *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE  + *  ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY + *  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + *  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _ONEDPL_PARALLEL_BACKEND_SYCL_REDUCE_BY_SEGMENT_H +#define _ONEDPL_PARALLEL_BACKEND_SYCL_REDUCE_BY_SEGMENT_H + +#include +#include +#include +#include +#include + +#include "sycl_defs.h" +#include "parallel_backend_sycl_utils.h" +#include "utils_ranges_sycl.h" +#include "sycl_traits.h" + +#include "../../utils.h" +#include "../../../internal/scan_by_segment_impl.h" + +namespace oneapi +{ +namespace dpl +{ +namespace __par_backend_hetero +{ + +template +class __seg_reduce_count_kernel; +template +class __seg_reduce_offset_kernel; +template +class __seg_reduce_wg_kernel; +template +class __seg_reduce_prefix_kernel; + +namespace +{ +template +using _SegReduceCountPhase = __seg_reduce_count_kernel<_Name...>; +template +using _SegReduceOffsetPhase = __seg_reduce_offset_kernel<_Name...>; +template +using _SegReduceWgPhase = __seg_reduce_wg_kernel<_Name...>; +template +using _SegReducePrefixPhase = __seg_reduce_prefix_kernel<_Name...>; +} // namespace + +template +oneapi::dpl::__internal::__difference_t<_Range3> +__parallel_reduce_by_segment_fallback(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy&& __exec, + _Range1&& __keys, _Range2&& __values, _Range3&& __out_keys, + _Range4&& __out_values, _BinaryPredicate __binary_pred, + _BinaryOperator __binary_op, + /*known_identity=*/std::true_type) +{ + using _CustomName = oneapi::dpl::__internal::__policy_kernel_name<_ExecutionPolicy>; + + using _SegReduceCountKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< + _SegReduceCountPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, + _BinaryOperator>; + using _SegReduceOffsetKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< + _SegReduceOffsetPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, + _BinaryOperator>; + using _SegReduceWgKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< + _SegReduceWgPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, + _BinaryOperator>; + using _SegReducePrefixKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_generator< + _SegReducePrefixPhase, _CustomName, _ExecutionPolicy, _Range1, _Range2, _Range3, _Range4, _BinaryPredicate, + _BinaryOperator>; + + using __diff_type = oneapi::dpl::__internal::__difference_t<_Range3>; + using __key_type = oneapi::dpl::__internal::__value_t<_Range1>; + using __val_type = oneapi::dpl::__internal::__value_t<_Range2>; + + const std::size_t __n = __keys.size(); + + constexpr std::uint16_t __vals_per_item = + 16; // Each work item serially processes 16 items. Best observed performance on gpu + + // Limit the work-group size to prevent large sizes on CPUs. Empirically found value. + // This value exceeds the current practical limit for GPUs, but may need to be re-evaluated in the future. + std::size_t __wgroup_size = oneapi::dpl::__internal::__max_work_group_size(__exec, (std::size_t)2048); + + // adjust __wgroup_size according to local memory limit. Double the requirement on __val_type due to sycl group algorithm's use + // of SLM. + __wgroup_size = oneapi::dpl::__internal::__slm_adjusted_work_group_size( + __exec, sizeof(__key_type) + 2 * sizeof(__val_type), __wgroup_size); + +#if _ONEDPL_COMPILE_KERNEL + auto __seg_reduce_count_kernel = + __par_backend_hetero::__internal::__kernel_compiler<_SegReduceCountKernel>::__compile(__exec); + auto __seg_reduce_offset_kernel = + __par_backend_hetero::__internal::__kernel_compiler<_SegReduceOffsetKernel>::__compile(__exec); + auto __seg_reduce_wg_kernel = + __par_backend_hetero::__internal::__kernel_compiler<_SegReduceWgKernel>::__compile(__exec); + auto __seg_reduce_prefix_kernel = + __par_backend_hetero::__internal::__kernel_compiler<_SegReducePrefixKernel>::__compile(__exec); + __wgroup_size = + std::min({__wgroup_size, oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_count_kernel), + oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_offset_kernel), + oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_wg_kernel), + oneapi::dpl::__internal::__kernel_work_group_size(__exec, __seg_reduce_prefix_kernel)}); +#endif + + std::size_t __n_groups = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __wgroup_size * __vals_per_item); + + // intermediate reductions within a workgroup + auto __partials = + oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __val_type>(__exec, __n_groups).get_buffer(); + + auto __end_idx = oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, 1).get_buffer(); + + // the number of segment ends found in each work group + auto __seg_ends = + oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, __n_groups).get_buffer(); + + // buffer that stores an exclusive scan of the results + auto __seg_ends_scanned = + oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, __diff_type>(__exec, __n_groups).get_buffer(); + + // 1. Count the segment ends in each workgroup + auto __seg_end_identification = __exec.queue().submit([&](sycl::handler& __cgh) { + oneapi::dpl::__ranges::__require_access(__cgh, __keys); + auto __seg_ends_acc = __seg_ends.template get_access(__cgh); +#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT + __cgh.use_kernel_bundle(__seg_reduce_count_kernel.get_kernel_bundle()); +#endif + __cgh.parallel_for<_SegReduceCountKernel>( + sycl::nd_range<1>{__n_groups * __wgroup_size, __wgroup_size}, [=]( +#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT + __seg_reduce_count_kernel, +#endif + sycl::nd_item<1> __item) { + auto __group = __item.get_group(); + std::size_t __group_id = __item.get_group(0); + std::uint32_t __local_id = __item.get_local_id(0); + std::size_t __global_id = __item.get_global_id(0); + + std::size_t __start = __global_id * __vals_per_item; + std::size_t __end = __dpl_sycl::__minimum{}(__start + __vals_per_item, __n); + std::size_t __item_segments = 0; + + // 1a. Work item scan to identify segment ends + for (std::size_t __i = __start; __i < __end; ++__i) + if (__n - 1 == __i || !__binary_pred(__keys[__i], __keys[__i + 1])) + ++__item_segments; + + // 1b. Work group reduction + std::size_t __num_segs = __dpl_sycl::__reduce_over_group( + __group, __item_segments, __dpl_sycl::__plus()); + + // 1c. First work item writes segment count to global memory + if (__local_id == 0) + __seg_ends_acc[__group_id] = __num_segs; + }); + }); + + // 1.5 Small single-group kernel + auto __single_group_scan = __exec.queue().submit([&](sycl::handler& __cgh) { + __cgh.depends_on(__seg_end_identification); + auto __seg_ends_acc = __seg_ends.template get_access(__cgh); + auto __seg_ends_scan_acc = __seg_ends_scanned.template get_access(__cgh); +#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT + __cgh.use_kernel_bundle(__seg_reduce_offset_kernel.get_kernel_bundle()); +#endif + __cgh.parallel_for<_SegReduceOffsetKernel>( +#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT + __seg_reduce_offset_kernel, +#endif + sycl::nd_range<1>{__wgroup_size, __wgroup_size}, [=](sycl::nd_item<1> __item) { + auto __beg = __dpl_sycl::__get_accessor_ptr(__seg_ends_acc); + auto __out_beg = __dpl_sycl::__get_accessor_ptr(__seg_ends_scan_acc); + __dpl_sycl::__joint_exclusive_scan(__item.get_group(), __beg, __beg + __n_groups, __out_beg, + __diff_type(0), sycl::plus<__diff_type>()); + }); + }); + + // 2. Work group reduction + auto __wg_reduce = __exec.queue().submit([&](sycl::handler& __cgh) { + __cgh.depends_on(__single_group_scan); + oneapi::dpl::__ranges::__require_access(__cgh, __keys, __out_keys, __out_values, __values); + + auto __partials_acc = __partials.template get_access(__cgh); + auto __seg_ends_scan_acc = __seg_ends_scanned.template get_access(__cgh); + __dpl_sycl::__local_accessor<__val_type> __loc_acc(2 * __wgroup_size, __cgh); +#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT + __cgh.use_kernel_bundle(__seg_reduce_wg_kernel.get_kernel_bundle()); +#endif + __cgh.parallel_for<_SegReduceWgKernel>( +#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT + __seg_reduce_wg_kernel, +#endif + sycl::nd_range<1>{__n_groups * __wgroup_size, __wgroup_size}, [=](sycl::nd_item<1> __item) { + std::array<__val_type, __vals_per_item> __loc_partials; + + auto __group = __item.get_group(); + std::size_t __group_id = __item.get_group(0); + std::size_t __local_id = __item.get_local_id(0); + std::size_t __global_id = __item.get_global_id(0); + + // 2a. Lookup the number of prior segs + auto __wg_num_prior_segs = __seg_ends_scan_acc[__group_id]; + + // 2b. Perform a serial scan within the work item over assigned elements. Store partial + // reductions in work group local memory. + std::size_t __start = __global_id * __vals_per_item; + std::size_t __end = __dpl_sycl::__minimum{}(__start + __vals_per_item, __n); + + std::size_t __max_end = 0; + std::size_t __item_segments = 0; + auto __identity = unseq_backend::__known_identity<_BinaryOperator, __val_type>; + + __val_type __accumulator = __identity; + for (std::size_t __i = __start; __i < __end; ++__i) + { + __accumulator = __binary_op(__accumulator, __values[__i]); + if (__n - 1 == __i || !__binary_pred(__keys[__i], __keys[__i + 1])) + { + __loc_partials[__i - __start] = __accumulator; + ++__item_segments; + __max_end = __local_id; + __accumulator = __identity; + } + } + + // 2c. Count the number of prior work segments cooperatively over group + std::size_t __prior_segs_in_wg = __dpl_sycl::__exclusive_scan_over_group( + __group, __item_segments, __dpl_sycl::__plus()); + std::size_t __start_idx = __wg_num_prior_segs + __prior_segs_in_wg; + + // 2d. Find the greatest segment end less than the current index (inclusive) + std::size_t __closest_seg_id = __dpl_sycl::__inclusive_scan_over_group( + __group, __max_end, __dpl_sycl::__maximum()); + + // __wg_segmented_scan is a derivative work and responsible for the third header copyright + __val_type __carry_in = oneapi::dpl::internal::__wg_segmented_scan( + __item, __loc_acc, __local_id, __local_id - __closest_seg_id, __accumulator, __identity, + __binary_op, __wgroup_size); + + // 2e. Update local partial reductions in first segment and write to global memory. + bool __apply_aggs = true; + std::size_t __item_offset = 0; + + // first item in group does not have any work-group aggregates to apply + if (__local_id == 0) + { + __apply_aggs = false; + if (__global_id == 0) + { + // first segment identifier is always the first key + __out_keys[0] = __keys[0]; + } + } + + // apply the aggregates and copy the locally stored values to destination buffer + for (std::size_t __i = __start; __i < __end; ++__i) + { + if (__i == __n - 1 || !__binary_pred(__keys[__i], __keys[__i + 1])) + { + std::size_t __idx = __start_idx + __item_offset; + if (__apply_aggs) + { + __out_values[__idx] = __binary_op(__carry_in, __loc_partials[__i - __start]); + __apply_aggs = false; + } + else + { + __out_values[__idx] = __loc_partials[__i - __start]; + } + if (__i != __n - 1) + { + __out_keys[__idx + 1] = __keys[__i + 1]; + } + ++__item_offset; + } + } + + // 2f. Output the work group aggregate and total number of segments for use in phase 3. + if (__local_id == __wgroup_size - 1) // last work item writes the group's carry out + { + // If no segment ends in the item, the aggregates from previous work groups must be applied. + if (__max_end == 0) + { + // needs to be inclusive with last element + __partials_acc[__group_id] = __binary_op(__carry_in, __accumulator); + } + else + { + __partials_acc[__group_id] = __accumulator; + } + } + }); + }); + + // 3. Apply inter work-group aggregates + __exec.queue() + .submit([&](sycl::handler& __cgh) { + oneapi::dpl::__ranges::__require_access(__cgh, __keys, __out_keys, __out_values); + + auto __partials_acc = __partials.template get_access(__cgh); + auto __seg_ends_scan_acc = __seg_ends_scanned.template get_access(__cgh); + auto __seg_ends_acc = __seg_ends.template get_access(__cgh); + auto __end_idx_acc = __end_idx.template get_access(__cgh); + + __dpl_sycl::__local_accessor<__val_type> __loc_partials_acc(__wgroup_size, __cgh); + __dpl_sycl::__local_accessor<__diff_type> __loc_seg_ends_acc(__wgroup_size, __cgh); + + __cgh.depends_on(__wg_reduce); +#if _ONEDPL_COMPILE_KERNEL && _ONEDPL_KERNEL_BUNDLE_PRESENT + __cgh.use_kernel_bundle(__seg_reduce_prefix_kernel.get_kernel_bundle()); +#endif + __cgh.parallel_for<_SegReducePrefixKernel>( +#if _ONEDPL_COMPILE_KERNEL && !_ONEDPL_KERNEL_BUNDLE_PRESENT + __seg_reduce_prefix_kernel, +#endif + sycl::nd_range<1>{__n_groups * __wgroup_size, __wgroup_size}, [=](sycl::nd_item<1> __item) { + auto __group = __item.get_group(); + std::int64_t __group_id = __item.get_group(0); + std::size_t __global_id = __item.get_global_id(0); + std::size_t __local_id = __item.get_local_id(0); + + std::size_t __start = __global_id * __vals_per_item; + std::size_t __end = __dpl_sycl::__minimum{}(__start + __vals_per_item, __n); + std::size_t __item_segments = 0; + + std::int64_t __wg_agg_idx = __group_id - 1; + __val_type __agg_collector = unseq_backend::__known_identity<_BinaryOperator, __val_type>; + + bool __ag_exists = false; + // 3a. Check to see if an aggregate exists and compute that value in the first + // work item. + if (__group_id != 0) + { + __ag_exists = __start < __n; + // local reductions followed by a sweep + constexpr std::int32_t __vals_to_explore = 16; + bool __last_it = false; + __loc_seg_ends_acc[__local_id] = false; + __loc_partials_acc[__local_id] = unseq_backend::__known_identity<_BinaryOperator, __val_type>; + for (std::int32_t __i = __wg_agg_idx - __vals_to_explore * __local_id; !__last_it; + __i -= __wgroup_size * __vals_to_explore) + { + __val_type __local_collector = unseq_backend::__known_identity<_BinaryOperator, __val_type>; + // exploration phase + for (std::int32_t __j = __i; + __j > __dpl_sycl::__maximum{}(-1L, __i - __vals_to_explore); --__j) + { + __local_collector = __binary_op(__partials_acc[__j], __local_collector); + if (__seg_ends_acc[__j] || __j == 0) + { + __loc_seg_ends_acc[__local_id] = true; + break; + } + } + __loc_partials_acc[__local_id] = __local_collector; + __dpl_sycl::__group_barrier(__item); + // serial aggregate collection and synchronization + if (__local_id == 0) + { + for (std::size_t __j = 0; __j < __wgroup_size; ++__j) + { + __agg_collector = __binary_op(__loc_partials_acc[__j], __agg_collector); + if (__loc_seg_ends_acc[__j]) + { + __last_it = true; + break; + } + } + } + __agg_collector = __dpl_sycl::__group_broadcast(__item.get_group(), __agg_collector); + __last_it = __dpl_sycl::__group_broadcast(__item.get_group(), __last_it); + } + + // Check to see if aggregates exist. + // The last group must always stay to write the final index + __ag_exists = __dpl_sycl::__any_of_group(__group, __ag_exists); + if (!__ag_exists && __group_id != __n_groups - 1) + return; + } + // 3b. count the segment ends + for (std::size_t __i = __start; __i < __end; ++__i) + if (__i == __n - 1 || !__binary_pred(__keys[__i], __keys[__i + 1])) + ++__item_segments; + + std::size_t __prior_segs_in_wg = __dpl_sycl::__exclusive_scan_over_group( + __group, __item_segments, __dpl_sycl::__plus()); + + // 3c. Determine prior index + std::size_t __wg_num_prior_segs = __seg_ends_scan_acc[__group_id]; + + // 3d. Second pass over the keys, reidentifying end segments and applying work group + // aggregates if appropriate. Both the key and reduction value are written to the final output at the + // computed index + std::size_t __item_offset = 0; + for (std::size_t __i = __start; __i < __end; ++__i) + { + if (__i == __n - 1 || !__binary_pred(__keys[__i], __keys[__i + 1])) + { + std::size_t __idx = __wg_num_prior_segs + __prior_segs_in_wg + __item_offset; + + // apply the aggregate if it is the first segment end in the workgroup only + if (__prior_segs_in_wg == 0 && __item_offset == 0 && __ag_exists) + __out_values[__idx] = __binary_op(__agg_collector, __out_values[__idx]); + + ++__item_offset; + // the last item must write the last index's position to return + if (__i == __n - 1) + __end_idx_acc[0] = __idx; + } + } + }); + }) + .wait(); + + return __end_idx.get_host_access()[0] + 1; +} + +} // namespace __par_backend_hetero +} // namespace dpl +} // namespace oneapi + +#endif diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/sycl_traits.h b/include/oneapi/dpl/pstl/hetero/dpcpp/sycl_traits.h index 2d0e88fd34b..7d3fd829cc5 100644 --- a/include/oneapi/dpl/pstl/hetero/dpcpp/sycl_traits.h +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/sycl_traits.h @@ -236,6 +236,12 @@ namespace oneapi::dpl::__par_backend_hetero template struct __gen_transform_input; +template +struct __gen_red_by_seg_reduce_input; + +template +struct __gen_red_by_seg_scan_input; + template struct __gen_mask; @@ -254,12 +260,18 @@ struct __write_to_id_if; template struct __write_to_id_if_else; +template +struct __write_red_by_seg; + template struct __early_exit_find_or; template struct __leaf_sorter; +template +struct __red_by_seg_op; + } // namespace oneapi::dpl::__par_backend_hetero template @@ -276,6 +288,20 @@ struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backen { }; +template +struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backend_hetero::__gen_red_by_seg_reduce_input, + _BinaryPred)> + : oneapi::dpl::__internal::__are_all_device_copyable<_BinaryPred> +{ +}; + +template +struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backend_hetero::__gen_red_by_seg_scan_input, + _BinaryPred)> + : oneapi::dpl::__internal::__are_all_device_copyable<_BinaryPred> +{ +}; + template struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backend_hetero::__gen_unique_mask, _BinaryPredicate)> @@ -309,6 +335,13 @@ struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backen { }; +template +struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backend_hetero::__write_red_by_seg, + _BinaryPred)> + : oneapi::dpl::__internal::__are_all_device_copyable<_BinaryPred> +{ +}; + template struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backend_hetero::__early_exit_find_or, _ExecutionPolicy, _Pred)> @@ -323,6 +356,12 @@ struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backen { }; +template +struct sycl::is_device_copyable<_ONEDPL_SPECIALIZE_FOR(oneapi::dpl::__par_backend_hetero::__red_by_seg_op, _BinaryOp)> + : oneapi::dpl::__internal::__are_all_device_copyable<_BinaryOp> +{ +}; + namespace oneapi::dpl::unseq_backend { diff --git a/test/general/implementation_details/device_copyable.pass.cpp b/test/general/implementation_details/device_copyable.pass.cpp index 0922c66c84a..322d93d4824 100644 --- a/test/general/implementation_details/device_copyable.pass.cpp +++ b/test/general/implementation_details/device_copyable.pass.cpp @@ -157,6 +157,16 @@ test_device_copyable() sycl::is_device_copyable_v>, "__gen_transform_input is not device copyable with device copyable types"); + //__gen_red_by_seg_reduce_input + static_assert(sycl::is_device_copyable_v< + oneapi::dpl::__par_backend_hetero::__gen_red_by_seg_reduce_input>, + "__gen_red_by_seg_reduce_input is not device copyable with device copyable types"); + + //__gen_red_by_seg_scan_input + static_assert(sycl::is_device_copyable_v< + oneapi::dpl::__par_backend_hetero::__gen_red_by_seg_scan_input>, + "__gen_red_by_seg_scan_input is not device copyable with device copyable types"); + //__gen_mask static_assert(sycl::is_device_copyable_v>, "__gen_mask is not device copyable with device copyable types"); @@ -186,6 +196,11 @@ test_device_copyable() sycl::is_device_copyable_v>, "__write_to_id_if_else is not device copyable with device copyable types"); + //__write_red_by_seg + static_assert( + sycl::is_device_copyable_v>, + "__write_red_by_seg is not device copyable with device copyable types"); + // __early_exit_find_or static_assert( sycl::is_device_copyable_v< @@ -201,6 +216,11 @@ test_device_copyable() noop_device_copyable>>, "__leaf_sorter is not device copyable with device copyable types"); + //__red_by_seg_op + static_assert( + sycl::is_device_copyable_v>, + "__red_by_seg_op is not device copyable with device copyable types"); + //__not_pred static_assert(sycl::is_device_copyable_v>, "__not_pred is not device copyable with device copyable types"); @@ -400,6 +420,16 @@ test_non_device_copyable() !sycl::is_device_copyable_v>, "__gen_transform_input is device copyable with non device copyable types"); + //__gen_red_by_seg_reduce_input + static_assert(!sycl::is_device_copyable_v< + oneapi::dpl::__par_backend_hetero::__gen_red_by_seg_reduce_input>, + "__gen_red_by_seg_reduce_input is device copyable with non device copyable types"); + + //__gen_red_by_seg_reduce_input + static_assert(!sycl::is_device_copyable_v< + oneapi::dpl::__par_backend_hetero::__gen_red_by_seg_scan_input>, + "__gen_red_by_seg_scan_input is device copyable with non device copyable types"); + //__gen_mask static_assert(!sycl::is_device_copyable_v>, "__gen_mask is device copyable with non device copyable types"); @@ -429,6 +459,11 @@ test_non_device_copyable() oneapi::dpl::__par_backend_hetero::__write_to_id_if_else>, "__write_to_id_if_else is device copyable with non device copyable types"); + //__write_red_by_seg + static_assert(!sycl::is_device_copyable_v< + oneapi::dpl::__par_backend_hetero::__write_red_by_seg>, + "__write_red_by_seg is device copyable with non device copyable types"); + // __early_exit_find_or static_assert( !sycl::is_device_copyable_v>, "__leaf_sorter is device copyable with non device copyable types"); + //__red_by_seg_op + static_assert( + !sycl::is_device_copyable_v>, + "__red_by_seg_op is device copyable with non device copyable types"); + //__not_pred static_assert(!sycl::is_device_copyable_v>, "__not_pred is device copyable with non device copyable types"); diff --git a/test/parallel_api/numeric/numeric.ops/reduce_by_segment.pass.cpp b/test/parallel_api/numeric/numeric.ops/reduce_by_segment.pass.cpp index 4de95e26e9b..80aa9f53d3c 100644 --- a/test/parallel_api/numeric/numeric.ops/reduce_by_segment.pass.cpp +++ b/test/parallel_api/numeric/numeric.ops/reduce_by_segment.pass.cpp @@ -301,7 +301,7 @@ test_flag_pred() } #endif -template +template void run_test_on_device() { @@ -313,10 +313,8 @@ run_test_on_device() { if (TestUtils::has_type_support(TestUtils::get_test_queue().get_device())) { - // Run tests for USM shared memory - test4buffers>(); - // Run tests for USM device memory - test4buffers>(); + constexpr sycl::usm::alloc allocation_type = use_device_alloc ? sycl::usm::alloc::device : sycl::usm::alloc::shared; + test4buffers>(); } } #endif // TEST_DPCPP_BACKEND_PRESENT @@ -335,12 +333,12 @@ run_test_on_host() #endif // !_PSTL_ICC_TEST_SIMD_UDS_BROKEN && !_PSTL_ICPX_TEST_RED_BY_SEG_OPTIMIZER_CRASH } -template +template void run_test() { run_test_on_host(); - run_test_on_device(); + run_test_on_device(); } int @@ -350,7 +348,7 @@ main() // kernels. This is being filed to the compiler team. In the meantime, we can rearrange this test // to resolve the issue on our side. #if _PSTL_RED_BY_SEG_WINDOWS_COMPILE_ORDER_BROKEN - run_test, UserBinaryPredicate>, MaxFunctor>>(); + run_test, UserBinaryPredicate>, MaxFunctor>>(); #endif #if TEST_DPCPP_BACKEND_PRESENT @@ -360,17 +358,17 @@ main() #endif // TEST_DPCPP_BACKEND_PRESENT #if !_PSTL_RED_BY_SEG_WINDOWS_COMPILE_ORDER_BROKEN - run_test, UserBinaryPredicate>, MaxFunctor>>(); + run_test, UserBinaryPredicate>, MaxFunctor>>(); #endif - run_test, ::std::plus>(); - run_test, ::std::plus>(); - run_test, ::std::plus>(); + run_test, ::std::plus>(); + run_test, ::std::plus>(); + run_test, ::std::plus>(); // TODO investigate possible overflow: see issue #1416 - run_test_on_device, ::std::multiplies>(); - run_test_on_device, ::std::multiplies>(); - run_test_on_device, ::std::multiplies>(); + run_test_on_device, ::std::multiplies>(); + run_test_on_device, ::std::multiplies>(); + run_test_on_device, ::std::multiplies>(); return TestUtils::done(); }