datafusion: Bad performance on wide tables (1000+ columns)
Describe the bug
I’am testing DataFusion for using it in a system which has several thousand columns and billions of rows. I’m excited about the flexibility and possibilities this technology provides.
The problems we faced with:
- Optimization of the logical plan works slowly because it has to copy the whole schema in some rules. We workarounded it with prepared queries (we cache parametrized logical plan)
- Creation of physical plan consume up to 35% on CPU, which is more than it’s execution (we use several hundreds of aggregation functions and DF shows pretty good execution time)
Some investigation on that showed, that there a lot of string comparisons (take a look at flamegraph)
29 % datafusion_physical_expr::planner::create_physical_expr
28.5 % --> datafusion_common::dfschema::DFSchema::index_of_column
28.5 % -- --> datafusion_common::dfschema::DFSchema::index_of_column_by_name
7.4 % -- -- --> __memcmp_sse4_1
14.6 % -- -- --> datafusion_common::table_reference::TableReference::resolved_eq
6.8 % -- -- -- --> __memcmp_sse4_1
Now algorithm has O(N^2) complexity (N in iterating all the columns in
datafusion_common::dfschema::DFSchema::index_of_column_by_name
and N in datafusion_common::table_reference::TableReference::resolved_eq
).
Some ideas to resolve:
- Use hashmap or btree in DFSchema instead of list (decrease complexity of resolving column index by it’s name)
- Implement parametrization of Physical plan and prepared physical plans (in order to enable caching it the same as prepared logical plan)
Thank you for developing a such great tool!
To Reproduce
It’s hard to extract some code from the project, but I will try to build simple repro
Expected behavior
Creation of physical plan spent much less time in CPU than it’s execution
Additional context
No response
About this issue
- Original URL
- State: open
- Created 9 months ago
- Comments: 27 (13 by maintainers)
I have reviewed https://github.com/apache/arrow-datafusion/pull/7870 and https://github.com/apache/arrow-datafusion/pull/7878. Thank you for your work @maruschin and @karlovnv
Here are my thoughts:
index_of_column_by_name
and schema creation?HashMap
with each DFSchema (as it is creating / copying owned strings) if that never is read – I think it should be built on demand, as suggested by @crepererum at https://github.com/apache/arrow-datafusion/pull/7870/files#r1372786446@alamb I’ve read the discussions you shared. Thank you. Since I’m not quite proficient at Rust, I might not be able to add a useful comment there. However, I do have some general feelings:
Thank you.
@alamb I may not be able to make PRs myself. Maybe other contributors can make PRs based on my report. And I’d like to hear other’s opinions and test results about my optimizations. Some optimizations may not be considered appropriate - like the one that adds external dependency. And since I’ve investigated the ‘wide aggregation’ (and ‘simple wide selection’ although not reported above) code path only, there may be other simple performance problems in other code paths like the ones I’ve found. Anyway, I’m planning to apply all the optimizations I’ve shown to my version of DataFusion (as long as they are correct).
I’ve tried to optimize the logical planning and optimization routines. As a result, for a wide aggregation query, the logical planning + optimization time was reduced from 49 seconds to 0.8 seconds. The details are as follows:
In the following, each optimization step is accumulated. Elapsed times are reduced accordingly. No steps require deep knowledge for plan building. All time values are in milliseconds.
elapsed time after optimization
includeselapsed time after creating a logical plan
. So,elapsed time after optimization
is logical planning time + optimization time.Note that the following optimization steps were not heavily tested.
No code change: original timing
Optimization 1: In
DFField
, precompute qualifier_name innew...()
functions, set it to a member variable, and use it inqualified_name()
Optimization 2: Apply https://github.com/apache/arrow-datafusion/pull/7870 (Use btree to search fields in DFSchema)
Optimization 3: Change
DFField
’squalified_name()
to returnArc<String>
instead ofString
And change other codes accordingly, to avoid string
clone()
ing.Optimization 4: precompute
using_columns
inlogical_plan::builder::project()
Like this:
And implement
expr_rewriter::normalize_col_with_using_columns()
andlogical_plan::builder::normalize_with_using_columns()
that receivesusing_columns
as an argument.Optimization 5: In
DFSchema::merge()
checkduplicated_field
withbool
-based functions instead ofError
-based functionsLike this:
And implement
has_field_with_unqualified_name()
andhas_field_with_qualified_name()
which returnsbool
without involvingError
. Since it is not an error condition, receivingbool
is more appropriate anyway. Additionallyget_index_of_column_by_name()
which returnsOption<usize>
instead ofResult<Option<usize>>
for the above functions.field_not_found()
andunqualified_field_not_found()
are heavy when used for a wide table since they return all valid field names in theError
. So they must be avoided when not necessary.Optimization 6: In
expr::utils::columnize_expr()
use bool-based functions instead ofError
-based functionsSimilar to optimization 5. Like this:
And implement
get_field_with_unqualified_name()
inDFSchema
which returnsOption<&DFField>
instead ofResult<&DFField>
. Since it is not an error condition, receivingOption
is more appropriate anyway.Optimization 7: Use
IndexSet
inexpr::utils::find_exprs_in_exprs()
Like this:
IndexSet
is in https://docs.rs/indexmap/latest/indexmap/ crate.Optimization 8: In
logical_plan::plan::calc_func_dependencies_for_project()
return early when there is no functional dependenciesLike this:
And implement
DFSchema::has_functional_dependencies()
andFunctionalDependencies::is_empty()
for it.calc_func_dependencies_for_project()
does heavy operation to getproj_indices
even before it callsproject_functional_dependencies()
which is useless when there is no functional dependency (which is common since functional dependency is rare in my opinion). I think that functional dependency-related functions require arguments that require heavy operation even before checking whether they are required or not. So, it would be great if functional dependency-related functions receiveFnOnce
instead of precomputed data, to skip heavy operations if not required.Now, the resulting 0.8 seconds is acceptable for me.
We do something similar to this in IOx (cache schemas that we know don’t change rather than recomputing them)
It is my opinion that in order to make DFSchema behave well and not be a bottleneck we will need to more fundamentally restructure how it works.
Right now the amount of copying required is substantial as has been pointed out several times on this thread. I think with sufficient diligence we could avoid almost all copies when manipulating DFSchema and then the extra complexity of adding a cache or other techniques would become unnecessary.
I think this is a great idea. I think optimizing for the case of the same, reused qualifier, is a very good idea.
What do people think about the approach described on https://github.com/apache/arrow-datafusion/pull/7944? I (admittedly biasedly) think that approach would eliminate almost all allocations (instead it would be ref count updates). We can extend it to incorporate ideas like pre-caching qualified names and hash sets for column checks, and I think it could be pretty fast
@alamb Add some thoughts (#7895) that appeared during working on #7878. I’ll comment code in PR later.
Main things: create distinguish between qualified and unqualified column, and don’t allow qualified name as name in column.
Thanks @maruschin
I wonder if this is a good time to take a step back and see if we could make DF Schema easier to use in general – I think it is already pretty complicated and optimizing certain methods will likely make it more so.
For example, I wonder if we making the index map will make less complex queries more so, or if we need to take more care to reuse DFSchema
Thus I suggest maybe sketching out the type of change you have in mind in a draft PR that we can discuss prior to spending all the time to get the PR polished up.
Also it’s good to consider implementing prepared physical plans (with parametrization) it will add an ability to cache them
Thank you for the report @karlovnv – I agree with your analysis and we are indeed tracking various ways to make DataFusion’s planing faster in https://github.com/apache/arrow-datafusion/issues/5637
Another of the performance issues I think is related to the ones you have already identified, which is related to the representation of Schemas and name resolution (often error strings are created and then ignored, for example)
If you (or anyone else) has any time to help with this project it would be most appreciated