datafusion: Out of memory when sorting

Describe the bug Original bug was filed against Python bindings: https://github.com/apache/arrow-datafusion-python/issues/157

Describe the bug try a sort and export a parquet file using Colab generate an Out of memory error

To Reproduce

!curl -L 'https://drive.google.com/uc?export=download&id=18gv0Yd_a-Zc7CSolol8qeYVAAzSthnSN&confirm=t' > lineitem.parquet
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_parquet('lineitem', 'lineitem.parquet')
df = ctx.sql("select * from lineitem order by l_shipdate")
df.write_parquet("lineitem_Datafusion.parquet")

Expected behavior I expected to use only the available memory

here is the link comparing the same using Polars and DuckDB https://colab.research.google.com/drive/1pfAPpIG7jpvGB_aHj-PXX66vRaRT0xlj#scrollTo=O8-lyg1y6RT2

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 20 (19 by maintainers)

Most upvoted comments

@DDtKey I can look into your case next week.

The channel refactoring I did could increase memory usage if some child node of the repartition node starts to buffer data in some uncontrolled manner. Also it changes scheduling a bit so some edge case could now show up that was hidden before.

And probably related to usage of unbounded channels.

Interestingly, the point of #4867 was in fact to remove the unbounded channels. I am looking more into this issue

As I already said, my case it’s not about sorting, so sorry for referring to it in this issue. It’s somehow related to join-repartitions and channels (because it started to happen after getting rid of unbounded ones). I like that unbounded channels were refactored/removed, but somehow it introduced an issue that I faced with.

It returns correct error with 16.0.0 & just don’t respect memory limit with 17.0.0. However I can’t reproduce it on small file sizes with lower mem-limits (it works correctly for both versions - returns error), so I think this might be related to buffering.

So the code is attached above in the my message, and I’m attaching file (about 1.3 GiB) - GDrive link. It’s actually artificial case, but it shows the some kind of regression is exists.

@comphead likely it’s different issues. Probably make sense to create another issue with this description

JFY: in my example there is no explicit ordering at all, it’s only about repartitions.

It doesn’t respect memory pool for sure in my cases 🤔 And probably related to usage of unbounded channels.

Thanks for the reference, I’ll check

This behavior(at least my case described above) were introduced here (a9ddcd3a7558437361835120659b946b903468e1, PR link).

Before - it returned Resources exhausted when I used memory-pool and currently the memory usage grows up to OOM.

It could be reproduced with similar code: UPD: a bit more easier way to repro this is described in #5162

   let ctx = SessionContext::with_config_rt(
        SessionConfig::default(),
        Arc::new(
            RuntimeEnv::new(
                RuntimeConfig::new()
                    .with_memory_pool(Arc::new(FairSpillPool::new(4 * 1024 * 1024 * 1024))),
            )
            .unwrap(),
        ),
    );
    
    // I can share the file - it's kind of random data, but not sure what I can use to do that. 
    // However, it's reproducible for any files which joins could lead to a large result file (> memory pool limit).
    ctx.register_csv("hr", file_path, CsvReadOptions::default())
        .await?;
        
    // 4 joins - just to represent a problem
    let data_frame = ctx
        .sql(
            r#"
        SELECT hr1."Emp_ID"
        from hr hr1 
        left join hr hr2 on hr1."Emp_ID" = hr2."Emp_ID" 
        left join hr hr3 on hr2."Emp_ID" = hr3."Emp_ID" 
        left join hr hr4 on hr3."Emp_ID" = hr4."Emp_ID"
    "#,
        )
        .await?;
        
        data_frame
        .write_csv(output_path)
        .await?;

Additional notes

It’s could be useful to mention - memory consumption are jumping for 3rd join. For 2 it works totally fine and even don’t return an error. This query working fine with optimizer.repartition_joins = false, it just don’t consume so much memory without repartitions for joins (but, it will take much more time for sure).