datafusion: [Epic] Generate runtime errors if the memory budget is exceeded
Is your feature request related to a problem or challenge? Please describe what you are trying to do. The basic challenge is that DataFusion can use an unbounded amount of memory for running a plan which typically results in DataFusion being killed by some system memory protection limit (e.g. the OOM Killer on Linux). See https://github.com/apache/arrow-datafusion/issues/587 for more details
As a first step towards supporting larger datasets in DataFusion, if a plan will exceed the overall budget, it should generate a runtime error (resource exhausted) rather than exceeding the budget and risking being killed
There should be a way to keep the current behavior as well (do not error due to resource exhausted)
Describe the solution you’d like
- The user can define a limit for memory via
MemoryManagerConfig
- All operators that consume significant memory (Hash, Join, Sort) will properly account for and request memory from the
MemoryManager
via methods liketry_grow
- If sufficient memory can not be allocated, the plan should return ResourcesExhausted
Needed:
- Use
MemoryManager
inSortExec
, and return errors if the memory budget is exceeded: https://github.com/apache/arrow-datafusion/pull/4330 - Use
MemoryManager
in Aggregate operators, and return errors if the memory budget is exceeded: https://github.com/apache/arrow-datafusion/issues/3940 - #5220
- #4404
Describe alternatives you’ve considered
We can always increase the accuracy of the memory allocation accounting (e.g. RecordBatch
es internal to operators, etc). However, for this initial epic I would like to get the major consumers of memory instrumented and using the MemoryManager
interface. Hopefully this will also allow
Additional context cc @yjshen @crepererum related to issues like https://github.com/influxdata/influxdb_iox/issues/5776 (and some internal issues of our own)
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 4
- Comments: 18 (18 by maintainers)
Hi, sorry to join this party late.
I don’t quite get why the async is a problem for the memory manager while implementing aggregation.
We could do memory-limited aggregation as follows:
The spill procedure would be:
Also, one could refer to Apache Spark’s hash aggregate impl if interested.
I believe we have completed all initial planned work for generating runtime errors if the memory budge is execeeded, so closing this issue 🎉
Thanks @alamb for reminding me! I’ll start working on #2723 this week.
Thanks for the explanation!
Sort in DataFusion is currently memory limited, and I think we could apply a similar approach in aggregation like that in sort: https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/sorts/sort.rs#L799-L813.
We could async
do_agg
as it is fordo_sort
.I think decoupling try_grow and spill would be my preference. In general, if memory is exceeded we may not actually want to spill for all operators (e.g. an initial hash table might simply flush its output if it exceeded memory rather than trying to spill)
cc @yjshen in case he has some additional thoughts
There is also
memory_fraction
onMemoryManagerConfig
https://docs.rs/datafusion/13.0.0/datafusion/execution/memory_manager/enum.MemoryManagerConfig.html to account for some slack in estimatesYes, I agree this would be best strategy: as memory is needed, we request more memory from the memory manager incrementally.
Yes
In general I think upgrading / improving the memory manager would likely be fine.
I think it’s OK to have some slack room on top of the limit, which is somewhat controlled by the batch size. It’s unlikely that we are going to account for every single byte anyways, since there might be some aux data structures here and there that are heap-allocated. So I would treat this is a “best effort limit w/o impacting performance (too much)”.
We could later (if there’s demand for it) add a config option “strict memory enforcement” that impacts performance.