datafusion: Bad performance on wide tables (1000+ columns)

Describe the bug

I’am testing DataFusion for using it in a system which has several thousand columns and billions of rows. I’m excited about the flexibility and possibilities this technology provides.

The problems we faced with:

  1. Optimization of the logical plan works slowly because it has to copy the whole schema in some rules. We workarounded it with prepared queries (we cache parametrized logical plan)
  2. Creation of physical plan consume up to 35% on CPU, which is more than it’s execution (we use several hundreds of aggregation functions and DF shows pretty good execution time)

Some investigation on that showed, that there a lot of string comparisons (take a look at flamegraph)

  29 %      datafusion_physical_expr::planner::create_physical_expr 
  28.5 %    --> datafusion_common::dfschema::DFSchema::index_of_column
  28.5 %    -- --> datafusion_common::dfschema::DFSchema::index_of_column_by_name
   7.4 %    -- -- --> __memcmp_sse4_1
  14.6 %    -- -- --> datafusion_common::table_reference::TableReference::resolved_eq
   6.8 %    -- -- -- --> __memcmp_sse4_1

photo_2023-09-29_14-29-16

Now algorithm has O(N^2) complexity (N in iterating all the columns in datafusion_common::dfschema::DFSchema::index_of_column_by_name and N in datafusion_common::table_reference::TableReference::resolved_eq).

https://github.com/apache/arrow-datafusion/blob/22d03c127e7c5e56cf97ae33eb4446d5b7022eaa/datafusion/common/src/dfschema.rs#L211

Some ideas to resolve:

  • Use hashmap or btree in DFSchema instead of list (decrease complexity of resolving column index by it’s name)
  • Implement parametrization of Physical plan and prepared physical plans (in order to enable caching it the same as prepared logical plan)

Thank you for developing a such great tool!

To Reproduce

It’s hard to extract some code from the project, but I will try to build simple repro

Expected behavior

Creation of physical plan spent much less time in CPU than it’s execution

Additional context

No response

About this issue

  • Original URL
  • State: open
  • Created 9 months ago
  • Comments: 27 (13 by maintainers)

Most upvoted comments

I have reviewed https://github.com/apache/arrow-datafusion/pull/7870 and https://github.com/apache/arrow-datafusion/pull/7878. Thank you for your work @maruschin and @karlovnv

Here are my thoughts:

  1. I think some sort of performance benchmark results to know how much it is helps / hurts in in other areas (like how much longer it takes to create one). Can someone please create some benchmarks, similar to scalar.rs for index_of_column_by_name and schema creation?
  2. I think it is likely to be too expensive to build a HashMap with each DFSchema (as it is creating / copying owned strings) if that never is read – I think it should be built on demand, as suggested by @crepererum at https://github.com/apache/arrow-datafusion/pull/7870/files#r1372786446
  3. I have been long bothered by how expensive it is to create a DFSchema. I have some ideas on how to make it faster to construct – which might not help this usecase directly I think it might help planning in general. I will take a crack at working on this idea

@alamb I’ve read the discussions you shared. Thank you. Since I’m not quite proficient at Rust, I might not be able to add a useful comment there. However, I do have some general feelings:

  • I noticed that the discussions primarily focus on coding style and convenience rather than the performance of the code.
  • The mention of a deep comparison (between Exprs) raises concerns regarding potential performance issues.
  • I’m uncertain about the necessity of unifying the coding style between trait and enum.

Thank you.

@alamb I may not be able to make PRs myself. Maybe other contributors can make PRs based on my report. And I’d like to hear other’s opinions and test results about my optimizations. Some optimizations may not be considered appropriate - like the one that adds external dependency. And since I’ve investigated the ‘wide aggregation’ (and ‘simple wide selection’ although not reported above) code path only, there may be other simple performance problems in other code paths like the ones I’ve found. Anyway, I’m planning to apply all the optimizations I’ve shown to my version of DataFusion (as long as they are correct).

