Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable compression for fast comms #7768

Merged
merged 10 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from dask.utils import parse_timedelta

from distributed.comm import registry
from distributed.comm.addressing import parse_address
from distributed.comm.addressing import get_address_host, parse_address, resolve_address
from distributed.metrics import time
from distributed.protocol.compression import get_default_compression
from distributed.protocol.compression import get_compression_settings
from distributed.protocol.pickle import HIGHEST_PROTOCOL
from distributed.utils import wait_for

Expand Down Expand Up @@ -113,12 +113,23 @@ def closed(self):
@property
@abstractmethod
def local_address(self) -> str:
"""The local address. For logging and debugging purposes only."""
"""The local address"""

@property
@abstractmethod
def peer_address(self) -> str:
"""The peer's address. For logging and debugging purposes only."""
"""The peer's address"""

@property
def same_host(self) -> bool:
"""Return True if the peer is on localhost; False otherwise"""
local_ipaddr = get_address_host(resolve_address(self.local_address))
peer_ipaddr = get_address_host(resolve_address(self.peer_address))

# Note: this is not the same as testing `peer_ipaddr == "127.0.0.1"`.
# When you start a Server, by default it starts listening on the LAN interface,
# so its advertised address will be 10.x or 192.168.x.
return local_ipaddr == peer_ipaddr

@property
def extra_info(self):
Expand All @@ -129,16 +140,49 @@ def extra_info(self):
"""
return {}

@staticmethod
def handshake_info():
def handshake_info(self) -> dict[str, Any]:
"""Share environment information with the peer that may differ, i.e. compression
settings.

Notes
-----
By the time this method runs, the "auto" compression setting has been updated to
an actual compression algorithm. This matters if both peers had compression set
to 'auto' but only one has lz4 installed. See
distributed.protocol.compression._update_and_check_compression_settings()

See also
--------
handshake_configuration
"""
if self.same_host:
compression = None
else:
compression = get_compression_settings("distributed.comm.compression")

return {
"compression": get_default_compression(),
"compression": compression,
"python": tuple(sys.version_info)[:3],
"pickle-protocol": HIGHEST_PROTOCOL,
}

@staticmethod
def handshake_configuration(local, remote):
def handshake_configuration(
local: dict[str, Any], remote: dict[str, Any]
) -> dict[str, Any]:
"""Find a configuration that is suitable for both local and remote

Parameters
----------
local
Output of handshake_info() in this process
remote
Output of handshake_info() on the remote host

See also
--------
handshake_info
"""
try:
out = {
"pickle-protocol": min(
Expand Down
4 changes: 4 additions & 0 deletions distributed/comm/inproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ def local_address(self) -> str:
def peer_address(self) -> str:
return self._peer_addr

@property
def same_host(self) -> bool:
return True

async def read(self, deserializers="ignored"):
if self._closed:
raise CommClosedError()
Expand Down
15 changes: 3 additions & 12 deletions distributed/comm/tests/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from distributed.comm.core import FatalCommClosedError
from distributed.comm.registry import backends, get_backend
from distributed.comm.tests.test_comms import check_tls_extra
from distributed.compatibility import randbytes
from distributed.security import Security
from distributed.utils_test import (
gen_cluster,
Expand Down Expand Up @@ -128,18 +129,8 @@ async def test_collections(c, s, a, b):

@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"})
async def test_large_transfer(c, s, a, b):
np = pytest.importorskip("numpy")
await c.scatter(np.random.random(1_000_000))


@gen_test()
async def test_large_transfer_with_no_compression():
np = pytest.importorskip("numpy")
with dask.config.set({"distributed.comm.compression": None}):
async with Scheduler(protocol="ws://") as s:
async with Worker(s.address, protocol="ws://"):
async with Client(s.address, asynchronous=True) as c:
await c.scatter(np.random.random(1_500_000))
x = await c.scatter(randbytes(12_000_000))
await c.gather(x)


@pytest.mark.parametrize(
Expand Down
21 changes: 21 additions & 0 deletions distributed/comm/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import socket
import struct
import warnings
import weakref
Expand Down Expand Up @@ -173,6 +174,16 @@ def peer_address(self) -> str:
assert isinstance(ip, str)
return ip + ":0"

@property
def same_host(self) -> bool:
"""Override Comm.same_host, adding support for HTTP-only subdomains, which won't
have a port and that may not be known to the DNS service
"""
try:
return super().same_host
except (ValueError, socket.gaierror):
return False

def closed(self):
return (
self.handler.closed
Expand Down Expand Up @@ -285,6 +296,16 @@ def local_address(self) -> str:
def peer_address(self) -> str:
return f"{self.prefix}{self.sock.parsed.netloc}"

@property
def same_host(self) -> bool:
"""Override Comm.same_host, adding support for HTTP-only subdomains, which won't
have a port and that may not be known to the DNS service
"""
try:
return super().same_host
except (ValueError, socket.gaierror):
return False

def _read_extra(self):
pass

Expand Down
21 changes: 16 additions & 5 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,15 @@ properties:
description: >-
Interval between checks for the spill, pause, and terminate thresholds

spill-compression:
enum: [null, false, auto, zlib, lz4, snappy, zstd]
description:
The compression algorithm to use. 'auto' defaults to lz4 if installed,
otherwise to snappy if installed, otherwise to false. zlib and zstd
are only used if explicitly requested here. Uncompressible data is
always uncompressed, regardless of this setting.
See also distributed.comm.compression.

http:
type: object
description: Settings for Dask's embedded HTTP Server
Expand Down Expand Up @@ -772,11 +781,13 @@ properties:
description: The maximum delay between retries

compression:
type: string
description: |
The compression algorithm to use

This could be one of lz4, snappy, zstd
enum: [null, false, auto, zlib, lz4, snappy, zstd]
description:
The compression algorithm to use. 'auto' defaults to lz4 if installed,
otherwise to snappy if installed, otherwise to false. zlib and zstd are
only used if explicitly requested here. Uncompressible data and transfers
on localhost are always uncompressed, regardless of this setting.
See also distributed.worker.memory.spill-compression.

offload:
type:
Expand Down
4 changes: 3 additions & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ distributed:
# Set to false for no maximum.
max-spill: false

spill-compression: auto # See also: distributed.comm.compression

# Interval between checks for the spill, pause, and terminate thresholds.
# The target threshold is checked every time new data is inserted.
monitor-interval: 100ms
Expand Down Expand Up @@ -217,7 +219,7 @@ distributed:
delay:
min: 1s # the first non-zero delay between re-tries
max: 20s # the maximum delay between re-tries
compression: auto
compression: false # See also: distributed.worker.memory.spill-compression
shard: 64MiB
offload: 10MiB # Size after which we choose to offload serialization to another thread
default-scheme: tcp
Expand Down
1 change: 0 additions & 1 deletion distributed/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from contextlib import suppress
from functools import partial

from distributed.protocol.compression import compressions, default_compression
from distributed.protocol.core import decompress, dumps, loads, maybe_compress, msgpack
from distributed.protocol.cuda import cuda_deserialize, cuda_serialize
from distributed.protocol.serialize import (
Expand Down
Loading