datafusion: Hash partitioning not working properly
Describe the bug
Using Partitioning::Hash
results in wrong partitions.
To Reproduce
- Create a new cargo project
- Create the
data
directory on the same level with thesrc
directory - Download the attached archive data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet.zip and unpack it in
data
directory - Add the following dependencies in
Cargo.toml
[dependencies]
tokio = "1.14"
arrow = "6.0"
datafusion = "6.0"
- Place the following code in the
main.rs
use arrow::{
record_batch::RecordBatch,
util::display::array_value_to_string};
use datafusion::{
prelude::*,
error::Result,
logical_plan::count_distinct};
const SOURCE_DATA_PATH: &str = "./data/data-dimension-vehicle-20210609T222533Z-vehicle-electric_gas_steam_3cols.parquet";
#[tokio::main]
async fn main() -> Result<()> {
let mut ctx = ExecutionContext::new();
// Define partitioning
let partitioning_columns = vec![
col("nlm_dimension_load_date"),
col("src_fuel_type")];
// Read source parquet file
let df = ctx.read_parquet(SOURCE_DATA_PATH).await?;
// Manual partitioning
let concat_cols = vec![
col("nlm_dimension_load_date"),
lit(" / "),
col("src_fuel_type")];
// Get info for the given partitions
let df_info = df
.aggregate(
partitioning_columns.clone(),
vec![
count_distinct(concat(&concat_cols[..])).alias("c"),
count(concat(&concat_cols[..])).alias("rows_in_part")
])?;
println!("");
println!("Data distribution by partition");
println!("==============================");
println!("");
df_info.show().await?;
println!("");
// Determine the total number of partitions
let num_partitions_rec = df_info
.aggregate(
vec![col("c")],
vec![sum(col("c")).alias("total")])?
.select(
vec![col("total")])?
.collect().await?;
// Should be 72
let total_partitions = array_value_to_string(
num_partitions_rec
.iter()
.filter(|b| b.num_rows() > 0)
.collect::<Vec<&RecordBatch>>()[0]
.column(0),
0
)?.parse::<usize>().unwrap();
println!("Total number of partitions: {}", total_partitions);
println!("==============================");
println!("");
// Partition the dataframe with hash partitioning with 72 desired partitions
let partitioned_df = df
.repartition(Partitioning::Hash(
partitioning_columns,
total_partitions))?; // 72
// Collected partitions
let collected_partitions = partitioned_df
.collect_partitioned()
.await?;
println!("Collected partitions");
println!("====================");
println!("");
collected_partitions.iter().enumerate().for_each(|(i, p)| {
let mut num_rows: usize = 0;
for i in p {
num_rows += i.num_rows();
}
println!("Partition={:0>2}, rows_in_part={}", i, num_rows);
});
Ok(())
}
- Execute it with
cargo run
. - The result is bellow:
Data distribution by partition
==============================
+-------------------------+---------------+---+--------------+
| nlm_dimension_load_date | src_fuel_type | c | rows_in_part |
+-------------------------+---------------+---+--------------+
| 2020-10-18T00:29:41Z | Steam | 1 | 14 |
| 2021-06-09T00:32:40Z | Gas | 1 | 5 |
| 2020-10-18T00:29:41Z | Electric | 1 | 89 |
| 2021-06-09T22:15:47Z | Gas | 1 | 35 |
| 2020-10-18T00:46:52Z | Gas | 1 | 2 |
| 2021-06-09T00:43:47Z | Gas | 1 | 20 |
| 2021-06-09T00:32:40Z | Electric | 1 | 304 |
| 2021-06-09T22:15:47Z | Steam | 1 | 9 |
| 2021-06-09T00:26:40Z | Electric | 1 | 160 |
| 2020-10-18T00:04:58Z | Electric | 1 | 36 |
| 2021-06-09T00:13:39Z | Electric | 1 | 47 |
| 2020-10-18T00:29:41Z | Gas | 1 | 1 |
| 2021-06-09T00:41:20Z | Steam | 1 | 4 |
| 2020-10-18T00:35:16Z | Electric | 1 | 131 |
| 2020-10-18T00:18:03Z | Steam | 1 | 8 |
| 2021-06-09T00:23:35Z | Gas | 1 | 4 |
| 2021-06-09T00:20:35Z | Electric | 1 | 54 |
| 2021-06-09T00:41:20Z | Gas | 1 | 8 |
| 2021-06-09T22:15:47Z | Electric | 1 | 943 |
| 2020-10-18T00:23:47Z | Gas | 1 | 2 |
| 2021-06-09T00:32:40Z | Steam | 1 | 2 |
| 2020-10-18T00:46:52Z | Electric | 1 | 246 |
| 2021-06-09T00:43:47Z | Electric | 1 | 410 |
| 2020-10-18T00:23:47Z | Steam | 1 | 10 |
| 2021-06-09T00:35:51Z | Gas | 1 | 10 |
| 2020-10-18T00:35:16Z | Steam | 1 | 27 |
| 2020-10-18T00:18:03Z | Electric | 1 | 88 |
| 2021-06-09T00:20:35Z | Steam | 1 | 2 |
| 2021-06-09T00:26:40Z | Gas | 1 | 7 |
| 2021-06-09T00:38:39Z | Electric | 1 | 256 |
| 2020-10-18T00:06:36Z | Electric | 1 | 136 |
| 2020-10-18T00:40:11Z | Electric | 1 | 209 |
| 2021-06-09T00:02:37Z | Steam | 1 | 18 |
| 2020-10-18T00:18:03Z | Gas | 1 | 1 |
| 2021-06-09T00:29:31Z | Steam | 1 | 7 |
| 2021-06-09T00:16:33Z | Electric | 1 | 54 |
| 2021-06-09T00:35:51Z | Electric | 1 | 479 |
| 2020-10-18T00:52:38Z | Steam | 1 | 7 |
| 2020-10-18T00:12:55Z | Gas | 1 | 3 |
| 2021-06-09T00:08:27Z | Electric | 1 | 472 |
| 2021-06-08T23:55:20Z | Electric | 1 | 234 |
| 2021-06-09T00:29:31Z | Gas | 1 | 10 |
| 2021-06-09T00:02:37Z | Gas | 1 | 13 |
| 2020-10-18T00:12:55Z | Steam | 1 | 9 |
| 2020-10-18T00:52:38Z | Electric | 1 | 259 |
| 2021-06-09T00:08:27Z | Gas | 1 | 15 |
| 2021-06-08T23:55:20Z | Gas | 1 | 7 |
| 2021-06-09T00:35:51Z | Steam | 1 | 2 |
| 2020-10-18T00:04:58Z | Steam | 1 | 5 |
| 2021-06-09T00:41:20Z | Electric | 1 | 203 |
| 2021-06-09T00:23:35Z | Electric | 1 | 56 |
| 2021-06-09T00:26:40Z | Steam | 1 | 5 |
| 2021-06-09T00:13:39Z | Steam | 1 | 2 |
| 2020-10-18T00:12:55Z | Electric | 1 | 86 |
| 2021-06-09T00:08:27Z | Steam | 1 | 22 |
| 2020-10-18T00:06:36Z | Gas | 1 | 2 |
| 2021-06-09T00:16:33Z | Gas | 1 | 6 |
| 2021-06-09T00:38:39Z | Gas | 1 | 19 |
| 2021-06-08T23:55:20Z | Steam | 1 | 13 |
| 2020-10-18T00:40:11Z | Gas | 1 | 1 |
| 2020-10-18T00:23:47Z | Electric | 1 | 114 |
| 2020-10-18T00:52:38Z | Gas | 1 | 2 |
| 2020-10-18T00:46:52Z | Steam | 1 | 9 |
| 2021-06-09T00:43:47Z | Steam | 1 | 4 |
| 2020-10-18T00:40:11Z | Steam | 1 | 48 |
| 2021-06-09T00:02:37Z | Electric | 1 | 571 |
| 2020-10-18T00:35:16Z | Gas | 1 | 2 |
| 2021-06-09T00:29:31Z | Electric | 1 | 223 |
| 2020-10-18T00:06:36Z | Steam | 1 | 10 |
| 2021-06-09T00:38:39Z | Steam | 1 | 7 |
| 2021-06-09T00:20:35Z | Gas | 1 | 4 |
| 2021-06-09T00:16:33Z | Steam | 1 | 1 |
+-------------------------+---------------+---+--------------+
Total number of partitions: 72
==============================
Collected partitions
====================
Partition=00, rows_in_part=6
Partition=01, rows_in_part=7
Partition=02, rows_in_part=23
Partition=03, rows_in_part=0
Partition=04, rows_in_part=0
Partition=05, rows_in_part=208
Partition=06, rows_in_part=8
Partition=07, rows_in_part=227
Partition=08, rows_in_part=236
Partition=09, rows_in_part=0
Partition=10, rows_in_part=2
Partition=11, rows_in_part=2
Partition=12, rows_in_part=0
Partition=13, rows_in_part=63
Partition=14, rows_in_part=88
Partition=15, rows_in_part=0
Partition=16, rows_in_part=584
Partition=17, rows_in_part=0
Partition=18, rows_in_part=10
Partition=19, rows_in_part=0
Partition=20, rows_in_part=0
Partition=21, rows_in_part=0
Partition=22, rows_in_part=61
Partition=23, rows_in_part=7
Partition=24, rows_in_part=32
Partition=25, rows_in_part=7
Partition=26, rows_in_part=114
Partition=27, rows_in_part=29
Partition=28, rows_in_part=0
Partition=29, rows_in_part=0
Partition=30, rows_in_part=6
Partition=31, rows_in_part=231
Partition=32, rows_in_part=0
Partition=33, rows_in_part=18
Partition=34, rows_in_part=0
Partition=35, rows_in_part=943
Partition=36, rows_in_part=0
Partition=37, rows_in_part=0
Partition=38, rows_in_part=131
Partition=39, rows_in_part=260
Partition=40, rows_in_part=0
Partition=41, rows_in_part=479
Partition=42, rows_in_part=0
Partition=43, rows_in_part=4
Partition=44, rows_in_part=0
Partition=45, rows_in_part=203
Partition=46, rows_in_part=27
Partition=47, rows_in_part=0
Partition=48, rows_in_part=10
Partition=49, rows_in_part=2
Partition=50, rows_in_part=0
Partition=51, rows_in_part=304
Partition=52, rows_in_part=35
Partition=53, rows_in_part=40
Partition=54, rows_in_part=0
Partition=55, rows_in_part=64
Partition=56, rows_in_part=19
Partition=57, rows_in_part=0
Partition=58, rows_in_part=0
Partition=59, rows_in_part=0
Partition=60, rows_in_part=89
Partition=61, rows_in_part=5
Partition=62, rows_in_part=8
Partition=63, rows_in_part=137
Partition=64, rows_in_part=0
Partition=65, rows_in_part=259
Partition=66, rows_in_part=656
Partition=67, rows_in_part=0
Partition=68, rows_in_part=19
Partition=69, rows_in_part=0
Partition=70, rows_in_part=0
Partition=71, rows_in_part=621
- The following WRONG things can be observer:
- some of the collected partitions are empty, they have no rows
- the number of rows in the collected partitions do no match the source data
Expected behavior
Collected partition after using the Partitioning::Hash
to match the desired partitioning.
Additional context
cargo 1.58.0-nightly (294967c53 2021-11-29)
rustc 1.59.0-nightly (efec54529 2021-12-04)
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 15 (6 by maintainers)
Commits related to this issue
- Add support for PartitionBy functionality https://github.com/apache/arrow-datafusion/issues/1404 — committed to andrei-ionescu/arrow-datafusion by andrei-ionescu 3 years ago
- Add support for PartitionBy functionality https://github.com/apache/arrow-datafusion/issues/1404 — committed to andrei-ionescu/arrow-datafusion by andrei-ionescu 3 years ago
That’s right, I agree 💯.
partition
now works like Sparkpartition
, where equal values end up in the same group/partition, but there might be multiple distinct values per partition or none at all for a certain partition.partitionBy
and improved writing/table partition support seems the way to go for this kind of feature.