prefect: Task input persisted leading to memory not being released (same for output).
First check
- I added a descriptive title to this issue.
- I used the GitHub search to find a similar request and didn’t find it.
- I searched the Prefect documentation for this feature.
Prefect Version
2.x
Describe the current behavior
Hi,
I really like the design of Prefect. To me Prefect is close to perfect. There is just the issue that passing large sets of data to and from tasks quickly eats up all memory. This eaten up memory cannot be released, or at least, I could not find any option to do so. Actually, being able to pass data into tasks is one of the advertised advantages of Prefect over Airflow:
from prefect import flow, task
import os
import psutil
import sys
import gc
@task(persist_result=False, cache_result_in_memory=False) # <----- Remove this line, and the memory is released -----
def my_task(df):
pass
@flow
def my_sub_flow_1():
print(f"Memory before: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB")
df = bytearray(1024*1024*1024) # 1024MiB of memory
my_task(df)
print(f'{sys.getrefcount(df)} references to df')
del df # release memory
gc.collect() # garbage collection not needed, just be certain
print(f"Memory after: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB")
my_sub_flow_1()
output is:
Memory before: 163MiB
4 references to df
Memory after: 1187MiB
then removing the @task
decorator, all memory gets released by the del
and everything works fine. Of course, this is a minimal example. In real live, data engineers like me want to run flows with lots of sub_flows passing large pandas data frames around without running out of memory.
Describe the proposed behavior
It would be a really great enhancement if we could run sub_flows like the one below with large dataframes without running out of memory quickly once having more subflows or tasks.
@flow(empty_cache_after_flowrun=True): # <----- something like this here
def my_sub_flow_1():
df=task_load_data1()
df=task_modify_data(df)
task_upload_data(df, release_)
prefect.release_task_cache # <----- or something like this here
df=task_load_data2()
df=task_modify_data(df)
task_upload_data(df)
Example Use
see above. Currently, all big objects passed from one task to another need to go via disk or any other store to be able to release memory. This somehow defeats the great idea of Prefect.
Additional context
Debugging showed that the class class UnpersistedResult(BaseResult)
persists the outputs of task ‘forever’ by design in self._cache
. Surprisingly inputs are also persisted somewhere. I spend quite some time debugging all the options, reading documentation and googling. In fact, the documentation is to me misleading as it suggests that things can be non-persistent. persist_result=False, cache_result_in_memory=False are not of help either, especially not to the problem of input persistence.
I also tried to modify the source to release the self._cache
after it has been read to contribute, but sadly, I did not find a non destructive way to solve the issue to help also others that might be affected. Actually, I am wondering that I am the only one having this issue.
Ps.: Other options I found are not using tasks at all so that no data is passed around. I also experimented with using a wrapper classes to encapsulate each object passed around and then deleting the large object from the class (leaving the class itself in memory), but that somehow defeats the simplicity of prefect.
About this issue
- Original URL
- State: closed
- Created 9 months ago
- Reactions: 7
- Comments: 15 (7 by maintainers)
Here’s a memray snapshot showing heap memory stable with the example flow from Itay using #12019. Previously, heap grew on each iteration.
In #12019, I believe I’ve isolated two different memory leaks involved in this issue. I’ll need to test more and get some confirmation from another machine setup, but this looks promising.
Hi, I am running on the newest prefect version (2.14.15). I am still experiencing the same memory leak even after upgrading the prefect version. I turned storage persistence and memory cache off for the task. Here is a reproducible piece of code that causes the memory leak.
And the output after one iteration of the loop: “”" 14:29:43.012 | INFO | prefect.engine - Created flow run ‘pastel-perch’ for flow ‘periodic-batch-flow’ Memory before flow: 287.171875MiB 14:29:43.151 | INFO | Flow run ‘pastel-perch’ - Created subflow run ‘radiant-loon’ for flow ‘subflow’ Memory before task: 287.9375MiB 14:29:43.843 | INFO | Flow run ‘radiant-loon’ - Created task run ‘some_heavy_task-0’ for task ‘some_heavy_task’ 14:29:43.844 | INFO | Flow run ‘radiant-loon’ - Executing ‘some_heavy_task-0’ immediately… 14:29:53.909 | INFO | Task run ‘some_heavy_task-0’ - Finished in state Completed() Memory after task: 1051.625MiB 14:29:54.012 | INFO | Flow run ‘radiant-loon’ - Finished in state Completed(‘All states completed.’) Memory after flow: 1051.890625MiB “”"
As you can see, the memory doesn’t go down even after the subflow finishes. If I continue with the loop to more iterations, the memory just grows and grows. If I try to run @chrisguidry 's flow and task, I see the same results that he posted after the bugfix got merged, the only difference I see between his flow and mine are that mine is async.
We’ve been discussing internally how we may need to take a fresh look at how results are handled across the board, with memory efficiency in mind. Thanks for your help diagnosing these, we’ll keep working on this and keep you posted.
Ah yes, @sibbiii, that is likely to still be the case during the flow run. My fix corrected an issue where memory wasn’t freed between flow runs, but it wouldn’t have addressed freeing memory during a flow run. Check out this example for what my change fixed:
Before the fix:
After the fix:
I’m going to re-open this for further investigation.
Hi @sibbiii, I just merged a fix that will be in the next release, but if you could give it a try on
prefect@main
, that would be amazing! I was able to address the parameter issue as you described in the original writeup, but the result issue is a bit more invasive of an issue. I’ll bring this back to the team to talk about how we might improve the performance/memory use around result caching between flow runs.