dask: da.map_blocks with drop_axis stopped working in dask 1.1.4

Hi!

First and foremost a big Thank You for the amazing work that you’re doing with dask! It’s a fantastic piece of software!

We’re using dask as parallelization engine for a large-scale data analysis package (still in early development). Our computational routines work on independent chunks of one big source data-set and then write to a single file (accessible to all workers). We used map_blocks for this in dask 1.0.0 and things worked great but unfortunately the same code does not work any more in dask 1.1.4. Here’s a MCVE:

import dask
import dask.array as da
import numpy as np

# in reality used to write `chk` to a (pre-formatted) file
def da_dummy(chk, block_info=None):
    idx = block_info[0]["chunk-location"][-1]
    #
    # use `idx` to store `chk` at correct location
    # 
    return idx

np_a = np.ones((10,50*20))
a = da.from_array(np_a, chunks=(10,20))

# works in dask 1.0.0, raises an IndexError in dask 1.1.4
result = a.map_blocks(da_dummy, dtype="int", chunks=(1,), drop_axis=[0])
res = result.compute()

In dask 1.0.0 this snippet produces an array res that looks like this:

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49])

In dask 1.1.4 the same code triggers an IndexError: index 2 is out of bounds for axis 0 with size 2, here the full traceback:

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
~/Documents/job/dask_fail.py in <module>
     15 
     16 # works in dask 1.0.0, raises an IndexError in dask 1.1.4
---> 17 result = a.map_blocks(da_dummy, dtype="int", chunks=(1,), drop_axis=[0])
     18 res = result.compute()
     19 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in map_blocks(self, func, *args, **kwargs)
   1569     @wraps(map_blocks)
   1570     def map_blocks(self, func, *args, **kwargs):
-> 1571         return map_blocks(func, self, *args, **kwargs)
   1572 
   1573     def map_overlap(self, func, depth, boundary=None, trim=True, **kwargs):

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in map_blocks(func, *args, **kwargs)
    534                                            for ij, j in enumerate(old_k[1:])],
    535                         'chunk-location': old_k[1:]}
--> 536                     for i in shapes}
    537 
    538             v = copy.copy(v)  # Need to copy and unpack subgraph callable

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in <dictcomp>(.0)
    534                                            for ij, j in enumerate(old_k[1:])],
    535                         'chunk-location': old_k[1:]}
--> 536                     for i in shapes}
    537 
    538             v = copy.copy(v)  # Need to copy and unpack subgraph callable

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in <listcomp>(.0)
    532                         'num-chunks': num_chunks[i],
    533                         'array-location': [(starts[i][ij][j], starts[i][ij][j + 1])
--> 534                                            for ij, j in enumerate(old_k[1:])],
    535                         'chunk-location': old_k[1:]}
    536                     for i in shapes}

IndexError: index 2 is out of bounds for axis 0 with size 2

A (desperate) attempt at somehow working around this was to not use drop_axis and instead leave the chunk-size as is and return a bogus tuple, but no luck either:


import dask
import dask.array as da
import numpy as np

np_a = np.ones((10,50*20))
a = da.from_array(np_a, chunks=(10,20))

# desperate workaround for dask 1.1.4 - still not working
def da_dummy2(chk, block_info=None):
    print(block_info) # show what's going on in here...
    idx = block_info[0]["chunk-location"][-1]
    return (idx, 1)
result = a.map_blocks(da_dummy2, dtype="int", chunks=(1, 1))
res = result.compute()

This gives an IndexError: tuple index out of range, full traceback here:

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
~/Documents/job/dask_fail.py in <module>
     20     return (idx, 1)
     21 result = a.map_blocks(da_dummy2, dtype="int", chunks=(1, 1))
---> 22 res = result.compute()
     23 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
    398     results = schedule(dsk, keys, **kwargs)
--> 399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 
    401 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/base.py in <listcomp>(.0)
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
    398     results = schedule(dsk, keys, **kwargs)
--> 399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 
    401 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in finalize(results)
    777     while isinstance(results2, (tuple, list)):
    778         if len(results2) > 1:
--> 779             return concatenate3(results)
    780         else:
    781             results2 = results2[0]

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in concatenate3(arrays)
   3495     if not ndim:
   3496         return arrays
-> 3497     chunks = chunks_from_arrays(arrays)
   3498     shape = tuple(map(sum, chunks))
   3499 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in chunks_from_arrays(arrays)
   3325 
   3326     while isinstance(arrays, (list, tuple)):
-> 3327         result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
   3328         arrays = arrays[0]
   3329         dim += 1

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in <listcomp>(.0)
   3325 
   3326     while isinstance(arrays, (list, tuple)):
-> 3327         result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
   3328         arrays = arrays[0]
   3329         dim += 1

IndexError: tuple index out of range

Any help with this is greatly appreciated. Thank you!

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 23 (17 by maintainers)

Commits related to this issue

Most upvoted comments

@jcrist @jakirkham if either of you have time can I ask you to look into this?

On Wed, Apr 3, 2019 at 5:20 AM Stefan Fuertinger notifications@github.com wrote:

Hi Tom!

I was wondering if you already had a chance to further investigate the root cause of the problem? Thank you!

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/4584#issuecomment-479429833, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszIXmiglgSVy4GLWIN7V0tj_2tXsEks5vdIB3gaJpZM4brWsE .