datafusion: Hash partitioning not working properly

Describe the bug

Using Partitioning::Hash results in wrong partitions.

To Reproduce

  1. Create a new cargo project
  2. Create the data directory on the same level with the src directory
  3. Download the attached archive data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet.zip and unpack it in data directory
  4. Add the following dependencies in Cargo.toml
[dependencies]
tokio = "1.14"
arrow = "6.0"
datafusion = "6.0"
  1. Place the following code in the main.rs
use arrow::{
    record_batch::RecordBatch, 
    util::display::array_value_to_string};
use datafusion::{
    prelude::*, 
    error::Result, 
    logical_plan::count_distinct};


const SOURCE_DATA_PATH: &str = "./data/data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet";

#[tokio::main]
async fn main()  -> Result<()> {
    let mut ctx = ExecutionContext::new(); 

    // Define partitioning
    let partitioning_columns = vec![
        col("nlm_dimension_load_date"), 
        col("src_fuel_type")];

    // Read source parquet file
    let df = ctx.read_parquet(SOURCE_DATA_PATH).await?;

    // Manual partitioning
    let concat_cols = vec![
        col("nlm_dimension_load_date"), 
        lit(" / "), 
        col("src_fuel_type")];

    // Get info for the given partitions
    let df_info = df
        .aggregate(
            partitioning_columns.clone(), 
            vec![
                count_distinct(concat(&concat_cols[..])).alias("c"), 
                count(concat(&concat_cols[..])).alias("rows_in_part")
            ])?;
    
    println!("");
    println!("Data distribution by partition");
    println!("==============================");
    println!("");
    df_info.show().await?;
    println!("");

    // Determine the total number of partitions
    let num_partitions_rec = df_info
        .aggregate(
            vec![col("c")], 
            vec![sum(col("c")).alias("total")])?
        .select(
            vec![col("total")])?
        .collect().await?;
        
    // Should be 72
    let total_partitions = array_value_to_string(
        num_partitions_rec
            .iter()
            .filter(|b| b.num_rows() > 0)
            .collect::<Vec<&RecordBatch>>()[0]
            .column(0), 
        0
    )?.parse::<usize>().unwrap();

    println!("Total number of partitions: {}", total_partitions);
    println!("==============================");
    println!("");

    // Partition the dataframe with hash partitioning with 72 desired partitions
    let partitioned_df = df
        .repartition(Partitioning::Hash(
            partitioning_columns, 
            total_partitions))?; // 72

    // Collected partitions
    let collected_partitions = partitioned_df
        .collect_partitioned()
        .await?;

    println!("Collected partitions");
    println!("====================");
    println!("");
    
    collected_partitions.iter().enumerate().for_each(|(i, p)| {
        let mut num_rows: usize = 0;
        for i in p {
            num_rows += i.num_rows();
        }
        println!("Partition={:0>2}, rows_in_part={}", i, num_rows);
    });

    Ok(())
}
  1. Execute it with cargo run.
  2. The result is bellow:
Data distribution by partition
==============================

