distributed: Worker memory not being freed when tasks complete

I’m still investigating, but in the meantime I wanted to get this issue started.

I’m noticing that after executing a task graph with large inputs and a small output, my worker memory stays high. In the example below we

  1. Generate data (large byte strings)
  2. filter data (slice)
  3. reduce many tasks (sum)

So the final result returned to the client is small, a single Python int. The only large objects should be the initially generated bytestrings.

The console output below is

  1. per-worker memory usage before the computation (~30 MB)
  2. per-worker memory usage right after the computation (~ 230 MB)
  3. per-worker memory usage 5 seconds after, in case things take some time to settle down. (~ 230 MB)
Memory usage [before]
{'tcp://192.168.7.20:50533': '30.92 MB', 'tcp://192.168.7.20:50534': '30.95 MB'}
running
Memory usage [after]
{'tcp://192.168.7.20:50533': '231.97 MB',
 'tcp://192.168.7.20:50534': '232.63 MB'}
Memory usage [after]
{'tcp://192.168.7.20:50533': '232.05 MB',
 'tcp://192.168.7.20:50534': '232.63 MB'}

In an effort to test whether the scheduler or worker is holding a reference to the data, I submit a bunch of tiny inc tasks to one of the worker. I notice that the memory on that worker does settle down

Memory usage [final]
{'tcp://192.168.7.20:52114': '232.77 MB',
 'tcp://192.168.7.20:52115': '49.73 MB'}

That’s at least consistent with the worker or scheduler holding a reference to the data, but there could be many other causes. I’m still debugging.

The number of inc tasks, 2731, seems to be significant. With 2730 inc tasks, I don’t see any memory reduction on that worker.

import time
from dask.utils import parse_bytes, format_bytes
import pprint
import string
import toolz
from distributed import Client, wait

N = parse_bytes("100 Mb")
I = 20


def inc(x):
    return x + 1


def f(x, n=N):
    time.sleep(0.05)
    return string.ascii_letters[x % 52].encode() * n


def g(x):
    time.sleep(0.02)
    return x[:5]


def h(*args):
    return sum(x[0] for x in args)


def get_mem(dask_worker):
    return dask_worker.monitor.proc.memory_info().rss


def main():
    dsk = {}
    for i in range(I):
        dsk[f'a-{i}'] = (f, i, N)
        dsk[f'b-{i}'] = (g, f'a-{i}')
    dsk['c-0'] = (h,) + tuple(f'b-{i}' for i in range(I))

    with Client(n_workers=2, threads_per_worker=1, memory_limit='500Mb', processes=True) as client:
        print("Memory usage [before]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))

        print("running")
        client.get(dsk, keys=["c-0"])
        time.sleep(2)  # let things settle

        print("Memory usage [after]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))

        time.sleep(5)  # settle some more?
        print("Memory usage [after]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))


        print("clear things?")
        futures = client.map(inc, range(2731), pure=False)
        wait(futures)
        del futures

        print("Memory usage [final]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))


if __name__ == '__main__':
    main()

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Reactions: 10
  • Comments: 26 (14 by maintainers)

Most upvoted comments

This problem is indeed a big one, preventing me to use dask in production where I have a very long running task and 200 gigs of memory get used in not time Is there anything at all you can suggest to investigate the issue or try to mitigate it?

I already tried the suggested PR without success. My structure is made by nested multiprocessing on different layers like:

level A                            A1
level B                  B1                 D2
level C             C1        C2       C3        C4 
level D           D1  D2    D3  D4   D5  D6    D7  D8         

A1 runs in parallel (B1, B2)

B1 runs in parallel (C1, C2) B2 runs in parallel (C3, C4)

C1 runs in parallel (D1, D2) C2 runs in parallel (D3, D4) C3 runs in parallel (D6, D5) C4 runs in parallel (D7, D8)

And everything works fine except for the fact that once the tasks in the most inner layer D are completed the memory never gets released and it accumulates until the kernel dies.

In this situation of nested processes I cannot even restart the client in the inner layers because this will end up affecting the whole computation. So for me there is really no solution here.

Any help would be much appreciated.

I’m also experiencing some king of memory leak, though it might not be related. I’m using only Dask distributed as a job scheduler, not even passing any substantial data. The input is just a filename and there is no return value. And the job itself is calling only bare pandas and numpy. This way I’m processing 4000 files (almost equaly sized) on 40 core machine in cca 45 minutes.

With Dask distributed the memory usage continuously increases until the work is done. At that point it’s consuming 40 GB and the memory is not freed. I see distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? in logs, which seems alright because Dask is not involved in any data manipulation.

The strange thing is that I don’t experience memory leak when using multiprocessing.Pool to dispatch those jobs. (And I also get better CPU usage.) Though it also consumes cca 10 GB which is not freed entirely, but the usage is almost constant all the time.

So it seams that Dask is not directly involved, yet it makes the difference somehow.

I’m running Debian buster, python3.7 and latest libraries (dask==2020.12.0, numpy==1.19.5, pandas==1.2.0). (Python3.8 seems to make no difference.)

There are htop leak-screenshots.zip.

See the code...

def compute_profile_from_file(filepath):
    df = pd.read_csv(filepath, compression="gzip", sep=";", names=("eid","t","src","spd"))
    # I observe memory leak even if just reading the data.
    # As I add more processing steps more memory is leaked.
    ...
    df.to_parquet(...)


def main_dask():
    fs = fsspec.filesystem("file")
    filepaths = fs.glob("/data/*.csv.gz")

    client = Client(LocalCluster(n_workers=40, threads_per_worker=1))
    results = set()

    for filepath in filepaths:
        if len(results) == 40:
            _, results = wait(results, return_when='FIRST_COMPLETED')

        job = client.submit(compute_profile_from_file, filepath)
        results.add(job)

    client.gather(results);
    del results
    
    time.sleep(24*3600)


def main_mp():
    fs = fsspec.filesystem("file")
    filepaths = fs.glob("/data/*.csv.gz")

    import multiprocessing as mp
    mp.set_start_method('spawn')

    pool = mp.Pool(40)
    pool.map(compute_profile_from_file, filepaths)

    time.sleep(24*3600)


if __name__ == "__main__":
    #main_dask()
    #main_mp()

Any update here? We are also trying to use Dask in production, but this is causing some major issues for us.

same problem here. Is there a way to overcome this while we wait for the fix?

at the moment I am doing client.restart() every time

@dhirschfeld I have tried manually garbage collecting after reading the other issue Matt linked above and didn’t see an improvement. Appreciate the suggestion though. This is using pandas read_csv for all the IO but I’m fairly confident I see the same behavior w/ other methods.

Another thing that might be worth doing is to add all values placed into Worker.data also into a weakref.WeakValueDictionary

def __init__(self, ...):
    self.weak_data = weakref.WeakValueDictionary()

def put_key_in_memory(self, key, value):
    self.data[key] = value
    self.weak_data[key] = value

Then, after seeing things flush through, you could check on the references to the items in the weak_data dictionary. This could even become a test fairly easily if we were to implement a custom mapping.