datafusion: Regression: Ordering by joined column doesn't return results

Describe the bug

After update to datafusion: 33 I’ve noticed wrong behavior for our internal test with sorting by multiple columns. It used to work in datafusion: 31

To Reproduce

MRE with datafusion-cli:

CREATE TABLE users AS VALUES('Alice',50),('Bob',100);
CREATE TABLE employees AS VALUES('Alice','Finance'),('Bob','Marketing');

SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1", e."column2";
0 rows in set. Query took 0.002 seconds.

But at the same time, without ordering by joined column it works:

SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1";
+---------+---------+
| column1 | column2 |
+---------+---------+
| Alice   | 50      |
| Bob     | 100     |
+---------+---------+
2 rows in set. Query took 0.002 seconds.

Expected behavior

It should work as before

Additional context

No response

About this issue

  • Original URL
  • State: closed
  • Created 7 months ago
  • Comments: 17 (17 by maintainers)

Most upvoted comments

My initial solution:

.find_map(|(index, (projected_expr, alias))| {
  projected_expr.as_any().downcast_ref::<Column>().and_then(
      |projected_column| {
          (column.index() == projected_column.index()       <--- and index comparison
              && column.name().eq(projected_column.name()))
          .then(|| {
              state = RewriteState::RewrittenValid;
              Arc::new(Column::new(alias, index)) as _
          })
      },
  )
})

and index comparison

DataFusion CLI v33.0.0
❯ CREATE TABLE users AS VALUES('Alice',50),('Bob',100);
0 rows in set. Query took 0.022 seconds.

❯ CREATE TABLE employees AS VALUES('Alice','Finance'),('Bob','Marketing');
0 rows in set. Query took 0.008 seconds.

❯ SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1", e."column2";
+---------+---------+
| column1 | column2 |
+---------+---------+
| Alice   | 50      |
| Bob     | 100     |
+---------+---------+

The result is correct

After do some research, I find this error cause by ProjectionPushdown rule in physical optimizer

| physical_plan after OutputRequirements  | ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]                                                                                                                     |
|                                         |   SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                                                                                                        |
|                                         |     SortExec: expr=[column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                                                                                                                |
|                                         |       ProjectionExec: expr=[column1@0 as column1, column2@1 as column2, column2@3 as column2]  <-- before we have column2@3(e.column2)                                                                                       |
|                                         |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                   |
|                                         |           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)]                                                                                                |
|                                         |             CoalesceBatchesExec: target_batch_size=8192                                                                                                                               |
|                                         |               RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                 |
|                                         |                 MemoryExec: partitions=1, partition_sizes=[1]                                                                                                                         |
|                                         |             CoalesceBatchesExec: target_batch_size=8192                                                                                                                               |
|                                         |               RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                 |
|                                         |                 MemoryExec: partitions=1, partition_sizes=[1]                                                                                                                         |
|                                         |                                                                                                                                                                                       |
| physical_plan after PipelineChecker     | SAME TEXT AS ABOVE                                                                                                                                                                    |
| physical_plan after LimitAggregation    | SAME TEXT AS ABOVE                                                                                                                                                                    |
| physical_plan after ProjectionPushdown  | SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]                                                                                                          |
|                                         |   SortExec: expr=[column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]                                                                                                                  |
|                                         |     ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]   <-- after we elimiate column2@3(e.column2)                                                                                                              |
|                                         |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                     |
|                                         |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)]                                                                                                  |
|                                         |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                 |
|                                         |             RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                   |
|                                         |               MemoryExec: partitions=1, partition_sizes=[1]                                                                                                                           |
|                                         |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                 |
|                                         |             RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                   |
|                                         |               MemoryExec: partitions=1, partition_sizes=[1]    

the reason for this rewrite, may be because we only use column name for identify a column in below code:

https://github.com/apache/arrow-datafusion/blob/06bbe1298fa8aa042b6a6462e55b2890969d884a/datafusion/core/src/physical_optimizer/projection_pushdown.rs#L866-L872

When the column names are identical, the error will arise

I tried to explain the sql:

explain SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1", e."column2";

On branch-33,the result is:

+---------------+--------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                 |
+---------------+--------------------------------------------------------------------------------------+
| logical_plan  | Projection: u.column1, u.column2                                                     |
|               |   Sort: u.column1 ASC NULLS LAST, e.column2 ASC NULLS LAST                           |
|               |     Projection: u.column1, u.column2, e.column2                                      |
|               |       Inner Join: u.column1 = e.column1                                              |
|               |         SubqueryAlias: u                                                             |
|               |           TableScan: users projection=[column1, column2]                             |
|               |         SubqueryAlias: e                                                             |
|               |           TableScan: employees projection=[column1, column2]                         |
| physical_plan | SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]         |
|               |   SortExec: expr=[column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]                 |
|               |     ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]                |
|               |       CoalesceBatchesExec: target_batch_size=8192                                    |
|               |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)] |
|               |           CoalesceBatchesExec: target_batch_size=8192                                |
|               |             RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=1   |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                          |
|               |           CoalesceBatchesExec: target_batch_size=8192                                |
|               |             RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=1   |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                          |
|               |                                                                                      |
+---------------+--------------------------------------------------------------------------------------+
2 rows in set. Query took 0.022 seconds.

On branch-31, the result is:

+---------------+-----------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                          |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan  | Projection: u.column1, u.column2                                                              |
|               |   Sort: u.column1 ASC NULLS LAST, e.column2 ASC NULLS LAST                                    |
|               |     Projection: u.column1, u.column2, e.column2                                               |
|               |       Inner Join: u.column1 = e.column1                                                       |
|               |         SubqueryAlias: u                                                                      |
|               |           TableScan: users projection=[column1, column2]                                      |
|               |         SubqueryAlias: e                                                                      |
|               |           TableScan: employees projection=[column1, column2]                                  |
| physical_plan | ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]                             |
|               |   SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                |
|               |     SortExec: expr=[column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                        |
|               |       ProjectionExec: expr=[column1@0 as column1, column2@1 as column2, column2@3 as column2] |
|               |         CoalesceBatchesExec: target_batch_size=8192                                           |
|               |           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)]        |
|               |             CoalesceBatchesExec: target_batch_size=8192                                       |
|               |               RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=8          |
|               |                 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1          |
|               |                   MemoryExec: partitions=1, partition_sizes=[1]                               |
|               |             CoalesceBatchesExec: target_batch_size=8192                                       |
|               |               RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=8          |
|               |                 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1          |
|               |                   MemoryExec: partitions=1, partition_sizes=[1]                               |
|               |                                                                                               |
+---------------+-----------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.019 seconds.

The difference is ProjectionExec, on branch-33, the project wrongly excluded the e.column2, so the SortExec can’t sort by e.column2.

BTW one of the longer term discussions I would like to have at https://github.com/apache/arrow-datafusion/discussions/8152 and in other venues (I just haven’t had time to write it down yet) is how to improve the overall “process maturity” of datafusion – like @DDtKey points out that regressions should be prioritized, but at the moment we don’t really have a mechanism to do that (or, for example, hold the release for such regressions) other than by relying on one of us to catch it manually

@ozankabak thanks for pointing to the PR. Looks like I’ve missed that it has been merged prior to releasing 34.0.0 (and the issue has not been closed yet).

So that’s my wrong assumption, sorry (to be more clear, my test still fails, but due to another issue #7931, not related to this one, gonna check additionally - it used to work in 31) I tested MRE and this case works with the latest stable version Though, as it’s been mentioned we may have some underlying issues, but not related to this one.

note: I’m not talking about bugs in general, but about regressions, unfortunately they occur quite often and they are more dangerous, there is no trust in new versions

Thank you for bringing this up – I agree we need to prioritize regressions – I personally missed this particular bug as a regression and thought it was a pre-existing bug. I have updated the title to reflect this and created a new tag for regressions

cc @andygrove @viirya and @ozankabak

My initial solution:

.find_map(|(index, (projected_expr, alias))| {
  projected_expr.as_any().downcast_ref::<Column>().and_then(
      |projected_column| {
          (column.index() == projected_column.index()       <--- and index comparison
              && column.name().eq(projected_column.name()))
          .then(|| {
              state = RewriteState::RewrittenValid;
              Arc::new(Column::new(alias, index)) as _
          })
      },
  )
})

use name and index(the index is column index of input schema) to identify a column, should be under the assumption that the input schema of column’s plan and projection_column’s plan is the same. Otherwise, some projection that can be pushed down may become unable to be pushed down. And when the schema is the same, we can just use the index to identify a column.