I’ve tried to optimize the logical planning and optimization routines. As a result, for a wide aggregation query, the logical planning + optimization time was reduced from 49 seconds to 0.8 seconds. The details are as follows:

  • The version of DataFusion: 31.0.0 (with small modifications)
  • The query
    • a SELECT query
    • ~3000 aggregation functions in the SELECT clause
    • FROM clause has one table that has 3617 columns
    • 16 GROUP BY columns

In the following, each optimization step is accumulated. Elapsed times are reduced accordingly. No steps require deep knowledge for plan building. All time values are in milliseconds. elapsed time after optimization includes elapsed time after creating a logical plan. So, elapsed time after optimization is logical planning time + optimization time.

Note that the following optimization steps were not heavily tested.

No code change: original timing

  • elapsed time after creating a logical plan: 11468
  • elapsed time after optimization: 48734

Optimization 1: In DFField, precompute qualifier_name in new...() functions, set it to a member variable, and use it in qualified_name()

  • elapsed time after creating a logical plan: 10571
  • elapsed time after optimization: 29375

Optimization 2: Apply https://github.com/apache/arrow-datafusion/pull/7870 (Use btree to search fields in DFSchema)

  • elapsed time after creating a logical plan: 2429
  • elapsed time after optimization: 20307

Optimization 3: Change DFField’s qualified_name() to return Arc<String> instead of String

And change other codes accordingly, to avoid string clone()ing.

  • elapsed time after creating a logical plan: 2285
  • elapsed time after optimization: 15503

Optimization 4: precompute using_columns in logical_plan::builder::project()

Like this:

    let using_columns = plan.using_columns()?;
    for e in expr {
        let e = e.into();
        match e {
            Expr::Wildcard => {
                projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
            }
            Expr::QualifiedWildcard { ref qualifier } => projected_expr
                .extend(expand_qualified_wildcard(qualifier, input_schema, None)?),
            _ => projected_expr.push(columnize_expr(
                normalize_col_with_using_columns(e, &plan, &using_columns)?,
                input_schema,
            )),
        }
    }

And implement expr_rewriter::normalize_col_with_using_columns() and logical_plan::builder::normalize_with_using_columns() that receives using_columns as an argument.

  • elapsed time after creating a logical plan: 1491
  • elapsed time after optimization: 14376

Optimization 5: In DFSchema::merge() check duplicated_field with bool-based functions instead of Error-based functions

Like this:

            let duplicated_field = match field.qualifier() {
                Some(q) => self.has_field_with_qualified_name(q, field.name()),
                // for unqualified columns, check as unqualified name
                None => self.has_field_with_unqualified_name(field.name()),
            };

And implement has_field_with_unqualified_name() and has_field_with_qualified_name() which returns bool without involving Error. Since it is not an error condition, receiving bool is more appropriate anyway. Additionally get_index_of_column_by_name() which returns Option<usize> instead of Result<Option<usize>> for the above functions. field_not_found() and unqualified_field_not_found() are heavy when used for a wide table since they return all valid field names in the Error. So they must be avoided when not necessary.

  • elapsed time after creating a logical plan: 899
  • elapsed time after optimization: 6538

Optimization 6: In expr::utils::columnize_expr() use bool-based functions instead of Error-based functions

Similar to optimization 5. Like this:

        _ => match e.display_name() {
            Ok(name) => match input_schema.get_field_with_unqualified_name(&name) {
                Some(field) => Expr::Column(field.qualified_column()),
                // expression not provided as input, do not convert to a column reference
                None => e,
            },
            Err(_) => e,
        },

And implement get_field_with_unqualified_name() in DFSchema which returns Option<&DFField> instead of Result<&DFField>. Since it is not an error condition, receiving Option is more appropriate anyway.

  • elapsed time after creating a logical plan: 442
  • elapsed time after optimization: 6033

