dask-jobqueue: SLURMCluster doesn't seem to adapt the number of workers
What happened:
SLURMCluster started but created only one workers even though it is managed by a supposed to adapt between 1 and 10 jobs.Client with adapt
What you expected to happen: I expected dask to ask SLURM to create one workers, potentially wait for how quickly it finishes, and then create more workers (the graph has four independent parts).
Minimal Complete Verifiable Example:
import time
import numpy as np
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
def get_shape():
# simulate some workload for adapt
for i in range(10 ** 5):
x = np.random.rand(1000, 100)
# make sure we have enough time to spawn workers
time.sleep(10)
return x.shape
if __name__ == "__main__":
# create simple task graph
n = 4
graph = {f"shape_{i}": (get_shape,) for i in range(n)}
targets = [f"shape_{i}" for i in range(n)]
# use SLURM
workers = 10 # we shouldn't need more then n=4 workers
slurm_kwargs = {
"queue": "cpu_low,cpu_high,gpu_low",
"resource_spec": "",
"diagnostics_port": None,
"ip": "10.1.188.1",
"cores": 1,
"memory": "2G",
"processes": 1,
}
with SLURMCluster(**slurm_kwargs) as cluster:
cluster.adapt(minimum_jobs=1, maximum_jobs=workers) # used minimum=1, maximum=workers before
with Client(cluster, direct_to_workers=True) as client:
print(client.get(graph, targets))
Anything else we need to know?: There are enough free resources on the SLURM cluster available and there are no job spawning limits enforced.
Environment:
- Dask version: 2.25.0 and dask_jobqueue 0.7.1
- Python version: 3.8.5
- Operating System: CentOs 3.10.0-693.5.2.el7.x86_64
- Install method (conda, pip, source): conda
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 20 (20 by maintainers)
I created a PR for dask-distributed that fixes this issue (and also ensures that there are not more than necessary number of workers). The PR also contains a simpler testcase to reproduce the issue: https://github.com/dask/distributed/pull/4155/files#diff-413d6f55a708fcd529d5cb0b45208a3a49636e637bbfcada6de0949d37e6de50R483
I think that if delayed manage to do it, you can probably do the same using the tuple (func_name, index) as parent task, but I’d advice against it anyway (see https://docs.dask.org/en/latest/graphs.html: Working directly with dask graphs is rare, unless you intend to develop new modules with Dask. Even then, dask.delayed is often a better choice.
The root cause is that dask needs a reference of a task time to decide to scale up or not. In your case, every task was identified as a new one during the first execution.
If you’re referring tohttps://docs.dask.org/en/latest/delayed-best-practices.html#avoid-repeatedly-putting-large-inputs-into-delayed-calls, I think that when using delayed, you just stick with delayed. This way the all computing graph is known by Dask and it can recompute anything. Using scatter is more suited to Future API, but it may miss the recompute possibility with delayed. Anyway, you should absolutely use one of them.
@guillaumeeb I think the if-condition referenced above (in dask/distributed) seems to be wrong? Shouldn’t it be
More importantly, the adaptive scheduler needs to distinguish between creating more workers within a job (in that case it is necessary to check whether there is enough space for those workers) and creating more jobs (in that case we might either have no memory limit or query SLURM&Co for the available resources).