Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Fix the hang in cuGraph Python Uniform Neighbor Sample, Add Logging to Bulk Sampler #3669

Merged
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
3982222
fix hang
alexbarghi-nv Jun 23, 2023
1f5f956
remove unwanted files
alexbarghi-nv Jun 23, 2023
59c6ffc
pull in changes from other branch
alexbarghi-nv Jun 26, 2023
fe7ef04
style fixes
alexbarghi-nv Jun 26, 2023
009d123
mess with filtering
alexbarghi-nv Jun 26, 2023
f82cbed
remove unwanted files
alexbarghi-nv Jun 26, 2023
40a520a
b
alexbarghi-nv Jun 26, 2023
c01ebdc
remove stats
alexbarghi-nv Jun 26, 2023
e27a339
revert filtering changes
alexbarghi-nv Jun 26, 2023
ea5ecb8
remove unwanted files
alexbarghi-nv Jun 26, 2023
2bfaabe
persist the filter
alexbarghi-nv Jun 26, 2023
9f9d893
cleaned up unwanted files
alexbarghi-nv Jun 26, 2023
046c267
repartitioning
alexbarghi-nv Jun 26, 2023
c9a79e4
remove unwanted files
alexbarghi-nv Jun 26, 2023
055d4b0
remove shuffle
alexbarghi-nv Jun 26, 2023
b0a4c08
Reduce overhead by 2x
VibhuJawa Jun 27, 2023
9289f18
Added bulk sampling script
VibhuJawa Jun 27, 2023
e0f1ac9
m
alexbarghi-nv Jun 27, 2023
11bd9a0
cleanup
alexbarghi-nv Jun 27, 2023
8620824
remove debug file
alexbarghi-nv Jun 27, 2023
1e86d71
revert config
alexbarghi-nv Jun 27, 2023
2df5e8f
update sampling tests
alexbarghi-nv Jun 28, 2023
85b4ff9
update sg sampling
alexbarghi-nv Jun 28, 2023
82802a4
minor
alexbarghi-nv Jun 28, 2023
54859d0
style
alexbarghi-nv Jun 28, 2023
d4b1a2b
Merge branch 'branch-23.08' into python-sampling-fix-hang
alexbarghi-nv Jun 28, 2023
be51b3a
Update python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py
alexbarghi-nv Jun 28, 2023
964837f
switch to isinstance check
alexbarghi-nv Jun 28, 2023
115b656
Merge branch 'python-sampling-fix-hang' of https://github.com/alexbar…
alexbarghi-nv Jun 28, 2023
26c5900
switch to isinstance
alexbarghi-nv Jun 28, 2023
f40bab3
remove log
alexbarghi-nv Jun 28, 2023
a13ec72
more isinstance checks
alexbarghi-nv Jun 28, 2023
14e0aea
correction to multi-column behavior
alexbarghi-nv Jun 28, 2023
eeada64
style
alexbarghi-nv Jun 28, 2023
bec749f
update notebook with run numbers
alexbarghi-nv Jun 28, 2023
7af87f9
add back the wait calls
alexbarghi-nv Jun 29, 2023
11163be
merge multiple waits into one
alexbarghi-nv Jun 29, 2023
0d561a4
do work
alexbarghi-nv Jun 30, 2023
b90f9da
ensure sg code path works
alexbarghi-nv Jun 30, 2023
fc2eabf
ensure empty partitions return a empty df
alexbarghi-nv Jun 30, 2023
a4a30cf
style, min/max change but still requires client copy
alexbarghi-nv Jul 5, 2023
5062de9
update benchmarking script
alexbarghi-nv Jul 5, 2023
315e2a3
remove paths from script
alexbarghi-nv Jul 5, 2023
b41cfda
add arguments in shell script
alexbarghi-nv Jul 6, 2023
de2acbd
Merge branch 'branch-23.08' into python-sampling-fix-hang
alexbarghi-nv Jul 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,860 changes: 1,860 additions & 0 deletions benchmarks/cugraph/standalone/bulk_sampling/benchmarking_script.ipynb
alexbarghi-nv marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A notebook is nice, but just so I can plan ahead: do we want to run this notebook as part of the nightlies, and if so, can we use the notebook conversion script to run it from a command-line (this is what we do in CI, as seen here, which calls this)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do want to run this nightly.

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions benchmarks/cugraph/standalone/bulk_sampling/bulk_sampling.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

export RAPIDS_NO_INITIALIZE="1"
export CUDF_SPILL="1"
export LIBCUDF_CUFILE_POLICY=OFF

