Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression slows down network comms #7655

Closed
crusaderky opened this issue Mar 15, 2023 · 16 comments · Fixed by #7768
Closed

Compression slows down network comms #7655

crusaderky opened this issue Mar 15, 2023 · 16 comments · Fixed by #7768
Assignees

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Mar 15, 2023

State of the art

Before dask either sends data over the network or writes it to disk, it tries compressing a small bit of it (10 kB) with lz4 and, if it compresses to 90% of its original size or better on the sample, it compresses the whole thing (implementation: maybe_compress).
For spill/unspill, compression/decompression blocks the event loop.
For network comms, compression/decompression runs on a thread pool with a single worker (offload) while the GIL is released (this has been fixed very recently for outbound comms; before it blocked the event loop: #7593).

Performance

Until now we did not have hard data on how beneficial compression is - just an academic expectation that "disk slow, network slow, cpu fast".
In coiled-runtime, I've just added tests [ coiled/benchmarks#714 ] that demonstrate different performance when data is compressible vs. when it is uncompressible.
coiled-runtime benchmarks run on coiled, which in turn means that all workers run on AWS EC2.

Test results are very bad.

I'm seeing that:

  • network-intensive tests that don't spill (test_array.py::*) are up to 70% slower when the workers spend time compressing and decompressing data compared to when maybe_compress gives up after it fails to compress 10 kB
  • spill-intensive tests that perform no network comms (test_spill.py::test_spilling) get a substantial speedup from compression
  • tests that are both spill-intensive and network-intensive (test_spill.py::test_dot_product_spill) don't show benefits from compression, probably because the two effects cancel each other out.

Whoops. Disk slow, cpu fast, network faster.

image

Tests were performed downstream of #7593.

In the above chart, each test ran in 3 different configurations, which differ in how the original data is generated:

uncompressible

a = da.random.random(...)

compressible

This data compresses to 42% of its original size, at a speed of 570 MiB/s (measured on lz4 4.0).

def compressible(x):
    y = x.reshape(-1)
    y[::2] = 0
    return y.reshape(x.shape)

a = da.random.random(...).map_blocks(compressible)

dummy

This was to rule out that the slowdown between compressible and uncompressible was because of the extra layer in the graph or by the introduction of cloudpickle in the serialization.

def dummy(x):
    return x.reshape(x.shape)

a = da.random.random(...).map_blocks(dummy)

Possible solutions

First of all, we need to verify that the difference is actually caused by pure waiting time for decompression and not something else (e.g. GIL).

Reinstate blosc + fix #7433

blosc was removed in #5269 over concerns on maintainability.
However, it is 5x faster than lz4 and is can work around #7433 (with additional work to just reverting #5269).
#7433 is a known major cause of slowness besides raw throughput of the C algorithm.
This is my preferred choice.

Increase number of offload threads

At the moment all compression/decompression is pipelined onto a single offload thread.
Increasing this number could be beneficial - if user functions alone are insufficient to saturate the CPU.
Thread safety of the various C libraries used for compression/decompression would need to be thoroughly verified.

Completely disable compression by default.

I don't think this is a good idea because (1) it would harm spilling and (2) it would severely harm client<->worker comms in all configurations where the client-worker bandwidth is much more limited than the worker-worker one - such as in Coiled.
From customer feedback, we know that many dask users can't read/write on cloud storage and are forced to upload/download everything from/to their laptop with scatter / gather.

Disable compression in network, keep it in spilling

Small code change needed. Same concerns as above.

Disable compression in worker-worker and scheduler-worker comms, keep it in client-worker comms and spilling

Larger and cumbersome code change needed. Very ugly IMHO.

Leave everything as it is in core dask; disable compression in coiled

This would make sense if we thought non-Coiled users had much lower inter-worker bandwidth than on Coiled, on average.
I don't think this is a good idea, as I don't have any evidence supporting this statement.

CC @fjetter @mrocklin @gjoseph92 @hendrikmakait

@crusaderky
Copy link
Collaborator Author

crusaderky commented Mar 15, 2023

Minimal benchmark for lz4

On my beastly AMD Ryzen 3950X:

On AWS EC2 m6i.large:

  • 400 MiB/s compression
  • 760 MiB/s decompression (<= 64 MiB buffers)
  • 490 MiB/s decompression (> 64 MiB buffers)
import time
import numpy
from distributed.protocol import deserialize_bytes, serialize_bytelist


x = numpy.random.random(128 * 2**20 // 8)
y = x.copy().reshape(-1)
y[::2] = 0
y = y.reshape(x.shape)

x_frames = serialize_bytelist(x)
y_frames = serialize_bytelist(y)
x_bytes = bytearray(b"".join(x_frames))
y_bytes = bytearray(b"".join(y_frames))

print("buffer (MiB)", x.nbytes / 2**20)
assert len(x_frames) == len(y_frames)
print("n. frames", len(x_frames))
print("serialized uncompressible (MiB)", len(x_bytes) / 2**20)
print("serialized compressible (MiB)", len(y_bytes) / 2**20)

def bench(func, data, size):
    N = 20
    t0 = time.perf_counter()
    for _ in range(N):
        func(data)
    t1 = time.perf_counter()
    elapsed = (t1 - t0) / N
    return size / elapsed / 2**20

print("serialize uncompressible (MiB/s)", bench(serialize_bytelist, x, x.nbytes))
print("deserialize uncompressible (MiB/s)", bench(deserialize_bytes, x_bytes, x.nbytes))
print("serialize compressible (MiB/s)", bench(serialize_bytelist, y, x.nbytes))
print("deserialize compressible (MiB/s)", bench(deserialize_bytes, y_bytes, x.nbytes))
buffer (MiB) 128.0
n. frames 4
serialized uncompressible (MiB) 128.000226020813
serialized compressible (MiB) 72.65573978424072
serialize uncompressible (MiB/s) 1457635.4768711757
deserialize uncompressible (MiB/s) 4311637.046941704
serialize compressible (MiB/s) 678.4610149287204
deserialize compressible (MiB/s) 908.7458341590923

@crusaderky
Copy link
Collaborator Author

On my beastly AMD Ryzen 3950X:

With blosc:

  • 4.5 GiB/s compression
  • 5.2 GiB/s decompression

@milesgranger
Copy link
Contributor

A selfless plug for cramjam's lz4 block, which is slightly faster especially w/ larger sizes and supports single allocation de/compression if the output size is already known. Albeit, I guess that'd only be most common for decompression here. fastparquet has been using it for a while now.

@fjetter
Copy link
Member

fjetter commented Mar 16, 2023

Context for those who are not intimately familiar with how we do compression. This is done in

@context_meter.meter("compress")
def maybe_compress(
payload,
min_size=10_000,
sample_size=10_000,
nsamples=5,
compression=no_default,
):
"""
Maybe compress payload
1. We don't compress small messages
2. We sample the payload in a few spots, compress that, and if it doesn't
do any good we return the original
3. We then compress the full original, it it doesn't compress well then we
return the original
4. We return the compressed result
"""
if compression is no_default:
compression = dask.config.get("distributed.comm.compression")
if not compression:
return None, payload
if not (min_size <= nbytes(payload) <= 2**31):
# Either too small to bother
# or too large (compression libraries often fail)
return None, payload
# Normalize function arguments
if compression == "auto":
compression = default_compression
compress = compressions[compression]["compress"]
# Take a view of payload for efficient usage
mv = ensure_memoryview(payload)
# Try compressing a sample to see if it compresses well
sample = byte_sample(mv, sample_size, nsamples)
if len(compress(sample)) <= 0.9 * sample.nbytes:
# Try compressing the real thing and check how compressed it is
compressed = compress(mv)
if len(compressed) <= 0.9 * mv.nbytes:
return compression, compressed
# Skip compression as the sample or the data didn't compress well
return None, payload

and works through a couple of steps

  1. sample bytes (5 samples of contiguous 10KB).
  2. The five samples are concatenated and are compressed
  3. If the compressed sample are compressed by at least 10% we proceed to compress the entire payload
  4. After compressing the entire payload, we again check if it's size is reduced by at least 10%. If the size is indeed reduced, the compressed bytes + header are returned and submitted. Else we throw away the compressed bytes and submit the raw bytes to avoid decompression cost on the other side.

So the question "how fast does or compression algo need to be?" depends on this cutoff.

The breakeven point can be calculated roughly by

# no compression time >= time for compressed bytes + compression + decompression
size / network >= new_size / network + size / compression + new_size / compression
new_size = compression_ratio * size

...

compression_ratio <= (compression - network) / (compression + network)

which, given a bandwidth of 1GiBit/s (125MiB/s) gives a cutoff of about

Compression rate (symmetric) [MiB/s] Breakeven ratio
600 (slow case for lz4) 0.655
1100 (fast case for lz4) 0.80
5000 (blosc) 0.95

so... indeed, this looks like lz4 is a loss on performance, even on paper, unless we are significantly reducing this threshold. This does not even account for us potentially throwing away the compressed bytes again because the sampling looked good but the real data didn't (not sure how likely this is, I guess the other way round is more likely)

@crusaderky
Copy link
Collaborator Author

crusaderky commented Mar 16, 2023

because the sampling looked good but the real data didn't (not sure how likely this is, I guess the other way round is more likely)

A realistic use case for both is that you have a chunk that start with an area full of zeros or another constant and later becomes random-ish (sample is compressible, total isn't), or the other way around (sample is uncompressible, total is).
All in all I don't hate the current sampling heuristic - I think it's good enough in most cases.

@fjetter
Copy link
Member

fjetter commented Mar 16, 2023

  • I think it's good enough in most cases.

I'm just curious how often this happens since this represents the absolute worst case. I'm not suggesting to drop or change this sampling.

@mrocklin
Copy link
Member

A realistic use case for both is that you have a chunk that start with an area full of zeros or another constant and later becomes random-ish

FWIW we currently sample five random sections in order to avoid situations like this.

@mrocklin
Copy link
Member

In general though, if we learned that "compression is bad" I'd be very happy to have all of that functionality go away. Simple is good.

@mrocklin
Copy link
Member

Kudos, also, for finding this. I'm excited by the potential optimization.

@crusaderky
Copy link
Collaborator Author

A selfless plug for cramjam's lz4 block, which is slightly faster especially w/ larger sizes and supports single allocation de/compression if the output size is already known. Albeit, I guess that'd only be most common for decompression here. fastparquet has been using it for a while now.

That's impressive. FWIW, I could not make blosc.decompress_ptr work due to its awful API - I suspect it will require some C-level hack. blosc2 fixes the issue but is not available on conda (and compression is much slower).

import numpy
import lz4.block
import snappy
import cramjam
import blosc
import blosc2

x = numpy.random.random(64 * 2**20 // 8)
x[::2] = 0
b = x.tobytes()

print("=== lz4 ===")
c = lz4.block.compress(b)
print(len(c) / len(b))
assert lz4.block.decompress(c) == b
%timeit lz4.block.compress(b)
%timeit lz4.block.decompress(c)

print("=== snappy ===")
c = snappy.compress(b)
print(len(c) / len(b))
assert snappy.decompress(c) == b
%timeit snappy.compress(b)
%timeit snappy.decompress(c)

print("=== cramjam.lz4 ===")
c = cramjam.lz4.compress_block(b)
print(len(c) / len(b))
assert bytes(cramjam.lz4.decompress_block(c)) == b
d = bytearray(len(b))
cramjam.lz4.decompress_block_into(c, output=d)
assert d == b
%timeit cramjam.lz4.compress_block(b)
%timeit cramjam.lz4.decompress_block(c)
%timeit cramjam.lz4.decompress_block_into(c, output=d)

print("=== cramjam.snappy ===")
c = cramjam.snappy.compress(b)
print(len(c) / len(b))
assert bytes(cramjam.snappy.decompress(c)) == b
d = bytearray(len(b))
cramjam.snappy.decompress_into(c, output=d)
# assert d == b  # Fails
%timeit cramjam.snappy.compress(b)
%timeit cramjam.snappy.decompress(c)
# %timeit cramjam.snappy.decompress_into(c, output=d)

print("=== blosc ===")
c = blosc.compress(b, typesize=8)
print(len(c) / len(b))
assert blosc.decompress(c) == b
%timeit blosc.compress(b, typesize=8)
%timeit blosc.decompress(c)
#%timeit blosc.decompress_ptr(c, id(d) + ???)

print("=== blosc2 ===")
c = blosc2.compress(b, typesize=8)
print(len(c) / len(b))
assert blosc2.decompress(c) == b
d = bytearray(len(b))
blosc2.decompress(c, dst=d)
assert d == b
%timeit blosc2.compress(b, typesize=8)
%timeit blosc2.decompress(c)
%timeit blosc2.decompress(c, dst=d)

=== lz4 ===
0.567610889673233
87.7 ms ± 575 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
48.6 ms ± 108 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=== snappy ===
0.5744811147451401
82 ms ± 708 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
54.6 ms ± 158 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=== cramjam.lz4 ===
0.567610889673233
83.5 ms ± 301 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
28.3 ms ± 198 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
9.74 ms ± 102 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
=== cramjam.snappy ===
0.5746490508317947
90.7 ms ± 1.33 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
2.47 µs ± 22.6 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
=== blosc ===
0.8442769050598145
15.2 ms ± 146 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
13.3 ms ± 172 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
=== blosc2 ===
0.8443719297647476
45 ms ± 342 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
15.8 ms ± 559 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
9.29 ms ± 80.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

cramjam.snappy.decompress_into doesn't seem to work, and the performance of cramjam.snappy.decompress is suspicious. But cramjam.lz4 looks like the best of the lot to me.

@fjetter
Copy link
Member

fjetter commented Mar 16, 2023

It's also worth noting that microbenchmarks and back-of-the-envelope calculations will not necessarily decide this for us. CPUs on workers are typically busy with other things (at least assuming some computation and network are overlapping) so a realistic compression rate is possibly much slower that what we're measuring here.

@milesgranger
Copy link
Contributor

milesgranger commented Mar 16, 2023

cramjam.snappy.decompress_into doesn't seem to work...

That's because the standard out of cramjam is a file-like buffer, and when that goes into another de/compression it reads it to the end, then subsequent calls act just like io.Buffer and will read zero bytes. Then zero bytes to de/compress. Here is some slightly modified version of the cramjam stuff, just getting the buffer view and using that via bytes for example.

Also note python-snappy's .compress is the raw format, and cramjam uses .de/compress_raw to denote that, cramjam.snappy.compress is the snappy framed/streaming format.

import numpy
import lz4.block
import snappy
import cramjam
import blosc
import blosc2

x = numpy.random.random(64 * 2**20 // 8)
x[::2] = 0
b = x.tobytes()

print("=== lz4 ===")
c = lz4.block.compress(b)
print(len(c) / len(b))
assert lz4.block.decompress(c) == b
%timeit lz4.block.compress(b)
%timeit lz4.block.decompress(c)

print("=== cramjam.lz4 ===")
c = bytes(cramjam.lz4.compress_block(b))
print(len(c) / len(b))
assert bytes(cramjam.lz4.decompress_block(c)) == b
d = bytearray(len(b))
cramjam.lz4.decompress_block_into(c, output=d)
assert d == b
%timeit cramjam.lz4.compress_block(b)
%timeit cramjam.lz4.decompress_block(c)
%timeit cramjam.lz4.decompress_block_into(c, output=d)

print("=== snappy ===")
c = snappy.compress(b)
print(len(c) / len(b))
assert snappy.decompress(c) == b
%timeit snappy.compress(b)
%timeit snappy.decompress(c)

print("=== cramjam.snappy raw ===")
c = bytes(cramjam.snappy.compress_raw(b))
print(len(c) / len(b))
assert bytes(cramjam.snappy.decompress_raw(c)) == b
d = bytearray(len(b))
cramjam.snappy.decompress_raw_into(c, output=d)
assert d == b
%timeit cramjam.snappy.compress_raw(b)
%timeit cramjam.snappy.decompress_raw(c)
%timeit cramjam.snappy.decompress_raw_into(c, output=d)

print("=== cramjam.snappy ===")
c = bytes(cramjam.snappy.compress(b))
print(len(c) / len(b))
assert bytes(cramjam.snappy.decompress(c)) == b
d = bytearray(len(b))
cramjam.snappy.decompress_into(c, output=d)
assert d == b
%timeit cramjam.snappy.compress(b)
%timeit cramjam.snappy.decompress(c)
%timeit cramjam.snappy.decompress_into(c, output=d)

print("=== blosc ===")
c = blosc.compress(b, typesize=8)
print(len(c) / len(b))
assert blosc.decompress(c) == b
%timeit blosc.compress(b, typesize=8)
%timeit blosc.decompress(c)
#%timeit blosc.decompress_ptr(c, id(d) + ???)

print("=== blosc2 ===")
c = blosc2.compress(b, typesize=8)
print(len(c) / len(b))
assert blosc2.decompress(c) == b
d = bytearray(len(b))
blosc2.decompress(c, dst=d)
assert d == b
%timeit blosc2.compress(b, typesize=8)
%timeit blosc2.decompress(c)
%timeit blosc2.decompress(c, dst=d)

=== lz4 ===
0.567649319767952
89.4 ms ± 2.67 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
55.4 ms ± 641 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=== cramjam.lz4 ===
0.567649319767952
86.3 ms ± 573 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
36.5 ms ± 243 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
12 ms ± 22 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
=== snappy ===
0.5744812339544296
73.9 ms ± 277 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
60.9 ms ± 2.45 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
=== cramjam.snappy raw ===
0.5744812339544296
87.6 ms ± 1.63 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
47.2 ms ± 528 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
23.2 ms ± 255 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=== cramjam.snappy ===
0.5746491700410843
81.7 ms ± 791 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
61 ms ± 178 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
34 ms ± 138 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
=== blosc ===
0.8442404121160507
15.7 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
10.6 ms ± 207 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
=== blosc2 ===
0.8434544950723648
39.8 ms ± 1.55 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
11.9 ms ± 229 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
6.67 ms ± 257 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Also, I don't think it'd be absurd to add C-Blosc2 bindings to cramjam either.

@fjetter
Copy link
Member

fjetter commented Mar 21, 2023

I'm not entirely convinced about using cramjam, or even keeping compression at all (for network). What worries me the most is the CPU load of user tasks (and the server itself considering this is offloaded). When I perform the above measurements with load on my CPU I see severe degradation of performance. When all my CPUs are under load (while true; i += 1) compression rate drops by about 2.5x while decompression rate drops by about 10x. These drops very likely depend severely on hardware and architecture but given the benchmarks so far, the only case where keeping network compression is beneficial is slow network OR if we set the threshold compression ratio significantly down from 90% to something like 50% just to ensure we're not loosing on this deal (not even talking about additional memory requirements here). If we need to lower it by that much, I wonder how valuable this still is or whether we should not just rip it out (and everything belonging to it, e.g. comm handshakes)

@mrocklin
Copy link
Member

My gut reaction is to change the default compression to None in the next release for network comms. Mostly I like this because it's simple and removes a moving part from the system, which is good.

I hear @crusaderky 's concerns about Client-Scheduler communications, but I think that a 2x difference there won't be that big of a deal (it's either very good or very bad today, so 2x doesn't necessarily meaningfully change things).

@crusaderky
Copy link
Collaborator Author

When all my CPUs are under load (while true; i += 1) compression rate drops by about 2.5x while decompression rate drops by about 10x.

That's because cramjam today doesn't release the GIL - a known issue that @milesgranger is looking into and which is obviously a showstopper to the adoption in dask.

@crusaderky crusaderky self-assigned this Apr 8, 2023
@crusaderky
Copy link
Collaborator Author

From an async conversation with @fjetter:

While cramjam and blosc2 would be faster than lz4 (at least in decompression), the problem remains that you need to manually install them - as optional dependencies, nothing will pull them in. So a substantial amount of users will continue having lz4 installed for whatever reason but not our preferred libraries.
To fix this we could stop automatically defaulting to lz4 when cramjam is not installed, but that would slow down users of slow networks immediately upon upgrade.

So we verged in favour of having a switch for scheduler/worker<->worker comms and a separate one for client<->scheduler/worker.

I briefly fiddled with the idea of having compression settings that depend on the IP of your peer:

compression:
    localhost: False  # 127.0.0.x
    lan: False  # 10.x.x.x and 192.168.x.x
    internet: auto  # everything else

The biggest benefit would be that it would automatically speed up client comms on LocalClusters without any need for configuration. However, it would completely fail clients connecting through a VPN into a corporate network, as their perceived IP address would be 10.x.

So I think I will go for:

compression:
    remote-client: auto  # client->scheduler and client->worker on LAN or internet
    remote-worker:  False # worker->scheduler and worker->worker on LAN or internet
    localhost: False  # any actor connecting to another actor on 127.0.0.x

which covers the LocalCluster use case nicely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants