Skip to content

Commit

Permalink
Fix additional race condition that can cause P2P restart to deadlock (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Aug 11, 2023
1 parent 4f30abc commit b3dde5c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4802,7 +4802,7 @@ def stimulus_task_finished(self, key, worker, stimulus_id, run_id, **kwargs):

ws: WorkerState = self.workers[worker]
ts: TaskState = self.tasks.get(key)
if ts is None or ts.state in ("released", "queued"):
if ts is None or ts.state in ("released", "queued", "no-worker"):
logger.debug(
"Received already computed task, worker: %s, state: %s"
", key: %s, who_has: %s",
Expand Down
10 changes: 3 additions & 7 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,19 +448,15 @@ async def test_restarting_does_not_deadlock(c, s):
"shuffle-transfer", b.worker_address, 1, s
)
a.status = Status.paused
while len(s.running) > 1:
await asyncio.sleep(0.01)
await async_poll_for(lambda: len(s.running) == 1, timeout=5)
b.close_gracefully()
await b.process.process.kill()

while s.running:
await asyncio.sleep(0.01)
await async_poll_for(lambda: not s.running, timeout=5)

a.status = Status.running

while not s.running:
await asyncio.sleep(0.01)
pass
await async_poll_for(lambda: s.running, timeout=5)
await fut


Expand Down

0 comments on commit b3dde5c

Please sign in to comment.