diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 7a6207c0..efe2cbad 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -153,6 +153,56 @@ def test_rmm_async(loop): # noqa: F811 assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 +def test_rmm_async_with_maximum_pool_size(loop): # noqa: F811 + rmm = pytest.importorskip("rmm") + + driver_version = rmm._cuda.gpu.driverGetVersion() + runtime_version = rmm._cuda.gpu.runtimeGetVersion() + if driver_version < 11020 or runtime_version < 11020: + pytest.skip("cudaMallocAsync not supported") + + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): + with popen( + [ + "dask", + "cuda", + "worker", + "127.0.0.1:9369", + "--host", + "127.0.0.1", + "--rmm-async", + "--rmm-pool-size", + "2 GB", + "--rmm-release-threshold", + "3 GB", + "--rmm-maximum-pool-size", + "4 GB", + "--no-dashboard", + ] + ): + with Client("127.0.0.1:9369", loop=loop) as client: + assert wait_workers(client, n_gpus=get_n_gpus()) + + memory_resource_types = client.run( + lambda: ( + rmm.mr.get_current_device_resource_type(), + type(rmm.mr.get_current_device_resource().get_upstream()), + ) + ) + for v in memory_resource_types.values(): + memory_resource_type, upstream_memory_resource_type = v + assert memory_resource_type is rmm.mr.LimitingResourceAdaptor + assert ( + upstream_memory_resource_type is rmm.mr.CudaAsyncMemoryResource + ) + + ret = get_cluster_configuration(client) + wait(ret) + assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000 + assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 + assert ret["[plugin] RMMSetup"]["maximum_pool_size"] == 4000000000 + + def test_rmm_logging(loop): # noqa: F811 rmm = pytest.importorskip("rmm") with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index e087fb70..530e51e2 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -261,6 +261,40 @@ async def test_rmm_async(): assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 +@gen_test(timeout=20) +async def test_rmm_async_with_maximum_pool_size(): + rmm = pytest.importorskip("rmm") + + driver_version = rmm._cuda.gpu.driverGetVersion() + runtime_version = rmm._cuda.gpu.runtimeGetVersion() + if driver_version < 11020 or runtime_version < 11020: + pytest.skip("cudaMallocAsync not supported") + + async with LocalCUDACluster( + rmm_async=True, + rmm_pool_size="2GB", + rmm_release_threshold="3GB", + rmm_maximum_pool_size="4GB", + asynchronous=True, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + memory_resource_types = await client.run( + lambda: ( + rmm.mr.get_current_device_resource_type(), + type(rmm.mr.get_current_device_resource().get_upstream()), + ) + ) + for v in memory_resource_types.values(): + memory_resource_type, upstream_memory_resource_type = v + assert memory_resource_type is rmm.mr.LimitingResourceAdaptor + assert upstream_memory_resource_type is rmm.mr.CudaAsyncMemoryResource + + ret = await get_cluster_configuration(client) + assert ret["[plugin] RMMSetup"]["initial_pool_size"] == 2000000000 + assert ret["[plugin] RMMSetup"]["release_threshold"] == 3000000000 + assert ret["[plugin] RMMSetup"]["maximum_pool_size"] == 4000000000 + + @gen_test(timeout=20) async def test_rmm_logging(): rmm = pytest.importorskip("rmm") diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index f40a8f75..a155dc59 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -56,14 +56,11 @@ def __init__( "`rmm_maximum_pool_size` was specified without specifying " "`rmm_pool_size`.`rmm_pool_size` must be specified to use RMM pool." ) - if async_alloc is True and managed_memory is True: - raise ValueError( - "`rmm_managed_memory` is incompatible with the `rmm_async`." - ) - if async_alloc is True and maximum_pool_size is not None: - raise ValueError( - "`rmm_maximum_pool_size` is incompatible with the `rmm_async`." - ) + if async_alloc is True: + if managed_memory is True: + raise ValueError( + "`rmm_managed_memory` is incompatible with the `rmm_async`." + ) if async_alloc is False and release_threshold is not None: raise ValueError("`rmm_release_threshold` requires `rmm_async`.") @@ -90,12 +87,20 @@ def setup(self, worker=None): self.release_threshold, alignment_size=256 ) - rmm.mr.set_current_device_resource( - rmm.mr.CudaAsyncMemoryResource( - initial_pool_size=self.initial_pool_size, - release_threshold=self.release_threshold, - ) + mr = rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=self.initial_pool_size, + release_threshold=self.release_threshold, ) + + if self.maximum_pool_size is not None: + self.maximum_pool_size = parse_device_memory_limit( + self.maximum_pool_size, alignment_size=256 + ) + mr = rmm.mr.LimitingResourceAdaptor( + mr, allocation_limit=self.maximum_pool_size + ) + + rmm.mr.set_current_device_resource(mr) if self.logging: rmm.enable_logging( log_file_name=get_rmm_log_file_name(