dask: Read_parquet is slower than expected with S3

I was looking at a read_parquet profile with @th3ed @ncclementi and @gjoseph92

Looking at this performance report: https://raw.githubusercontent.com/coiled/h2o-benchmarks/main/performance-reports-pyarr_str-50GB/q1_50GB_pyarr.html I see the following analysis (two minute video): https://www.loom.com/share/4c8ad1c5251a4e658c1c47ee2113f34a

We’re spending only about 20-25% of our time reading from S3, and about 5% of our time converting data to Pandas. We’re spending a lot of our time doing something else.

@gjoseph92 took a look at this with pyspy and generated reports like the following: tls-10_0_0_177-42425.json

I’m copying a note from him below:

What you’ll see from this is that pyarrow isn’t doing the actual reads. Because dask uses s3fs, the C++ arrow code has to call back into Python for each read. Ultimately, the reads are actually happening on the fsspec event loop (see the fsspecIO thread in profiles). If we look there, about 40% of CPU time is spent waiting for something (aka data from S3, good), but 60% is spent doing stuff in Python (which I’d consider overhead, to some degree).

We can also see that 30% of the total time is spent blocking on Python’s GIL (all the pthread_cond_timedwaits) (look at the functions calling into this and the corresponding lines in the Python source if you don’t believe me; they’re all Py_END_ALLOW_THREADS). This is an issue known as the convoy effect: https://bugs.python.org/issue7946, https://github.com/dask/distributed/issues/6325.

My takeaway is that using fsspec means dask is using Python for reads, which might be adding significant overhead / reducing parallelism due to the GIL.

I’d be interested in doing a comparison by hacking together a version that bypasses fsspec, and uses pyarrow’s native S3FileSystem directly. Before that though, it might be good to get some baseline numbers on how fast we can pull the raw data from S3 (just as bytes), to understand what performance we can expect.

FYI I also tried https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/, but it was ~2x slower. Haven’t tried repeating that though, so not sure if it’s a real result.

One other thing I find surprising is that polars appears to be using fsspec for reads as well, rather than the native S3FileSystem or GCSFileSystem: https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/io.py#L949-L956 https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/internals/io.py#L114-L121

I would have expected polars and dask read performance to be closer in this case. We should probably confirm for ourselves that they’re not.

It looks like we could make things a lot faster. I’m curious about the right steps to isolate the problem further.

cc’ing @martindurant @rjzamora @ritchie46 @fjetter

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 51 (43 by maintainers)

Most upvoted comments

Thanks for the feedback @the-matt-morris. I’m curious if you run into the same problem if you use the new filesystem= keyword argument in the read_parquet call?

That solved my problem, no longer seeing the error in my use case. Thanks @jrbourbeau !

@dbalabka , thanks for your report. It would be very useful to know more details about your workflow and setup. How many threads versus processes are you using, for example? How big are your typical chunks in bytes?

If you turn on the “s3fs” logger, you can also know what calls it is making. You suggest there are too many, so we need to know which could in theory be eliminated. Could excessive calls by themselves lead to your slowdown due to AWS’ throttling?

Even if uvloop solved the problem I would still push for this change. Many people don’t use uvloop and if we can give those people a 2x speedup for presumably no cost then we should.

You might also be underestimating the importance of this speed boost. We’ve just discovered a way for Dask to go 2x faster on very common workloads. This is HUGE for a lot of users.

It’s worth turning on logging

fsspec.utils.setup_logging(logger_name="s3fs")

creating dataframe:

2022-11-03 20:47:02,664 - s3fs - DEBUG - _lsdir -- Get directory listing page for ursa-labs-taxi-data/2009/01/data.parquet
2022-11-03 20:47:02,937 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:02,993 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,047 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:47:03,048 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,162 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461900991
2022-11-03 20:47:03,162 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461900990', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,324 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,384 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,454 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:47:03,455 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,524 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461900991
2022-11-03 20:47:03,525 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461900990', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}

(4 head calls, 4 get calls, all the same range)

and read

2022-11-03 20:48:11,558 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:48:11,816 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:48:11,817 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:11,922 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461966519
2022-11-03 20:48:11,922 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461966518', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:12,089 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 4-33347731
2022-11-03 20:48:12,090 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=4-33347730', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:14,017 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 266106048-299556487
2022-11-03 20:48:14,017 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=266106048-299556486', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:15,890 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 299556576-332631576
2022-11-03 20:48:15,891 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=299556576-332631575', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:17,532 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 332631643-366119909
2022-11-03 20:48:17,533 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=332631643-366119908', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:19,814 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 366119998-399192441
2022-11-03 20:48:19,815 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=366119998-399192440', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:21,581 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 399192508-432738114
2022-11-03 20:48:21,582 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=399192508-432738113', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:23,680 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 432738203-461642359
2022-11-03 20:48:23,680 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=432738203-461642358', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:25,287 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 166582521-199685105
2022-11-03 20:48:25,288 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=166582521-199685104', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:27,760 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 33347817-66443866
2022-11-03 20:48:27,762 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=33347817-66443865', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:29,462 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 99952366-133117918
2022-11-03 20:48:29,462 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=99952366-133117917', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:31,172 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 199685171-233041894
2022-11-03 20:48:31,173 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=199685171-233041893', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:32,837 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 66443929-99952280
2022-11-03 20:48:32,838 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=66443929-99952279', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:34,515 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 133117983-166582432
2022-11-03 20:48:34,517 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=133117983-166582431', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:36,159 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 233041983-266105981
2022-11-03 20:48:36,170 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=233041983-266105980', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}

(fetched 266MB in 16 serial, but unordered calls in 30s, including parse time, which is near my available bandwidth)

When loading with pyarrow’s FS using the snippet above, I get

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/_fs.pyx:763, in pyarrow._fs.FileSystem.open_input_file()

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/error.pxi:144, in pyarrow.lib.pyarrow_internal_check_status()

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/error.pxi:115, in pyarrow.lib.check_status()

OSError: When reading information for key '2009/01/data.parquet' in bucket 'ursa-labs-taxi-data': AWS Error [code 100]: No response body.

The time to download the whole file with s3fs in a single continuous call is 27.7s.

fs = fsspec.filesystem("s3", anon=True)
%time fs.cat(path)
fsspec.parquet.open_parquet_file(path, storage_options={"anon": True}, engine="pyarrow")

takes 29s with s3fs, with two concurrent reads (35s if actually parsing the data)

Please run these on your machines closer to the data! Note that Rick’s blog ( https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/ ) specifically measured s3fs with various caching versus arrow’s FS versus fsspec.parquet. I would appreciate some deeper benchmarking and consideration before jumping to conclusions.

I’d be interested in doing a comparison by hacking together a version that bypasses fsspec, and uses pyarrow’s native S3FileSystem directly. Before that though, it might be good to get some baseline numbers on how fast we can pull the raw data from S3 (just as bytes), to understand what performance we can expect.

Note that you should already be able to do this by passing open_file_options={"open_file_func": <pyarrow-file-open-func>} to dd.read_parquet. For example:

import dask.dataframe as dd
import pyarrow as pa
import pyarrow.fs as pa_fs

path = "s3://ursa-labs-taxi-data/2009/01/data.parquet"
fs = pa_fs.S3FileSystem(anonymous=True)

ddf = dd.read_parquet(
    path,
    engine="pyarrow",
    storage_options={"anon": True},
    open_file_options={
        "open_file_func": fs.open_input_file,
    },
)

ddf.partitions[0].compute()

Using fs.open_input_file does cut my wall time by ~50% for this simple example.