Skip to content

Honor output range distribution in dash::transform #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions dash/include/dash/algorithm/Copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,6 @@ copy_async(
* \ingroup DashAlgorithms
*/
template <
typename ValueType,
class GlobInputIt,
class GlobOutputIt >
GlobOutputIt copy(
Expand All @@ -1196,11 +1195,46 @@ GlobOutputIt copy(
{
DASH_LOG_TRACE("dash::copy()", "blocking, global to global");

DASH_LOG_TRACE_VAR("dash::copy()", in_first);
DASH_LOG_TRACE_VAR("dash::copy()", in_last);
DASH_LOG_TRACE_VAR("dash::copy()", out_first);

auto num_elements = dash::distance(in_first, in_last);
auto li_range_in = local_index_range(in_first, in_last);
auto num_local_elem = li_range_in.end - li_range_in.begin;
DASH_LOG_TRACE_VAR("dash::copy()", num_elements);
DASH_LOG_TRACE_VAR("dash::copy()", num_local_elem);


// copy our local portion into the global output range
if (num_local_elem > 0) {
auto pattern = in_first.pattern();
// the distance from the first local element to the in_first iterator
auto in_offset = pattern.global(li_range_in.begin)
- in_first.global().gpos();

// the first local element
auto local_in_first = in_first + in_offset;
// the last local element
auto local_in_last = in_first + (num_local_elem + in_offset - 1);
auto local_out_first = out_first + in_offset;

DASH_LOG_TRACE("Copying from range \n [",
pattern.global(li_range_in.begin), ", ",
pattern.global(li_range_in.end - 1), "] \n [", local_in_first,
"] to \n ", local_out_first, " (global offset ", in_offset, ") ");

dash::copy(
local_in_first.local(),
// pointer one past the last element
local_in_last.local() + 1,
local_out_first);
}
// TODO:
// - Implement adapter for local-to-global dash::copy here
// - Return if global input range has no local sub-range

return GlobOutputIt();
return (out_first + num_elements);
}

#endif // DOXYGEN
Expand Down
88 changes: 69 additions & 19 deletions dash/include/dash/algorithm/Transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <dash/GlobRef.h>
#include <dash/GlobAsyncRef.h>

#include <dash/algorithm/Copy.h>
#include <dash/algorithm/LocalRange.h>
#include <dash/algorithm/Operation.h>
#include <dash/algorithm/Accumulate.h>
Expand Down Expand Up @@ -48,8 +49,33 @@ inline dart_ret_t transform_blocking_impl(
return result;
}

/**
* Wrapper of the non-blocking DART accumulate operation with local completion.
* Allows re-use of \c values pointer after the call returns.
*/
template< typename ValueType >
dart_ret_t transform_local_blocking_impl(
dart_gptr_t dest,
ValueType * values,
size_t nvalues,
dart_operation_t op)
{
static_assert(dash::dart_datatype<ValueType>::value != DART_TYPE_UNDEFINED,
"Cannot accumulate unknown type!");

dart_ret_t result = dart_accumulate(
dest,
reinterpret_cast<void *>(values),
nvalues,
dash::dart_datatype<ValueType>::value,
op);
dart_flush_local(dest);
return result;
}

/**
* Wrapper of the non-blocking DART accumulate operation.
* The pointer \c values should not be re-used before the operation completed.
*/
template< typename ValueType >
dart_ret_t transform_impl(
Expand All @@ -67,7 +93,6 @@ dart_ret_t transform_impl(
nvalues,
dash::dart_datatype<ValueType>::value,
op);
dart_flush_local(dest);
return result;
}

Expand Down Expand Up @@ -271,40 +296,65 @@ GlobOutputIt transform(
BinaryOperation binary_op)
{
DASH_LOG_DEBUG("dash::transform(af, al, bf, outf, binop)");
auto &pattern = out_first.pattern();
// Outut range different from rhs input range is not supported yet
auto in_first = in_a_first;
auto in_last = in_a_last;
std::vector<ValueType> in_range;
ValueType* in_first = &(*in_a_first);
ValueType* in_last = &(*in_a_last);
// Number of elements in local range:
size_t num_local_elements = std::distance(in_first, in_last);
auto out_last = out_first + num_local_elements;
if (out_last.gpos() > pattern.size()) {
DASH_THROW(dash::exception::OutOfRange,
"Too many input elements in dash::transform");
}
if (in_b_first == out_first) {
// Output range is rhs input range: C += A
// Input is (in_a_first, in_a_last).
} else {
// Output range different from rhs input range: C = A+B
// Input is (in_a_first, in_a_last) + (in_b_first, in_b_last):
std::transform(
in_a_first, in_a_last,
dash::copy(
in_b_first,
std::back_inserter(in_range),
binary_op);
in_first = in_range.data();
in_last = in_first + in_range.size();
in_b_first + std::distance(in_a_first, in_a_last),
out_first);
}

dash::util::Trace trace("transform");

// Resolve local range from global range:
// Number of elements in local range:
size_t num_local_elements = std::distance(in_first, in_last);
// Global iterator to dart_gptr_t:
dart_gptr_t dest_gptr = out_first.dart_gptr();
// Send accumulate message:
trace.enter_state("transform_blocking");
dash::internal::transform_blocking_impl(
auto &team = pattern.team();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?

size_t towrite = num_local_elements;
auto out_it = out_first;
auto in_it = in_first;
while (towrite > 0) {
auto lpos = out_it.lpos();
size_t lsize = pattern.local_size(lpos.unit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the local_size covers the total number of elements in the that unit? What if the unit has multiple local blocks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I already mentioned multiple times. This approach only works for a single continuous index range. Hence it would be better to use the new range based views interface by @fuchsto . There should be something like dash::chunks(dash::local(...)). Unfortunately I do not have detailed knowledge in this API either.

Copy link
Member

@fuchsto fuchsto May 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fmoessbauer @bertwesarg
Yes, almost, it's
dash::blocks(dash::local(view_or_range_expr))
or
dash::local(dash::blocks(view_or_range_expr)).
Containers are valid range expressions, iterators can be converted to ranges using
dash::make_range(it_begin, it_end).

The term chunks was intended for any ad-hoc user-specified decomposition like dash::chunks(dash::stride(10, array)), blocks depend on the pattern. But actually there is no reason to differentiate between those.

size_t num_values = std::min(lsize - lpos.index, towrite);
dart_gptr_t dest_gptr = out_it.dart_gptr();
// use non-blocking transform and wait for all at the end
dash::internal::transform_impl(
dest_gptr,
in_first,
num_local_elements,
in_it,
num_values,
binary_op.dart_operation());
trace.exit_state("transform_blocking");
out_it += num_values;
in_it += num_values;
towrite -= num_values;
}

// out_first.team().barrier();
dart_flush_all(out_first.dart_gptr());


// trace.enter_state("transform_blocking");
// dash::internal::transform_blocking_impl(
// dest_gptr,
// in_first,
// num_local_elements,
// binary_op.dart_operation());
// trace.exit_state("transform_blocking");
// The position past the last element transformed in global element space
// cannot be resolved from the size of the local range if the local range
// spans over more than one block. Otherwise, the difference of two global
Expand All @@ -320,7 +370,7 @@ GlobOutputIt transform(
// For ranges over block borders, we would have to resolve the global
// position past the last element transformed from the iterator's pattern
// (see dash::PatternIterator).
return out_first + num_local_elements;
return out_it;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions dash/include/dash/iterator/GlobViewIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,10 +1091,10 @@ std::ostream & operator<<(
ElementType, Pattern, GlobStaticMem, Pointer, Reference> & it)
{
std::ostringstream ss;
dash::GlobPtr<ElementType, GlobStaticMem> ptr(it);
// dash::GlobPtr<ElementType, GlobStaticMem> ptr(it);
ss << "dash::GlobViewIter<" << typeid(ElementType).name() << ">("
<< "idx:" << it._idx << ", "
<< "gptr:" << ptr << ")";
<< "gptr:" << it.global().dart_gptr() << ")";
return operator<<(os, ss.str());
}

Expand Down
51 changes: 51 additions & 0 deletions dash/test/algorithm/CopyTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <dash/Matrix.h>

#include <dash/algorithm/Copy.h>
#include <dash/algorithm/Fill.h>
#include <dash/algorithm/Generate.h>
#include <dash/algorithm/ForEach.h>
#include <dash/pattern/ShiftTilePattern1D.h>
#include <dash/pattern/TilePattern1D.h>
#include <dash/pattern/BlockPattern1D.h>
Expand Down Expand Up @@ -803,6 +806,54 @@ TEST_F(CopyTest, AsyncGlobalToLocalBlock)
}
}


TEST_F(CopyTest, GlobalToGlobal)
{
using value_t = int;
constexpr int elem_per_unit = 100;
dash::Array<value_t> source(dash::size() * elem_per_unit);
dash::Array<value_t> target(dash::size() * elem_per_unit);

dash::fill(target.begin(), target.end(), 0);
dash::generate_with_index(source.begin(), source.end(),
[](size_t idx) {
return dash::myid() * 1000 + idx;
}
);

source.barrier();

// copy the full range
dash::copy(source.begin(), source.end(), target.begin());
source.barrier();

dash::for_each_with_index(target.begin(), target.end(),
[](value_t val, size_t idx) {
ASSERT_EQ_U(val, dash::myid() * 1000 + idx);
}
);

// copy the range with an offset (effectively moving the input
// range to the left by 1)
dash::copy(source.begin() + 1, source.end(), target.begin());
source.barrier();

dash::for_each_with_index(target.begin(), target.end() - 1,
[](value_t val, size_t idx) {
std::cout << idx << ": " << val << std::endl;
// the array has shifted so the last element is different
if ((idx % elem_per_unit) == (elem_per_unit - 1)) {
// the last element comes from the next unit
// this element has not been copied on the last unit
ASSERT_EQ_U(val, (dash::myid() + 1) * 1000 + idx + 1);
} else {
ASSERT_EQ_U(val, dash::myid() * 1000 + idx + 1);
}
}
);

}

#if 0
// TODO
TEST_F(CopyTest, AsyncAllToLocalVector)
Expand Down
40 changes: 40 additions & 0 deletions dash/test/algorithm/TransformTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include <dash/algorithm/Transform.h>
#include <dash/algorithm/Generate.h>
#include <dash/algorithm/Fill.h>
#include <dash/algorithm/ForEach.h>

#include <dash/Array.h>
#include <dash/Matrix.h>
Expand Down Expand Up @@ -221,3 +223,41 @@ TEST_F(TransformTest, MatrixGlobalPlusGlobalBlocking)
EXPECT_EQ_U(first_l_block_a_begin,
first_l_block_a_offsets);
}


TEST_F(TransformTest, LocalIteratorInput)
{
using value_t = int;
std::vector<value_t> local_v(100);
size_t idx = 0;
std::fill(local_v.begin(), local_v.end(), (value_t)dash::myid());
for (auto& elem : local_v) {
elem = dash::myid() * 1000 + idx;
idx++;
}
dash::Array<value_t> global_v(local_v.size() + 1);
dash::fill(global_v.begin(), global_v.end(), 0.0);
global_v.barrier();
// start from the second element
auto it = dash::transform<value_t>(
local_v.begin(),
local_v.end(),
global_v.begin() + 1,
global_v.begin() + 1,
dash::max<value_t>()
);

global_v.barrier();

ASSERT_EQ_U(it, global_v.end());

// size_t idx = 0;

dash::for_each_with_index(global_v.begin() + 1, global_v.end(),
[](value_t val, size_t idx){
ASSERT_EQ_U(val, (dash::size() - 1) * 1000 + (idx - 1));
++idx;
Copy link
Member

@bertwesarg bertwesarg May 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove ++idx.

});

global_v.barrier();
}
39 changes: 25 additions & 14 deletions dash/test/container/MatrixTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,28 +721,39 @@ TEST_F(MatrixTest, BlockCopy)
dash::Team::All(),
team_spec);
// Fill matrix
auto block_a = matrix_a.block(1);
auto block_b = matrix_b.block(0);
if (myid == 0) {
LOG_MESSAGE("Assigning matrix values");
for(size_t col = 0; col < matrix_a.extent(0); ++col) {
for(size_t row = 0; row < matrix_a.extent(1); ++row) {
auto value = (row * matrix_a.extent(0)) + col;
matrix_a[col][row] = value;
matrix_b[col][row] = value;
for(size_t row = 0; row < matrix_a.extent(0); ++row) {
for(size_t col = 0; col < matrix_a.extent(1); ++col) {
auto value = (row * 1000) + col;
matrix_a[row][col] = value;
matrix_b[row][col] = value;
}
}
}
LOG_MESSAGE("Wait for team barrier ...");
dash::barrier();
LOG_MESSAGE("Team barrier passed");

matrix_b.barrier();

LOG_MESSAGE("Copying block");

// Copy block 1 of matrix_a to block 0 of matrix_b:
dash::copy<element_t>(matrix_a.block(1).begin(),
matrix_a.block(1).end(),
matrix_b.block(0).begin());
dash::copy(block_a.begin(),
block_a.end(),
block_b.begin());
matrix_b.barrier();

LOG_MESSAGE("Wait for team barrier ...");
dash::barrier();
LOG_MESSAGE("Team barrier passed");
LOG_MESSAGE("Checking copy result");
if (myid == 0) {
LOG_MESSAGE("Checking copied matrix block values");
for(size_t col = 0; col < block_a.extent(0); ++col) {
for(size_t row = 0; row < block_a.extent(1); ++row) {
ASSERT_EQ_U(static_cast<element_t>(block_b[col][row]),
static_cast<element_t>(block_a[col][row]));
}
}
}
}

TEST_F(MatrixTest, StorageOrder)
Expand Down