distributed: memory leak when using distributed.Client with delayed

I have used dask.delayed to wire together some classes and when using dask.threaded.get everything works properly. When same code is run using distributed.Client memory used by process keeps growing.

Dummy code to reproduce issue is below.

import gc
import os

import psutil
from dask import delayed

# generate random strings: https://stackoverflow.com/a/16310739
class Data():
    def __init__(self):
        self.tbl = bytes.maketrans(bytearray(range(256)),
                              bytearray([ord(b'a') + b % 26 for b in range(256)]))

    @staticmethod
    def split_len(seq, length):
        return [seq[i:i + length] for i in range(0, len(seq), length)]

    def get_data(self):
        l = self.split_len(os.urandom(1000000).translate(self.tbl), 1000)
        return l


class Calc():
    def __init__(self, l):
        self.l = l

    def nth_nth_item(self, n):
        return self.l[n][n]


class Combiner():
    def __init__(self):
        self.delayed_data = delayed(Data())

    def get_calc(self):
        d_l = self.delayed_data.get_data(pure=True)
        return delayed(Calc, pure=True)(d_l)

    def mem_usage_mb(self):
        process = psutil.Process(os.getpid())
        return "%.2f" % (process.memory_info().rss * 1e-6)

    def results(self):
        return {
            '0': self.get_calc().nth_nth_item(0),
            '1': self.get_calc().nth_nth_item(1),
            '2': self.get_calc().nth_nth_item(2),
            'mem_usage_mb': self.mem_usage_mb()
        }

    def delayed_results(self):
        return delayed(self.results())


def main_threaded_get():
    from dask.threaded import get as threaded_get
    from dask import compute

    for i in range(300):
        delayed_obj = Combiner().delayed_results()
        res = compute(delayed_obj, key=threaded_get)[0]
        #print(res)
        print("#%d, mem: %s mb" % (i, res['mem_usage_mb']))
        gc.collect()


def main_distributed_client():
    from distributed import Client
    client = Client(processes=True, n_workers=1, threads_per_worker=1)

    for i in range(1000):
        delayed_obj = Combiner().delayed_results()
        future = client.compute(delayed_obj)
        res = future.result()
        print("#%d, mem: %s mb" % (i, res['mem_usage_mb']))

        collect_res = client.run(lambda: gc.collect()) # doesn't help
        # print(collect_res)

if __name__ == "__main__":
    main_threaded_get()
    main_distributed_client()

Results:

main_threaded_get():
100, mem: 33.64 mb
200, mem: 33.64 mb
299, mem: 33.64 mb

main_distributed_client()
100, mem: 94.02 mb
200, mem: 96.02 mb
300, mem: 97.95 mb
400, mem: 100.11 mb
500, mem: 102.29 mb
600, mem: 104.48 mb
700, mem: 106.72 mb
800, mem: 108.20 mb
900, mem: 110.02 mb
999, mem: 112.22 mb
And also "distributed.utils_perf - WARNING - full garbage collections took 60% CPU time recently (threshold: 10%)" messages starting with i=30
Python 3.6.5
>>> dask.__version__
'0.18.0'
>>> distributed.__version__
'1.22.0'

About this issue

  • Original URL
  • State: open
  • Created 6 years ago
  • Reactions: 2
  • Comments: 26 (7 by maintainers)

Most upvoted comments

@Axel-CH I’ve also noticed a mismatch between the memory usage reported by dask distributed and the OS. What helped me to resolve problems of freezed and killed workers was to change the configuration described here to the following:

     # Fractions of worker memory at which we take action to avoid memory blowup
     # Set any of the lower three values to False to turn off the behavior entirely
     memory:
       target: 0.95  # target fraction to stay below
       spill: False  # fraction at which we spill to disk
       pause: False  # fraction at which we pause worker threads
       terminate: False  # fraction at which we terminate the worker
Code for: mleak.py
import gc
import os
import tracemalloc

import psutil
from dask import delayed
from dask.distributed import get_worker

