dask: OOMs on seemingly simple shuffle job: mem usage greatly exceeds --memory-limit

Summary

  • I’m struggling to figure out how to avoid OOMs in a seemingly simple shuffle on a ~6gb parquet.snappy dataset using 16 workers, each with 8gb mem, ~4gb memory limit, 1 proc, and 1 thread. I’m not persisting anything, and I’m ok with shuffle tasks spilling to disk as necessary.
  • The OOMs cause the job to either fail after a while or complete after a really long while, nondeterministically.
  • I decreased task size by increasing task count (128 -> 512), but I still observed OOMs with similar frequency.
  • Plotting mem usage over time shows a tight distribution around --memory-limit for the first ~1/2 of the job and then large variance for the second ~1/2 of the job, during which time OOMs start happening (plots below).
  • I created more headroom for this large variance by decreasing --memory-limit (4gb/8gb -> 2gb/8gb) and I did observe many fewer OOMs, but still 1 OOM, and moreover 2gb/8gb impedes our ability to persist data later in this pipeline for an iterative ML algo so this isn’t a feasible solution.
  • Maybe there’s something fishy on the dask side happening here, in particular in the high variance of mem usage above --memory-limit? Or maybe I’m just making a dumb user error somewhere that’s easy to fix?
  • Lmk if I can clarify or distill anything better!

Setup

  • 16 workers (on k8s on ec2), each running in its own docker container with 8gb mem and 1 cpu
  • Workers running with ~4gb mem limit, 1 proc, and 1 thread:
    • DASK_COMPRESSION=zlib dask-worker --nprocs 1 --nthreads 1 --memory-limit=4e9 --no-nanny <scheduler-url>
  • Code looks like:
# Read from parquet (s3)
#   - 238 parts in
#   - ~6.5gb total
#   - Part file sizes vary 10-50mb (see plot below)
ddf_no_index = dd.read_parquet(in_path)

# Pick task/part count for output
num_parts_out = ... # 128 or 512

# Reindex to a column of uniformly distributed uuid5 values with fixed, uniform divisions
#   - npartitions=num_parts_out, via divisions=uniform_divisions[num_parts_out]
ddf_indexed = ddf_no_index.set_index(
    uniformly_distributed_uuid5_column,
    drop=False,
    divisions=uniform_divisions[num_parts_out],
)

# Write to parquet (s3)
#   - 128 or 512 parts out
#   - ~6.6gb total (based on a successful 128-part output)
#   - When 128 parts, output part files vary 54-58mb (see plot below)
#   - When 512 parts, output part files should vary ~10-15mb, but I didn't let the job finish
(ddf_indexed
    .astype(...)
    .drop(ddf_indexed.index.name, axis=1)
    .to_parquet(
        out_path,
        compression='snappy',
        object_encoding=...,
        write_index=True,
    )
)
  • Data skew looks like:
input parquet.snappy part file sizes
238 parts
output parquet.snappy part file sizes
128 parts
fig-20170615t072553509023 fig-20170615t073618347342

Trials

  • Rows 1–2: my starting point was num_parts_out=128 with --memory-limit=4e9, which fails a lot of the time but actually succeeded twice with many OOMs and long runtimes
  • Row 3: I increased task count to num_parts_out=512, but saw a similar frequency of OOMs and killed the job
  • Row 4: I decreased mem limit to --memory-limit=2e9 but still saw 1 OOM (and thus some amount of repeated work)
  • Col “sys metrics”: check out the change in variance in mem usage partway through the job, after which OOMs start happening
  • Col “task aftermath”: you can see the lost workers, all due to OOMs
  • Col “task counts”: shows the number of shuffle tasks, for reference (~6–8k)
params outcome task counts task aftermath sys metrics
238 parts in
128 parts out
4g mem limit
27 OOMs
111m
success
datadog 4g 128 2
238 parts in
128 parts out
4g mem limit
10 OOMs
47m
success
dask 4g 128 tasks 4g 128 datadog 4g 128
238 parts in
512 parts out
4g mem limit
>4 OOMs
gave up early
dask 4g 512 blank datadog 4g 512
238 parts in
128 parts out
2g mem limit
1 OOM
56m
success
dask 2g 128 tasks 2g 128 datadog 2g 128

Versions

$ python --version
Python 3.6.0

