Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't start LocalCUDACluster w/ ucx #1148

Closed
randerzander opened this issue Mar 29, 2023 · 22 comments
Closed

Can't start LocalCUDACluster w/ ucx #1148

randerzander opened this issue Mar 29, 2023 · 22 comments

Comments

@randerzander
Copy link
Contributor

randerzander commented Mar 29, 2023

With a conda environment using the latest nightlies:

mamba create -y --name pynds -c rapidsai-nightly -c conda-forge -c nvidia python=3.10 cudatoolkit=11.8 cudf=23.04 dask-cudf dask-cuda 'ucx-proc=*=gpu' ucx-py 'rust>=1.59.0' 'setuptools-rust>=1.5.2' dask/label/dev::dask-sql requests pydrive2 gspread oauth2client plotly python-graphviz graphviz bpython jupyterlab

conda list | grep rapids
cubinlinker               0.2.0           py310hf09951c_1    rapidsai-nightly
cudf                      23.04.00a       cuda_11_py310_230327_g173fde9d9a_240    rapidsai-nightly
dask-cuda                 23.06.00a       py310_230329_g9839618_4    rapidsai-nightly
dask-cudf                 23.04.00a       cuda_11_py310_230327_g173fde9d9a_240    rapidsai-nightly
libcudf                   23.04.00a       cuda11_230327_g173fde9d9a_240    rapidsai-nightly
librmm                    23.04.00a       cuda11_230321_ge8fbd06e_34    rapidsai-nightly
rmm                       23.04.00a       cuda11_py310_230321_ge8fbd06e_34    rapidsai-nightly
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.31.00a        py310_230320_gb5f4a10_13    rapidsai-nightly

conda list | grep dask
dask                      2023.3.2           pyhd8ed1ab_0    conda-forge
dask-core                 2023.3.2           pyhd8ed1ab_0    conda-forge
dask-cuda                 23.06.00a       py310_230329_g9839618_4    rapidsai-nightly
dask-cudf                 23.04.00a       cuda_11_py310_230327_g173fde9d9a_240    rapidsai-nightly
dask-sql                  2023.2.1a230328 py310_g883cc3c_43    dask/label/dev
distributed               2023.3.2           pyhd8ed1ab_0    conda-forge

conda list | grep ucx
ucx                       1.14.0               h538f049_0    conda-forge
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.31.00a        py310_230320_gb5f4a10_13    rapidsai-nightly

Repro:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

if __name__ == "__main__":
    cluster = LocalCUDACluster(protocol="ucx")
    client = Client(cluster)
    print(client)

Trace:

<Client: 'ucx://127.0.0.1:56043' processes=8 threads=8, memory=503.75 GiB>                  2023-03-29 09:04:59,340 - distributed.core - ERROR -                                        
Traceback (most recent call last):                                                          
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/comm/ucx.
py", line 349, in read
    await self.ep.recv(msg)
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/ucp/core.py", line 72
5, in recv
    ret = await comm.tag_recv(self._ep, buffer, nbytes, tag, name=log)
