Skip to content

Python: fix load_chunk to temporary #913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 127 additions & 77 deletions src/binding/python/RecordComponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,34 @@ parseTupleSlices(uint8_t const ndim, Extent const & full_extent, py::tuple const
return std::make_tuple(offset, extent, flatten);
}

/** Check an array is a contiguous buffer
*
* Required are contiguous buffers for store and load
*
* - not strided with paddings
* - not a view in another buffer that results in striding
*/
inline void
check_buffer_is_contiguous( py::array & a ) {

auto* view = new Py_buffer();
int flags = PyBUF_STRIDES | PyBUF_FORMAT;
if( PyObject_GetBuffer( a.ptr(), view, flags ) != 0 )
{
delete view;
throw py::error_already_set();
}
bool isContiguous = ( PyBuffer_IsContiguous( view, 'A' ) != 0 );
PyBuffer_Release( view );
delete view;

if( !isContiguous )
throw py::index_error(
"strides in chunk are inefficient, not implemented!");
// @todo in order to implement stride handling, one needs to
// loop over the input data strides in store/load calls
}

/** Store Chunk
*
* Called with offset and extent that are already in the record component's
Expand Down Expand Up @@ -260,27 +288,7 @@ store_chunk(RecordComponent & r, py::array & a, Offset const & offset, Extent co
);
}

/* required are contiguous buffers
*
* - not strided with paddings
* - not a view in another buffer that results in striding
*/
auto* view = new Py_buffer();
int flags = PyBUF_STRIDES | PyBUF_FORMAT;
if( PyObject_GetBuffer( a.ptr(), view, flags ) != 0 )
{
delete view;
throw py::error_already_set();
}
bool isContiguous = ( PyBuffer_IsContiguous( view, 'A' ) != 0 );
PyBuffer_Release( view );
delete view;

if( !isContiguous )
throw py::index_error(
"strides in chunk are inefficient, not implemented!");
// @todo in order to implement stride handling, one needs to
// loop over the input data strides during write below
check_buffer_is_contiguous( a );

