diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index e33c219a6a7..45be1139808 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -272,10 +272,10 @@ def uniform_neighbor_sample( cuGraph graph, which contains connectivity information as dask cudf edge list dataframe - start_list : list or cudf.Series (int32) + start_list : int, list, cudf.Series, or dask_cudf.Series (int32 or int64) a list of starting vertices for sampling - fanout_vals : list (int32) + fanout_vals : list List of branching out (fan-out) degrees per starting vertex for each hop level. @@ -286,15 +286,16 @@ def uniform_neighbor_sample( Flag to specify whether to return edge properties (weight, edge id, edge type, batch id, hop id) with the sampled edges. - batch_id_list: list (int32), optional (default=None) + batch_id_list: cudf.Series or dask_cudf.Series (int32), optional (default=None) List of batch ids that will be returned with the sampled edges if with_edge_properties is set to True. - label_list: list (int32), optional (default=None) + label_list: cudf.Series or dask_cudf.Series (int32), optional (default=None) List of unique batch id labels. Used along with label_to_output_comm_rank to assign batch ids to GPUs. - label_to_out_comm_rank (int32), optional (default=None) + label_to_out_comm_rank: cudf.Series or dask_cudf.Series (int32), + optional (default=None) List of output GPUs (by rank) corresponding to batch id labels in the label list. Used to assign each batch id to a GPU. @@ -396,23 +397,41 @@ def uniform_neighbor_sample( else: indices_t = numpy.int32 - if input_graph.renumbered: - start_list = input_graph.lookup_internal_vertex_id(start_list) - - start_list = start_list.rename(start_col_name).to_frame() + start_list = start_list.rename(start_col_name) if batch_id_list is not None: - ddf = start_list.join(batch_id_list.rename(batch_col_name)) + batch_id_list = batch_id_list.rename(batch_col_name) + if hasattr(start_list, "compute"): + # mg input + start_list = start_list.to_frame() + batch_id_list = batch_id_list.to_frame() + ddf = start_list.merge( + batch_id_list, + how="left", + left_index=True, + right_index=True, + ) + else: + # sg input + ddf = cudf.concat( + [ + start_list, + batch_id_list, + ], + axis=1, + ) else: - ddf = start_list + ddf = start_list.to_frame() - if isinstance(ddf, cudf.DataFrame): - splits = cp.array_split(cp.arange(len(ddf)), len(Comms.get_workers())) - ddf = {w: [ddf.iloc[splits[i]]] for i, w in enumerate(Comms.get_workers())} + if input_graph.renumbered: + ddf = input_graph.lookup_internal_vertex_id(ddf, column_name=start_col_name) - else: + if hasattr(ddf, "compute"): ddf = get_distributed_data(ddf) wait(ddf) ddf = ddf.worker_to_parts + else: + splits = cp.array_split(cp.arange(len(ddf)), len(Comms.get_workers())) + ddf = {w: [ddf.iloc[splits[i]]] for i, w in enumerate(Comms.get_workers())} client = get_client() session_id = Comms.get_session_id() diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py index 3cb33d68ee7..81403ebf3ff 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py @@ -15,6 +15,7 @@ import os import pytest +import cupy import cudf import dask_cudf from pylibcugraph.testing.utils import gen_fixture_params_product @@ -422,7 +423,7 @@ def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets): @pytest.mark.mg -def test_uniform_neighbor_sample_edge_properties_self_loops(): +def test_uniform_neighbor_sample_edge_properties_self_loops(dask_client): df = dask_cudf.from_cudf( cudf.DataFrame( { @@ -484,7 +485,9 @@ def test_uniform_neighbor_sample_edge_properties_self_loops(): @pytest.mark.skipif( int(os.getenv("DASK_NUM_WORKERS", 2)) < 2, reason="too few workers to test" ) -def test_uniform_neighbor_edge_properties_sample_small_start_list(with_replacement): +def test_uniform_neighbor_edge_properties_sample_small_start_list( + dask_client, with_replacement +): df = dask_cudf.from_cudf( cudf.DataFrame( { @@ -518,7 +521,7 @@ def test_uniform_neighbor_edge_properties_sample_small_start_list(with_replaceme @pytest.mark.mg -def test_uniform_neighbor_sample_without_dask_inputs(): +def test_uniform_neighbor_sample_without_dask_inputs(dask_client): df = dask_cudf.from_cudf( cudf.DataFrame( { @@ -573,6 +576,65 @@ def test_uniform_neighbor_sample_without_dask_inputs(): assert sorted(sampling_results.hop_id.values_host.tolist()) == [0, 0, 0, 1, 1, 1] +@pytest.mark.mg +@pytest.mark.parametrize("dataset", datasets) +@pytest.mark.parametrize("input_df", [cudf.DataFrame, dask_cudf.DataFrame]) +@pytest.mark.parametrize("max_batches", [2, 8, 16, 32]) +def test_uniform_neighbor_sample_batched(dask_client, dataset, input_df, max_batches): + num_workers = len(dask_client.scheduler_info()["workers"]) + + df = dataset.get_edgelist() + df["eid"] = cupy.arange(len(df), dtype=df["src"].dtype) + df["etp"] = cupy.zeros_like(df["eid"].to_cupy()) + ddf = dask_cudf.from_cudf(df, npartitions=num_workers) + + G = cugraph.Graph(directed=True) + G.from_dask_cudf_edgelist( + ddf, + source="src", + destination="dst", + edge_attr=["wgt", "eid", "etp"], + legacy_renum_only=True, + ) + + input_vertices = dask_cudf.concat([df.src, df.dst]).unique().compute() + assert isinstance(input_vertices, cudf.Series) + + input_vertices.index = cupy.random.permutation(len(input_vertices)) + + input_batch = cudf.Series( + cupy.random.randint(0, max_batches, len(input_vertices)), dtype="int32" + ) + input_batch.index = cupy.random.permutation(len(input_vertices)) + + if input_df == dask_cudf.DataFrame: + input_batch = dask_cudf.from_cudf(input_batch, npartitions=num_workers) + input_vertices = dask_cudf.from_cudf(input_vertices, npartitions=num_workers) + + sampling_results = cugraph.dask.uniform_neighbor_sample( + G, + start_list=input_vertices, + batch_id_list=input_batch, + fanout_vals=[5, 5], + with_replacement=False, + with_edge_properties=True, + ) + + for batch_id in range(max_batches): + output_starts_per_batch = ( + sampling_results[ + (sampling_results.batch_id == batch_id) & (sampling_results.hop_id == 0) + ] + .sources.nunique() + .compute() + ) + + input_starts_per_batch = len(input_batch[input_batch == batch_id]) + + # Should be <= to account for starts without outgoing edges + assert output_starts_per_batch <= input_starts_per_batch + + # ============================================================================= # Benchmarks # ============================================================================= @@ -581,7 +643,7 @@ def test_uniform_neighbor_sample_without_dask_inputs(): @pytest.mark.mg @pytest.mark.slow @pytest.mark.parametrize("n_samples", [1_000, 5_000, 10_000]) -def bench_uniform_neigbour_sample_email_eu_core(gpubenchmark, dask_client, n_samples): +def bench_uniform_neighbor_sample_email_eu_core(gpubenchmark, dask_client, n_samples): input_data_path = email_Eu_core.get_path() chunksize = dcg.get_chunksize(input_data_path)