Skip to content

Commit

Permalink
[RFC] dash::copy: Implement global-to-global
Browse files Browse the repository at this point in the history
Active team selection is now done by tag struct argument.
  • Loading branch information
bertwesarg committed Mar 3, 2020
1 parent 91c36c9 commit 76a910b
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 11 deletions.
157 changes: 149 additions & 8 deletions dash/include/dash/algorithm/Copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -646,25 +646,166 @@ copy_async(
}
#endif

struct ActiveDestination{};
struct ActiveSource{};

/**
* Specialization of \c dash::copy as global-to-global blocking copy
* operation.
*
* \ingroup DashAlgorithms
*/
template <
class GlobInputIt,
class GlobOutputIt,
bool UseHandles = false>
GlobOutputIt copy(
GlobInputIt in_first,
GlobInputIt in_last,
GlobOutputIt out_first,
ActiveDestination /*unused*/)
{
DASH_LOG_TRACE("dash::copy()", "blocking, global to global, active destination");

using size_type = typename GlobInputIt::size_type;
using input_value_type = typename GlobInputIt::value_type;
using output_value_type = typename GlobOutputIt::value_type;

size_type num_elem_total = dash::distance(in_first, in_last);
if (num_elem_total <= 0) {
DASH_LOG_TRACE("dash::copy", "input range empty");
return out_first;
}

auto g_out_first = out_first;
auto g_out_last = g_out_first + num_elem_total;

internal::ContiguousRangeSet<GlobOutputIt> range_set{g_out_first, g_out_last};

const auto & out_team = out_first.team();
out_team.barrier();

std::vector<dart_handle_t> handles;
std::vector<dart_handle_t>* handles_arg = UseHandles ? &handles : nullptr;

dash::internal::local_copy_chunks<input_value_type, output_value_type> local_chunks;

size_type num_elem_processed = 0;

for (auto range : range_set) {

auto cur_out_first = range.first;
auto num_copy_elem = range.second;

DASH_ASSERT_GT(num_copy_elem, 0,
"Number of elements to copy is 0");

// handle local data only
if (cur_out_first.is_local()) {
auto dest_ptr = cur_out_first.local();
auto src_ptr = in_first + num_elem_processed;
internal::copy_impl(src_ptr,
src_ptr + num_copy_elem,
dest_ptr,
handles_arg,
local_chunks);
}
num_elem_processed += num_copy_elem;
}

dash::internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall_local(handles.data(), handles.size());
} else if (!UseHandles) {
dart_flush_local_all(in_first.dart_gptr());
}
out_team.barrier();

DASH_ASSERT_EQ(num_elem_processed, num_elem_total,
"Failed to find all contiguous subranges in range");

return g_out_last;
}

/**
* Specialization of \c dash::copy as global-to-global blocking copy
* operation.
*
* \ingroup DashAlgorithms
*/
template <typename ValueType, class GlobInputIt, class GlobOutputIt>
template <
class GlobInputIt,
class GlobOutputIt,
bool UseHandles = false>
GlobOutputIt copy(
GlobInputIt /*in_first*/,
GlobInputIt /*in_last*/,
GlobOutputIt /*out_first*/)
GlobInputIt in_first,
GlobInputIt in_last,
GlobOutputIt out_first,
ActiveSource /*unused*/)
{
DASH_LOG_TRACE("dash::copy()", "blocking, global to global");

// TODO:
// - Implement adapter for local-to-global dash::copy here
// - Return if global input range has no local sub-range
using size_type = typename GlobInputIt::size_type;
using input_value_type = typename GlobInputIt::value_type;
using output_value_type = typename GlobOutputIt::value_type;

size_type num_elem_total = dash::distance(in_first, in_last);
if (num_elem_total <= 0) {
DASH_LOG_TRACE("dash::copy", "input range empty");
return out_first;
}

internal::ContiguousRangeSet<GlobOutputIt> range_set{in_first, in_last};

const auto & in_team = in_first.team();
in_team.barrier();

std::vector<dart_handle_t> handles;
std::vector<dart_handle_t>* handles_arg = UseHandles ? &handles : nullptr;

dash::internal::local_copy_chunks<input_value_type, output_value_type> local_chunks;

size_type num_elem_processed = 0;

for (auto range : range_set) {

auto cur_in_first = range.first;
auto num_copy_elem = range.second;

DASH_ASSERT_GT(num_copy_elem, 0,
"Number of elements to copy is 0");

// handle local data only
if (cur_in_first.is_local()) {
auto src_ptr = cur_in_first.local();
auto dest_ptr = out_first + num_elem_processed;
internal::copy_impl(src_ptr,
src_ptr + num_copy_elem,
dest_ptr,
handles_arg,
local_chunks);
}
num_elem_processed += num_copy_elem;
}

internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall(handles.data(), handles.size());
} else if (!UseHandles) {
dart_flush_all(out_first.dart_gptr());
}
in_team.barrier();

