dask-jobqueue: Handling workers with expiring allocation requests

I am trying to figure out how to handle the case of dask workers getting bumped from a cluster due to their requested allocation time expiring. From the intro YouTube video at https://www.youtube.com/watch?v=FXsgmwpRExM, it sounds like dask-jobqueue should detect when a worker expires and automatically start a replacement, which is what I want. However, my testing on DOE’s edison computer at NERSC is not getting that behavior. If it matters, edison uses SLURM.

I have tried setting up my cluster two ways and both behave the same. I start a worker that uses dask.delayed to do a bunch of embarrassingly parallel tasks, the server spawns one worker, that worker does the first task or two, the worker expires, the server seems to hang, and nothing else happens.

The first approach I used to setup the cluster was with “scale”:

    cluster = SLURMCluster(cores=1, processes=1)  # need all the memory for one task
    cluster.scale(1)  # testing with as simple as I can get, cycling 1 worker
    client = Client(cluster, timeout='45s')

@josephhardinee suggested a 2nd approach using “adapt” instead:

    cluster = SLURMCluster(cores=1, processes=1)  # need all the memory for one task
    cluster.adapt(minimum=1, maximum=1)  # trying adapt instead of scale
    client = Client(cluster, timeout='45s')

The dask-worker.err log concludes with:

slurmstepd: error: *** JOB 10234215 ON nid01242 CANCELLED AT 2018-08-10T13:25:30 DUE TO TIME LIMIT ***
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.4.227:35634'
distributed.dask_worker - INFO - Exiting on signal 15
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>

Am I expecting more from dask-jobqueue than I should? Or, is this a bug in my implementation or in dask.distributed of dask-jobqueue?

Thanks, Bill

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Reactions: 2
  • Comments: 83 (60 by maintainers)

Commits related to this issue

Most upvoted comments

Thanks to @willsALMANJ issue a few days ago, I tried the --lifetime option and I confirm that it works perfectly with the latest Dask, Distributed and Jobqueue versions.

The initial script I used (just reduced time):

import time
import numpy as np
from dask_jobqueue import PBSCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb')
cluster.adapt(minimum=0, maximum=4) # FIX

client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(480)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
    i, v = future.result()
    X[i, :] = v

It fails with a KilleWorkerException when the first 4 workers are killed due to walltime.

Just modify the cluster initialization:

cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb', extra=["--lifetime", "50s"])

And it works! I think it solves the problem here.

It’s unclear to me whether --lifetime-stagger is something we should worry about at this point

I think this would be valuable when scaling up with 100s of workers, at that point you don’t want them all to stop at the same time.

I’ll try to produce some documentation to explain all that and close this issue. The outline should look something like:

How to handle Job Queueing system walltime killing workers

Reaching walltime can be troublesome

  • when you don’t have a lot of room on you HPC platform and have only a few workers at a time: these workers will be killed (and other started) before you workload ends.
  • when you really don’t know how long your workload will take: all your workers could be killed before reaching the end. In this case, you want to use adaptive clusters.

If you don’t set the proper parameters, you’ll run into KilleWorker exceptions in thos two cases. Use --lifetime worker option. This will enables infinite workloads using adaptive.

Use --lifetime-stagger when dealing with many workers.

Examples

cluster = Cluster(walltime='01:00:00', cores=4, memory='16gb', extra=["--lifetime", "55m", "--lifetime-stagger", "4m"])
cluster.adapt(minimum=0, maximum=200)

...

As mentioned in #126, I fear that adaptive mode is broken in release 0.3.0 of dask-jobqueue. It has latter been fixed by #63.

I would recommand trying master branch and see if that fixes this wrong behaviour.