ucp._libs.exceptions.UCXCanceled: <[Recv #006] ep: 0x7f7882f8e840, tag: 0x45a67c76a1982a39, 
nbytes: 16, type: <class 'numpy.ndarray'>>: 

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/utils.py"
, line 752, in wrapper
    return await func(*args, **kwargs)
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/comm/ucx.
py", line 366, in read
    self.abort()
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/distributed/comm/ucx.
py", line 434, in abort 
    self._ep.abort()
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/ucp/core.py", line 56
4, in abort
    logger.debug("Endpoint.abort(): %s" % hex(self.uid))
  File "/raid/rgelhausen/conda/envs/pynds/lib/python3.10/site-packages/ucp/core.py", line 55
0, in uid
    return self._ep.handle
  File "ucp/_libs/ucx_endpoint.pyx", line 359, in ucp._libs.ucx_api.UCXEndpoint.handle.__get
__
AssertionError

above trace repeats

@randerzander
Copy link
Contributor Author

I'm seeing this on a box w/ 4 T4s, as well as a box with 8x A100s. Doesn't seem machine or card specific.

@pentschev
Copy link
Member

Could you check the ucx version that gets installed as well and if besides the Python backtrace there's any segmentation fault/assertion errors/other C backtraces in any of the processes?

@randerzander
Copy link
Contributor Author

Added UCX to the issue desc

@pentschev
Copy link
Member

Could you then check if there are any other traces as asked above, after that could you try installing ucx=1.13.1 and rerunning?

@pentschev
Copy link
Member

Actually, before downgrading to ucx=1.13.1, could you try rerunning with UCX_RNDV_SCHEME=get_zcopy?

@randerzander
Copy link
Contributor Author

Here's the full trace with export UCX_RNDV_SCHEME=get_zcopy (w/ ucx 1.14.0) before running the repro script:

https://gist.github.com/randerzander/16e59b627fc8abf1efa349f819eba735

@quasiben
Copy link
Member

I'm seeing the same thing with UCX 1.14 with and without UCX_RNDV_SCHEME=get_zcopy

@quasiben
Copy link
Member

This is an issue at shutdown but shouldn't prevent any usage. We think the issue is a change in connection handling in distributed -- bisecting to find out

@pentschev
Copy link
Member

pentschev commented Mar 29, 2023

dask/distributed#7593 is the offending PR.

EDIT: I had pasted the wrong link before.

@randerzander
Copy link
Contributor Author

This is an issue at shutdown but shouldn't prevent any usage.

Unfortunately it does impact usage. Simple things like len(ddf) work, but more complex jobs (Dask-SQL queries) do not. I'll try to boil it down if that's useful, but it seems like the problem has already been identified?

@quasiben
Copy link
Member

With non-trivial failures can you locally revert dask/distributed#7644 and see if that passes for you ?

@wence-
Copy link
Contributor

wence- commented Mar 30, 2023

@randerzander Can you try adding:

import weakref
weakref.finalize(lambda: None, lambda: None)

To the top of your script? Must occur before distributed has been imported

@randerzander
Copy link
Contributor Author

Yes, that resolved both the clean cluster shutdown, and workload failures!

Thanks for the workaround!

@wence-
Copy link
Contributor

wence- commented Apr 3, 2023

@randerzander can you also check if the following works around the problem for you?

def workaround_7726():
    from distributed._concurrent_futures_thread import _python_exit
    from distributed.client import _close_global_client
    from distributed.deploy.spec import close_clusters
    import distributed
    import atexit
    import weakref

    def shutdown():
        # This mimics a function in distributed.__init__ which we can't
        # get a handle on (because it is del'd), and must be registered
        # after the other functions
        distributed._python_shutting_down = True

    # These functions must be unregistered and then re-registered
    for fn in [_python_exit, _close_global_client, close_clusters]:
        atexit.unregister(fn)
    # So that this finalizer is in an atexit hook after them
    # Note that atexit handlers are called last-in, first out.
    # See https://docs.python.org/3/library/atexit.html
    weakref.finalize(lambda: None, lambda: None)
    # And re-register them.
    for fn in [_python_exit, close_clusters, _close_global_client, shutdown]:
        atexit.register(fn)

Run this function before you boot a cluster.

@quasiben
Copy link
Member

quasiben commented Apr 3, 2023

If this solution works I would be inclined to use this rather than put out patch release of distributed

@pentschev
Copy link
Member

The fact that this has to run before spinning up the cluster suggests this must be implemented by the user. Am I right @wence- ? Did you have success implementing it as part of Dask-CUDA directly (without requiring any user interaction)?

@wence-
Copy link
Contributor

wence- commented Apr 3, 2023

I haven't yet, but we could run it, for example, in dask_cuda.__init__

@wence-
Copy link
Contributor

wence- commented Apr 3, 2023

This works for the minimal reproducer (not sure about cluster restarts):

from dask_cuda import LocalCUDACluster
from distributed import Client


if __name__ == "__main__":
    cluster = LocalCUDACluster(protocol="ucx")
    client = Client(cluster)
    del cluster
    print("shutting down...")
    print(client)
diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py
index 656f614..4781048 100644
--- a/dask_cuda/local_cuda_cluster.py
+++ b/dask_cuda/local_cuda_cluster.py
@@ -23,6 +23,35 @@ from .utils import (
 )
 
 
+def workaround_distributed_7726():
+    import atexit
+    import weakref
+
+    import distributed
+    from distributed._concurrent_futures_thread import _python_exit
+    from distributed.client import _close_global_client
+    from distributed.deploy.spec import close_clusters
+
+    def shutdown():
+        # This mimics a function in distributed.__init__ which we can't
+        # get a handle on (because it is del'd), and must be registered
+        # after the other functions
+        distributed._python_shutting_down = True
+
+    # These functions must be unregistered and then re-registered
+    for fn in [_python_exit, _close_global_client, close_clusters]:
+        atexit.unregister(fn)
+    # So that this finalizer is in an atexit hook after them
+    weakref.finalize(lambda: None, lambda: None)
+    # And re-register them.
+    for fn in [_python_exit, close_clusters, _close_global_client, shutdown]:
+        atexit.register(fn)
+
+
+workaround_distributed_7726()
+del workaround_distributed_7726
+
+
 class LoggedWorker(Worker):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)

@pentschev
Copy link
Member

pentschev commented Apr 3, 2023

I can confirm that. The more fragile part seems to be the client must import dask_cuda, it seems that there's where the problem lies. The consequence of that is the dask cuda worker CLI will not suffice, but the client code must import dask_cuda even if it's not used by the client code.

EDIT: That's what I tested:

repro.py
from dask.distributed import Client, LocalCluster
import dask_cuda

if __name__ == "__main__":
    client = Client("ucx://10.33.227.163:8786")
    print(client)
shell
$ dask scheduler --protocol ucx
...

$ dask cuda worker ucx://10.33.227.163:8786
...

$ python repro.py

If import dask_cuda is commented out in repro.py, the problem still occurs.

@wence-
Copy link
Contributor

wence- commented Apr 3, 2023

Yes, I think the client connections are the problematic ones :(

@pentschev
Copy link
Member

Client is problematic in the cases we've seen. I suspect @randerzander 's case may be happening elsewhere too, but not sure either.

@quasiben
Copy link
Member

quasiben commented Apr 6, 2023

This should now be resolved by the distributed 2023.3.2.1 release . Thank you @randerzander for raising!

@quasiben quasiben closed this as completed Apr 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants