datafusion: [Epic] Generate runtime errors if the memory budget is exceeded

Is your feature request related to a problem or challenge? Please describe what you are trying to do. The basic challenge is that DataFusion can use an unbounded amount of memory for running a plan which typically results in DataFusion being killed by some system memory protection limit (e.g. the OOM Killer on Linux). See https://github.com/apache/arrow-datafusion/issues/587 for more details

As a first step towards supporting larger datasets in DataFusion, if a plan will exceed the overall budget, it should generate a runtime error (resource exhausted) rather than exceeding the budget and risking being killed

There should be a way to keep the current behavior as well (do not error due to resource exhausted)

Describe the solution you’d like

  1. The user can define a limit for memory via MemoryManagerConfig
  2. All operators that consume significant memory (Hash, Join, Sort) will properly account for and request memory from the MemoryManager via methods like try_grow
  3. If sufficient memory can not be allocated, the plan should return ResourcesExhausted

Needed:

Describe alternatives you’ve considered We can always increase the accuracy of the memory allocation accounting (e.g. RecordBatches internal to operators, etc). However, for this initial epic I would like to get the major consumers of memory instrumented and using the MemoryManager interface. Hopefully this will also allow

Additional context cc @yjshen @crepererum related to issues like https://github.com/influxdata/influxdb_iox/issues/5776 (and some internal issues of our own)

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 4
  • Comments: 18 (18 by maintainers)

Most upvoted comments

Hi, sorry to join this party late.

I don’t quite get why the async is a problem for the memory manager while implementing aggregation.

We could do memory-limited aggregation as follows:

  1. try to allocate a fixed-sized buffer (a record batch or a row batch (if row aggregate is possible) with 8096 rows for example.) for incoming groups.
  2. aggregate incoming records, updating existing buffer slots or adding new buffer rows if there’s space left for the last pre-allocated buffer.
  3. if the last allocated aggregate buffer is full, we try_grow aggregate’s memory by allocating another fixed-sized buffer.
  • if more memory quota is granted, go to 2 and continue aggregating.
  • if we fail to get one more aggregation buffer from the memory manager, we spill (see spill described below).
  1. if the input stream is exhausted, we could either:
  • get a sorted iterator of the in-memory aggregation buffers, do multi-way merging with the spills, get the final results, and free all memory used at last (if the aggregate is final)
  • output the in-memory buffers, and free all the memory (in case the aggregate is partial).

The spill procedure would be:

  • if the aggregate is partial, produce partial output based on all the aggregation buffers we have, free all the memory, and go to agg step 1. to handle the following incoming tuples.
  • if the aggregate is final, sort all the current buffers by group key and spill to disk, store it in the spilled files list, and go back to agg step 1. to handle the following incoming tuples.

Also, one could refer to Apache Spark’s hash aggregate impl if interested.

I believe we have completed all initial planned work for generating runtime errors if the memory budge is execeeded, so closing this issue 🎉

Thanks @alamb for reminding me! I’ll start working on #2723 this week.

Thanks for the explanation!

Sort in DataFusion is currently memory limited, and I think we could apply a similar approach in aggregation like that in sort: https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/sorts/sort.rs#L799-L813.

We could async do_agg as it is for do_sort.

de-couple try_grow and spill, removing async from try_grow, do manual spill in case of failure.

I think decoupling try_grow and spill would be my preference. In general, if memory is exceeded we may not actually want to spill for all operators (e.g. an initial hash table might simply flush its output if it exceeded memory rather than trying to spill)

cc @yjshen in case he has some additional thoughts

I think it’s OK to have some slack room on top of the limit, which is somewhat controlled by the batch size.

There is also memory_fraction on MemoryManagerConfig https://docs.rs/datafusion/13.0.0/datafusion/execution/memory_manager/enum.MemoryManagerConfig.html to account for some slack in estimates

Should memory limit be optimistic? What I mean is that in case of aggregation we could first process record batch, compare memory before and after batch is process and request delta value from memory manager.

Yes, I agree this would be best strategy: as memory is needed, we request more memory from the memory manager incrementally.

End of batch processing would be a “safe point” which should have correct memory usage, or trigger spill. wdyt?

Yes

In general I think upgrading / improving the memory manager would likely be fine.

I think it’s OK to have some slack room on top of the limit, which is somewhat controlled by the batch size. It’s unlikely that we are going to account for every single byte anyways, since there might be some aux data structures here and there that are heap-allocated. So I would treat this is a “best effort limit w/o impacting performance (too much)”.

We could later (if there’s demand for it) add a config option “strict memory enforcement” that impacts performance.