+-------------------------+---------------+---+--------------+
| nlm_dimension_load_date | src_fuel_type | c | rows_in_part |
+-------------------------+---------------+---+--------------+
| 2020-10-18T00:29:41Z    | Steam         | 1 | 14           |
| 2021-06-09T00:32:40Z    | Gas           | 1 | 5            |
| 2020-10-18T00:29:41Z    | Electric      | 1 | 89           |
| 2021-06-09T22:15:47Z    | Gas           | 1 | 35           |
| 2020-10-18T00:46:52Z    | Gas           | 1 | 2            |
| 2021-06-09T00:43:47Z    | Gas           | 1 | 20           |
| 2021-06-09T00:32:40Z    | Electric      | 1 | 304          |
| 2021-06-09T22:15:47Z    | Steam         | 1 | 9            |
| 2021-06-09T00:26:40Z    | Electric      | 1 | 160          |
| 2020-10-18T00:04:58Z    | Electric      | 1 | 36           |
| 2021-06-09T00:13:39Z    | Electric      | 1 | 47           |
| 2020-10-18T00:29:41Z    | Gas           | 1 | 1            |
| 2021-06-09T00:41:20Z    | Steam         | 1 | 4            |
| 2020-10-18T00:35:16Z    | Electric      | 1 | 131          |
| 2020-10-18T00:18:03Z    | Steam         | 1 | 8            |
| 2021-06-09T00:23:35Z    | Gas           | 1 | 4            |
| 2021-06-09T00:20:35Z    | Electric      | 1 | 54           |
| 2021-06-09T00:41:20Z    | Gas           | 1 | 8            |
| 2021-06-09T22:15:47Z    | Electric      | 1 | 943          |
| 2020-10-18T00:23:47Z    | Gas           | 1 | 2            |
| 2021-06-09T00:32:40Z    | Steam         | 1 | 2            |
| 2020-10-18T00:46:52Z    | Electric      | 1 | 246          |
| 2021-06-09T00:43:47Z    | Electric      | 1 | 410          |
| 2020-10-18T00:23:47Z    | Steam         | 1 | 10           |
| 2021-06-09T00:35:51Z    | Gas           | 1 | 10           |
| 2020-10-18T00:35:16Z    | Steam         | 1 | 27           |
| 2020-10-18T00:18:03Z    | Electric      | 1 | 88           |
| 2021-06-09T00:20:35Z    | Steam         | 1 | 2            |
| 2021-06-09T00:26:40Z    | Gas           | 1 | 7            |
| 2021-06-09T00:38:39Z    | Electric      | 1 | 256          |
| 2020-10-18T00:06:36Z    | Electric      | 1 | 136          |
| 2020-10-18T00:40:11Z    | Electric      | 1 | 209          |
| 2021-06-09T00:02:37Z    | Steam         | 1 | 18           |
| 2020-10-18T00:18:03Z    | Gas           | 1 | 1            |
| 2021-06-09T00:29:31Z    | Steam         | 1 | 7            |
| 2021-06-09T00:16:33Z    | Electric      | 1 | 54           |
| 2021-06-09T00:35:51Z    | Electric      | 1 | 479          |
| 2020-10-18T00:52:38Z    | Steam         | 1 | 7            |
| 2020-10-18T00:12:55Z    | Gas           | 1 | 3            |
| 2021-06-09T00:08:27Z    | Electric      | 1 | 472          |
| 2021-06-08T23:55:20Z    | Electric      | 1 | 234          |
| 2021-06-09T00:29:31Z    | Gas           | 1 | 10           |
| 2021-06-09T00:02:37Z    | Gas           | 1 | 13           |
| 2020-10-18T00:12:55Z    | Steam         | 1 | 9            |
| 2020-10-18T00:52:38Z    | Electric      | 1 | 259          |
| 2021-06-09T00:08:27Z    | Gas           | 1 | 15           |
| 2021-06-08T23:55:20Z    | Gas           | 1 | 7            |
| 2021-06-09T00:35:51Z    | Steam         | 1 | 2            |
| 2020-10-18T00:04:58Z    | Steam         | 1 | 5            |
| 2021-06-09T00:41:20Z    | Electric      | 1 | 203          |
| 2021-06-09T00:23:35Z    | Electric      | 1 | 56           |
| 2021-06-09T00:26:40Z    | Steam         | 1 | 5            |
| 2021-06-09T00:13:39Z    | Steam         | 1 | 2            |
| 2020-10-18T00:12:55Z    | Electric      | 1 | 86           |
| 2021-06-09T00:08:27Z    | Steam         | 1 | 22           |
| 2020-10-18T00:06:36Z    | Gas           | 1 | 2            |
| 2021-06-09T00:16:33Z    | Gas           | 1 | 6            |
| 2021-06-09T00:38:39Z    | Gas           | 1 | 19           |
| 2021-06-08T23:55:20Z    | Steam         | 1 | 13           |
| 2020-10-18T00:40:11Z    | Gas           | 1 | 1            |
| 2020-10-18T00:23:47Z    | Electric      | 1 | 114          |
| 2020-10-18T00:52:38Z    | Gas           | 1 | 2            |
| 2020-10-18T00:46:52Z    | Steam         | 1 | 9            |
| 2021-06-09T00:43:47Z    | Steam         | 1 | 4            |
| 2020-10-18T00:40:11Z    | Steam         | 1 | 48           |
| 2021-06-09T00:02:37Z    | Electric      | 1 | 571          |
| 2020-10-18T00:35:16Z    | Gas           | 1 | 2            |
| 2021-06-09T00:29:31Z    | Electric      | 1 | 223          |
| 2020-10-18T00:06:36Z    | Steam         | 1 | 10           |
| 2021-06-09T00:38:39Z    | Steam         | 1 | 7            |
| 2021-06-09T00:20:35Z    | Gas           | 1 | 4            |
| 2021-06-09T00:16:33Z    | Steam         | 1 | 1            |
+-------------------------+---------------+---+--------------+

