Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that serialized data is measured correctly #7593

Merged

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Feb 28, 2023

Closes #7589

TODO: Tests are still missing / not done, yet

def test_sizeof_serialize(Wrap, Wrapped):
size = 100_000
ser_obj = Wrap(b"0" * size)
assert size <= sizeof(ser_obj) < size * 1.05
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowing 5% overhead is a bit generous but this test doesn't really care if the wrapper would add more metadata to it or if python object sizes changed, etc.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 28, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  ±  0         26 suites  ±0   12h 50m 14s ⏱️ + 33m 33s
  3 502 tests +  5    3 394 ✔️ +  1     103 💤 ±0  5 +4 
44 267 runs  +66  42 190 ✔️ +58  2 072 💤 +4  5 +4 

For more details on these failures, see this check.

Results for commit 52eee9e. ± Comparison against base commit 700f14a.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member Author

fjetter commented Feb 28, 2023

Apparently, there is something wrong with the offload TPE. I get some spurious errors

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 820, in _handle_comm
    result = await result
  File "/home/runner/work/distributed/distributed/distributed/worker.py", line 1795, in get_data
    compressed = await comm.write(msg, serializers=serializers)
  File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 271, in write
    frames = await to_frames(
  File "/home/runner/work/distributed/distributed/distributed/comm/utils.py", line 70, in to_frames
    return await offload(_to_frames)
  File "/home/runner/work/distributed/distributed/distributed/utils.py", line 1417, in offload
    return await loop.run_in_executor(
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/asyncio/base_events.py", line 783, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/concurrent/futures/thread.py", line 179, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

@@ -1390,7 +1390,6 @@ def is_valid_xml(text):


_offload_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dask-Offload")
weakref.finalize(_offload_executor, _offload_executor.shutdown)
Copy link
Member Author

@fjetter fjetter Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an attempt to fix the error I'm seeing

Two reasons why I believe this should be removed regardless of whether this is a fix or not

  1. All python versions 3.8+ are ensuring that worker threads are terminating on interpreter shutdown already. They explicitly handle the case of collected executors, interpreter shutdown and instance shutdown identically.
  2. Judging by the finalize docs I'm not even sure if this callback is ever triggered

A finalizer will never invoke its callback during the later part of the interpreter shutdown when module globals are liable to have been replaced by None.

since _offload_executor is a module global and unless it has been replaced by None, it has no chance of being GCed/finalized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not responsible after all but I still suggest to remove this line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judging by the finalize docs I'm not even sure if this callback is ever triggered

It's interesting, later in the docs it states:

Note It is important to ensure that func, args and kwargs do not own any references to obj, either directly or indirectly, since otherwise obj will never be garbage collected. In particular, func should not be a bound method of obj.

Notably the last part, which I suppose is part of the unneeded bit of this line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that note is interesting as well, i.e. this finalize is useless for many reasons

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #7639

Our code base is riddled with this pattern

Copy link
Collaborator

@crusaderky crusaderky Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one line makes me very nervous. Everything you wrote makes perfect sense, but just in case you're wrong, could you move it to its own PR so that it's not ending up in the release? It has the potential of leaving workers stuck on shutdown, and it also has the potential of different behaviour on different Python versions and on different OSs, so I believe some thorough testing is in order.

@fjetter
Copy link
Member Author

fjetter commented Feb 28, 2023

There are non-trivial failures, e.g. test_acquire_replicas_large_data raises state machine AssertionErrors ...

@fjetter fjetter force-pushed the ensure_serialized_data_meassured_correctly branch from 043b138 to 4fe360d Compare March 9, 2023 11:38
@fjetter fjetter force-pushed the ensure_serialized_data_meassured_correctly branch from 7eb755e to 690ec48 Compare March 9, 2023 11:47
Comment on lines -2839 to -2840
finally:
threadpool.shutdown()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somehow, this actually shutdown the actual offload threadpool, not just the mock, i.e. nothing in our test suite was using the offloader threadpool after this test ran 🤯

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not entirely understand why this is shutting down the actual threadpool but I don't care. I removed the mock and it works now

Comment on lines 2809 to 2816
async def custom_worker_offload(func, *args):
res = func(*args)
if not istask(args) and istask(res):
in_deserialize.set()
await wait_in_deserialize.wait()
return res

while CountingThreadPool.counter == 0:
await asyncio.sleep(0)
monkeypatch.setattr("distributed.worker.offload", custom_worker_offload)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test logic is now slightly different but I believe more robust. We don't truly care about the offloading part but rather that there is an await during deserialization. Therefore, I'll only patch the offload method. To ensure that we're truly in the task spec deserialization I put in the istask guard above

@fjetter
Copy link
Member Author

fjetter commented Mar 9, 2023

I think test failures are unrelated. @crusaderky care for another look?

@fjetter
Copy link
Member Author

fjetter commented Mar 10, 2023

FWIW benchmark results are available here https://github.com/coiled/coiled-runtime/actions/runs/4384325501 but they don't look particularly interesting. I don't think our tests are very sensitive to this, yet. See also coiled/benchmarks#696

Copy link
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Show resolved Hide resolved
@@ -1390,7 +1390,6 @@ def is_valid_xml(text):


_offload_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dask-Offload")
weakref.finalize(_offload_executor, _offload_executor.shutdown)
Copy link
Collaborator

@crusaderky crusaderky Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one line makes me very nervous. Everything you wrote makes perfect sense, but just in case you're wrong, could you move it to its own PR so that it's not ending up in the release? It has the potential of leaving workers stuck on shutdown, and it also has the potential of different behaviour on different Python versions and on different OSs, so I believe some thorough testing is in order.

@fjetter
Copy link
Member Author

fjetter commented Mar 13, 2023

@crusaderky next time, feel free to just push these minor changes

@crusaderky
Copy link
Collaborator

I've started an A/B test with uncompressible as well as compressible data, based on coiled/benchmarks#696:
https://github.com/coiled/coiled-runtime/actions/runs/4408363294

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @fjetter and review @crusaderky

Test failures seem unrelated, but the distributed/shuffle/tests/test_shuffle.py::test_new_worker failure in this build looks interesting. cc @hendrikmakait for visibility

@jrbourbeau jrbourbeau merged commit 6cab0e2 into dask:main Mar 13, 2023
@crusaderky
Copy link
Collaborator

I ran the A/B tests and I'm observing a very modest (5%), but consistent speedup in test_filter_then_average. The test uses data that is compressible at 37%. The other tests do not show any kind of change - including those running on data that is compressible at 99%. The reason is that the data compressible at 37% takes 140ms per chunk to compress, whereas identically sized data full of ones takes 14ms per chunk. I'll need to amend the PR that introduces compressible data and rerun the A/B test.

@fjetter
Copy link
Member Author

fjetter commented Mar 14, 2023

I ran the A/B tests and I'm observing a very modest (5%), but consistent speedup

Well, it's an easy win regardless. Even if it's not boosting performance significantly, having a more healthy event loop is a win already for stability.

@crusaderky
Copy link
Collaborator

crusaderky commented Mar 15, 2023

I have produced a meaningful A/B test, running on data that is 42% compressible and takes a substantial amount of time to compress.

  • test_anom_mean is 15~20% slower.
  • test_double_diff is 5% faster
  • test_filter_then_average is 5% faster
  • no impact on test_dot_product
  • no impact on test_vorticity
  • inconclusive data on test_map_overlap_sample (test is too noisy, with either compressible or incompressible data)

It's important to note that, before this PR, test_anom_mean and test_double_diff were respectively 50% and 30% slower on compressible data than they were on uncompressible data, and test_vorticity is 67% slower on compressible data. I'll open a separate issue to discuss these findings.

image
image
image
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

get_data() never offloads serialization
4 participants