dataset_name="ogbn_papers100M"
dataset_root="/datasets/abarghi/"
output_root="/tmp/samples"
alexbarghi-nv marked this conversation as resolved.
Show resolved Hide resolved
batch_sizes="512"
fanouts="25_25,10_10_10,5_10_20"
reverse_edges=True

rm -rf $output_root
mkdir -p $output_root

# Change to 2 in Selene
gpu_per_replica=4
#--add_edge_ids \

# Expand to 1, 4, 8 in Selene
for i in 1,2,3,4:
do
for replication in 2;
do
dataset_name_with_replication="${dataset_name}[${replication}]"
dask_worker_devices=$(seq -s, 0 $((gpu_per_replica*replication-1)))
echo "Sampling dataset = $dataset_name_with_replication on devices = $dask_worker_devices"
python3 cugraph_bulk_sampling.py --datasets $dataset_name_with_replication \
--dataset_root $dataset_root \
--batch_sizes $batch_sizes \
--output_root $output_root \
--dask_worker_devices $dask_worker_devices \
--fanouts $fanouts \
--batch_sizes $batch_sizes \
--reverse_edges
done
done
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

import cugraph

from datetime import datetime

import json
import re
import os
Expand All @@ -50,6 +48,7 @@
import dask_cudf
import dask.dataframe as ddf
from dask.distributed import default_client
from cugraph.dask import get_n_workers

from typing import Optional, Union, Dict

Expand Down Expand Up @@ -173,6 +172,7 @@ def sample_graph(G, label_df, output_path,seed=42, batch_size=500, seeds_per_cal
random_state=seed,
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
log_level = logging.INFO
)

n_workers = len(default_client().scheduler_info()['workers'])
Expand All @@ -182,10 +182,10 @@ def sample_graph(G, label_df, output_path,seed=42, batch_size=500, seeds_per_cal
'batch': cudf.Series(dtype='int32')
})


batch_df = label_df.map_partitions(_make_batch_ids, batch_size, n_workers, meta=meta)
#batch_df = batch_df.sort_values(by='node')

# should always persist the batch dataframe or performace may be suboptimal
# should always persist the batch dataframe or performance may be suboptimal
batch_df = batch_df.persist()

del label_df
Expand Down Expand Up @@ -278,6 +278,8 @@ def load_disk_dataset(dataset, dataset_dir='.', reverse_edges=True, replication_
path = Path(dataset_dir) / dataset
parquet_path = path / 'parquet'

n_workers = get_n_workers()

with open(os.path.join(path, 'meta.json')) as meta_file:
meta = json.load(meta_file)

Expand All @@ -289,7 +291,9 @@ def load_disk_dataset(dataset, dataset_dir='.', reverse_edges=True, replication_
print(f'Loading edge index for edge type {edge_type}')

can_edge_type = tuple(edge_type.split('__'))
edge_index_dict[can_edge_type] = dask_cudf.read_parquet(os.path.join(os.path.join(parquet_path, edge_type), 'edge_index.parquet'))
edge_index_dict[can_edge_type] = dask_cudf.read_parquet(
Path(parquet_path) / edge_type / 'edge_index.parquet'
).repartition(n_workers*2)

edge_index_dict[can_edge_type]['src'] += node_offsets_replicated[can_edge_type[0]]
edge_index_dict[can_edge_type]['dst'] += node_offsets_replicated[can_edge_type[-1]]
Expand Down Expand Up @@ -344,7 +348,7 @@ def load_disk_dataset(dataset, dataset_dir='.', reverse_edges=True, replication_
print(f'Loading node labels for node type {node_type} (offset={offset})')
node_label_path = os.path.join(os.path.join(parquet_path, node_type), 'node_label.parquet')
if os.path.exists(node_label_path):
node_labels[node_type] = dask_cudf.read_parquet(node_label_path).drop('label',axis=1).persist()
node_labels[node_type] = dask_cudf.read_parquet(node_label_path).repartition(n_workers).drop('label',axis=1).persist()
node_labels[node_type]['node'] += offset
node_labels[node_type] = node_labels[node_type].persist()

Expand Down
1 change: 1 addition & 0 deletions mg_utils/run-dask-process.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ function buildTcpArgs {
"

WORKER_ARGS="--rmm-pool-size=$WORKER_RMM_POOL_SIZE
--rmm-async
--local-directory=/tmp/$LOGNAME
--scheduler-file=$SCHEDULER_FILE
--memory-limit=$DASK_HOST_MEMORY_LIMIT
Expand Down
Loading
Loading