$ cat requirements.txt | egrep 'dask|distributed|fastparquet'
git+https://github.com/dask/dask.git@a883f44
git+https://github.com/dask/fastparquet.git@d07d662
distributed==1.16.2

About this issue

  • Original URL
  • State: open
  • Created 7 years ago
  • Reactions: 3
  • Comments: 34 (20 by maintainers)

Most upvoted comments

Another unsuccessful approach at solving the “4g mem limit, 8g mem, 16 workers” case:

@odovad Can you please provide a minimal reproducer for your example? Thank you!

Cool. I really appreciate the work here. I’m looking forward to taking a look at this. As a disclaimer I’m decently slammed this week but hope to have some time next week.

On Mon, Sep 11, 2017 at 2:50 PM, Dan Brown notifications@github.com wrote:

@mrocklin https://github.com/mrocklin Check it out—I now have an OOM repro for array, bag, and dataframe:

  • All three OOM when total data volume roughly approaches or exceeds total worker ram, like my various ddf repros above
  • I dockerized everything so it’s hopefully easy to repro on your end
  • I packaged it up in a repo: https://github.com/jdanbrown/dask-oom

Eager to hear your thoughts! Here’s the summary (copied from the repo’s README https://github.com/jdanbrown/dask-oom): Experimental setup

  • Latest stable dask+distributed versions (requirements.txt https://github.com/jdanbrown/dask-oom/blob/master/requirements.txt):

    • dask==0.15.2
    • distributed==1.18.3
  • Use local docker containers to make a reproducible and portable distributed environment

  • Worker setup (docker-compose.yml https://github.com/jdanbrown/dask-oom/blob/master/docker-compose.yml):

    • 4 workers @ 1g mem + no swap (4g total worker ram)
    • Default --memory-limit (each worker reports 0.584g)
    • Limited to 1 concurrent task per worker, to minimize mem contention and oom risk
  • For each of ddf, dask array, and dask bag:

    • Run a simple shuffle operation and test whether the operation succeeds or OOMs
    • Test against increasing data volumes, from much smaller than total worker ram to larger than total worker ram
    • Try to keep partition sizes below ~10-15m, to control for oom risk from large partitions

Results oom_ddf.py https://github.com/jdanbrown/dask-oom/blob/master/oom_ddf.py params ddf_bytes part_bytes runtime success/OOM? cols=10 part_rows=157500 nparts=64 .75g 12m 00:08 success cols=10 part_rows=157500 nparts=128 1.5g 12m ~00:20 usually OOM, sometimes success cols=10 part_rows=157500 nparts=256 3g 12m ~00:20 OOM cols=10 part_rows=157500 nparts=512 6g 12m ~00:30 OOM oom_array.py https://github.com/jdanbrown/dask-oom/blob/master/oom_array.py params da_bytes chunk_bytes chunk_n chunk_shape runtime success/OOM? sqrt_n=64 128m 2m 64 (4096, 64) 00:01 success sqrt_n=96 648m 6.8m 96 (9216, 96) 00:03 success sqrt_n=112 1.2g 11m 112 (12544, 112) 00:05 success sqrt_n=120 1.5g 13m 120 (14400, 120) ~00:15 usually OOM, rare success sqrt_n=128 2g 16m 128 (16384, 128) ~00:10 OOM oom_bag.py https://github.com/jdanbrown/dask-oom/blob/master/oom_bag.py

  • Much slower than ddf and array, since bag operations are bottlenecked by more python execution

params bag_bytes part_bytes runtime success/OOM? nparts=2 25m 12m 00:00:08 success nparts=4 49m 12m 00:00:14 success nparts=8 98m 12m 00:00:24 success nparts=32 394m 12m 00:01:53 success nparts=64 787m 12m 00:07:46 success nparts=128 1.5g 12m 00:17:40 success nparts=256 3.1g 12m 00:37:09 success nparts=512 6.2g 12m 01:16:30 OOM (first OOM ~57:00, then ~4 more OOMs before client saw KilledWorker)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/2456#issuecomment-328623359, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszEhWGGFroXH1g2ftE2pRxm4NlpHiks5shYD2gaJpZM4N65rg .

(note that dask-kubernetes always passes --memory-limit to each worker in a container that is a bit smaller than the container configured size, and this is probably generally good practice)