Total number of partitions: 72
==============================

Collected partitions
====================

Partition=00, rows_in_part=6
Partition=01, rows_in_part=7
Partition=02, rows_in_part=23
Partition=03, rows_in_part=0
Partition=04, rows_in_part=0
Partition=05, rows_in_part=208
Partition=06, rows_in_part=8
Partition=07, rows_in_part=227
Partition=08, rows_in_part=236
Partition=09, rows_in_part=0
Partition=10, rows_in_part=2
Partition=11, rows_in_part=2
Partition=12, rows_in_part=0
Partition=13, rows_in_part=63
Partition=14, rows_in_part=88
Partition=15, rows_in_part=0
Partition=16, rows_in_part=584
Partition=17, rows_in_part=0
Partition=18, rows_in_part=10
Partition=19, rows_in_part=0
Partition=20, rows_in_part=0
Partition=21, rows_in_part=0
Partition=22, rows_in_part=61
Partition=23, rows_in_part=7
Partition=24, rows_in_part=32
Partition=25, rows_in_part=7
Partition=26, rows_in_part=114
Partition=27, rows_in_part=29
Partition=28, rows_in_part=0
Partition=29, rows_in_part=0
Partition=30, rows_in_part=6
Partition=31, rows_in_part=231
Partition=32, rows_in_part=0
Partition=33, rows_in_part=18
Partition=34, rows_in_part=0
Partition=35, rows_in_part=943
Partition=36, rows_in_part=0
Partition=37, rows_in_part=0
Partition=38, rows_in_part=131
Partition=39, rows_in_part=260
Partition=40, rows_in_part=0
Partition=41, rows_in_part=479
Partition=42, rows_in_part=0
Partition=43, rows_in_part=4
Partition=44, rows_in_part=0
Partition=45, rows_in_part=203
Partition=46, rows_in_part=27
Partition=47, rows_in_part=0
Partition=48, rows_in_part=10
Partition=49, rows_in_part=2
Partition=50, rows_in_part=0
Partition=51, rows_in_part=304
Partition=52, rows_in_part=35
Partition=53, rows_in_part=40
Partition=54, rows_in_part=0
Partition=55, rows_in_part=64
Partition=56, rows_in_part=19
Partition=57, rows_in_part=0
Partition=58, rows_in_part=0
Partition=59, rows_in_part=0
Partition=60, rows_in_part=89
Partition=61, rows_in_part=5
Partition=62, rows_in_part=8
Partition=63, rows_in_part=137
Partition=64, rows_in_part=0
Partition=65, rows_in_part=259
Partition=66, rows_in_part=656
Partition=67, rows_in_part=0
Partition=68, rows_in_part=19
Partition=69, rows_in_part=0
Partition=70, rows_in_part=0
Partition=71, rows_in_part=621
  1. The following WRONG things can be observer:
    • some of the collected partitions are empty, they have no rows
    • the number of rows in the collected partitions do no match the source data

Expected behavior

Collected partition after using the Partitioning::Hash to match the desired partitioning.

Additional context

cargo 1.58.0-nightly (294967c53 2021-11-29) rustc 1.59.0-nightly (efec54529 2021-12-04)

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 15 (6 by maintainers)

Commits related to this issue

Most upvoted comments

Isn’t this more a problem that you have to specify the number of partitions ahead of time, and if your partition size is a smaller number than the number of distinct combinations of the expressions you’re hashing against, you’ll end up with multiple variations of your expression set within the same partition?

It sounds like what you’re asking for is the Spark equivalent of partitionBy(...cols), so this issue might be better thought of as a feature request for a new logical partitioning scheme.

That’s right, I agree 💯.

partition now works like Spark partition, where equal values end up in the same group/partition, but there might be multiple distinct values per partition or none at all for a certain partition.

partitionBy and improved writing/table partition support seems the way to go for this kind of feature.