Skip to content

Commit

Permalink
Pass stream to cuco::static_map (#1984)
Browse files Browse the repository at this point in the history
cuCollection added stream support for static_map in NVIDIA/cuCollections#113

This allows us to avoid unnecessary stream/device synchronizations and also allows to enable multi-stream execution without unnecessary serialization.

This PR updates libcugraph code to pass `handle.get_stream()` to `cuco::static_map`.

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Kumar Aatish (https://github.com/kaatish)

URL: #1984
  • Loading branch information
seunghwak authored Jan 19, 2022
1 parent efbff09 commit 20a6d54
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 136 deletions.
4 changes: 2 additions & 2 deletions cpp/cmake/thirdparty/get_cuco.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#=============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,7 @@ function(find_and_configure_cuco VERSION)
GLOBAL_TARGETS cuco::cuco
CPM_ARGS
GIT_REPOSITORY https://github.com/NVIDIA/cuCollections.git
GIT_TAG f0eecb203590f1f4ac4a9f1700229f4434ac64dc
GIT_TAG 0ca860b824f5dc22cf8a41f09912e62e11f07d82
OPTIONS "BUILD_TESTS OFF"
"BUILD_BENCHMARKS OFF"
"BUILD_EXAMPLES OFF"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,14 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
// 1. build a cuco::static_map object for the k, v pairs.

auto poly_alloc = rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr});
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, handle.get_stream());
auto kv_map_ptr = std::make_unique<
cuco::static_map<vertex_t, value_t, cuda::thread_scope_device, decltype(stream_adapter)>>(
size_t{0},
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter);
stream_adapter,
handle.get_stream());
if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
Expand Down Expand Up @@ -274,8 +275,6 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
handle.get_stream());
}

handle.sync_stream(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

kv_map_ptr = std::make_unique<
Expand All @@ -286,14 +285,17 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
static_cast<size_t>(thrust::distance(map_unique_key_first, map_unique_key_last)) + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter);
stream_adapter,
handle.get_stream());

auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(map_keys.begin(), get_dataframe_buffer_begin(map_value_buffer)));
kv_map_ptr->insert(pair_first, pair_first + map_keys.size());
kv_map_ptr->insert(pair_first,
pair_first + map_keys.size(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());
} else {
handle.sync_stream(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

kv_map_ptr = std::make_unique<
Expand All @@ -306,12 +308,16 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
static_cast<size_t>(thrust::distance(map_unique_key_first, map_unique_key_last)) + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter);
stream_adapter,
handle.get_stream());

auto pair_first =
thrust::make_zip_iterator(thrust::make_tuple(map_unique_key_first, map_value_first));
kv_map_ptr->insert(pair_first,
pair_first + thrust::distance(map_unique_key_first, map_unique_key_last));
pair_first + thrust::distance(map_unique_key_first, map_unique_key_last),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());
}

// 2. aggregate each vertex out-going edges based on keys and transform-reduce.
Expand Down
77 changes: 53 additions & 24 deletions cpp/include/cugraph/utilities/collect_comm.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,10 +75,15 @@ collect_values_for_keys(raft::comms::comms_t const& comm,
static_cast<size_t>(thrust::distance(map_key_first, map_key_last)) + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter);
stream_adapter,
stream_view);
{
auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(map_key_first, map_value_first));
kv_map_ptr->insert(pair_first, pair_first + thrust::distance(map_key_first, map_key_last));
kv_map_ptr->insert(pair_first,
pair_first + thrust::distance(map_key_first, map_key_last),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);
}