// here, we increase a reference on the user-passed data so that
// temporary and lost-scope variables stay alive until we flush
Expand Down Expand Up @@ -352,64 +360,83 @@ store_chunk(RecordComponent & r, py::array & a, py::tuple const & slices)
* Size checks of the requested chunk (spanned data is in valid bounds)
* will be performed at C++ API part in RecordComponent::loadChunk .
*/
inline py::array
load_chunk(RecordComponent & r, Offset const & offset, Extent const & extent, std::vector<bool> const & flatten)
inline void
load_chunk(RecordComponent & r, py::array & a, Offset const & offset, Extent const & extent)
{
// some one-size dimensions might be flattended in our output due to selections by index
size_t const numFlattenDims = std::count(flatten.begin(), flatten.end(), true);
std::vector< ptrdiff_t > shape(extent.size() - numFlattenDims);
auto maskIt = flatten.begin();
std::copy_if(
std::begin(extent),
std::end(extent),
std::begin(shape),
[&maskIt](std::uint64_t){
return !*(maskIt++);
}
);
// check array is large enough
size_t s_load = 1u;
size_t s_array = 1u;
std::string str_extent_shape;
std::string str_array_shape;
for( auto & si : extent ) {
s_load *= si;
str_extent_shape.append(" ").append(std::to_string(si));
}
for( py::ssize_t d = 0; d < a.ndim(); ++d ) {
s_array *= a.shape()[d];
str_array_shape.append(" ").append(std::to_string(a.shape()[d]));
}

auto const dtype = dtype_to_numpy( r.getDatatype() );
/* we allow flattening of the result dimension
if( size_t(a.ndim()) > extent.size() )
throw py::index_error(
std::string("dimension of array (") +
std::to_string(a.ndim()) +
std::string("D) does not fit dimension of selection "
"in record component (") +
std::to_string(extent.size()) +
std::string("D)")
);
*/
if( s_array < s_load ) {
throw py::index_error(
std::string("size of array (") +
std::to_string(s_array) +
std::string("; shape:") +
str_array_shape +
std::string(") is smaller than size of selection "
"in record component (") +
std::to_string(s_load) +
std::string("; shape:") +
str_extent_shape +
std::string(")")
);
}

auto a = py::array( dtype, shape );
check_buffer_is_contiguous( a );

if( r.getDatatype() == Datatype::CHAR )
r.loadChunk<char>(shareRaw((char*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::UCHAR )
r.loadChunk<unsigned char>(shareRaw((unsigned char*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::SHORT )
r.loadChunk<short>(shareRaw((short*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::INT )
r.loadChunk<int>(shareRaw((int*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::LONG )
r.loadChunk<long>(shareRaw((long*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::LONGLONG )
r.loadChunk<long long>(shareRaw((long long*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::USHORT )
r.loadChunk<unsigned short>(shareRaw((unsigned short*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::UINT )
r.loadChunk<unsigned int>(shareRaw((unsigned int*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::ULONG )
r.loadChunk<unsigned long>(shareRaw((unsigned long*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::ULONGLONG )
r.loadChunk<unsigned long long>(shareRaw((unsigned long long*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::LONG_DOUBLE )
r.loadChunk<long double>(shareRaw((long double*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::DOUBLE )
r.loadChunk<double>(shareRaw((double*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::FLOAT )
r.loadChunk<float>(shareRaw((float*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::CLONG_DOUBLE )
r.loadChunk<std::complex<long double>>(shareRaw((std::complex<long double>*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::CDOUBLE )
r.loadChunk<std::complex<double>>(shareRaw((std::complex<double>*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::CFLOAT )
r.loadChunk<std::complex<float>>(shareRaw((std::complex<float>*) a.mutable_data()), offset, extent);
else if( r.getDatatype() == Datatype::BOOL )
r.loadChunk<bool>(shareRaw((bool*) a.mutable_data()), offset, extent);
else
throw std::runtime_error(std::string("Datatype not known in 'loadChunk'!"));
// here, we increase a reference on the user-passed data so that
// temporary and lost-scope variables stay alive until we flush
// note: this does not yet prevent the user, as in C++, to build
// a race condition by manipulating the data they passed
auto load_data = [ &r, &a, &offset, &extent ]( auto cxxtype ) {
using CXXType = decltype(cxxtype);
a.inc_ref();
void* data = a.mutable_data();
std::shared_ptr< CXXType > shared( ( CXXType * )data,
[ a ]( CXXType * ) { a.dec_ref(); } );
r.loadChunk( std::move( shared ), offset, extent );
};

return a;
if( r.getDatatype() == Datatype::CHAR ) load_data( char() );
else if( r.getDatatype() == Datatype::UCHAR ) load_data( (unsigned char)0 );
else if( r.getDatatype() == Datatype::SHORT ) load_data( short() );
else if( r.getDatatype() == Datatype::INT ) load_data( int() );
else if( r.getDatatype() == Datatype::LONG ) load_data( long() );
else if( r.getDatatype() == Datatype::LONGLONG ) load_data( (long long)0 );
else if( r.getDatatype() == Datatype::USHORT ) load_data( (unsigned short)0 );
else if( r.getDatatype() == Datatype::UINT ) load_data( (unsigned int)0 );
else if( r.getDatatype() == Datatype::ULONG ) load_data( (unsigned long)0 );
else if( r.getDatatype() == Datatype::ULONGLONG ) load_data( (unsigned long long)0 );
else if( r.getDatatype() == Datatype::LONG_DOUBLE ) load_data( (long double)0 );
else if( r.getDatatype() == Datatype::DOUBLE ) load_data( double() );
else if( r.getDatatype() == Datatype::FLOAT ) load_data( float() );
else if( r.getDatatype() == Datatype::CLONG_DOUBLE ) load_data( std::complex<long double>() );
else if( r.getDatatype() == Datatype::CDOUBLE ) load_data( std::complex<double>() );
else if( r.getDatatype() == Datatype::CFLOAT ) load_data( std::complex<float>() );
else if( r.getDatatype() == Datatype::BOOL ) load_data( bool() );
else
throw std::runtime_error(std::string("Datatype not known in 'load_chunk'!"));
}

/** Load Chunk
Expand All @@ -427,7 +454,25 @@ load_chunk(RecordComponent & r, py::tuple const & slices)
std::vector<bool> flatten;
std::tie(offset, extent, flatten) = parseTupleSlices(ndim, full_extent, slices);

return load_chunk(r, offset, extent, flatten);
// some one-size dimensions might be flattended in our output due to selections by index
size_t const numFlattenDims = std::count(flatten.begin(), flatten.end(), true);
std::vector< ptrdiff_t > shape(extent.size() - numFlattenDims);
auto maskIt = flatten.begin();
std::copy_if(
std::begin(extent),
std::end(extent),
std::begin(shape),
[&maskIt](std::uint64_t){
return !*(maskIt++);
}
);

auto const dtype = dtype_to_numpy( r.getDatatype() );
auto a = py::array( dtype, shape );

load_chunk(r, a, offset, extent);

return a;
}

void init_RecordComponent(py::module &m) {
Expand Down Expand Up @@ -628,8 +673,13 @@ void init_RecordComponent(py::module &m) {
else
extent = extent_in;

std::vector<bool> flatten(ndim, false);
return load_chunk(r, offset, extent, flatten);
std::vector< ptrdiff_t > shape(extent.size());
std::copy(std::begin(extent), std::end(extent), std::begin(shape));
auto const dtype = dtype_to_numpy( r.getDatatype() );
auto a = py::array( dtype, shape );
load_chunk(r, a, offset, extent);

return a;
},
py::arg_v("offset", Offset(1, 0u), "np.zeros(Record_Component.shape)"),
py::arg_v("extent", Extent(1, -1u), "Record_Component.shape")
Expand Down
25 changes: 21 additions & 4 deletions test/python/unittest/API/APITest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1706,9 +1706,22 @@ def writeFromTemporaryStore(self, E_x):
if found_numpy:
E_x.store_chunk(np.array([[4, 5, 6]], dtype=np.dtype("int")),
[1, 0])

data = np.array([[1, 2, 3]], dtype=np.dtype("int"))
E_x.store_chunk(data)

data2 = np.array([[7, 8, 9]], dtype=np.dtype("int"))
E_x.store_chunk(np.repeat(data2, 198, axis=0),
[2, 0])

def loadToTemporaryStore(self, r_E_x):
# not catching the return value shall not result in a use-after-free:
r_E_x.load_chunk()
Copy link
Member Author

@ax3l ax3l Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franzpoeschel it looks like these tests does not trigger the issue we were concerned about.

Maybe the GC zeroes out the data it frees or we are otherwise lucky... But then we should see this throw: https://github.com/openPMD/openPMD-api/blob/0.13.1/include/openPMD/RecordComponent.hpp#L334-L335

Do you have another Python snippet we could add in the test here that demonstrates the load_chunk issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say this is likely a combination of (1) the rather small dataset that we test this on, (2) the fact that py::array initializes its data upon constructon and (3) that we don't allocate anything in between. So, maybe we could trigger this by making things go a bit wilder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/franzpoeschel/openPMD-api/tree/topic-pyStoreChunkInMem-increaseBuffersize
This branch triggers the issue on my machine and your implementation fixes it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great hint, thank you - increasing the tested memory triggers the issue reliably :)

# we keep a reference on the data until we are done flush()ing
d = r_E_x[()]
del d
return

def writeFromTemporary(self, ext):
name = "../samples/write_from_temporary_python." + ext
write = io.Series(
Expand All @@ -1718,7 +1731,7 @@ def writeFromTemporary(self, ext):

DS = io.Dataset
E_x = write.iterations[0].meshes["E"]["x"]
E_x.reset_dataset(DS(np.dtype("int"), [2, 3]))
E_x.reset_dataset(DS(np.dtype("int"), [200, 3]))
self.writeFromTemporaryStore(E_x)
gc.collect() # trigger removal of temporary data to check its copied

Expand All @@ -1731,16 +1744,20 @@ def writeFromTemporary(self, ext):

r_E_x = read.iterations[0].meshes["E"]["x"]
if read.backend == 'ADIOS2':
self.assertEqual(len(r_E_x.available_chunks()), 2)
self.assertEqual(len(r_E_x.available_chunks()), 3)
else:
self.assertEqual(len(r_E_x.available_chunks()), 1)
r_d = r_E_x[()]
self.loadToTemporaryStore(r_E_x)
gc.collect() # trigger removal of temporary data to check its copied

read.flush()

if found_numpy:
np.testing.assert_allclose(
r_d,
np.array([[1, 2, 3], [4, 5, 6]], dtype=np.dtype("int"))
r_d[:3, :],
np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]],
dtype=np.dtype("int"))
)

def testWriteFromTemporary(self):
Expand Down