# generate random strings: https://stackoverflow.com/a/16310739
class Data():
    def __init__(self):
        self.tbl = bytes.maketrans(bytearray(range(256)),
                              bytearray([ord(b'a') + b % 26 for b in range(256)]))

    @staticmethod
    def split_len(seq, length):
        return [seq[i:i + length] for i in range(0, len(seq), length)]

    def get_data(self):
        l = self.split_len(os.urandom(1000000).translate(self.tbl), 1000)
        return l


class Calc():
    def __init__(self, l):
        self.l = l

    def nth_nth_item(self, n):
        return self.l[n][n]


class Combiner():
    def __init__(self):
        self.delayed_data = delayed(Data())

    def get_calc(self):
        d_l = self.delayed_data.get_data(pure=True)
        return delayed(Calc, pure=True)(d_l)

    def mem_usage_mb(self):
        process = psutil.Process(os.getpid())
        return "%.2f" % (process.memory_info().rss * 1e-6)

    def results(self):
        return {
            '0': self.get_calc().nth_nth_item(0),
            '1': self.get_calc().nth_nth_item(1),
            '2': self.get_calc().nth_nth_item(2),
            'mem_usage_mb': self.mem_usage_mb()
        }

    def delayed_results(self):
        return delayed(self.results())

##
##


def snapshot():
    worker = get_worker()
    worker.snapshot1 = tracemalloc.take_snapshot()


def top_stats(do_print_trace=False):
    worker = get_worker()
    snapshot2 = tracemalloc.take_snapshot()

    stats = snapshot2.compare_to(worker.snapshot1, 'traceback')

    for stat in stats[:5]:
        print(stat)
        if do_print_trace:
            for line in stat.traceback.format():
                print(line)

##
##


def main_threaded_get():
    from dask.threaded import get as threaded_get
    from dask import compute

    for i in range(300):
        delayed_obj = Combiner().delayed_results()
        res = compute(delayed_obj, key=threaded_get)[0]
        print("#%d, mem: %s mb" % (i, res['mem_usage_mb']))
        gc.collect()


def main_distributed_client():
    from distributed import Client
    client = Client(processes=True, n_workers=1, threads_per_worker=1)

    # up to 7 stacktrace lines
    client.run(lambda: tracemalloc.start(7))

    for i in range(1000):
        client.run(lambda: snapshot())

        delayed_obj = Combiner().delayed_results()
        future = client.compute(delayed_obj)
        res = future.result()
        print("#%d, mem: %s mb" % (i, res['mem_usage_mb']))

        client.run(lambda: top_stats(do_print_trace=False))
        # client.run(lambda: top_stats(do_print_trace=True))  # print call stack as well

        client.run(lambda: gc.collect()) # doesn't help

        print("---")


if __name__ == "__main__":
    main_distributed_client()

Modified script compares memory usage using tracemalloc before computing delayed function and after.

If I’m interpreting tracemalloc results correctly, then it looks that memory usage grows when pickle.loads is called.

Run: python -X tracemalloc mleak.py

Top memory increases per invocation:

[..]/python3.6/site-packages/distributed/protocol/pickle.py:59: size=188 KiB (+5044 B), count=2019 (+50), average=95 B
[..]/python3.6/site-packages/distributed/worker.py:2130: size=11.3 KiB (+3176 B), count=16 (+5), average=721 B
[..]/python3.6/site-packages/tornado/gen.py:1046: size=3560 B (+2848 B), count=5 (+4), average=712 B
[..]/python3.6/site-packages/distributed/protocol/core.py:188: size=97.5 KiB (+2482 B), count=1042 (+25), average=96 B
[..]/python3.6/asyncio/events.py:145: size=2880 B (+2304 B), count=5 (+4), average=576 B

Call stack for distributed/protocol/pickle.py (different invocation)

