distributed: Dask's Arrow serialization slow & memory intensive
I’m creating a dummy 80MB single-partition Dask distributed DataFrame, and attempting to convert it to a PyArrow Table.
Doing so causes a notebook to throw GC warnings, and takes consistently over 20 seconds.
Versions: PyArrow: 0.12.0 Dask: 1.1.1
Repro:
from dask.distributed import Client, wait, LocalCluster
import pyarrow as pa
ip = '0.0.0.0'
cluster = LocalCluster(ip=ip)
client = Client(cluster)
import dask.array as da
import dask.dataframe as dd
n_rows = 5000000
n_keys = 5000000
ddf = dd.concat([
da.random.random(n_rows).to_dask_dataframe(columns='x'),
da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()
def get_arrow(df):
return pa.Table.from_pandas(df)
%time arrow_tables = ddf.map_partitions(get_arrow).compute()
Result:
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 26% CPU time recently (threshold: 10%)
CPU times: user 20.6 s, sys: 1.17 s, total: 21.7 s
Wall time: 22.5 s
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 16 (13 by maintainers)
Yup, totally agree. My example above shows that this might be because we’re putting these things into Pandas series, and apparently putting an Arrow Table into a Pandas Series takes several seconds. Probably this isn’t a problem with serialization or communication on the Dask end, it’s rather that by using Dask dataframe you’re trying to keep these things around in Pandas, which is currently borking. (At least until @TomAugspurger refactors the Pandas constructor logic).
Short term the solution here is probably to avoid calling
map_partitions(pa.Table.from_pandas), which will try to form another dataframe object, and instead use Dask Delayed as @TomAugspurger suggests above:This will avoid trying to put PyArrow table objects in Pandas series, which seems to be the fundamental bug here.