DASH_ASSERT_EQ(num_elem_processed, num_elem_total,
"Failed to find all contiguous subranges in range");

return GlobOutputIt();
return out_first + num_elem_total;
}

#endif // DOXYGEN
Expand Down
68 changes: 68 additions & 0 deletions dash/test/algorithm/CopyTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1040,3 +1040,71 @@ TEST_F(CopyTest, InputOutputTypeTest)
ASSERT_TRUE_U((dash::internal::is_dash_copyable<const point_t, point_t>::value));

}

TEST_F(CopyTest, MatrixTransfersGlobalToGlobal)
{
if (_dash_size < 2) {
SKIP_TEST_MSG("At least 2 units required for this test.");
}

using TeamSpecT = dash::TeamSpec<2>;
using MatrixT = dash::NArray<double, 2>;
using PatternT = typename MatrixT::pattern_type;
using SizeSpecT = dash::SizeSpec<2>;
using DistSpecT = dash::DistributionSpec<2>;

auto& team_all = dash::Team::All();
TeamSpecT team_all_spec(team_all.size(), 1);
team_all_spec.balance_extents();

auto size_spec = SizeSpecT(4*team_all_spec.extent(1),
4*team_all_spec.extent(1));
auto dist_spec = DistSpecT(dash::BLOCKED, dash::BLOCKED);

MatrixT grid_more(size_spec, dist_spec, team_all, team_all_spec);
dash::fill(grid_more.begin(), grid_more.end(), (double)team_all.myid());
team_all.barrier();

// create a smaller team
dash::Team& team_fewer= team_all.split(2);
team_all.barrier();
if (!team_fewer.is_null() && 0 == team_fewer.position()) {
TeamSpecT team_fewer_spec(team_fewer.size(), 1);
team_fewer_spec.balance_extents();

MatrixT grid_fewer(size_spec, dist_spec, team_fewer, team_fewer_spec);
dash::fill(grid_fewer.begin(), grid_fewer.end(), -1.0);

auto lextents= grid_fewer.pattern().local_extents();

dash::copy(grid_more.begin(), grid_more.end(),
grid_fewer.begin(), dash::ActiveDestination());

if (team_fewer.myid() == 0) {
auto gextents = grid_fewer.extents();
for (uint32_t y = 0; y < gextents[0]; ++y) {
for (uint32_t x = 0; x < gextents[1]; ++x) {
ASSERT_EQ_U(grid_more(y, x), grid_fewer(y, x));
}
}
}

team_fewer.barrier();

dash::fill(grid_fewer.begin(), grid_fewer.end(), (double)team_fewer.myid());

dash::copy(grid_fewer.begin(), grid_fewer.end(),
grid_more.begin(), dash::ActiveSource());

if (team_fewer.myid() == 0) {
auto gextents = grid_fewer.extents();
for (uint32_t y = 0; y < gextents[0]; ++y) {
for (uint32_t x = 0; x < gextents[1]; ++x) {
ASSERT_EQ_U(grid_more(y, x), grid_fewer(y, x));
}
}
}

team_fewer.barrier();
}
}
7 changes: 4 additions & 3 deletions dash/test/container/MatrixTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,10 @@ TEST_F(MatrixTest, BlockCopy)
LOG_MESSAGE("Team barrier passed");

// Copy block 1 of matrix_a to block 0 of matrix_b:
dash::copy<element_t>(matrix_a.block(1).begin(),
matrix_a.block(1).end(),
matrix_b.block(0).begin());
dash::copy(matrix_a.block(1).begin(),
matrix_a.block(1).end(),
matrix_b.block(0).begin(),
dash::ActiveSource());

LOG_MESSAGE("Wait for team barrier ...");
dash::barrier();
Expand Down

0 comments on commit 76a910b

Please sign in to comment.