Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Critical: Force cudf.concat when passing in a cudf Series to MG Uniform Neighbor Sample #3416

Merged
merged 12 commits into from
Apr 5, 2023
48 changes: 33 additions & 15 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ def uniform_neighbor_sample(
cuGraph graph, which contains connectivity information as dask cudf
edge list dataframe

start_list : list or cudf.Series (int32)
start_list : int, list, cudf.Series, or dask_cudf.Series (int32 or int64)
a list of starting vertices for sampling

fanout_vals : list (int32)
fanout_vals : list
List of branching out (fan-out) degrees per starting vertex for each
hop level.

Expand All @@ -286,15 +286,15 @@ def uniform_neighbor_sample(
Flag to specify whether to return edge properties (weight, edge id,
edge type, batch id, hop id) with the sampled edges.

batch_id_list: list (int32), optional (default=None)
batch_id_list: cudf.Series or dask_cudf.Series (int32), optional (default=None)
List of batch ids that will be returned with the sampled edges if
with_edge_properties is set to True.

label_list: list (int32), optional (default=None)
label_list: cudf.Series or dask_cudf.Series (int32), optional (default=None)
List of unique batch id labels. Used along with
label_to_output_comm_rank to assign batch ids to GPUs.

label_to_out_comm_rank (int32), optional (default=None)
label_to_out_comm_rank: cudf.Series or dask_cudf.Series (int32), optional (default=None)
List of output GPUs (by rank) corresponding to batch
id labels in the label list. Used to assign each batch
id to a GPU.
Expand Down Expand Up @@ -396,23 +396,41 @@ def uniform_neighbor_sample(
else:
indices_t = numpy.int32

if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(start_list)

start_list = start_list.rename(start_col_name).to_frame()
start_list = start_list.rename(start_col_name)
if batch_id_list is not None:
ddf = start_list.join(batch_id_list.rename(batch_col_name))
batch_id_list = batch_id_list.rename(batch_col_name)
if hasattr(start_list, "compute"):
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
# mg input
start_list = start_list.to_frame()
batch_id_list = batch_id_list.to_frame()
ddf = start_list.merge(
batch_id_list,
how="left",
left_index=True,
right_index=True,
)
else:
# sg input
ddf = cudf.concat(
[
start_list,
batch_id_list,
],
axis=1,
)
else:
ddf = start_list
ddf = start_list.to_frame()
Comment on lines +405 to +423
Copy link
Member

@VibhuJawa VibhuJawa Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really care about the index here ? I think not . Does below work ?

start_list = start_list.reset_index(drop=True)
batch_id_list = batch_id_list.reset_index(drop=True)

if isinstance(start_list, dask_cudf.Series):
   ddf = dd.concat([start_list, batch_id_list], ignore_unknown_divisions=True, axis=1) 
else:
   ddf = cudf.concat([start_list, batch_id_list], axis =1, ignore_index=True) 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we reset index can we join batch id and start list correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, I ran into an issue with dask_cudf.concat where the name of the series was dropped in one of my first attempts at a solution. dask_cudf.merge doesn't have that problem.

Copy link
Member

@VibhuJawa VibhuJawa Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to, from the logic you shared , we are merging on index ( left_index=True, right_index=True) in dask which is the same thing but more inefficient.

Edit: Also added ingore_index=True to make it more concrete in cuDF.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me try this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VibhuJawa I just confirmed this is not an issue with dask-cudf, it's an issue with our get_distributed_data function. I will make an issue for cugraph instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why calling merge instead of concat before get_distributed_data works, but for some reason the bug completely disappears with merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take a look too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating an issue .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should link it here, sorry: #3420


if isinstance(ddf, cudf.DataFrame):
splits = cp.array_split(cp.arange(len(ddf)), len(Comms.get_workers()))
ddf = {w: [ddf.iloc[splits[i]]] for i, w in enumerate(Comms.get_workers())}
if input_graph.renumbered:
ddf = input_graph.lookup_internal_vertex_id(ddf, column_name=start_col_name)

else:
if hasattr(ddf, "compute"):
ddf = get_distributed_data(ddf)
wait(ddf)
ddf = ddf.worker_to_parts
else:
splits = cp.array_split(cp.arange(len(ddf)), len(Comms.get_workers()))
ddf = {w: [ddf.iloc[splits[i]]] for i, w in enumerate(Comms.get_workers())}

client = get_client()
session_id = Comms.get_session_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os

import pytest
import cupy
import cudf
import dask_cudf
from pylibcugraph.testing.utils import gen_fixture_params_product
Expand Down Expand Up @@ -422,7 +423,7 @@ def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets):


@pytest.mark.mg
def test_uniform_neighbor_sample_edge_properties_self_loops():
def test_uniform_neighbor_sample_edge_properties_self_loops(dask_client):
df = dask_cudf.from_cudf(
cudf.DataFrame(
{
Expand Down Expand Up @@ -484,7 +485,9 @@ def test_uniform_neighbor_sample_edge_properties_self_loops():
@pytest.mark.skipif(
int(os.getenv("DASK_NUM_WORKERS", 2)) < 2, reason="too few workers to test"
)
def test_uniform_neighbor_edge_properties_sample_small_start_list(with_replacement):
def test_uniform_neighbor_edge_properties_sample_small_start_list(
dask_client, with_replacement
):
df = dask_cudf.from_cudf(
cudf.DataFrame(
{
Expand Down Expand Up @@ -518,7 +521,7 @@ def test_uniform_neighbor_edge_properties_sample_small_start_list(with_replaceme


@pytest.mark.mg
def test_uniform_neighbor_sample_without_dask_inputs():
def test_uniform_neighbor_sample_without_dask_inputs(dask_client):
df = dask_cudf.from_cudf(
cudf.DataFrame(
{
Expand Down Expand Up @@ -573,6 +576,65 @@ def test_uniform_neighbor_sample_without_dask_inputs():
assert sorted(sampling_results.hop_id.values_host.tolist()) == [0, 0, 0, 1, 1, 1]


@pytest.mark.mg
@pytest.mark.parametrize("dataset", datasets)
@pytest.mark.parametrize("input_df", [cudf.DataFrame, dask_cudf.DataFrame])
@pytest.mark.parametrize("max_batches", [2, 8, 16, 32])
def test_uniform_neighbor_sample_batched(dask_client, dataset, input_df, max_batches):
num_workers = len(dask_client.scheduler_info()["workers"])

df = dataset.get_edgelist()
df["eid"] = cupy.arange(len(df), dtype=df["src"].dtype)
df["etp"] = cupy.zeros_like(df["eid"].to_cupy())
ddf = dask_cudf.from_cudf(df, npartitions=num_workers)

G = cugraph.Graph(directed=True)
G.from_dask_cudf_edgelist(
ddf,
source="src",
destination="dst",
edge_attr=["wgt", "eid", "etp"],
legacy_renum_only=True,
)

input_vertices = dask_cudf.concat([df.src, df.dst]).unique().compute()
assert isinstance(input_vertices, cudf.Series)

input_vertices.index = cupy.random.permutation(len(input_vertices))

input_batch = cudf.Series(
cupy.random.randint(0, max_batches, len(input_vertices)), dtype="int32"
)
input_batch.index = cupy.random.permutation(len(input_vertices))

if input_df == dask_cudf.DataFrame:
input_batch = dask_cudf.from_cudf(input_batch, npartitions=num_workers)
input_vertices = dask_cudf.from_cudf(input_vertices, npartitions=num_workers)

sampling_results = cugraph.dask.uniform_neighbor_sample(
G,
start_list=input_vertices,
batch_id_list=input_batch,
fanout_vals=[5, 5],
with_replacement=False,
with_edge_properties=True,
)

for batch_id in range(max_batches):
output_starts_per_batch = (
sampling_results[
(sampling_results.batch_id == batch_id) & (sampling_results.hop_id == 0)
]
.sources.nunique()
.compute()
)

input_starts_per_batch = len(input_batch[input_batch == batch_id])

# Should be <= to account for starts without outgoing edges
assert output_starts_per_batch <= input_starts_per_batch


# =============================================================================
# Benchmarks
# =============================================================================
Expand All @@ -581,7 +643,7 @@ def test_uniform_neighbor_sample_without_dask_inputs():
@pytest.mark.mg
@pytest.mark.slow
@pytest.mark.parametrize("n_samples", [1_000, 5_000, 10_000])
def bench_uniform_neigbour_sample_email_eu_core(gpubenchmark, dask_client, n_samples):
def bench_uniform_neighbor_sample_email_eu_core(gpubenchmark, dask_client, n_samples):
input_data_path = email_Eu_core.get_path()
chunksize = dcg.get_chunksize(input_data_path)

Expand Down