aws-sdk-pandas: Slow performance when using read_parquet from s3

Hi,

I would like to open an issue as we have seen quite unsatisfying performance using the read_parquet function. This is our setup and data below:

  • data is in S3, there are 1164 individual date-time prefixes under the main folder, and the total size of all files is barely 25.6 MB. So quite a lot of small individual files organized by individual prefixes by date-time
  • they way we gather these files is by passing path: s3://.../ and using the partition_filter. The function call looks like this:
wr.s3.read_parquet(
                path,
                dataset=True,
                partition_filter=filter(),
            )

I’ve run a couple of tests to verify whether there would be any speed improvement if I passed a list of prefixes for the function to combine instead of using the partition_filter but the gain was marginal. Enabling use_threads=True gave no improvement. Overall it takes around 13 minutes to collect all files… this is just too long. Downloading them with aws sync takes a few seconds.

Our main use case for operating on streams is in AWS Batch. We have some data loaders that use the data wrangler when we train our ML model in AWS Batch. We realized after some time that the main contributor to an extended training time, is the part where the data is collected from AWS using the data wrangler (primarily the wr.s3.read_parquet). Please also note that we’re not taking of big data here. Most of our use cases is like described above.

At the moment we’re wondering whether this can be optimized or if we should move away from the streaming approach, and simply download the data on the container for model training. Could you give some advice? What’s your take on that?

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 16 (8 by maintainers)

Most upvoted comments

@konradsemsch Happy to hear that 🙂 I’ve now also created a PR adressing the issue. It is now also possible to directly set the number of used threads via the use_threads parameter which might also be able to increase your speed performance even further 😉. e.g.

start_time = time.perf_counter()
df3 = wr.s3.read_parquet(s3_location, partition_filter=lambda x: x["day"] == 1, dataset=True, use_threads=16)
print(f"Loading one day with threads took {time.perf_counter() - start_time} seconds.")
# >>> Loading one day with threads took 4.47 seconds. (instead of ~9 seconds with just use_threads=True)

Even when using use_threads = True, loading and writing data with awswrangler is extraordinarily slow. I have data partitioned by day and awswrangler takes at least 10x longer to read data than directly loading the parquet files.

FWIW ran this through a fairly large dataset as well and saw a ~20% speedup and a 6% increase in memory usage 🎉

This was for a single partition that’s part of a parquet dataset, with chunked compressed files (12 chunks within the partition) on a machine with 48 CPUs and a 10Gbps network interface (r5.12xl).

Before:

peak memory: 273454.70 MiB, increment: 273326.68 MiB
CPU times: user 10min 22s, sys: 6min 5s, total: 16min 28s
Wall time: 12min 48s

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 110985989 entries, 0 to 110985988
Data columns (total 99 columns):
...
dtypes: Int32(1), Int64(54), category(1), datetime64[ns](2), float64(35), string(6)
memory usage: 123.0 GB

After:

peak memory: 290745.77 MiB, increment: 290646.48 MiB
CPU times: user 12min 28s, sys: 7min 55s, total: 20min 23s
Wall time: 10min 32s

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 110985989 entries, 0 to 110985988
Data columns (total 99 columns):
...
dtypes: Int32(1), Int64(54), category(1), datetime64[ns](2), float64(35), string(6)
memory usage: 123.0 GB

In which setup/environment are you using awswrangler. If it’s something like a small EC2 instance, Lambda, etc. which might only have 1 or 2 cpus, it might be worth setting the number of threads directly like:

    for path_prefix, paths in paths_to_merge.items():
        df_merged = wr.s3.read_parquet(path=paths, ignore_index=True, use_threads=8)
        merged_folder_name = f'merged-{folder}'
        merged_output_path = f'{path_prefix.replace(folder, merged_folder_name)}/{merged_parquet_filename}'
        print(f'saving merged file to s3 in path: {merged_output_path}')
        wr.s3.to_parquet(df=df_merged, path=merged_output_path)

@maxispeicher tnx, works 😃 general question: I don’t see improvement in terms of times, I suspect it is because having large amount of small files ~5KB and using wr.s3.read_parquet with list of paths 1-50 called from a loop - attached code:

    for path_prefix, paths in paths_to_merge.items():
        df_merged = wr.s3.read_parquet(path=paths, ignore_index=True, use_threads=True)
        merged_folder_name = f'merged-{folder}'
        merged_output_path = f'{path_prefix.replace(folder, merged_folder_name)}/{merged_parquet_filename}'
        print(f'saving merged file to s3 in path: {merged_output_path}')
        wr.s3.to_parquet(df=df_merged, path=merged_output_path)

can you suggest additional tunings for the wrangler?

@maxspeicher, I’ve slotted time to test this this week. I’ll report back.