distributed: Workers stuck, increased memory usage while processing large CSV from S3.
I’m processing a dataframe stored as a (relatively) large CSV on S3. Using distributed scheduler with multiprocessing (1 thread per 1 worker process, --no-nanny). Workers seem to be accumulating data and getting stuck, in some cases this also leads to failure of whole job.
I came up with minimal reproducing example as below (only read/write CSV)
frame = df.read_csv(input_url,
collection=True,
blocksize=1024*1024,
compression=None,
lineterminator='\n',
dtype=str,
sep=',',
quotechar='"',
encoding='utf-8')
fun_list = frame.to_csv(output_url,
compute=False,
encoding='utf-8',
index=False,
index_label=False)
futures = client.compute(fun_list)
progress(futures)
client.gather(futures)
This would hang forever with progress at 0%.
In worker log:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 7.16 GB -- Worker memory limit: 8.02 GB
The file itself is only 1.2GB though. Using distributed 1.19.2 and dask 0.15.4
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Reactions: 12
- Comments: 32 (12 by maintainers)
Just commenting, disabling the chain assigment pandas option made my ETL job go from running out of memory after 90 minutes to taking 17 minutes! I think we can close this issue since its related to pandas (and thanks @jeffreyliu a year and a half later for your comment!)
Yes, that seemed to be the issue. This thread helped get it to work.
Turning off the pandas option:
pd.options.mode.chained_assignment = Noneallows the dataframe to load in a reasonable amount of time.With default 8GB limit it seems to get stuck on read_block
My current idea is that it gets stuck when reading the data and looking for record terminators.