[..]/python3.6/site-packages/distributed/protocol/pickle.py:59: size=41.0 KiB (+2468 B), count=360 (+21), average=117 B
  File "[..]/python3.6/site-packages/distributed/protocol/pickle.py", line 59
    return pickle.loads(x)
  File "[..]/python3.6/site-packages/distributed/worker.py", line 720
    function = pickle.loads(function)
  File "[..]/python3.6/site-packages/distributed/worker.py", line 1275
    self.tasks[key] = _deserialize(function, args, kwargs, task)
  File "[..]/python3.6/site-packages/distributed/core.py", line 375
    handler(**merge(extra, msg))
  File "[..]/python3.6/site-packages/tornado/gen.py", line 1113
    yielded = self.gen.send(value)
  File "[..]/python3.6/site-packages/tornado/gen.py", line 1199
    self.run()
  File "[..]/python3.6/site-packages/tornado/stack_context.py", line 276
    return fn(*args, **kwargs)

Any solution or walk-around for this problem? It’s really urgent for me now.

sorry , I find it’s not memory leak problem in my case. Actually , it seems (personal opinion) the problem of bad control of graph size (I mean number of tasks). If i control the number of tasks submited, the memory is almost at a constant level. Thanks .

I wanted to add some unexpected results I observed and was able to resolve thanks to the dialogue above…

Using Python 3.8.5 Prefect==0.14.16 Dask[complete]==2021.2.0

I was observing errors while trying to run my Prefect workflows on an AWS Stack involving some ECS Fargate containers, which kept saying something along the lines of this:

 Unexpected error: TypeError('string indices must be integers') 

This was strange, and I definitely had a few other lingering problems that really made my hunt difficult until I saw some logs that stated the following:

distributed.core - INFO - Event loop was unresponsive in Worker for 3.69s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

distributed.utils_perf - INFO - full garbage collection released 45.16 MB from 37 reference cycles (threshold: 10.00 MB)

Above, @mrocklin suggested “less workers and more memory per worker”, which proved to be my silver bullet in this instance. Hope someone else sees that issue and can react accordingly!

@songqiqqq’s suggestion seems to be a viable workaround – limiting the number of tasks scheduled at any given time.

Changing

MSG_COUNT = 1_000_000

with Client(processes=False) as client:
    pbar = tqdm(total=MSG_COUNT)
    tasks = [client.submit(functools.partial(requests.post, URL, json=MESSAGE)) for _ in range(MSG_COUNT)]
    for _ in as_completed(tasks):
        pbar.update(1)

to

MSG_COUNT = 1_000_000

def group(iterable, chunk_size=50_000):
    iterable = iter(iterable)
    chunk = list(it.islice(iterable, chunk_size))
    while chunk:
        yield chunk
        chunk = list(it.islice(iterable, chunk_size))


with Client(processes=False) as client:
    pbar = tqdm(total=MSG_COUNT)
    for chunk in group(range(MSG_COUNT)):
        tasks = [client.submit(functools.partial(requests.post, URL, json=MESSAGE)) for _ in chunk]
        for _ in as_completed(tasks):
            pbar.update(1)

solved the issue for me.

I continued to get warnings; but tasks were processed

There is also objgraph which is useful for generating reference graphs of objects: https://mg.pov.lt/objgraph/

Interesting. When I run this I get something similar. Memory use climbs slowly in steps. I also get a number of warnings about garbage collection time taking a long while.

distributed.utils_perf - WARNING - full garbage collections took 62% CPU time recently (threshold: 10%)
#855, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 62% CPU time recently (threshold: 10%)
#856, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 62% CPU time recently (threshold: 10%)
#857, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#858, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#859, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#860, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#861, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#862, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#863, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#864, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#865, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#866, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 64% CPU time recently (threshold: 10%)
#867, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 64% CPU time recently (threshold: 10%)
#868, mem: 126.02 mb

I’m curious how people generally debug this sort of issue. I might start with the following:

  1. Use some project, maybe like pympler, to look at what objects might be left over: https://pythonhosted.org/Pympler/muppy.html#the-summary-module
  2. Look at what objects are being GCed. Unfortunately I don’t have much experience finding out what takes up GC time

If anyone has any experience here and has the time to investigate this further I would appreciate it.