prefect: Task input persisted leading to memory not being released (same for output).

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar request and didn’t find it.
  • I searched the Prefect documentation for this feature.

Prefect Version

2.x

Describe the current behavior

Hi,

I really like the design of Prefect. To me Prefect is close to perfect. There is just the issue that passing large sets of data to and from tasks quickly eats up all memory. This eaten up memory cannot be released, or at least, I could not find any option to do so. Actually, being able to pass data into tasks is one of the advertised advantages of Prefect over Airflow:

from prefect import flow, task
import os
import psutil
import sys
import gc


@task(persist_result=False, cache_result_in_memory=False)  # <----- Remove this line, and the memory is released -----
def my_task(df):
    pass


@flow
def my_sub_flow_1():

    print(f"Memory before: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB")
    df = bytearray(1024*1024*1024)  # 1024MiB of memory

    my_task(df)

    print(f'{sys.getrefcount(df)} references to df')
    del df  # release memory
    gc.collect()  # garbage collection not needed, just be certain
    print(f"Memory after: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB")


my_sub_flow_1()

output is:

Memory before: 163MiB
4 references to df
Memory after: 1187MiB

then removing the @task decorator, all memory gets released by the del and everything works fine. Of course, this is a minimal example. In real live, data engineers like me want to run flows with lots of sub_flows passing large pandas data frames around without running out of memory.

Describe the proposed behavior

It would be a really great enhancement if we could run sub_flows like the one below with large dataframes without running out of memory quickly once having more subflows or tasks.

@flow(empty_cache_after_flowrun=True):  # <----- something like this here
def my_sub_flow_1():
    df=task_load_data1()
    df=task_modify_data(df)
    task_upload_data(df, release_)  
	
    prefect.release_task_cache  # <----- or something like this here
   
    df=task_load_data2()
    df=task_modify_data(df)
    task_upload_data(df)   	

Example Use

see above. Currently, all big objects passed from one task to another need to go via disk or any other store to be able to release memory. This somehow defeats the great idea of Prefect.

Additional context

Debugging showed that the class class UnpersistedResult(BaseResult) persists the outputs of task ‘forever’ by design in self._cache. Surprisingly inputs are also persisted somewhere. I spend quite some time debugging all the options, reading documentation and googling. In fact, the documentation is to me misleading as it suggests that things can be non-persistent. persist_result=False, cache_result_in_memory=False are not of help either, especially not to the problem of input persistence.

I also tried to modify the source to release the self._cache after it has been read to contribute, but sadly, I did not find a non destructive way to solve the issue to help also others that might be affected. Actually, I am wondering that I am the only one having this issue.

Ps.: Other options I found are not using tasks at all so that no data is passed around. I also experimented with using a wrapper classes to encapsulate each object passed around and then deleting the large object from the class (leaving the class itself in memory), but that somehow defeats the simplicity of prefect.

About this issue

  • Original URL
  • State: closed
  • Created 9 months ago
  • Reactions: 7
  • Comments: 15 (7 by maintainers)

Commits related to this issue

Most upvoted comments

Here’s a memray snapshot showing heap memory stable with the example flow from Itay using #12019. Previously, heap grew on each iteration.

Screenshot 2024-02-16 at 4 29 12 PM

In #12019, I believe I’ve isolated two different memory leaks involved in this issue. I’ll need to test more and get some confirmation from another machine setup, but this looks promising.

Hi, I am running on the newest prefect version (2.14.15). I am still experiencing the same memory leak even after upgrading the prefect version. I turned storage persistence and memory cache off for the task. Here is a reproducible piece of code that causes the memory leak.

@flow
async def periodic_batch_flow(until: Optional[str] = None):
    for i in range(3):
        print(f"Memory before flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024 * 1024)}MiB")
        await subflow()
        print(f"Memory after flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024 * 1024)}MiB")


@task(cache_result_in_memory=False, persist_result=False)
async def some_heavy_task(df):
    # some processing and writing to db
    await asyncio.sleep(10)
    return

