dask: OOMs on seemingly simple shuffle job: mem usage greatly exceeds --memory-limit
Summary
- I’m struggling to figure out how to avoid OOMs in a seemingly simple shuffle on a ~6gb parquet.snappy dataset using 16 workers, each with 8gb mem, ~4gb memory limit, 1 proc, and 1 thread. I’m not persisting anything, and I’m ok with shuffle tasks spilling to disk as necessary.
- The OOMs cause the job to either fail after a while or complete after a really long while, nondeterministically.
- I decreased task size by increasing task count (128 -> 512), but I still observed OOMs with similar frequency.
- Plotting mem usage over time shows a tight distribution around
--memory-limitfor the first ~1/2 of the job and then large variance for the second ~1/2 of the job, during which time OOMs start happening (plots below). - I created more headroom for this large variance by decreasing
--memory-limit(4gb/8gb -> 2gb/8gb) and I did observe many fewer OOMs, but still 1 OOM, and moreover 2gb/8gb impedes our ability to persist data later in this pipeline for an iterative ML algo so this isn’t a feasible solution. - Maybe there’s something fishy on the dask side happening here, in particular in the high variance of mem usage above
--memory-limit? Or maybe I’m just making a dumb user error somewhere that’s easy to fix? - Lmk if I can clarify or distill anything better!
Setup
- 16 workers (on k8s on ec2), each running in its own docker container with 8gb mem and 1 cpu
- Workers running with ~4gb mem limit, 1 proc, and 1 thread:
DASK_COMPRESSION=zlib dask-worker --nprocs 1 --nthreads 1 --memory-limit=4e9 --no-nanny <scheduler-url>
- Code looks like:
# Read from parquet (s3)
# - 238 parts in
# - ~6.5gb total
# - Part file sizes vary 10-50mb (see plot below)
ddf_no_index = dd.read_parquet(in_path)
# Pick task/part count for output
num_parts_out = ... # 128 or 512
# Reindex to a column of uniformly distributed uuid5 values with fixed, uniform divisions
# - npartitions=num_parts_out, via divisions=uniform_divisions[num_parts_out]
ddf_indexed = ddf_no_index.set_index(
uniformly_distributed_uuid5_column,
drop=False,
divisions=uniform_divisions[num_parts_out],
)
# Write to parquet (s3)
# - 128 or 512 parts out
# - ~6.6gb total (based on a successful 128-part output)
# - When 128 parts, output part files vary 54-58mb (see plot below)
# - When 512 parts, output part files should vary ~10-15mb, but I didn't let the job finish
(ddf_indexed
.astype(...)
.drop(ddf_indexed.index.name, axis=1)
.to_parquet(
out_path,
compression='snappy',
object_encoding=...,
write_index=True,
)
)
- Data skew looks like:
| input parquet.snappy part file sizes 238 parts |
output parquet.snappy part file sizes 128 parts |
|---|---|
![]() |
![]() |
Trials
- Rows 1–2: my starting point was
num_parts_out=128with--memory-limit=4e9, which fails a lot of the time but actually succeeded twice with many OOMs and long runtimes - Row 3: I increased task count to
num_parts_out=512, but saw a similar frequency of OOMs and killed the job - Row 4: I decreased mem limit to
--memory-limit=2e9but still saw 1 OOM (and thus some amount of repeated work) - Col “sys metrics”: check out the change in variance in mem usage partway through the job, after which OOMs start happening
- Col “task aftermath”: you can see the lost workers, all due to OOMs
- Col “task counts”: shows the number of shuffle tasks, for reference (~6–8k)
| params | outcome | task counts | task aftermath | sys metrics |
|---|---|---|---|---|
| 238 parts in 128 parts out 4g mem limit |
27 OOMs 111m success |
![]() |
||
| 238 parts in 128 parts out 4g mem limit |
10 OOMs 47m success |
![]() |
![]() |
![]() |
| 238 parts in 512 parts out 4g mem limit |
>4 OOMs gave up early |
![]() |
![]() |
![]() |
| 238 parts in 128 parts out 2g mem limit |
1 OOM 56m success |
![]() |
![]() |
![]() |
Versions
$ python --version
Python 3.6.0
$ cat requirements.txt | egrep 'dask|distributed|fastparquet'
git+https://github.com/dask/dask.git@a883f44
git+https://github.com/dask/fastparquet.git@d07d662
distributed==1.16.2
About this issue
- Original URL
- State: open
- Created 7 years ago
- Reactions: 3
- Comments: 34 (20 by maintainers)












Another unsuccessful approach at solving the “4g mem limit, 8g mem, 16 workers” case:
psutilreports host metrics, not container metrics/workerspage shows “memory: 64 GiB”, which is our k8s host ram, instead of the pod’s container mem (one of 4/8/16 GiB in the various trials above)--memory-limit, other parts of dask are seeingpsutil.virtual_memory().total= 64GiB and allocating more than they would have otherwise, which is causing OOMs, e.g.max_buffer_sizeindistributed.comm.tcppsutilwherevirtual_memory()returns metrics from/sys/fs/cgroup/memoryinstead of/proc, and verified that I saw “memory: 8 GiB” on the/workerspagepsutilhack@odovad Can you please provide a minimal reproducer for your example? Thank you!
Cool. I really appreciate the work here. I’m looking forward to taking a look at this. As a disclaimer I’m decently slammed this week but hope to have some time next week.
On Mon, Sep 11, 2017 at 2:50 PM, Dan Brown notifications@github.com wrote:
(note that dask-kubernetes always passes
--memory-limitto each worker in a container that is a bit smaller than the container configured size, and this is probably generally good practice)