Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable compression for fast comms #7768

Merged
merged 10 commits into from
May 3, 2023
Merged

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Apr 11, 2023

  • Closes Compression slows down network comms  #7655
  • Disable compression by default for all network comms
  • Disable compression (hardcoded) for network comms on localhost, regardless of settings
  • Separate compression settings for spilling
  • Reduce the threshold for compression from 0.9 to 0.7 (a buffer won't be compressed if it doesn't shrink down to at least 70% of its original size, even if compression is enabled).

Copy link
Contributor

@milesgranger milesgranger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one suggestion, but otherwise looks great. Would be handy to remove the optional compression codecs for cramjam later 😜

distributed/comm/addressing.py Outdated Show resolved Hide resolved
@github-actions
Copy link
Contributor

github-actions bot commented Apr 11, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  +       1         26 suites  +1   15h 47m 47s ⏱️ + 1h 55m 3s
  3 576 tests +     20    3 467 ✔️ +     25     105 💤  -     2  3  - 4  1 🔥 +1 
45 277 runs  +3 571  43 110 ✔️ +3 365  2 162 💤 +209  4  - 4  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit 0e6f7f0. ± Comparison against base commit 57ae3e7.

This pull request removes 28 and adds 48 tests. Note that renamed tests count towards both.
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
distributed.comm.tests.test_ws ‑ test_large_transfer_with_no_compression
distributed.protocol.tests.test_protocol ‑ test_compression_config[None-None]
distributed.protocol.tests.test_protocol ‑ test_compression_config[auto-None]
distributed.protocol.tests.test_protocol ‑ test_compression_config[auto-lz4]
distributed.protocol.tests.test_protocol ‑ test_compression_config[foo-ValueError]
distributed.protocol.tests.test_protocol ‑ test_compression_config[zlib-zlib]
distributed.protocol.tests.test_protocol ‑ test_compression_thread_safety[None-None]
distributed.protocol.tests.test_protocol ‑ test_compression_thread_safety[lz4-lz4]
…
distributed.protocol.tests.test_compression ‑ test_compress_remote_comms
distributed.protocol.tests.test_compression ‑ test_disable_compression_on_localhost[127.0.0.1]
distributed.protocol.tests.test_compression ‑ test_disable_compression_on_localhost[None]
distributed.protocol.tests.test_compression ‑ test_disable_compression_on_localhost[localhost]
distributed.protocol.tests.test_protocol ‑ test_auto_compression
distributed.protocol.tests.test_protocol ‑ test_compression_thread_safety[None-bytes]
distributed.protocol.tests.test_protocol ‑ test_compression_thread_safety[None-memoryview]
distributed.protocol.tests.test_protocol ‑ test_compression_thread_safety[lz4-bytes]
distributed.protocol.tests.test_protocol ‑ test_compression_thread_safety[lz4-memoryview]
distributed.protocol.tests.test_protocol ‑ test_compression_thread_safety[snappy-bytes]
…

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the compression branch 4 times, most recently from f4fe2ea to 6c30e8d Compare April 12, 2023 12:45
@crusaderky
Copy link
Collaborator Author

The current state of this PR is that it is done, with one major flaw: Client.gather(..., direct=False) (which is the default, and the only available option on Coiled) uses remote-worker compression settings instead of remote-client. With the new default config, it means that if the client retrieves data through a somewhat narrowband connection, there won't be any compression and a user will likely observe a slowdown compared to main.

At the moment I cannot find a decently-looking fix for it.

@crusaderky
Copy link
Collaborator Author

Tightly related: #7774

@fjetter
Copy link
Member

fjetter commented Apr 24, 2023

The current state of this PR is that it is done, with one major flaw: Client.gather(..., direct=False) (which is the default, and the only available option on Coiled) uses remote-worker compression settings instead of remote-client.

Can you elaborate?

I haven't tested anything for this but when gathering over the scheduler, the data is passing two conncetions

  1. Worker->Scheduler (uncompressed)
  2. Scheduler->Client (compressed)

You say 2.) is not compressed in this case. Why is that?

@fjetter
Copy link
Member

fjetter commented Apr 24, 2023

Even if this was true, the data submitted between workers should be typically orders of magnitude more than the data submitted to the client. My guess is this should typically still be a net positive effect on wall time

@crusaderky
Copy link
Collaborator Author

The current state of this PR is that it is done, with one major flaw: Client.gather(..., direct=False) (which is the default, and the only available option on Coiled) uses remote-worker compression settings instead of remote-client.

Can you elaborate?

I haven't tested anything for this but when gathering over the scheduler, the data is passing two conncetions

  1. Worker->Scheduler (uncompressed)
  2. Scheduler->Client (compressed)

You say 2.) is not compressed in this case. Why is that?

Because the data does not get deserialized and then serialized again when it goes through the scheduler. So it's serialized and compressed once, using the settings established during the worker<->scheduler comms, and then does not change afterwards.
For the same reason if you have lz4 installed on both worker and scheduler, but not the client, then gather(direct=False) will fail: #7774.

@fjetter
Copy link
Member

fjetter commented Apr 25, 2023

Because the data does not get deserialized and then serialized again when it goes through the scheduler. So it's serialized and compressed once, using the settings established during the worker<->scheduler comms, and then does not change afterwards.

Correct, it is not being deserialized (thankfully) but the serialization and compression is decoupled to a certain degree.

Currently it is hard coded that we will not reconsider compression on a Serialized object but that does not need to be true necessarily.

IIUC this is controlled in this section here

