Skip to content

Commit

Permalink
[RFC] dash::copy: Implement global-to-global
Browse files Browse the repository at this point in the history
Active team selection is now done by tag struct argument.
  • Loading branch information
bertwesarg committed Jul 16, 2019
1 parent ecb9c0d commit 07f640a
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 8 deletions.
145 changes: 137 additions & 8 deletions dash/include/dash/algorithm/Copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -624,25 +624,154 @@ copy_async(
}
#endif

struct ActiveDestination{};
struct ActiveSource{};

/**
* Specialization of \c dash::copy as global-to-global blocking copy
* operation.
*
* \ingroup DashAlgorithms
*/
template <
class GlobInputIt,
class GlobOutputIt,
typename ValueType = typename GlobInputIt::value_type>
GlobOutputIt copy(
GlobInputIt in_first,
GlobInputIt in_last,
GlobOutputIt out_first,
ActiveDestination /*unused*/)
{
DASH_LOG_TRACE("dash::copy()", "blocking, global to global");

typedef typename GlobInputIt::size_type size_type;

size_type num_elem_total = dash::distance(in_first, in_last);
if (num_elem_total <= 0) {
DASH_LOG_TRACE("dash::copy", "input range empty");
return out_first;
}

auto g_out_first = out_first;
auto g_out_last = g_out_first + num_elem_total;

internal::ContiguousRangeSet<GlobOutputIt> range_set{g_out_first, g_out_last};

const auto & out_team = out_first.team();
out_team.barrier();

std::vector<dart_handle_t> handles;
internal::local_copy_chunks<ValueType> local_chunks;

size_type num_elem_processed = 0;

for (auto range : range_set) {

auto cur_out_first = range.first;
auto num_copy_elem = range.second;

DASH_ASSERT_GT(num_copy_elem, 0,
"Number of elements to copy is 0");

// handle local data only
if (cur_out_first.is_local()) {
auto dest_ptr = cur_out_first.local();
auto src_ptr = in_first + num_elem_processed;
internal::copy_impl(src_ptr,
src_ptr + num_copy_elem,
dest_ptr,
&handles,
local_chunks);
}
num_elem_processed += num_copy_elem;
}

internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall_local(handles.data(), handles.size());
}
out_team.barrier();

DASH_ASSERT_EQ(num_elem_processed, num_elem_total,
"Failed to find all contiguous subranges in range");

return g_out_last;
}

/**
* Specialization of \c dash::copy as global-to-global blocking copy
* operation.
*
* \ingroup DashAlgorithms
*/
template <typename ValueType, class GlobInputIt, class GlobOutputIt>
template <
class GlobInputIt,
class GlobOutputIt,
typename ValueType = typename GlobInputIt::value_type>
GlobOutputIt copy(
GlobInputIt /*in_first*/,
GlobInputIt /*in_last*/,
GlobOutputIt /*out_first*/)
GlobInputIt in_first,
GlobInputIt in_last,
GlobOutputIt out_first,
ActiveSource /*unused*/)
{
DASH_LOG_TRACE("dash::copy()", "blocking, global to global");

// TODO:
// - Implement adapter for local-to-global dash::copy here
// - Return if global input range has no local sub-range
typedef typename GlobInputIt::size_type size_type;

size_type num_elem_total = dash::distance(in_first, in_last);
if (num_elem_total <= 0) {
DASH_LOG_TRACE("dash::copy", "input range empty");
return out_first;
}

internal::ContiguousRangeSet<GlobOutputIt> range_set{in_first, in_last};

const auto & in_team = in_first.team();
in_team.barrier();

std::vector<dart_handle_t> handles;
internal::local_copy_chunks<ValueType> local_chunks;

size_type num_elem_processed = 0;

for (auto range : range_set) {

auto cur_in_first = range.first;
auto num_copy_elem = range.second;

DASH_ASSERT_GT(num_copy_elem, 0,
"Number of elements to copy is 0");

// handle local data only
if (cur_in_first.is_local()) {
auto src_ptr = cur_in_first.local();
auto dest_ptr = out_first + num_elem_processed;
internal::copy_impl(src_ptr,
src_ptr + num_copy_elem,
dest_ptr,
&handles,
local_chunks);
}
num_elem_processed += num_copy_elem;
}

internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall(handles.data(), handles.size());
}
in_team.barrier();

DASH_ASSERT_EQ(num_elem_processed, num_elem_total,
"Failed to find all contiguous subranges in range");

return GlobOutputIt();
return out_first + num_elem_total;
}

#endif // DOXYGEN
Expand Down
68 changes: 68 additions & 0 deletions dash/test/algorithm/CopyTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1019,3 +1019,71 @@ TEST_F(CopyTest, MatrixToSmallerTeam)

}
}

TEST_F(CopyTest, MatrixTransfersGlobalToGlobal)
{
if (_dash_size < 2) {
SKIP_TEST_MSG("At least 2 units required for this test.");
}

using TeamSpecT = dash::TeamSpec<2>;
using MatrixT = dash::NArray<double, 2>;
using PatternT = typename MatrixT::pattern_type;
using SizeSpecT = dash::SizeSpec<2>;
using DistSpecT = dash::DistributionSpec<2>;

auto& team_all = dash::Team::All();
TeamSpecT team_all_spec(team_all.size(), 1);
team_all_spec.balance_extents();

auto size_spec = SizeSpecT(4*team_all_spec.extent(1),
4*team_all_spec.extent(1));
auto dist_spec = DistSpecT(dash::BLOCKED, dash::BLOCKED);

MatrixT grid_more(size_spec, dist_spec, team_all, team_all_spec);
dash::fill(grid_more.begin(), grid_more.end(), (double)team_all.myid());
team_all.barrier();

// create a smaller team
dash::Team& team_fewer= team_all.split(2);
team_all.barrier();
if (!team_fewer.is_null() && 0 == team_fewer.position()) {
TeamSpecT team_fewer_spec(team_fewer.size(), 1);
team_fewer_spec.balance_extents();

MatrixT grid_fewer(size_spec, dist_spec, team_fewer, team_fewer_spec);
dash::fill(grid_fewer.begin(), grid_fewer.end(), -1.0);

auto lextents= grid_fewer.pattern().local_extents();

dash::copy(grid_more.begin(), grid_more.end(),
grid_fewer.begin(), dash::ActiveDestination());

if (team_fewer.myid() == 0) {
auto gextents = grid_fewer.extents();
for (uint32_t y = 0; y < gextents[0]; ++y) {
for (uint32_t x = 0; x < gextents[1]; ++x) {
ASSERT_EQ_U(grid_more(y, x), grid_fewer(y, x));
}
}
}

team_fewer.barrier();

dash::fill(grid_fewer.begin(), grid_fewer.end(), (double)team_fewer.myid());

dash::copy(grid_fewer.begin(), grid_fewer.end(),
grid_more.begin(), dash::ActiveSource());

if (team_fewer.myid() == 0) {
auto gextents = grid_fewer.extents();
for (uint32_t y = 0; y < gextents[0]; ++y) {
for (uint32_t x = 0; x < gextents[1]; ++x) {
ASSERT_EQ_U(grid_more(y, x), grid_fewer(y, x));
}
}
}

team_fewer.barrier();
}
}

0 comments on commit 07f640a

Please sign in to comment.