arrow: [Python][Parquet] Parquet deserialization speeds slower on Linux

Describe the bug, including details regarding any error messages, version, and platform.

I’m debugging slow performance in Dask DataFrame and have tracked things down, I think, to slow parquet deserialization in PyArrow. Based on what I know of Arrow I expect to get GB/s and I’m getting more in the range of 100-200 MB/s. What’s more is that this seems to depend strongly on the environment (Linux / OSX) I’m using. I could use help tracking this down.

Experiment

I’ve isolated the performance difference down to the following simple experiment (notebook here):

# Create dataset
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import time
import io

x = np.random.randint(0, 100000, size=(1000000, 100))
df = pd.DataFrame(x)
t = pa.Table.from_pandas(df)


# Write to local parquet file

pq.write_table(t, "foo.parquet")


# Time Disk speeds

start = time.time()
with open("foo.parquet", mode="rb") as f:
    bytes = f.read()
    nbytes = len(bytes)
stop = time.time()
print("Disk Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")


# Time Arrow Parquet Speeds

start = time.time()
_ = pq.read_table("foo.parquet")
stop = time.time()
print("PyArrow Read Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")


# Time In-Memory Read Speeds

start = time.time()
pq.read_table(io.BytesIO(bytes))
stop = time.time()

print("PyArrow In-Memory Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")

Results

I’ve tried this on a variety of cloud machines (intel/arm, VMs/metal, 8-core/64-core, AWS/GCP) and they all get fast disk speeds (probably cached), but only about 150MB/s parquet deserialization speeds. I’ve tried this on two laptops, one a MBP and one a ThinkPad running Ubuntu and I get …

  • MacBookPro: 1GiB/s PyArrow deserialization performance (what I expect)
  • Ubuntu/Thinkpad: 150MB/s PyArrow deserialization

In all cases I’ve installed latest release, PyArrow 13 from conda-forge

Summary

I’m confused by this. I’ve seen Arrow go way faster than this. I’ve tried to isolate the problem as much as possible to identify something in my environment that is the cause, but I can’t. Everything seems to point to the conclusion that “PyArrow Parquet is just slow on Linux” which doesn’t make any sense to me.

I’d welcome any help. Thank you all for your work historically.

Component(s)

Parquet, Python

About this issue

  • Original URL
  • State: open
  • Created 8 months ago
  • Reactions: 1
  • Comments: 85 (59 by maintainers)

Commits related to this issue

Most upvoted comments

re: the pipelining/IO discussion, you may find the discussion here interesting: https://lists.apache.org/thread/cdfkm8oflm2zvd25yn4k6gh2o7pc9z88

Some (but not all) of those proposals were implemented in Arrow (“pre-buffering” primarily), though pre-buffering is probably not the ideal way to implement it (too much memory usage). One thing that didn’t make it was the global concurrency manager, which would have approximated priority by not actually issuing reads for a file until all reads for previous files have been issued (of course, this only makes sense if there’s an ordering between the files in the first place - not necessarily true for dataset)

That said, I believe datasets does parallelize at the row-group level already @mapleFU

That said, ~1 GB/s for uncompressed PLAIN-encoded fixed-width data is still very mediocre. I think this has to with the fact that pq.read_table concatenates the row groups together instead of building one more chunk per row group:

>>> tab = pq.read_table('lineitem-uncompressed.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'], columns=['l_or
...: derkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_shipdate', 'l_commitdate'])
>>> [{n: c.num_chunks} for n, c in zip(tab.column_names, tab.columns)]
[{'l_orderkey': 1},
 {'l_partkey': 1},
 {'l_suppkey': 1},
 {'l_linenumber': 1},
 {'l_shipdate': 1},
 {'l_commitdate': 1}]

If I deliberately read row groups separately for these PLAIN-encoded columns, I get almost twice the speed:

>>> %timeit pq.read_table('lineitem-uncompressed.pq', use_threads=False, read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'], columns=['l_
...: orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_shipdate', 'l_commitdate'])
67.1 ms ± 1.39 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

>>> f = pq.ParquetFile('lineitem-uncompressed.pq', read_dictionary=['l_returnflag', 'l_linestatus', 'l_shipinstruct', 'l_shipmode'])
>>> %timeit [f.read_row_group(i, use_threads=False, columns=['l_orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_shipdate', 'l_commitdate']) for i in range(f.nu
...: m_row_groups)]
36.6 ms ± 841 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Something weird is that most columns out of this file have a single chunk, even though the file has 21 row groups. This doesn’t look right:

>>> [(name, a.num_chunks) for name, a in zip(tab.column_names, tab.columns)]
[('l_orderkey', 1),
 ('l_partkey', 1),
 ('l_suppkey', 1),
 ('l_linenumber', 1),
 ('l_quantity', 1),
 ('l_extendedprice', 1),
 ('l_discount', 1),
 ('l_tax', 1),
 ('l_returnflag', 21),
 ('l_linestatus', 21),
 ('l_shipdate', 1),
 ('l_commitdate', 1),
 ('l_receiptdate', 1),
 ('l_shipinstruct', 21),
 ('l_shipmode', 21),
 ('l_comment', 1)]

>>> pf = pq.ParquetFile('~/arrow/data/lineitem/lineitem_0002072d-7283-43ae-b645-b26640318053.parquet')
>>> pf.metadata
<pyarrow._parquet.FileMetaData object at 0x7f236076dcb0>
  created_by: DuckDB
  num_columns: 16
  num_rows: 2568534
  num_row_groups: 21
  format_version: 1.0
  serialized_size: 29792

We have a public version here: s3://coiled-data/tpch/scale-1000/ That should be available with anonymous access

Sorry for being non-specific. I meant PyArrow Parquet performance specifically. My apologies for the lack of precision.

In particular, the configuration setting I’m using is

pq.ParquetFile(s3_filename, pre_buffer=True).read(
    columns=columns,
    use_threads=False,
    use_pandas_metadata=True,
)

As mentioned above on a 4 core machine when running this in parallel with 4-8 threads I can get up to about 200-250 MB/s. CPU utilization during this period is high. I can get around 500 MB/s S3 bandwidth on the same machine.

I’m also open to other APIs, but that has performed the best for me in my circumstances so far.

Also. The benchmark is great

From the paper ( https://www.durner.dev/app/media/papers/anyblob-vldb23.pdf ) perspective, the io-size also important.

And some machine might have different bandwidth, this should also be taken into account.

My intuition says that Arrow deserialization should be much faster than S3 download speeds, and so we can stop when we’re within ~80% of S3 download speeds

But this is Parquet deserialization, not Arrow deserialization 😃 Parquet features many encodings and compression schemes, most of which trade CPU overhead for disk footprint (and network transmission times).

You’re also looking at single-thread Parquet decoding speed here, but presumably you’re going to download and decode multiple files simultaneously, which will give you a multiple of the decoding speed and may reach S3 download limits.

(also, I don’t know what the expected download speeds from S3 are…)

Also shows, btw, that MiB/s can be a very misleading metric. In the end, you’re interested in reading values, not bytes, so I find it more useful to reason in terms of Mitems/s.

About (1) some optimization will be included later, see:

  1. https://github.com/apache/arrow/pull/37868 (This patch should be revisited )

Seems we can enable larger prefetch-depth to arrow fetching multiple files concurrently.