dask: Pyarrow metadata `RuntimeError` in `to_parquet`

Offline a user reported getting RuntimeError: file metadata is only available after writer close when writing a Dask DataFrame to parquet with our pyarrow engine. The traceback they were presented with was:

Traceback (most recent call last):
  File "example.py", line 349, in <module>
    main(date_dict, example_conf)
  File "example.py", line 338, in main
    make_example_datasets(
  File "example.py", line 311, in make_example_datasets
    default_to_parquet(sub_ddf, v["path"], engine="pyarrow", overwrite=True)
  File "example.py", line 232, in default_to_parquet
    ddf.to_parquet(path=path, engine=engine, overwrite=overwrite, write_metadata_file=False)
  File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py", line 4453, in to_parquet
    return to_parquet(self, path, *args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 721, in to_parquet
    out = out.compute(**compute_kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py", line 286, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py", line 568, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 2743, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 2020, in gather
    return self.sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 861, in sync
    return sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/opt/conda/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 1885, in _gather
    raise exception.with_traceback(traceback)
  File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py", line 947, in write_partition
    pq.write_table(
  File "/opt/conda/lib/python3.8/site-packages/pyarrow/parquet.py", line 1817, in write_table
    writer.write_table(table, row_group_size=row_group_size)
  File "/opt/conda/lib/python3.8/site-packages/pyarrow/parquet.py", line 662, in __exit__
    self.close()
  File "/opt/conda/lib/python3.8/site-packages/pyarrow/parquet.py", line 684, in close
    self._metadata_collector.append(self.writer.metadata)
  File "pyarrow/_parquet.pyx", line 1434, in pyarrow._parquet.ParquetWriter.metadata.__get__
RuntimeError: file metadata is only available after writer close

cc @rjzamora in case you’ve seen this before or have an idea of what might be causing this

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Reactions: 1
  • Comments: 26 (13 by maintainers)

Most upvoted comments

Thanks for the reproducer! I can reproduce it with the above dask example, but if I try to extract the relevant pyarrow example, I don’t see the failure:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

df = pd.DataFrame({"time": pd.date_range("2022-01-01", "2022-01-02", periods=500)})
table = pa.table(df)

metadata_collector = []

with open("test_invalid.parquet", "wb") as fil:
    pq.write_table(table, fil, coerce_timestamps="us", allow_truncated_timestamps=False, metadata_collector=metadata_collector)

(I get the correct error about “Casting from timestamp[ns] to timestamp[us] would lose data”, and the metadata_collector actually gets filled with a FileMetaData object)

Would the fact that it is executed in threads when using dask influence it somehow?

So if it fixes the error for you, we can certainly apply the patch. But it would be nice to have a reproducer for our own test suite as well that doesn’t rely on dask.

Unfortunately not, I thought I had it, and it went away again…

I tried a small reproducer with a public S3 bucket for which I don’t have write permissions:

import pyarrow as pa
import pyarrow.parquet as pq
import s3fs

table = pa.table({'a': [1, 2, 3]})
fs = s3fs.S3FileSystem(anon=True)
md_list = []

with fs.open("ursa-labs-taxi-data/data.parquet", "wb") as f:
    with pq.ParquetWriter(f, table.schema, metadata_collector=md_list) as writer:
        writer.write_table(table)
        # small edit to the example, don't call metadata here, since we didn't yet close the writer this always error
        # meta = writer.writer.metadata

for the latest s3fs that gives a “PermissionError: Access Denied” / “ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied”. But trying the older s3fs 0.4.2 as you listed, I see the RuntimeError as well (although also AccessDenied).

Full traceback
In [9]: with fs.open("ursa-labs-taxi-data/data.parquet", "wb") as f:
   ...:     with pq.ParquetWriter(f, table.schema) as writer:
   ...:         writer.write_table(table)
   ...:         meta = writer.writer.metadata
   ...: 
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-9-60f024fa1700> in <module>
      3         writer.write_table(table)
----> 4         meta = writer.writer.metadata
      5 

~/scipy/repos/arrow/python/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetWriter.metadata.__get__()

RuntimeError: file metadata is only available after writer close

During handling of the above exception, another exception occurred:

ClientError                               Traceback (most recent call last)
<ipython-input-9-60f024fa1700> in <module>
      2     with pq.ParquetWriter(f, table.schema) as writer:
      3         writer.write_table(table)
----> 4         meta = writer.writer.metadata
      5 

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/fsspec/spec.py in __exit__(self, *args)
   1600 
   1601     def __exit__(self, *args):
-> 1602         self.close()

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/fsspec/spec.py in close(self)
   1567         else:
   1568             if not self.forced:
-> 1569                 self.flush(force=True)
   1570 
   1571             if self.fs is not None:

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/fsspec/spec.py in flush(self, force)
   1438                 raise
   1439 
-> 1440         if self._upload_chunk(final=force) is not False:
   1441             self.offset += self.buffer.seek(0, 2)
   1442             self.buffer = io.BytesIO()

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/s3fs/core.py in _upload_chunk(self, final)
   1250 
   1251         if self.autocommit and final:
-> 1252             self.commit()
   1253         return not final
   1254 

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/s3fs/core.py in commit(self)
   1265                 self.buffer.seek(0)
   1266                 data = self.buffer.read()
-> 1267                 write_result = self._call_s3(
   1268                     self.fs.s3.put_object,
   1269                     Key=self.key, Bucket=self.bucket, Body=data, **self.kwargs

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *kwarglist, **kwargs)
   1128 
   1129     def _call_s3(self, method, *kwarglist, **kwargs):
-> 1130         return self.fs._call_s3(method, self.s3_additional_kwargs, *kwarglist,
   1131                                 **kwargs)
   1132 

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
    198         additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist,
    199                                                        **kwargs)
--> 200         return method(**additional_kwargs)
    201 
    202     def _get_s3_method_kwargs(self, method, *akwarglist, **kwargs):

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    355                     "%s() only accepts keyword arguments." % py_operation_name)
    356             # The "self" in this scope is referring to the BaseClient.
--> 357             return self._make_api_call(operation_name, kwargs)
    358 
    359         _api_call.__name__ = str(py_operation_name)

~/miniconda3/envs/arrow-dev/lib/python3.8/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    674             error_code = parsed_response.get("Error", {}).get("Code")
    675             error_class = self.exceptions.from_code(error_code)
--> 676             raise error_class(parsed_response, operation_name)
    677         else:
    678             return parsed_response

ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied

@kinghuang could you try with a more recent version of s3fs to see if that gives a more informative error message? You mentioned https://github.com/dask/dask/issues/6782 as a reason to have pinned s3fs to 0.4.2, but I think those issues should be resolved in the latest versions of s3fs / fsspec / pyarrow.