// 2. collect values for the unique keys in [collect_key_first, collect_key_last)
Expand Down Expand Up @@ -107,10 +112,12 @@ collect_values_for_keys(raft::comms::comms_t const& comm,

rmm::device_uvector<value_t> values_for_rx_unique_keys(rx_unique_keys.size(), stream_view);

stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr->find(
rx_unique_keys.begin(), rx_unique_keys.end(), values_for_rx_unique_keys.begin());
kv_map_ptr->find(rx_unique_keys.begin(),
rx_unique_keys.end(),
values_for_rx_unique_keys.begin(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);

rmm::device_uvector<value_t> rx_values_for_unique_keys(0, stream_view);
std::tie(rx_values_for_unique_keys, std::ignore) =
Expand All @@ -122,8 +129,6 @@ collect_values_for_keys(raft::comms::comms_t const& comm,
// 3. re-build a cuco::static_map object for the k, v pairs in unique_keys,
// values_for_unique_keys.

stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

kv_map_ptr = std::make_unique<
Expand All @@ -133,18 +138,28 @@ collect_values_for_keys(raft::comms::comms_t const& comm,
unique_keys.size() + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter);
stream_adapter,
stream_view);
{
auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(unique_keys.begin(), values_for_unique_keys.begin()));
kv_map_ptr->insert(pair_first, pair_first + unique_keys.size());
kv_map_ptr->insert(pair_first,
pair_first + unique_keys.size(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);
}

// 4. find values for [collect_key_first, collect_key_last)

auto value_buffer = allocate_dataframe_buffer<value_t>(
thrust::distance(collect_key_first, collect_key_last), stream_view);
kv_map_ptr->find(collect_key_first, collect_key_last, get_dataframe_buffer_begin(value_buffer));
kv_map_ptr->find(collect_key_first,
collect_key_last,
get_dataframe_buffer_begin(value_buffer),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);

return value_buffer;
}
Expand Down Expand Up @@ -189,10 +204,15 @@ collect_values_for_unique_keys(raft::comms::comms_t const& comm,
static_cast<size_t>(thrust::distance(map_key_first, map_key_last)) + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter);
stream_adapter,
stream_view);
{
auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(map_key_first, map_value_first));
kv_map_ptr->insert(pair_first, pair_first + thrust::distance(map_key_first, map_key_last));
kv_map_ptr->insert(pair_first,
pair_first + thrust::distance(map_key_first, map_key_last),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);
}

// 2. collect values for the unique keys in [collect_unique_key_first, collect_unique_key_last)
Expand All @@ -217,10 +237,12 @@ collect_values_for_unique_keys(raft::comms::comms_t const& comm,

rmm::device_uvector<value_t> values_for_rx_unique_keys(rx_unique_keys.size(), stream_view);

stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr->find(
rx_unique_keys.begin(), rx_unique_keys.end(), values_for_rx_unique_keys.begin());
kv_map_ptr->find(rx_unique_keys.begin(),
rx_unique_keys.end(),
values_for_rx_unique_keys.begin(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);

rmm::device_uvector<value_t> rx_values_for_unique_keys(0, stream_view);
std::tie(rx_values_for_unique_keys, std::ignore) =
Expand All @@ -232,8 +254,6 @@ collect_values_for_unique_keys(raft::comms::comms_t const& comm,
// 3. re-build a cuco::static_map object for the k, v pairs in unique_keys,
// values_for_unique_keys.

stream_view.synchronize(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

kv_map_ptr = std::make_unique<
Expand All @@ -243,19 +263,28 @@ collect_values_for_unique_keys(raft::comms::comms_t const& comm,
unique_keys.size() + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter);
stream_adapter,
stream_view);
{
auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(unique_keys.begin(), values_for_unique_keys.begin()));
kv_map_ptr->insert(pair_first, pair_first + unique_keys.size());
kv_map_ptr->insert(pair_first,
pair_first + unique_keys.size(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);
}

// 4. find values for [collect_unique_key_first, collect_unique_key_last)

auto value_buffer = allocate_dataframe_buffer<value_t>(
thrust::distance(collect_unique_key_first, collect_unique_key_last), stream_view);
kv_map_ptr->find(
collect_unique_key_first, collect_unique_key_last, get_dataframe_buffer_begin(value_buffer));
kv_map_ptr->find(collect_unique_key_first,
collect_unique_key_last,
get_dataframe_buffer_begin(value_buffer),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
stream_view);

return value_buffer;
}
Expand Down
82 changes: 50 additions & 32 deletions cpp/src/structure/relabel_impl.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -105,24 +105,26 @@ void relabel(raft::handle_t const& handle,

// update intermediate relabel map

handle.sync_stream(); // cuco::static_map currently does not take stream

auto poly_alloc =
rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
auto stream_adapter =
rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr});
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, handle.get_stream());
cuco::static_map<vertex_t, vertex_t, cuda::thread_scope_device, decltype(stream_adapter)>
relabel_map{// cuco::static_map requires at least one empty slot
std::max(static_cast<size_t>(
static_cast<double>(rx_label_pair_old_labels.size()) / load_factor),
rx_label_pair_old_labels.size() + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter};
stream_adapter,
handle.get_stream()};

auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(rx_label_pair_old_labels.begin(), rx_label_pair_new_labels.begin()));
relabel_map.insert(pair_first, pair_first + rx_label_pair_old_labels.size());
relabel_map.insert(pair_first,
pair_first + rx_label_pair_old_labels.size(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());

rx_label_pair_old_labels.resize(0, handle.get_stream());
rx_label_pair_new_labels.resize(0, handle.get_stream());
Expand All @@ -141,8 +143,6 @@ void relabel(raft::handle_t const& handle,
[key_func] __device__(auto val) { return key_func(val); },
handle.get_stream());

handle.sync_stream(); // cuco::static_map currently does not take stream

if (skip_missing_labels) {
thrust::transform(handle.get_thrust_policy(),
rx_unique_old_labels.begin(),
Expand All @@ -155,55 +155,68 @@ void relabel(raft::handle_t const& handle,
: old_label;
});
} else {
relabel_map.find(
rx_unique_old_labels.begin(),
rx_unique_old_labels.end(),
rx_unique_old_labels.begin()); // now rx_unique_old_lables hold new labels for the
// corresponding old labels
relabel_map.find(rx_unique_old_labels.begin(),
rx_unique_old_labels.end(),
rx_unique_old_labels.begin(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream()); // now rx_unique_old_lables holds new labels for
// the corresponding old labels
}

std::tie(new_labels_for_unique_old_labels, std::ignore) = shuffle_values(
handle.get_comms(), rx_unique_old_labels.begin(), rx_value_counts, handle.get_stream());
}
}

handle.sync_stream(); // cuco::static_map currently does not take stream

{
auto poly_alloc =
rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
auto stream_adapter =
rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr});
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, handle.get_stream());
cuco::static_map<vertex_t, vertex_t, cuda::thread_scope_device, decltype(stream_adapter)>
relabel_map{
// cuco::static_map requires at least one empty slot
std::max(static_cast<size_t>(static_cast<double>(unique_old_labels.size()) / load_factor),
unique_old_labels.size() + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter};
stream_adapter,
handle.get_stream()};

auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(unique_old_labels.begin(), new_labels_for_unique_old_labels.begin()));
relabel_map.insert(pair_first, pair_first + unique_old_labels.size());
relabel_map.find(labels, labels + num_labels, labels);
relabel_map.insert(pair_first,
pair_first + unique_old_labels.size(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());
relabel_map.find(labels,
labels + num_labels,
labels,
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());
}
} else {
handle.sync_stream(); // cuco::static_map currently does not take stream

auto poly_alloc = rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr});
auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, handle.get_stream());
cuco::static_map<vertex_t, vertex_t, cuda::thread_scope_device, decltype(stream_adapter)>
relabel_map{// cuco::static_map requires at least one empty slot
std::max(static_cast<size_t>(static_cast<double>(num_label_pairs) / load_factor),
static_cast<size_t>(num_label_pairs) + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter};
relabel_map(
// cuco::static_map requires at least one empty slot
std::max(static_cast<size_t>(static_cast<double>(num_label_pairs) / load_factor),
static_cast<size_t>(num_label_pairs) + 1),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
stream_adapter,
handle.get_stream());

auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(std::get<0>(old_new_label_pairs), std::get<1>(old_new_label_pairs)));
relabel_map.insert(pair_first, pair_first + num_label_pairs);
relabel_map.insert(pair_first,
pair_first + num_label_pairs,
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());
if (skip_missing_labels) {
thrust::transform(handle.get_thrust_policy(),
labels,
Expand All @@ -216,7 +229,12 @@ void relabel(raft::handle_t const& handle,
: old_label;
});
} else {
relabel_map.find(labels, labels + num_labels, labels);
relabel_map.find(labels,
labels + num_labels,
labels,
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());
}
}

Expand Down
Loading

0 comments on commit 20a6d54

Please sign in to comment.