dask-jobqueue: SLURMCluster doesn't seem to adapt the number of workers

What happened: SLURMCluster started but created only one workers even though it is managed by a Client with adapt supposed to adapt between 1 and 10 jobs.

What you expected to happen: I expected dask to ask SLURM to create one workers, potentially wait for how quickly it finishes, and then create more workers (the graph has four independent parts).

Minimal Complete Verifiable Example:

import time

import numpy as np
from dask.distributed import Client
from dask_jobqueue import SLURMCluster


def get_shape():
    # simulate some workload for adapt
    for i in range(10 ** 5):
        x = np.random.rand(1000, 100)
    # make sure we have enough time to spawn workers
    time.sleep(10)
    return x.shape


if __name__ == "__main__":
    # create simple task graph
    n = 4
    graph = {f"shape_{i}": (get_shape,) for i in range(n)}
    targets = [f"shape_{i}" for i in range(n)]

    # use SLURM
    workers = 10  # we shouldn't need more then n=4 workers

    slurm_kwargs = {
        "queue": "cpu_low,cpu_high,gpu_low",
        "resource_spec": "",
        "diagnostics_port": None,
        "ip": "10.1.188.1",
        "cores": 1,
        "memory": "2G",
        "processes": 1,
    }

    with SLURMCluster(**slurm_kwargs) as cluster:
        cluster.adapt(minimum_jobs=1, maximum_jobs=workers)  #  used minimum=1, maximum=workers before
        with Client(cluster, direct_to_workers=True) as client:
            print(client.get(graph, targets))

Anything else we need to know?: There are enough free resources on the SLURM cluster available and there are no job spawning limits enforced.

Environment:

  • Dask version: 2.25.0 and dask_jobqueue 0.7.1
  • Python version: 3.8.5
  • Operating System: CentOs 3.10.0-693.5.2.el7.x86_64
  • Install method (conda, pip, source): conda

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 20 (20 by maintainers)

Most upvoted comments

I created a PR for dask-distributed that fixes this issue (and also ensures that there are not more than necessary number of workers). The PR also contains a simpler testcase to reproduce the issue: https://github.com/dask/distributed/pull/4155/files#diff-413d6f55a708fcd529d5cb0b45208a3a49636e637bbfcada6de0949d37e6de50R483

at least I don’t think that is possible in the dict graph since it associated parent tasks by a string

I think that if delayed manage to do it, you can probably do the same using the tuple (func_name, index) as parent task, but I’d advice against it anyway (see https://docs.dask.org/en/latest/graphs.html: Working directly with dask graphs is rare, unless you intend to develop new modules with Dask. Even then, dask.delayed is often a better choice.

I think the root cause is that the elapsed time of the tasks is not always updated correctly

The root cause is that dask needs a reference of a task time to decide to scale up or not. In your case, every task was identified as a new one during the first execution.

is scatter or delayed recommended to distributed data to workers?

If you’re referring tohttps://docs.dask.org/en/latest/delayed-best-practices.html#avoid-repeatedly-putting-large-inputs-into-delayed-calls, I think that when using delayed, you just stick with delayed. This way the all computing graph is known by Dask and it can recompute anything. Using scatter is more suited to Future API, but it may miss the recompute possibility with delayed. Anyway, you should absolutely use one of them.

@guillaumeeb I think the if-condition referenced above (in dask/distributed) seems to be wrong? Shouldn’t it be

if total <= 0.6 * limit:
    memory = 2 * len(self.workers)
else:
    memory = 0

More importantly, the adaptive scheduler needs to distinguish between creating more workers within a job (in that case it is necessary to check whether there is enough space for those workers) and creating more jobs (in that case we might either have no memory limit or query SLURM&Co for the available resources).