Optimization 7: Use IndexSet in expr::utils::find_exprs_in_exprs()

Like this:

fn find_exprs_in_exprs<F>(exprs: &[Expr], test_fn: &F) -> Vec<Expr>
where
    F: Fn(&Expr) -> bool,
{
    exprs
        .iter()
        .flat_map(|expr| find_exprs_in_expr(expr, test_fn))
        .fold(IndexSet::new(), |mut acc, expr| {
            acc.insert(expr);
            acc
        })
        .into_iter()
        .collect()
}

IndexSet is in https://docs.rs/indexmap/latest/indexmap/ crate.

  • elapsed time after creating a logical plan: 391
  • elapsed time after optimization: 5889

Optimization 8: In logical_plan::plan::calc_func_dependencies_for_project() return early when there is no functional dependencies

Like this:

fn calc_func_dependencies_for_project(
    exprs: &[Expr],
    input: &LogicalPlan,
) -> Result<FunctionalDependencies> {
    let input_schema = input.schema();
    if !input_schema.has_functional_dependencies() {
        return Ok(FunctionalDependencies::empty());
    }

And implement DFSchema::has_functional_dependencies() and FunctionalDependencies::is_empty() for it. calc_func_dependencies_for_project() does heavy operation to get proj_indices even before it calls project_functional_dependencies() which is useless when there is no functional dependency (which is common since functional dependency is rare in my opinion). I think that functional dependency-related functions require arguments that require heavy operation even before checking whether they are required or not. So, it would be great if functional dependency-related functions receive FnOnce instead of precomputed data, to skip heavy operations if not required.

  • elapsed time after creating a logical plan: 219
  • elapsed time after optimization: 845

Now, the resulting 0.8 seconds is acceptable for me.

For instance, in our case schema is being changed many times rarely then data and we can cache it for a long period of time

We do something similar to this in IOx (cache schemas that we know don’t change rather than recomputing them)

It is my opinion that in order to make DFSchema behave well and not be a bottleneck we will need to more fundamentally restructure how it works.

Right now the amount of copying required is substantial as has been pointed out several times on this thread. I think with sufficient diligence we could avoid almost all copies when manipulating DFSchema and then the extra complexity of adding a cache or other techniques would become unnecessary.

I’ve additionally changed the type of the precomputed qualified_name from String to Arc<String> after the above tests. Total planning time reduced to 75% of the previous iteration. But I think it is still far from optimal.

I think this is a great idea. I think optimizing for the case of the same, reused qualifier, is a very good idea.

What do people think about the approach described on https://github.com/apache/arrow-datafusion/pull/7944? I (admittedly biasedly) think that approach would eliminate almost all allocations (instead it would be ref count updates). We can extend it to incorporate ideas like pre-caching qualified names and hash sets for column checks, and I think it could be pretty fast

@alamb Add some thoughts (#7895) that appeared during working on #7878. I’ll comment code in PR later.

Main things: create distinguish between qualified and unqualified column, and don’t allow qualified name as name in column.

Thanks @maruschin

I wonder if this is a good time to take a step back and see if we could make DF Schema easier to use in general – I think it is already pretty complicated and optimizing certain methods will likely make it more so.

For example, I wonder if we making the index map will make less complex queries more so, or if we need to take more care to reuse DFSchema

Thus I suggest maybe sketching out the type of change you have in mind in a draft PR that we can discuss prior to spending all the time to get the PR polished up.

various ways to make DataFusion’s planing faster

Also it’s good to consider implementing prepared physical plans (with parametrization) it will add an ability to cache them

Thank you for the report @karlovnv – I agree with your analysis and we are indeed tracking various ways to make DataFusion’s planing faster in https://github.com/apache/arrow-datafusion/issues/5637

Another of the performance issues I think is related to the ones you have already identified, which is related to the representation of Schemas and name resolution (often error strings are created and then ignored, for example)

If you (or anyone else) has any time to help with this project it would be most appreciated