distributed: Scheduler memory leak / large worker footprint on simple workload

What happened:

Running an embarrassingly parallel map_overlap workload may be causing a memory leak in the scheduler. Upon completion, releasing the tasks and restarting the client does not reclaim the memory. The example below, with 200k tasks, shows a jump in scheduler memory from 100MB to 1.3GB while running the graph. After client.restart, it remains at 1.1GB.

In addition, the memory of the workers climb into the “yellow”, where I believe swapping to disk begins to happen. Given the parallel nature of this workload, workers ought to be able discard pieces when they are done with them.

From a performance perspective, during client.compute, the scheduler gets unresponsive (it takes 20ish seconds to start), presumably because its loading a large graph. I have seen this cause already running computations to start erroring. I’ve seen lost keys and KilledWorkers.

And finally, anecdotally, it sometimes happens that one worker runs hot, getting 10x the tasks of other workers. Eventually, forward progress halts. I now watch for this, and then kill that worker, which redistributes the work and finishes the job. (I’m using dask-gateway on K8s).

What you expected to happen:

  • The scheduler should not use up additional memory once a computation is done.
  • Workers should shard a parallel job so that each shard can be discarded when done, keeping a low worker memory profile
  • Loading a graph should not disrupt ongoing computation

Minimal Complete Verifiable Example:

import dask.array as da
import distributed
client = distributed.Client(n_workers=4, threads_per_worker=1, memory_limit='10GB')
arr = da.zeros((50, 2, 8192, 8192), chunks=(1, -1, 512, 512))
result = arr.map_overlap(lambda x: x, depth=(0,0,200,200))
store = result.to_zarr('/media/ssd/test/memory_scheduler.zarr', compute=False, overwrite=True)
future = client.compute(store)

Environment:

  • Dask version: 2.18.1
  • Distributed version: 2.18.0
  • Python version: 3.7
  • Operating System: Ubuntu 18.04
  • Install method (conda, pip, source): conda

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 3
  • Comments: 41 (30 by maintainers)

Most upvoted comments

I may be suffering from a completely unrelated leak

FYI, if you’re running a recent version, there’s a known worker memory leak especially if your graph contains large-ish objects: https://github.com/dask/distributed/issues/5960

Just found the following snippet in the mallopt man page under the M_MMAP_THRESHOLD entry:

Note: Nowadays, glibc uses a dynamic mmap threshold by default. The initial value of the threshold is 128*1024, but when blocks larger than the current threshold and less than or equal to DEFAULT_MMAP_THRESHOLD_MAX are freed, the threshold is adjusted upwards to the size of the freed block. When dynamic mmap thresholding is in effect, the threshold for trimming the heap is also dynamically adjusted to be twice the dynamic mmap threshold. Dynamic adjustment of the mmap threshold is disabled if any of the M_TRIM_THRESHOLD, M_TOP_PAD, M_MMAP_THRESHOLD, or M_MMAP_MAX parameters is set.

Perhaps it helps in understanding this?

In case this is of help to folks here. We’ve been adding on to our documentation about unmanaged memory which some might find useful. In particular when running on UNIX systems, there are various possibilities to reduce the unmanaged memory.

https://distributed.dask.org/en/latest/worker.html#memory-not-released-back-to-the-os