dask: Read_parquet is slower than expected with S3
I was looking at a read_parquet profile with @th3ed @ncclementi and @gjoseph92
Looking at this performance report: https://raw.githubusercontent.com/coiled/h2o-benchmarks/main/performance-reports-pyarr_str-50GB/q1_50GB_pyarr.html I see the following analysis (two minute video): https://www.loom.com/share/4c8ad1c5251a4e658c1c47ee2113f34a
We’re spending only about 20-25% of our time reading from S3, and about 5% of our time converting data to Pandas. We’re spending a lot of our time doing something else.
@gjoseph92 took a look at this with pyspy and generated reports like the following: tls-10_0_0_177-42425.json
I’m copying a note from him below:
What you’ll see from this is that pyarrow isn’t doing the actual reads. Because dask uses s3fs, the C++ arrow code has to call back into Python for each read. Ultimately, the reads are actually happening on the fsspec event loop (see the
fsspecIOthread in profiles). If we look there, about 40% of CPU time is spent waiting for something (aka data from S3, good), but 60% is spent doing stuff in Python (which I’d consider overhead, to some degree).
We can also see that 30% of the total time is spent blocking on Python’s GIL (all the
pthread_cond_timedwaits) (look at the functions calling into this and the corresponding lines in the Python source if you don’t believe me; they’re allPy_END_ALLOW_THREADS). This is an issue known as the convoy effect: https://bugs.python.org/issue7946, https://github.com/dask/distributed/issues/6325.
My takeaway is that using fsspec means dask is using Python for reads, which might be adding significant overhead / reducing parallelism due to the GIL.
I’d be interested in doing a comparison by hacking together a version that bypasses fsspec, and uses pyarrow’s native S3FileSystem directly. Before that though, it might be good to get some baseline numbers on how fast we can pull the raw data from S3 (just as bytes), to understand what performance we can expect.
FYI I also tried https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/, but it was ~2x slower. Haven’t tried repeating that though, so not sure if it’s a real result.
One other thing I find surprising is that polars appears to be using fsspec for reads as well, rather than the native S3FileSystem or GCSFileSystem: https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/io.py#L949-L956 https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/internals/io.py#L114-L121
I would have expected polars and dask read performance to be closer in this case. We should probably confirm for ourselves that they’re not.
It looks like we could make things a lot faster. I’m curious about the right steps to isolate the problem further.
cc’ing @martindurant @rjzamora @ritchie46 @fjetter
About this issue
- Original URL
- State: open
- Created 2 years ago
- Comments: 51 (43 by maintainers)
That solved my problem, no longer seeing the error in my use case. Thanks @jrbourbeau !
@dbalabka , thanks for your report. It would be very useful to know more details about your workflow and setup. How many threads versus processes are you using, for example? How big are your typical chunks in bytes?
If you turn on the “s3fs” logger, you can also know what calls it is making. You suggest there are too many, so we need to know which could in theory be eliminated. Could excessive calls by themselves lead to your slowdown due to AWS’ throttling?
Even if uvloop solved the problem I would still push for this change. Many people don’t use uvloop and if we can give those people a 2x speedup for presumably no cost then we should.
You might also be underestimating the importance of this speed boost. We’ve just discovered a way for Dask to go 2x faster on very common workloads. This is HUGE for a lot of users.
It’s worth turning on logging
creating dataframe:
(4 head calls, 4 get calls, all the same range)
and read
(fetched 266MB in 16 serial, but unordered calls in 30s, including parse time, which is near my available bandwidth)
When loading with pyarrow’s FS using the snippet above, I get
The time to download the whole file with s3fs in a single continuous call is 27.7s.
takes 29s with s3fs, with two concurrent reads (35s if actually parsing the data)
Please run these on your machines closer to the data! Note that Rick’s blog ( https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/ ) specifically measured s3fs with various caching versus arrow’s FS versus fsspec.parquet. I would appreciate some deeper benchmarking and consideration before jumping to conclusions.
Note that you should already be able to do this by passing
open_file_options={"open_file_func": <pyarrow-file-open-func>}todd.read_parquet. For example:Using
fs.open_input_filedoes cut my wall time by ~50% for this simple example.