pangeo-forge-recipes: Dask executor is broken with dask upstream
In our Dask executor, we create a high-level graph as follows:
Our upstream test (dask commit 725110f9367931291a3e68c9d582544cdb032f77) has revealed the following error, triggered by the line
delayed = Delayed(prev_token[0], hlg)
self = Delayed('finalize_target-8dfa49acbe8cb65c6a2f82ebca37350d')
key = 'finalize_target-8dfa49acbe8cb65c6a2f82ebca37350d'
dsk = HighLevelGraph with 5 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f338c0b3280>
0. config
1. cache_input
2. prepare_target
3. store_chunk
4. finalize_target
length = None, layer = None
def __init__(self, key, dsk, length=None, layer=None):
self._key = key
self._dask = dsk
self._length = length
# NOTE: Layer is used by `to_delayed` in other collections, but not in normal Delayed use
self._layer = layer or key
if isinstance(dsk, HighLevelGraph) and self._layer not in dsk.layers:
> raise ValueError(
f"Layer {self._layer} not in the HighLevelGraph's layers: {list(dsk.layers)}"
)
E ValueError: Layer finalize_target-8dfa49acbe8cb65c6a2f82ebca37350d not in the HighLevelGraph's layers: ['config', 'cache_input', 'prepare_target', 'store_chunk', 'finalize_target']
It looks like Dask does not like how we are creating the Delayed object. Is this a dask regression? Or do we need to change our code.
@TomAugspurger - any insights here?
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 19 (19 by maintainers)
Yes, this code was previously silently creating invalid Delayed objects; now doing so is an error.
From skimming the code, I think you could do
Delayed(prev_token[0], hlg, layer=prev_token[0].split("-")[0]). Or, the more appropriate thing to do might be to use the same names for HLG layers as you’re using for keys. I’m not sure what the purpose is of appending a token to the keys, but not to the layer names—could the layer names just have tokens too?Seems like that check was added in https://github.com/dask/dask/pull/8452 (cc @gjoseph92).
Maybe we need a similar fix as https://github.com/dask/dask-ml/pull/898/files#diff-db396642d72db7ff2df3330275627030696e1ae2dbd1972b86b881529cd519b0R131-R134, by providing the layer
namewhen creating the delayed?