if isinstance(obj, Serialized):
sub_header, sub_frames = obj.header, obj.frames
else:
sub_header, sub_frames = serialize_and_split(
obj,
serializers=serializers,
on_error=on_error,
context=context,
size=frame_split_size,
)
_inplace_compress_frames(sub_header, sub_frames)

And we may just need to call _inplace_compress_frames on Serialized as well and modify _inplace_compress_frames to be a no-op for already compressed frames.

min_size: int = 10_000,
sample_size: int = 10_000,
nsamples: int = 5,
min_ratio: float = 0.7,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Down from 0.9

@@ -0,0 +1,52 @@
from __future__ import annotations
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to move all compression-specific tests from test_protocol.py to here.
I'll do it in a successive PR in order to keep code review simple.

@crusaderky
Copy link
Collaborator Author

After async conversation, I removed all the complex and conceptually flawed machinery around distingushing client vs. worker vs. scheduler, and retained only a separate switch for network vs. spill.
See updated description for the detailed description of the final functionality delivered.

This PR is now ready for review.

A/B tests

All tests are vs. mainline git tip (lz4 used for both network and spill)

This PR (uncompressed network, lz4 on spill) delivers as expected, with a boost in network performance and no degradation in spill performance.
(test_single_future and test_join_big_small[0.1] are known offenders for noise and should be ignored)
AB_default

I also ran A/B for other configurations, for completeness' sake:

  • This same PR, with lz4 enabled for both network and spill, shows no difference vs. baseline. plot omitted.
  • snappy on both network and spill shows identical performance to lz4. plot omitted.
  • zstandard on both network and spill shows severe degradation throughout the board. Plot and discussion at Drop support for zstandard #7810.
  • No compression whatsoever, neither on network nor on spill, shows a degradation in spill performance and a boost in network performance, as expected:
    AB_nocomp

@crusaderky crusaderky changed the title [WIP] Disable compression for fast comms Disable compression for fast comms Apr 27, 2023
@crusaderky crusaderky marked this pull request as ready for review April 27, 2023 14:52
@crusaderky
Copy link
Collaborator Author

crusaderky commented Apr 27, 2023

I also A/B tested a very early prototype of integration with cramjam-lz4, which shows a very mild speedup vs. regular lz4. This is not so surprising, as I did not implement the extra machinery for decompress_into so it's still afflicted by an unnecessary deep-copy.

Protype here: https://github.com/crusaderky/distributed/tree/cramjam

AB_cramjam

@milesgranger
Copy link
Contributor

milesgranger commented Apr 27, 2023

Awesome, and interesting about cramjam, thanks for taking the time. :)

I did not implement the extra machinery for decompress_into so it's still afflicted by an unnecessary deep-copy.

Hope this shouldn't be too awful, Suppose one could keep around a cramjam.Buffer and de/compress_into that, something like:

self.buf = cramjam.Buffer()

...

self.buf.seek(0)
nbytes = cramjam.lz4.compress_into(data, self.buf)
self.buf.seek(0)
return self.buf.read(nbytes)

So it'd grow to the max allocation, but kept around to avoid any additional allocations.

@crusaderky
Copy link
Collaborator Author

@milesgranger I was actually talking about decompression only. I didn't really plan to use compress_into. Wouldn't we need one buffer per thread calling cramjam? I wouldn't be terribly happy with an extra 90 MiB[1] RAM usage per worker.

[1] 64 MiB (distributed.comm.shard) * 0.7 (min_ratio parameter of maybe_compress) * 2 threads (main thread + offload thread).
This would worsen if we chose to increase the number of offload threads in the future.

If you just call compress(), does cramjam need to enlarge the buffer multiple times? If so, is there a way to tell it to create an oversized buffer straight away and then shrink it once at the end instead?

@milesgranger
Copy link
Contributor

All compress variants take a output_len which will do a single allocation for the output buffer. Otherwise it grows the buffer using normal Vec growth strategy

@milesgranger
Copy link
Contributor

Alternatively, you can use cramjam.Buffer.set_len and truncate before/after but not sure that'll save much/anything over setting output_len

@fjetter
Copy link
Member

fjetter commented May 3, 2023

what's missing here?

@crusaderky
Copy link
Collaborator Author

what's missing here?

just the review.

Copy link
Contributor

@milesgranger milesgranger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked through as best I could, and seems good to me. Albeit I could very well have missed any subtle issues.

@crusaderky crusaderky merged commit 2ab8cbd into dask:main May 3, 2023
@crusaderky crusaderky deleted the compression branch May 3, 2023 13:13
@jrbourbeau
Copy link
Member

jrbourbeau commented May 3, 2023

I noticed that the dask/dask documentation build started failing recently (https://readthedocs.org/projects/dask/builds/20373322/). My current best guess is it's related to the changes here somehow (based on the traceback). @crusaderky any thoughts on what might be going on?

EDIT: Looks like some dask config parsing / validation is failing

@crusaderky
Copy link
Collaborator Author

@jrbourbeau I'll have a look at it

@jrbourbeau
Copy link
Member

👍 thanks @crusaderky! Unfortunately I've not been able to reproduce locally yet...

@fjetter
Copy link
Member

fjetter commented May 4, 2023

EDIT: Looks like some dask config parsing / validation is failing

I encountered similar problems locally after we switched to pyproject.toml

@jrbourbeau
Copy link
Member

Noting that the docs build has magically fixed itself ✨

milesgranger pushed a commit to milesgranger/distributed that referenced this pull request May 15, 2023
rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this pull request Jun 5, 2023
Spill to disk compression was introduced in dask/distributed#7768 and Dask-CUDA should also allow modifying the default compression via Dask config. This change is required to support `distributed>=2023.5.0`.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #1190
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Compression slows down network comms
4 participants