arrow: [Python] Only convert in parallel for the ConsolidatedBlockCreator class for large data

Describe the enhancement requested

The Consolidated Block Creator runs the column conversion in parallel, creating a Scalar Memo Table for each column up until pa.cpu_count().

For performance reasons, jemalloc and mimalloc maintain allocations on a per-memory segment level to reduce contention between threads.

What this means is that if a user calls table.to_pandas(split_blocks=False) for a small table, a disproportionately large amount of memory gets allocated to build the Scalar Memo Table. Both jemalloc and mimalloc will essentially allocate a chunk of memory per column.

Here is some code:

import pyarrow as pa

def test_memory_usage():
    table = pa.table({'A': 'a', 'B': 'b', 'C': 'c', 'D': 'd', 'E': 'e', 'F': 'f', 'G': 'g'}) #'H':'h', 'I': 'i', 'J': 'j', 'K':'k', 'L':'l', 'M':'m', 'N':'n', 'O':'o', 'P':'p', 'Q':'q', 'R':'r', 'S':'s', 'T':'t', 'U':'u', 'V':'v', 'W':'w', 'X':'x','Y':'y','Z':'z'})
    df = table.to_pandas()

if __name__ == '__main__':
    test_memory_usage()

Here are the resulting memory allocations summarised with memray:

jemalloc with 7 columns:

πŸ“¦ Total memory allocated:
        178.127MB

πŸ“Š Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 4.000B   :  3548 β–‡β–‡
        < 24.000B  :  1253 β–‡
        < 119.000B : 14686 β–‡β–‡β–‡β–‡β–‡β–‡β–‡
        < 588.000B :  3746 β–‡β–‡
        < 2.827KB  : 58959 β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡
        < 13.924KB :  2550 β–‡β–‡
        < 68.569KB :   403 β–‡
        < 337.661KB:    84 β–‡
        < 1.624MB  :    24 β–‡
        <=7.996MB  :    20 β–‡
        --------------------------------------------
        max: 7.996MB

jemalloc with 26 columns:

πŸ“¦ Total memory allocated:
        238.229MB

πŸ“Š Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 4.000B   :  3545 β–‡β–‡
        < 24.000B  :  1627 β–‡
        < 119.000B : 15086 β–‡β–‡β–‡β–‡β–‡β–‡β–‡
        < 588.000B :  3882 β–‡β–‡
        < 2.828KB  : 58971 β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡
        < 13.929KB :  2552 β–‡β–‡
        < 68.593KB :   403 β–‡
        < 337.794KB:    84 β–‡
        < 1.625MB  :    24 β–‡
        <=8.000MB  :    47 β–‡
        --------------------------------------------
        max: 8.000MB

mimalloc with 7 columns:

πŸ“¦ Total memory allocated:
        380.166MB

πŸ“Š Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 6.000B   :  3548 β–‡β–‡
        < 36.000B  :  7470 β–‡β–‡β–‡β–‡
        < 222.000B :  9524 β–‡β–‡β–‡β–‡β–‡
        < 1.319KB  : 57271 β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡
        < 7.999KB  :  6492 β–‡β–‡β–‡
        < 48.503KB :   775 β–‡
        < 294.066KB:   150 β–‡
        < 1.741MB  :    26 β–‡
        < 10.556MB :     1 β–‡
        <=64.000MB :     4 β–‡
        --------------------------------------------
        max: 64.000MB

mimalloc with 26 columns:

πŸ“¦ Total memory allocated:
        1.434GB

πŸ“Š Histogram of allocation size:
        min: 1.000B
        --------------------------------------------
        < 6.000B   :  3545 β–‡β–‡
        < 36.000B  :  7845 β–‡β–‡β–‡β–‡
        < 222.000B : 10001 β–‡β–‡β–‡β–‡β–‡
        < 1.319KB  : 57332 β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡β–‡
        < 7.999KB  :  6501 β–‡β–‡β–‡
        < 48.503KB :   794 β–‡
        < 294.066KB:   150 β–‡
        < 1.741MB  :    26 β–‡
        < 10.556MB :     1 β–‡
        <=64.000MB :    21 β–‡
        --------------------------------------------
        max: 64.000MB

You can see how dramatically the memory increases even for a very small table.

My proposal is that we only do the conversion in parallel when it might make a substantial performance difference for a table of a certain size. I’m not quite sure which size, but once the code has been refactored, we can run experiments to come to a data-informed decision.

Component(s)

C++

About this issue

  • Original URL
  • State: closed
  • Created 4 months ago
  • Comments: 22 (19 by maintainers)

Commits related to this issue

Most upvoted comments

We ran benchmarks on @felipecrv’s ParallelFor code and unfortunately the results weren’t promising. With @pitrou’s explanation on how to interpret the memory allocation results, and learning more about how the ThreadPool works, having a ParallelFor for one step of Arrow analysis doesn’t significantly reduce memory usage. And the memory usage isn’t as high as I was previously concerned it was.

I’m going to go ahead and close this issue for now! Thanks everyone for helping me look into this. @jorisvandenbossche @felipecrv and @pitrou ❀️

Note that it’s not really spinning up new threads, unless it’s the first time the thread pool is being used πŸ˜ƒ

So before trying to add heuristics for parallelization, perhaps we should investigate this?

Different people take different approaches to problems. I saw ParallelFor very naively posting small tasks to a ThreadPool and came up with a version that doesn’t do that.

You saw this problem and decided to fix how the profiler measures memory consumption. Which is a valid strategy, but not the only one, and certainly an even bigger rabbit hole for me personally if I were to take it.