Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCXUnreachable when running a benchmark with UCX_TLS=sm #1006

Open
rkooo567 opened this issue Nov 2, 2023 · 10 comments
Open

UCXUnreachable when running a benchmark with UCX_TLS=sm #1006

rkooo567 opened this issue Nov 2, 2023 · 10 comments

Comments

@rkooo567
Copy link

rkooo567 commented Nov 2, 2023

I tried running a benchmark using

UCX_TLS=sm python send_recv.py --server-dev 1 --client-dev 1 --object_type numpy --reuse-alloc --n-bytes 1024

And it seems like the client cannot reach the server for some reasons;

Server Running at 172.31.8.189:60173
Client connecting to server at 172.31.8.189:60173
Process SpawnProcess-2:
Traceback (most recent call last):
  File "/home/ubuntu/.conda/envs/ucx/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/ubuntu/.conda/envs/ucx/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/work/ucx/ucx-py/ucp/benchmarks/send_recv.py", line 95, in client
    loop.run_until_complete(client.run())
  File "/home/ubuntu/.conda/envs/ucx/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/ubuntu/.conda/envs/ucx/lib/python3.9/site-packages/ucp/benchmarks/backends/ucp_async.py", line 117, in run
    ep = await ucp.create_endpoint(self.server_address, self.port)
  File "/home/ubuntu/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 1004, in create_endpoint
    return await _get_ctx().create_endpoint(
  File "/home/ubuntu/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 316, in create_endpoint
    peer_info = await exchange_peer_info(
  File "/home/ubuntu/.conda/envs/ucx/lib/python3.9/site-packages/ucp/core.py", line 54, in exchange_peer_info
    await comm.stream_recv(endpoint, peer_info_arr, peer_info_arr.nbytes)
ucp._libs.exceptions.UCXUnreachable: <stream_recv>: 
Traceback (most recent call last):
  File "/home/ubuntu/work/ucx/ucx-py/ucp/benchmarks/send_recv.py", line 395, in <module>
    main()
  File "/home/ubuntu/work/ucx/ucx-py/ucp/benchmarks/send_recv.py", line 387, in main
    assert not p2.exitcode
AssertionError

When I enable debug logs, I also see

[1698940358.556109] [devbox:356803:0]          ucp_ep.c:3315 UCX  DEBUG ep 0x7f6202366000: calling user error callback 0x7f6203889e40 with arg 0x7f6203557820 and status Destination is unreachable
[1698940358.556225] [devbox:356803] UCXPY  DEBUG Error callback for endpoint 0x7f6202366000 called with status -6: Destination is unreachable

Have you guys seen any similar issue before

@pentschev
Copy link
Member

With UCX_TLS=sm you're limiting communication to shared memory exclusively. With that benchmark a connection with TCP or some other network protocol must be established, please try again with UCX_TLS=tcp,sm or, better yet, do not specify UCX_TLS at all and let UCX decide the optimal transports to use.

@rkooo567
Copy link
Author

rkooo567 commented Nov 2, 2023

HI @pentschev thanks for a quick response!

Actually, I am trying to limit communication to shared memory intentionally (to compare it with our hand written shared memory based data transfer for low latency communication). I am running a benchmark in a single node, so it should theoretically work? Also, if I add tcp,sm, how can I ensure the sm is used for the communication (I am not interested in tcp based transport performance now)?

@pentschev
Copy link
Member

Indeed this is possible with UCX, but the benchmark isn't prepared for that case, unfortunately. IIUC, tools like ucx_perftest will first connect to the remote peer via simple sockets and only then leverage UCX capabilities for transfers, which we don't do anywhere in UCX-Py at the moment.

Would you mind sharing a bit more on what you're trying to do, in particular to what you mean by "hand written"? Depending on what you want to do UCX-Py may not be the most performant, for instance the benchmark runs by default with --backend ucp-async which is using Python's asyncio and that has a huge performance impact for small messages, whereas --backend ucp-core will be considerably better, large messages (1MB or perhaps 8MB and larger, depending on the protocol) will have minimal performance impact when compared to UCX's C implementation.

@rkooo567
Copy link
Author

rkooo567 commented Nov 3, 2023

Indeed this is possible with UCX, but the benchmark isn't prepared for that case,

What's the best way to use the shm? I also made a simple script using create_endpoint and listener, but it basically had the same error. Is it safe to assume ucx-py doesn't support shm and I should instead use the core or cpp APIs (https://github.com/rapidsai/ucxx)?

Also if I use tcp,sm, can I assume shared memory is actually used? I have the impression this will only use tcp?

Would you mind sharing a bit more on what you're trying to do, in particular to what you mean by "hand written"?

We currently use grpc for data transfer, and for some low-overhead use cases, we'd like to skip this and just simply use shared memory (basically read and write to the buffer in shared memory) for sending/receiving the data. The detail is a bit hard to explain, but it's something like this

  1. There are 2 workers
  2. And 1 server that's "object store"
  3. Worker sends IPC (socket) to object store to create a buffer in shm (mmap on /dev/shm) and gets fd.
  4. worker 1 writes to a buffer and worker 2 reads. (send). After worker 2 finishes its work, it writes to the buffer and worker 1 reads.

In terms of data size, it varies. But we are planning to microbenchmark with 1KB~100MB

I don't actually expect "faster" performance. I think close enough performance should be good enough.

@pentschev
Copy link
Member

pentschev commented Nov 3, 2023

What's the best way to use the shm? I also made a simple script using create_endpoint and listener, but it basically had the same error. Is it safe to assume ucx-py doesn't support shm and I should instead use the core or cpp APIs (https://github.com/rapidsai/ucxx)?

Using a listener wouldn't work, a listener must use an IP address to establish connections. For the UCX-Py benchmark we would thus be unable to use a UCX listener but would need to establish endpoints using the remote worker's address, similar to how this test handles endpoints/data transfer. We would first need to exchange worker addresses either via IPC (e.g., a Python queue, as done in the test), or via sockets or some other similar communication method if workers and in separate machines (although in that case no shared memory could be used, obviously).

Please note that you would also need to specify endpoint_error_handling=False to the ucp.create_endpoint_from_worker_address() call on both endpoints, as not all transports support error handling, as is the case for shared memory. Alternatively, you could specify UCX_MM_ERROR_HANDLING=y and then you don't need to disable error handling explicitly.

I could probably attempt doing that next week, but cannot promise that.

EDIT: apologies, I hit the wrong button and closed/posted the response too soon, I'll follow up with the remaining in the next comment.

@pentschev pentschev reopened this Nov 3, 2023
@pentschev
Copy link
Member

pentschev commented Nov 3, 2023

Apologies for the late reply to the second part, I got sidetracked by other errands.

Also if I use tcp,sm, can I assume shared memory is actually used? I have the impression this will only use tcp?

This will depend on your system. For instance, if workers are running on separate NUMA nodes, then SM will not be used.

One way I know you can verify this is to have a UCX debug mode build and run with UCX_LOG_LEVEL=trace and then if you look for the entries that contain rkey and tcp in the same line. For example:

# There will be lots of matching entries
$ UCX_LOG_LEVEL=trace UCX_TLS=tcp python -m ucp.benchmarks.send_recv --server-dev 1 --client-dev 1 -
-object_type numpy --reuse-alloc --n-bytes 1000000000 --server-cpu-affinity 0 --client-cpu-affinity 1 --backend ucp-async --no-error-handling | grep rkey | grep tcp
[1699040247.066415] [dgx13:2478392:0]        ucp_rkey.c:165  UCX  TRACE   rkey[0]= for md[0]=tcp
[1699040247.568450] [dgx13:2478392:0]        ucp_rkey.c:165  UCX  TRACE   rkey[0]= for md[0]=tcp
[1699040248.056164] [dgx13:2478392:0]        ucp_rkey.c:165  UCX  TRACE   rkey[0]= for md[0]=tcp
...

# If we now add sm, there will be no matches
$ UCX_LOG_LEVEL=trace UCX_TLS=tcp,sm python -m ucp.benchmarks.send_recv --server-dev 1 --client-dev 1 --object_type numpy --reuse-alloc --n-bytes 1000000000 --server-cpu-affinity 0 --client-cpu-affinity 1 --backend ucp-async --no-error-handling | grep rkey | grep tcp

We currently use grpc for data transfer, and for some low-overhead use cases, we'd like to skip this and just simply use shared memory (basically read and write to the buffer in shared memory) for sending/receiving the data. The detail is a bit hard to explain, but it's something like this

I'm assuming you're doing that for Ray, is that right? The details resemble ray-project/ray#30094 a lot.

In terms of data size, it varies. But we are planning to microbenchmark with 1KB~100MB

I don't actually expect "faster" performance. I think close enough performance should be good enough.

If that is for Ray and I remember correctly, the communication backend is implemented in C(++), isn't it? If that's the case I would absolutely point you to https://github.com/rapidsai/ucxx instead, as there you can use its C++ backend explicitly and completely forget about any Python code, additionally all the UCX-Py API is now implemented on top of UCXX and part of that repo as well, so UCX-Py is expected to be deprecated/archived in the next 3-6 months in favor of UCXX.

If you need Python, and more importantly so if you need Python async, then performance for small messages may have significant impact, but performance for large messages should have much smaller impact (I believe in the 1-2% range when compared to pure C).

EDIT: Forgot to mention initially that we need to disable endpoint error handling for SM. I now opened #1007 to add that as an argument and updated instructions above accordingly.

@rkooo567
Copy link
Author

rkooo567 commented Nov 7, 2023

Yes, it is for Ray! And ray-project/ray#30094 is great. Thanks for the great context there.

so UCX-Py is expected to be deprecated/archived in the next 3-6 months in favor of UCXX.

It is great to know. And yeah Ray is written in C++, and I would eventually expect to use C (or C++ if exists) APIs. It is great there's already Cpp API there! I wanted to use ucx-py for quick prototyping & perf benchmark, but it makes sense there's high overhead due to Python.

EDIT: Forgot to mention initially that we need to disable endpoint error handling for SM. I now opened #1007 to add that as an argument and updated instructions above accordingly.

Is it specific to Python API? Or sth I should keep in mind while using UCXX?

I will play with UCXX and get back to you with more questions! As I mentioned, the first goal is to do benchmarking (sm, tcp, RDMA, and maybe GPU) and learn the APIs better.

I have a couple of last questions.

  • Would there be any channel we can use for better communication?
  • Is there any plan to support EFA in a forseeable timeline? We actually implemented RDMA using Libfabric before due to this limitation (for that particular issue). Now we have other use case UCX fits better, but if EFA is not supported, we still need to figure out how to support EFA (IIUC AWS has no other alternative, and lots of Ray users use AWS).

@pentschev
Copy link
Member

Is it specific to Python API? Or sth I should keep in mind while using UCXX?

You should keep that in mind. This is a limitation in UCX, some transports (like shared memory) do not support endpoint error handling which we enable by default here for InfiniBand use cases where having no error handling may cause deadlocks at shutdown. I will port #1007 to UCXX as well.

Would there be any channel we can use for better communication?

The only other public channel we have is the RAPIDS Slack. For very technical questions GitHub is preferred as we keep a public record people can find though.

Is there any plan to support EFA in a forseeable timeline? We actually implemented RDMA using Libfabric before due to this limitation (for that particular issue). Now we have other use case UCX fits better, but if EFA is not supported, we still need to figure out how to support EFA (IIUC AWS has no other alternative, and lots of Ray users use AWS).

Unfortunately I'm not aware of any plans to support EFA. However, UCX is an open standard and if there's interest from AWS or other members of the community to implement EFA support in UCX that is certainly welcome. If that ever happens, UCXX would support it out-of-the-box. Also there are no plans for UCXX to support other transports on its own, the intent is to always go to UCX and let it be the communication engine.

@rkooo567
Copy link
Author

rkooo567 commented Nov 8, 2023

Hi @pentschev! I could successfully start playing with ucxx, and the initial performance seems pretty good! I am going to play with it more and start benchmark more seriously, but also I'd like to ask a couple more questions;

  1. Is TagSend and TagRecv both blocking APIs? Also, is there any doc regarding the progress mode (blocking, polling, etc.)?
  2. Is there any example code of GPU<>GPU transfer? One thing I'd try after the benchmark is to see if I can transfer torch tensor to other host via Nvlink (and in the longer term via infiniband). Do you happen to know if there's any good example documentation?
  3. It seems like there are 2 APIs; active message vs Send/Recv, and I feel like for Ray, active message seems like a better fit (since the receiver could be passive). Is the performance of two APIs on par, or one is better than the other?

I am also very new to this HPC style communication, so please bear with me if I am asking a bad question!

@wence-
Copy link
Contributor

wence- commented Nov 8, 2023

Hi @pentschev! I could successfully start playing with ucxx, and the initial performance seems pretty good! I am going to play with it more and start benchmark more seriously, but also I'd like to ask a couple more questions;

  1. Is TagSend and TagRecv both blocking APIs? Also, is there any doc regarding the progress mode (blocking, polling, etc.)?

TagSend and TagRecv are both non-blocking (they return a Request object). You can check if the request is completed using isCompleted(). So a typical pattern is (sketch):

requests = std::vector<std::shared_ptr<Request>>{};
request.push_back(send/recv);
while (!std::all_of(requests.cbegin() ,requests.cend(), [](auto r) { return r.isCompleted(); })) {
    worker->progress(); // polling mode
    // worker->progressWorkerEvent(-1); // blocking mode
    // worker->waitProgress(); // wait mode
}

The docstrings of the various progress functions on the worker object are probably your best bet right now.

  1. Is there any example code of GPU<>GPU transfer? One thing I'd try after the benchmark is to see if I can transfer torch tensor to other host via Nvlink (and in the longer term via infiniband). Do you happen to know if there's any good example documentation?

If you pass a device pointer to tagsend/tagrecv and your UCX install is built with the appropriate device transports enabled, then this should "just work".

  1. It seems like there are 2 APIs; active message vs Send/Recv, and I feel like for Ray, active message seems like a better fit (since the receiver could be passive). Is the performance of two APIs on par, or one is better than the other?

I suspect it depends a bit on your use case. Active messages will always allocate new buffers for the incoming message on the receiver (so UCX arranges to to send a header along with the message that indicates how many bytes and so forth). If you already know the size of the buffer on the receive size (and/or you want to reuse buffers) then the tag API is better.

The core implementation in UCX uses the same transports under the hood so the raw transfer of bits should be about the same performance. Active messages have a bit more book-keeping I think (not a lot) that you might have done anyway in a distributed algorithm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants