Skip to content

Commit

Permalink
Explicit streams in device_buffer (#775)
Browse files Browse the repository at this point in the history
Removes default stream arguments from `rmm::device_buffer`. Now copy construction requires a stream argument (default copy ctor deleted), and copy assignment is disallowed (operator deleted). Move construction and assignment are still supported, and move assignment still use the most recently used stream for deallocating any previous data.

Also improves device_buffer tests (implements TODOs in code).

I don't think this should be merged until RAPIDS dependent libraries are ready for it. I have a libcudf PR in progress for this.

Fixes #418

- [x] cuDF PR: rapidsai/cudf#8280
- [x] cuGraph PR: rapidsai/cugraph#1609
- [x] cuSpatial PR: rapidsai/cuspatial#403
- [x] cuML does not yet use device_buffer

Authors:
  - Mark Harris (https://github.com/harrism)

Approvers:
  - Rong Ou (https://github.com/rongou)
  - Keith Kraus (https://github.com/kkraus14)
  - Conor Hoekstra (https://github.com/codereport)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #775
  • Loading branch information
harrism authored Jun 2, 2021
1 parent 783ed6d commit 7cbcd97
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 204 deletions.
103 changes: 35 additions & 68 deletions include/rmm/device_buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,8 +50,12 @@ namespace rmm {
* cuda_stream_view stream = cuda_stream_view{};
* device_buffer custom_buff(100, stream, &mr);
*
* // deep copies `buff` into a new device buffer using the default stream
* device_buffer buff_copy(buff);
* // deep copies `buff` into a new device buffer using the specified stream
* device_buffer buff_copy(buff, stream);
*
* // moves the memory in `from_buff` to `to_buff`. Deallocates previously allocated
* // to_buff memory on `to_buff.stream()`.
* device_buffer to_buff(std::move(from_buff));
*
* // deep copies `buff` into a new device buffer using the specified stream
* device_buffer buff_copy(buff, stream);
Expand All @@ -62,16 +66,19 @@ namespace rmm {
* // Default construction. Buffer is empty
* device_buffer buff_default{};
*
* // If the requested size is larger than the current size, resizes allocation
* // to the new size and deep copies any previous contents. Otherwise, simply
* // updates the value of `size()` to the newly requested size without any
* // allocations or copies. Uses the optionally specified stream or the default
* // stream if none specified.
* // If the requested size is larger than the current size, resizes allocation to the new size and
* // deep copies any previous contents. Otherwise, simply updates the value of `size()` to the
* // newly requested size without any allocations or copies. Uses the specified stream.
* buff_default.resize(100, stream);
*```
*/
class device_buffer {
public:
// The copy constructor and copy assignment operator without a stream are deleted because they
// provide no way to specify an explicit stream
device_buffer(device_buffer const& other) = delete;
device_buffer& operator=(device_buffer const& other) = delete;

/**
* @brief Default constructor creates an empty `device_buffer`
*/
Expand All @@ -95,11 +102,11 @@ class device_buffer {
* @param mr Memory resource to use for the device memory allocation.
*/
explicit device_buffer(std::size_t size,
cuda_stream_view stream = cuda_stream_view{},
cuda_stream_view stream,
mr::device_memory_resource* mr = mr::get_current_device_resource())
: _stream{stream}, _mr{mr}
{
allocate(size);
allocate_async(size);
}

/**
Expand All @@ -123,12 +130,12 @@ class device_buffer {
*/
device_buffer(void const* source_data,
std::size_t size,
cuda_stream_view stream = cuda_stream_view{},
cuda_stream_view stream,
mr::device_memory_resource* mr = mr::get_current_device_resource())
: _stream{stream}, _mr{mr}
{
allocate(size);
copy(source_data, size);
allocate_async(size);
copy_async(source_data, size);
}

/**
Expand All @@ -153,7 +160,7 @@ class device_buffer {
* @param mr The resource to use for allocating the new `device_buffer`
*/
device_buffer(device_buffer const& other,
cuda_stream_view stream = cuda_stream_view{},
cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
: device_buffer{other.data(), other.size(), stream, mr}
{
Expand Down Expand Up @@ -185,47 +192,6 @@ class device_buffer {
other.set_stream(cuda_stream_view{});
}

/**
* @brief Copies the contents of `other` into this `device_buffer`.
*
* All operations on the data in this `device_buffer` on all streams must be
* complete before using this operator, otherwise behavior is undefined.
*
* If the existing capacity is large enough, and the memory resources are
* compatible, then this `device_buffer`'s existing memory will be reused and
* `other`s contents will simply be copied on `other.stream()`. I.e., if
* `capcity() > other.size()` and
* `memory_resource()->is_equal(*other.memory_resource())`.
*
* Otherwise, the existing memory will be deallocated using
* `memory_resource()` on `stream()` and new memory will be allocated using
* `other.memory_resource()` on `other.stream()`.
*
* @throws rmm::bad_alloc if allocation fails
* @throws rmm::cuda_error if the copy from `other` fails
*
* @param other The `device_buffer` to copy.
*/
device_buffer& operator=(device_buffer const& other)
{
if (&other != this) {
// If the current capacity is large enough and the resources are
// compatible, just reuse the existing memory
if ((capacity() > other.size()) and _mr->is_equal(*other._mr)) {
resize(other.size(), other.stream());
copy(other.data(), other.size());
} else {
// Otherwise, need to deallocate and allocate new memory
deallocate();
set_stream(other.stream());
_mr = other._mr;
allocate(other.size());
copy(other.data(), other.size());
}
}
return *this;
}

/**
* @brief Move assignment operator moves the contents from `other`.
*
Expand All @@ -241,7 +207,7 @@ class device_buffer {
device_buffer& operator=(device_buffer&& other) noexcept
{
if (&other != this) {
deallocate();
deallocate_async();

_data = other._data;
_size = other._size;
Expand All @@ -266,7 +232,7 @@ class device_buffer {
*/
~device_buffer() noexcept
{
deallocate();
deallocate_async();
_mr = nullptr;
_stream = cuda_stream_view{};
}
Expand Down Expand Up @@ -296,7 +262,7 @@ class device_buffer {
* @param new_size The requested new size, in bytes
* @param stream The stream to use for allocation and copy
*/
void resize(std::size_t new_size, cuda_stream_view stream = cuda_stream_view{})
void resize(std::size_t new_size, cuda_stream_view stream)
{
set_stream(stream);
// If the requested size is smaller than the current capacity, just update
Expand All @@ -307,7 +273,7 @@ class device_buffer {
void* const new_data = _mr->allocate(new_size, this->stream());
RMM_CUDA_TRY(
cudaMemcpyAsync(new_data, data(), size(), cudaMemcpyDefault, this->stream().value()));
deallocate();
deallocate_async();
_data = new_data;
_size = new_size;
_capacity = new_size;
Expand All @@ -327,7 +293,7 @@ class device_buffer {
*
* @param stream The stream on which the allocation and copy are performed
*/
void shrink_to_fit(cuda_stream_view stream = cuda_stream_view{})
void shrink_to_fit(cuda_stream_view stream)
{
set_stream(stream);
if (size() != capacity()) {
Expand Down Expand Up @@ -404,19 +370,19 @@ class device_buffer {
///< allocate/deallocate device memory

/**
* @brief Allocates the specified amount of memory and updates the
* size/capacity accordingly.
* @brief Allocates the specified amount of memory and updates the size/capacity accordingly.
*
* Allocates on `stream()` using the memory resource passed to the constructor.
*
* If `bytes == 0`, sets `_data = nullptr`.
*
* @param bytes The amount of memory to allocate
* @param stream The stream on which to allocate
*/
void allocate(std::size_t bytes)
void allocate_async(std::size_t bytes)
{
_size = bytes;
_capacity = bytes;
_data = (bytes > 0) ? _mr->allocate(bytes, stream()) : nullptr;
_data = (bytes > 0) ? memory_resource()->allocate(bytes, stream()) : nullptr;
}

/**
Expand All @@ -426,10 +392,11 @@ class device_buffer {
* If the buffer doesn't hold any memory, i.e., `capacity() == 0`, doesn't
* call the resource deallocation.
*
* Deallocates on `stream()` using the memory resource passed to the constructor.
*/
void deallocate() noexcept
void deallocate_async() noexcept
{
if (capacity() > 0) { _mr->deallocate(data(), capacity(), stream()); }
if (capacity() > 0) { memory_resource()->deallocate(data(), capacity(), stream()); }
_size = 0;
_capacity = 0;
_data = nullptr;
Expand All @@ -447,7 +414,7 @@ class device_buffer {
* @param source The pointer to copy from
* @param bytes The number of bytes to copy
*/
void copy(void const* source, std::size_t bytes)
void copy_async(void const* source, std::size_t bytes)
{
if (bytes > 0) {
RMM_EXPECTS(nullptr != source, "Invalid copy from nullptr.");
Expand Down
4 changes: 2 additions & 2 deletions include/rmm/device_scalar.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -314,7 +314,7 @@ class device_scalar {
device_scalar &operator=(device_scalar &&) = delete;

private:
rmm::device_buffer buffer{sizeof(T)};
rmm::device_buffer buffer{sizeof(T), cuda_stream_default};

inline void _memcpy(void *dst, const void *src, cuda_stream_view stream) const
{
Expand Down
9 changes: 3 additions & 6 deletions python/rmm/_lib/device_buffer.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ from rmm._lib.memory_resource cimport DeviceMemoryResource
cdef extern from "rmm/device_buffer.hpp" namespace "rmm" nogil:
cdef cppclass device_buffer:
device_buffer()
device_buffer(size_t size) except +
device_buffer(size_t size, cuda_stream_view stream) except +
device_buffer(const void* source_data, size_t size) except +
device_buffer(const void* source_data,
size_t size, cuda_stream_view stream) except +
device_buffer(const device_buffer& other) except +
void resize(size_t new_size) except +
void shrink_to_fit() except +
void resize(size_t new_size, cuda_stream_view stream) except +
void shrink_to_fit(cuda_stream_view stream) except +
void* data()
size_t size()
size_t capacity()
Expand Down Expand Up @@ -60,7 +57,7 @@ cdef class DeviceBuffer:
cpdef bytes tobytes(self, Stream stream=*)

cdef size_t c_size(self) except *
cpdef void resize(self, size_t new_size) except *
cpdef void resize(self, size_t new_size, Stream stream=*) except *
cpdef size_t capacity(self) except *
cdef void* c_data(self) except *

Expand Down
6 changes: 4 additions & 2 deletions python/rmm/_lib/device_buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,10 @@ cdef class DeviceBuffer:
cdef size_t c_size(self) except *:
return self.c_obj.get()[0].size()

cpdef void resize(self, size_t new_size) except *:
self.c_obj.get()[0].resize(new_size)
cpdef void resize(self,
size_t new_size,
Stream stream=DEFAULT_STREAM) except *:
self.c_obj.get()[0].resize(new_size, stream.view())

cpdef size_t capacity(self) except *:
return self.c_obj.get()[0].capacity()
Expand Down
5 changes: 4 additions & 1 deletion python/rmm/_lib/tests/test_device_buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import numpy as np
from libcpp.memory cimport make_unique
from libcpp.utility cimport move

from rmm._lib.cuda_stream_view cimport cuda_stream_default
from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer


def test_release():
expect = DeviceBuffer.to_device(b'abc')
cdef DeviceBuffer buf = DeviceBuffer.to_device(b'abc')

got = DeviceBuffer.c_from_unique_ptr(
make_unique[device_buffer](buf.c_release())
make_unique[device_buffer](buf.c_release(),
cuda_stream_default.value())
)
np.testing.assert_equal(expect.copy_to_host(), got.copy_to_host())

Expand Down
4 changes: 2 additions & 2 deletions tests/cuda_stream_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,7 @@ TEST_F(CudaStreamTest, Equality)
EXPECT_NE(view_a, rmm::cuda_stream());
EXPECT_NE(stream_a, rmm::cuda_stream());

rmm::device_buffer buff(0);
rmm::device_buffer buff{};
EXPECT_EQ(buff.stream(), view_default);
}

Expand Down
Loading

0 comments on commit 7cbcd97

Please sign in to comment.