@flow
async def subflow():
    print(
        f"Memory before task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )
    df = pd.DataFrame(np.random.randint(0, 100, size=(int(2.5e7), 4)), columns=list("ABCD"))
    await some_heavy_task(df)

    del df  # doesn't actually do anything
    gc.collect()  # doesn't do anything
    print(
        f"Memory after task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )
    return

And the output after one iteration of the loop: “”" 14:29:43.012 | INFO | prefect.engine - Created flow run ‘pastel-perch’ for flow ‘periodic-batch-flow’ Memory before flow: 287.171875MiB 14:29:43.151 | INFO | Flow run ‘pastel-perch’ - Created subflow run ‘radiant-loon’ for flow ‘subflow’ Memory before task: 287.9375MiB 14:29:43.843 | INFO | Flow run ‘radiant-loon’ - Created task run ‘some_heavy_task-0’ for task ‘some_heavy_task’ 14:29:43.844 | INFO | Flow run ‘radiant-loon’ - Executing ‘some_heavy_task-0’ immediately… 14:29:53.909 | INFO | Task run ‘some_heavy_task-0’ - Finished in state Completed() Memory after task: 1051.625MiB 14:29:54.012 | INFO | Flow run ‘radiant-loon’ - Finished in state Completed(‘All states completed.’) Memory after flow: 1051.890625MiB “”"

As you can see, the memory doesn’t go down even after the subflow finishes. If I continue with the loop to more iterations, the memory just grows and grows. If I try to run @chrisguidry 's flow and task, I see the same results that he posted after the bugfix got merged, the only difference I see between his flow and mine are that mine is async.

We’ve been discussing internally how we may need to take a fresh look at how results are handled across the board, with memory efficiency in mind. Thanks for your help diagnosing these, we’ll keep working on this and keep you posted.

Ah yes, @sibbiii, that is likely to still be the case during the flow run. My fix corrected an issue where memory wasn’t freed between flow runs, but it wouldn’t have addressed freeing memory during a flow run. Check out this example for what my change fixed:

import gc
import os
import sys

import psutil
from prefect import flow, task


@task(
    persist_result=False, cache_result_in_memory=False
)  # <----- Remove this line, and the memory is released -----
def my_task(df):
    pass


@flow
def my_sub_flow_1():
    print(
        f"Memory before task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )
    df = bytearray(1024 * 1024 * 1024)  # 1024MiB of memory

    my_task(df)

    print(f"{sys.getrefcount(df)} references to df")
    del df  # release memory
    gc.collect()  # garbage collection not needed, just be certain
    print(
        f"Memory after task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )


if __name__ == "__main__":
    print(
        f"Memory before flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )

    my_sub_flow_1()

    gc.collect()  # garbage collection not needed, just be certain
    print(
        f"Memory after flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )

Before the fix:

Memory before flow: 80.4453125MiB
...
Memory before task: 136.1328125MiB
09:26:15.670 | INFO    | Flow run 'spiked-ostrich' - Created task run 'my_task-0' for task 'my_task'
09:26:15.671 | INFO    | Flow run 'spiked-ostrich' - Executing 'my_task-0' immediately...
09:26:16.353 | INFO    | Task run 'my_task-0' - Finished in state Completed()
4 references to df
Memory after task: 1162.3828125MiB
09:26:16.566 | INFO    | Flow run 'spiked-ostrich' - Finished in state Completed('All states completed.')
Memory after flow: 1163.3203125MiB

After the fix:

Memory before flow: 84.30078125MiB
...
Memory before task: 99.55078125MiB
09:21:21.617 | INFO    | Flow run 'uncovered-trogon' - Created task run 'my_task-0' for task 'my_task'
09:21:21.618 | INFO    | Flow run 'uncovered-trogon' - Executing 'my_task-0' immediately...
09:21:22.285 | INFO    | Task run 'my_task-0' - Finished in state Completed()
3 references to df
Memory after task: 1165.80078125MiB <---- what you're observing
09:21:22.531 | INFO    | Flow run 'uncovered-trogon' - Finished in state Completed('All states completed.')
Memory after flow: 141.91796875MiB <---- what I was able to fix in the first pass

I’m going to re-open this for further investigation.

Hi @sibbiii, I just merged a fix that will be in the next release, but if you could give it a try on prefect@main, that would be amazing! I was able to address the parameter issue as you described in the original writeup, but the result issue is a bit more invasive of an issue. I’ll bring this back to the team to talk about how we might improve the performance/memory use around result caching between flow runs.