dask: Expand not working when spliting string column

Trying to split a string column using a delimiter raises exception. Here is a minimal example:

df = pd.DataFrame({'channel_hash': ['1&2&3']})
ddf = dd.from_pandas(df, npartitions=1)
ddf.channel_hash.str.split('&', expand=True, n=3)

This fails with the following exception:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-98-fe4c812d7362> in <module>
----> 1 ddf.channel_hash.str.split(pat='&', expand=True, n=3).compute()

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

~/virtualenvs/statistics3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2508                     should_rejoin = False
   2509             try:
-> 2510                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2511             finally:
   2512                 for f in futures.values():

~/virtualenvs/statistics3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1799                 direct=direct,
   1800                 local_worker=local_worker,
-> 1801                 asynchronous=asynchronous,
   1802             )
   1803 

~/virtualenvs/statistics3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    762             return future
    763         else:
--> 764             return sync(self.loop, func, *args, **kwargs)
    765 
    766     def __repr__(self):

~/virtualenvs/statistics3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    329             e.wait(10)
    330     if error[0]:
--> 331         six.reraise(*error[0])
    332     else:
    333         return result[0]

~/virtualenvs/statistics3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/virtualenvs/statistics3/lib/python3.6/site-packages/distributed/utils.py in f()
    314             if timeout is not None:
    315                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 316             result[0] = yield future
    317         except Exception as exc:
    318             error[0] = sys.exc_info()

~/virtualenvs/statistics3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/virtualenvs/statistics3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/virtualenvs/statistics3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1656                             exc = CancelledError(key)
   1657                         else:
-> 1658                             six.reraise(type(exception), exception, traceback)
   1659                         raise exc
   1660                     if errors == "skip":

~/virtualenvs/statistics3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/optimization.py in __call__()
   1057         if not len(args) == len(self.inkeys):
   1058             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
-> 1059         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
   1060 
   1061     def __reduce__(self):

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/core.py in get()
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/compatibility.py in apply()
    105     def apply(func, args, kwargs=None):
    106         if kwargs:
--> 107             return func(*args, **kwargs)
    108         else:
    109             return func(*args)

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce()
   4780             ):
   4781                 raise ValueError(
-> 4782                     "The columns in the computed data do not match"
   4783                     " the columns in the provided metadata"
   4784                 )

ValueError: The columns in the computed data do not match the columns in the provided metadata

Tried splitting without expansion but still raises the following exception:

---------------------------------------------------------------------------
UnboundLocalError                         Traceback (most recent call last)
<ipython-input-99-a5b359767346> in <module>
----> 1 ddf.channel_hash.str.split(pat='&' ).compute()

~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/dataframe/accessor.py in split(self, pat, n, expand)
    148                 meta = type(self._series._meta)([" ".join(["a"] * 2 * n)])
    149                 meta = meta.str.split(n=n, expand=expand, pat=pat)
--> 150         return self._function_map("split", pat=pat, n=n, expand=expand, meta=meta)
    151 
    152     @derived_from(pd.core.strings.StringMethods)

UnboundLocalError: local variable 'meta' referenced before assignment

Python version: 3.6.8 Dask-version: 2.0.0

About this issue

  • Original URL
  • State: open
  • Created 5 years ago
  • Comments: 17 (14 by maintainers)

Most upvoted comments

Hey , is there a current workaround for this? @avlahop I need to split a string in pandas series which I know maximum no. of occurences of the delimiter.