distributed: Computation deadlocks due to worker rapidly running out of memory instead of spilling
The below script is pretty reliably triggering deadlocks. I’m sure this can be reduced further but I haven’t had time to do so, yet.
import coiled.v2
from distributed import Client
cluster = coiled.v2.Cluster(
n_workers=20
)
client = Client(cluster)
from distributed import Client
from dask.datasets import timeseries
ddf = timeseries(
"2020",
"2025",
partition_freq='2w',
)
ddf2 = timeseries(
"2020",
"2023",
partition_freq='2w',
)
def slowident(df):
import random
import time
time.sleep(random.randint(1, 5))
return df
while True:
client.restart()
demo1 = ddf.map_partitions(slowident)
(demo1.x + demo1.y).mean().compute()
demo2 = ddf.merge(ddf2)
demo2 = demo2.map_partitions(slowident)
(demo2.x + demo2.y).mean().compute()
We could confirm that version 2022.1.1 is not affected by this but it appears that all follow up versions might be affected (haven’t tested all of them, can definitely confirm for 2022.4.0)
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 64 (53 by maintainers)
Great find @gjoseph92 .
Just to summarize what I understand. We saw this issue appear in the 2022.4.? release but didn’t in
2022.1.1and we bisected and found https://github.com/dask/distributed/pull/6189 The working theory is that https://github.com/dask/distributed/pull/6189 indeed allows us to spill data faster which helps us to avoid the swap scenarioUpdate here others might find interesting: the lock-up likely is caused by swapping, but not in the way you’d think. It points to https://github.com/dask/distributed/issues/6177 being a very worthwhile thing to do.
https://github.com/dask/distributed/issues/4345#issuecomment-755321992 got my curious: yeah, why isn’t the kernel’s OOMKiller getting involved here and just killing the Python process that’s using too much memory?
I spun up a
t3.mediumnode with Coiled (what we were using here) and SSHd on. First interesting thing:freereports 0B of swap available. So the heap of our Python processes aren’t actually going to get swapped out.Let’s see what happens when we use up some memory (based on https://unix.stackexchange.com/a/254976).
freeis reporting we have 3.2Gi available between free RAM and purgeable cache pages:It turns out the OOMKiller is very conservative and doesn’t kill things even when system performance is extremely degraded, so long as there are any pages it can reclaim. And even without a swap partition, the kernel can and will still swap pages to disk if they were on disk to begin with—such as executable code. (See https://askubuntu.com/a/432827 for an explanation.) Really, this just means the kernel is willing to remove almost everything from the disk cache, so executing new instructions in the Python interpreter or any other executables means constantly reloading pages from disk. It’s all still running… just really… really… slowly.
Basically, getting into the “almost all physical memory is used, but not quite all of it” terrain leads to very bad behavior. People complain/rant about this a bit: https://unix.stackexchange.com/questions/373312/oom-killer-doesnt-work-properly-leads-to-a-frozen-os, https://serverfault.com/questions/390623/how-do-i-prevent-linux-from-freezing-when-out-of-memory, etc. There doesn’t seem to be a good answer for how to avoid this in linux generically.
But we don’t have to worry about the generic case. For dask, we can reasonably expect that the worker process tree is the only thing allocating significant amounts of memory. If we just set hard memory limits at the OS level (#6177) and don’t let dask go above 95% of physically available memory (our current default), we should be able to terminate before we get into this thrashing scenario.
And if we don’t set hard limits at the OS level (we currently don’t), we should have no reason to believe that we’d be able to successfully prevent or recover from this via our application-level memory monitor. Prevention is the only option, because once we get into this state, no userland processes can run effectively.
Done
Something like
ClusterBeta(n_workers=2, worker_vm_types=["i3.large"])or (i3.xlarge) should work. Those each have large NVMe drive that will be used for dask temp storage.i3.largeis 2 vCPU, 15.25GiB mem;i3.xlargeis 4 vCPU, 30.5GiB.Hooray! Rejoice!
On Thu, Apr 21, 2022, 5:58 PM Gabe Joseph @.***> wrote: