Skip to content

Commit

Permalink
Add meaningful error for out of disk exception during write (#8886)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Oct 8, 2024
1 parent 36020d6 commit 004fafb
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 7 deletions.
10 changes: 9 additions & 1 deletion distributed/shuffle/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
from distributed.protocol.serialize import ToPickle
from distributed.shuffle._comms import CommShardsBuffer
from distributed.shuffle._disk import DiskShardsBuffer
from distributed.shuffle._exceptions import P2PConsistencyError, ShuffleClosedError
from distributed.shuffle._exceptions import (
P2PConsistencyError,
P2POutOfDiskError,
ShuffleClosedError,
)
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._memory import MemoryShardsBuffer
from distributed.utils import run_in_executor_with_context, sync
Expand Down Expand Up @@ -508,6 +512,8 @@ def handle_transfer_errors(id: ShuffleId) -> Iterator[None]:
raise Reschedule()
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e

Expand All @@ -522,6 +528,8 @@ def handle_unpack_errors(id: ShuffleId) -> Iterator[None]:
raise Reschedule()
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"P2P shuffling {id} failed during unpack phase") from e

Expand Down
18 changes: 14 additions & 4 deletions distributed/shuffle/_disk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import errno
import pathlib
import shutil
import threading
Expand All @@ -12,7 +13,7 @@

from distributed.metrics import context_meter, thread_time
from distributed.shuffle._buffer import ShardsBuffer
from distributed.shuffle._exceptions import DataUnavailable
from distributed.shuffle._exceptions import DataUnavailable, P2POutOfDiskError
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._pickle import pickle_bytelist
from distributed.utils import Deadline, empty_context, log_errors, nbytes
Expand Down Expand Up @@ -177,12 +178,21 @@ async def _process(self, id: str, shards: list[Any]) -> None:
if self._closed:
raise RuntimeError("Already closed")

with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)

try:
self._write_frames(frames, id)
except OSError as e:
if e.errno == errno.ENOSPC:
raise P2POutOfDiskError from e
raise
context_meter.digest_metric("disk-write", 1, "count")
context_meter.digest_metric("disk-write", sum(map(nbytes, frames)), "bytes")

def _write_frames(
self, frames: Iterable[bytes | bytearray | memoryview], id: str
) -> None:
with open(self.directory / str(id), mode="ab") as f:
f.writelines(frames)

def read(self, id: str) -> Any:
"""Read a complete file back into memory"""
self.raise_on_exception()
Expand Down
9 changes: 9 additions & 0 deletions distributed/shuffle/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ class ShuffleClosedError(P2PConsistencyError):

class DataUnavailable(Exception):
"""Raised when data is not available in the buffer"""


class P2POutOfDiskError(OSError):
def __str__(self) -> str:
return (
"P2P ran out of available disk space while temporarily storing transferred data. "
"Please make sure that P2P has enough disk space available by increasing the number of "
"workers or the size of the attached disk."
)
8 changes: 7 additions & 1 deletion distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
handle_transfer_errors,
handle_unpack_errors,
)
from distributed.shuffle._exceptions import DataUnavailable, P2PConsistencyError
from distributed.shuffle._exceptions import (
DataUnavailable,
P2PConsistencyError,
P2POutOfDiskError,
)
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
from distributed.sizeof import sizeof
Expand Down Expand Up @@ -107,6 +111,8 @@ def shuffle_barrier(id: ShuffleId, run_ids: list[int]) -> int:
raise e
except P2PConsistencyError:
raise
except P2POutOfDiskError:
raise
except Exception as e:
raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e

Expand Down
33 changes: 32 additions & 1 deletion distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import contextlib
import errno
import itertools
import logging
import os
Expand All @@ -21,6 +22,7 @@
from dask.utils import key_split

from distributed.shuffle._core import ShuffleId, ShuffleRun, barrier_key
from distributed.shuffle._disk import DiskShardsBuffer
from distributed.worker import Status

np = pytest.importorskip("numpy")
Expand All @@ -47,7 +49,7 @@
read_from_disk,
serialize_table,
)
from distributed.shuffle._exceptions import P2PConsistencyError
from distributed.shuffle._exceptions import P2PConsistencyError, P2POutOfDiskError
from distributed.shuffle._limiter import ResourceLimiter
from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
from distributed.shuffle._shuffle import (
Expand Down Expand Up @@ -2039,6 +2041,35 @@ async def _receive(self, data: list[tuple[int, bytes]]) -> None:
await asyncio.gather(*[s.close() for s in [sA, sB]])


@gen_cluster(client=True)
async def test_meaningful_out_of_disk_error(c, s, a, b):
class OutOfDiskShardsBuffer(DiskShardsBuffer):
def _write_frames(self, frames, id):
code = errno.ENOSPC
raise OSError(code, os.strerror(code))

df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="10 s",
)
with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": True}
):
shuffled = df.shuffle("x", npartitions=10)
with pytest.raises(P2POutOfDiskError, match="out of available disk space"):
with mock.patch(
"distributed.shuffle._core.DiskShardsBuffer",
OutOfDiskShardsBuffer,
):
await c.compute(shuffled)
await assert_worker_cleanup(a)
await assert_worker_cleanup(b)
await c.close()
await assert_scheduler_cleanup(s)


class BlockedShuffleReceiveShuffleWorkerPlugin(ShuffleWorkerPlugin):
def setup(self, worker: Worker) -> None:
super().setup(worker)
Expand Down

0 comments on commit 004fafb

Please sign in to comment.