datafusion: Unable to "GROUP BY" udf result

Describe the bug When i try to add a udf (to_date), I am able to do the following:

  • SELECT to_date("updatedAt") as u from ‘table’;
  • SELECT DISTINCT to_date("updatedAt") as u from ‘table’;
  • SELECT to_date("updatedAt") as u from ‘table’ where to_date("updatedAt") != ‘2023-03-04’;

But I am NOT able to do the following

  • SELECT to_date(\"updatedAt\") as u from 'table' group by u;
  • SELECT to_date(\"updatedAt\") as u from 'table' group by to_date(\"updatedAt\");

To Reproduce example updatedAt looks like: “2023-03-05T05:01:15.274000+00:00”

  • SELECT to_date(\"updatedAt\") as u from 'table' group by u;

to_date udf using chrono crate

pub fn to_date(args: &[ArrayRef]) -> DFResult<ArrayRef> {
    if args.is_empty() || args.len() > 1 {
        return Err(DataFusionError::Internal(format!(
            "to_date was called with {} arguments. It requires only 1.",
            args.len()
        )));
    }
    let arg = &args[0].as_any().downcast_ref::<StringArray>().unwrap();
    let mut builder = Date32Builder::new();
    let date_string: &str = arg.value(0);
    let date_time = match DateTime::parse_from_rfc3339(date_string) {
        Ok(dt) => dt,
        Err(e) => {
            return Result::Err(DataFusionError::Internal(e.to_string()));
        }
    };
    builder.append_value((date_time.timestamp() / 86400) as i32);
    Ok(Arc::new(builder.finish()))
}

Expected behavior Expecting it to group the dates like this: SELECT u FROM (SELECT to_date("updatedAt") as u from ‘table’) group by u;

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 28 (17 by maintainers)

Most upvoted comments

Thanks, @jiangzhx it’s working!

But I am curious about the batching format, why do the record batches look like this?

It’s because the ‘target partitions’ whose default value is determined by core num. Setting target partitions to 1 will make the final batches vector only have 1 batch @BubbaJoe .

    let mut session_cfg = SessionConfig::new();
    session_cfg = session_cfg.with_target_partitions(1);
    let ctx = SessionContext::with_config(session_cfg);

I have no idea, maybe it’s the default partitions? I’ll dive into this then.

Sorry for the horrible description. Actually my suggestion is

select distinct to_date(...) from t 

In my opinion, group by usually accompanies by an aggr func. If not, it’s equivalent to distinct.

The to_date only returns 1 value per input array, is it as expected? cc @BubbaJoe @doki23

The current error message is confusing, maybe we should add some checks here.

Maybe we can compare the length of group_values and batch_hashes, and if not equal then give some clear exception information. https://github.com/apache/arrow-datafusion/blob/3ccf1aebb6959fbc6bbbf74d2821522ddfd7d484/datafusion/core/src/physical_plan/aggregates/row_hash.rs#L330-L335

The to_date only returns 1 value per input array, is it as expected?