xarray: Is there any way of having `.map_blocks` be even more opaque to dask?
Is your feature request related to a problem?
Currently I have a workload which does something a bit like:
ds = open_zarr(source)
(
ds.assign(
x=ds.foo * ds.bar
y=ds.foo + ds.bar
).to_zarr(dest)
)
(the actual calc is a bit more complicated! And while I don’t have a MVCE of the full calc, I pasted a task graph below)
Dask — while very impressive in many ways — handles this extremely badly, because it attempts to load the whole of ds
into memory before writing out any chunks. There are lots of issues on this in the dask repo; it seems like an intractable problem for dask.
Describe the solution you’d like
I was hoping to make the internals of this task opaque to dask, so it became a much dumber task runner — just map over the blocks, running the function and writing the result, block by block. I thought I had some success with .map_blocks
last week — the internals of the calc are now opaque at least. But the dask cluster is falling over again, I think because the write is seen as a separate task.
Is there any way to make the write more opaque too?
Describe alternatives you’ve considered
I’ve built a homegrown thing which is really hacky which does this on a custom scheduler — just runs the functions and writes with region
. I’d much prefer to use & contribute to the broader ecosystem…
Additional context
(It’s also possible I’m making some basic error — and I do remember it working much better last week — so please feel free to direct me / ask me for more examples, if this doesn’t ring true)
About this issue
- Original URL
- State: closed
- Created 8 months ago
- Comments: 23 (12 by maintainers)
Yes, indeed. That includes already the fix. However, I’m working on a follow up that should work even better here https://github.com/dask/dask/pull/10557
At least for your MCVE this new PR works as intended. The following graphs show the prioritization of tasks / the order in which we run tasks.
main / 2023.10.0
Note how all tasks after the
0
in the center are effectively scheduled consecutively before anything else runs. Those tasks are loading data and this is hurting you.New PR
The tasks are only loaded when necessary
The PR in question has a couple of performance issues to figure out but it would help a lot if you could try it out on your real problem and let me know if this is indeed helping you. FWIW the performance problems of this PR would show up as a longer delay before your computation kicks off. However, judging by the graph I’m seeing you may not even be impacted by this.
I really appreciate that you put in the effort to create the minimal example. We can put this into our benchmark suite and add a unit test to
dask/dask
with this. This is very helpful!I apologize that this is almost off-topic but I just merged https://github.com/dask/dask/pull/10660 which should fix the performance problems that triggered this conversation. From what I can tell this fixes the issue and I believe the new approach is more robust to your problems. If you encounter more issues like this, please let me know. I believe with the new implementation we can be more responsive to those issues.
This looks like a big improvement @fjetter ! Thank you for all your hard work here, user experience should be improved greatly by these efforts.
I’m still confused by why we can’t just dispatch to
map_blocks
but I’ve split that question off into a separate issue (https://github.com/pydata/xarray/issues/8545), so that we can close this as hopefully addressing @max-sixty 's original problem.I still don’t understand what’s stopping us rewriting
xarray.map_blocks
in terms of passing the underlying arrays todask.array.map_blocks
. If we did that then swapping out to try cubed would be trivial. I must be missing something obvious?Isn’t this pretty much the model of xarray-beam?
This is basically the same as https://github.com/pangeo-data/distributed-array-examples/issues/2 which @fjetter tried to fix with https://github.com/dask/dask/pull/10535
EDIT: And yeah the viz was misleading. It’s not a shuffle, it’s blockwise ops on different arrays, but only purely blockwise if you’re unchunked on
y
OK, here’s an MCVE. It requires more specificity than I expected — I think my read that things were working last week was correct actually, because the calcs were slightly different. It relies on the nested calcs, such that
return d
doesn’t trigger the issue.(though that was the beauty of
.map_blocks
for me — that it made the graph opaque to dask, and small changes in the inputs couldn’t cause catastrophic runtime impact)Here’s the graph — you can see the lack of basic parallelism that @dcherian pointed out above — though obv less severe than the big example above:
Eeeek, sounds like this is a common issue then… Without wanting to blast
@
s anytime anything is a problem, I’ll politely CC @martindurant & @mrocklin for info, as I count four alternatives that folks have built, within a few hours of me posting the issue…(Goes without saying that I am very happy to be wrong, not trying to throw stones at the hard work your team has done)
That would work well, but it needs the indexes, and that’s effectively prevented by https://github.com/pydata/xarray/issues/8409, because we get >1GB task graphs.
Yes, good observation.
Speculatively — I think this is splitting the source read by data variable, then shuffling to do the calc, then writing each destination data variable.
If that’s correct, then another approach could be to try and keep the tasks for each block together, rather than organizing by each data variable.
I’ll try and get to checking whether that assertion is correct.
I’m very up for checking this out, and some of the team pointed me towards it (hence my DM last week @TomNicholas !). I am way over on trying new things vs getting work done, and I would need to set up an env for running cubed.
I’d also like to try beam!