dask: dropna() is not working propertly

I’m trying to drop na values using dask, but I’m not able to do it. I keep on getting the same error message. I just updated to the lastest versions of pandas and dask, that were realeased earlier today.

Information for reproducing the error.

  • OS: Microsoft Windows 10 Pro
  • OS Version: 10.0.17763
  • dask: 2.10.1
  • pandas: 1.0.0

Get the data from www.kaggle.com/new-york-city/nyc-parking-tickets

import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import numpy as np

dtypes = {
    'Date First Observed': np.str,
    'Days Parking In Effect    ': np.str,
    'Double Parking Violation': np.str,
    'Feet From Curb': np.float32,
    'From Hours In Effect': np.str,
    'House Number': np.str,
    'Hydrant Violation': np.str,
    'Intersecting Street': np.str,
    'Issue Date': np.str,
    'Issuer Code': np.float32,
    'Issuer Command': np.str,
    'Issuer Precinct': np.float32,
    'Issuer Squad': np.str,
    'Issuing Agency': np.str,
    'Law Section': np.float32,
    'Meter Number': np.str,
    'No Standing or Stopping Violation': np.str,
    'Plate ID': np.str,
    'Plate Type': np.str,
    'Registration State': np.str,
    'Street Code1': np.uint32,
    'Street Code2': np.uint32,
    'Street Code3': np.uint32,
    'Street Name': np.str,
    'Sub Division': np.str,
    'Summons Number': np.uint32,
    'Time First Observed': np.str,
    'To Hours In Effect': np.str,
    'Unregistered Vehicle?': np.str,
    'Vehicle Body Type': np.str,
    'Vehicle Color': np.str,
    'Vehicle Expiration Date': np.str,
    'Vehicle Make': np.str,
    'Vehicle Year': np.float32,
    'Violation Code': np.uint16,
    'Violation County': np.str,
    'Violation Description': np.str,
    'Violation In Front Of Or Opposite': np.str,
    'Violation Legal Code': np.str,
    'Violation Location': np.str,
    'Violation Post Code': np.str,
    'Violation Precinct': np.float32,
    'Violation Time': np.str
}

nyc_data_raw = dd.read_csv('*.csv', dtype=dtypes, usecols=dtypes.keys()).rename(columns= lambda x: x.lower().strip().replace(' ', '_'))
nyc_data_raw.head()

missing_values = nyc_data_raw.isnull().sum()

with ProgressBar():
    percent_missing = ((missing_values / nyc_data_raw.index.size) * 100).compute()

columns_to_drop = list(percent_missing[percent_missing >= 50].index)
nyc_data_clean_stage1 = nyc_data_raw.drop(columns_to_drop, axis=1)

with ProgressBar():
    count_of_vehicle_colors = nyc_data_clean_stage1['vehicle_color'].value_counts().compute()

most_common_color = count_of_vehicle_colors.sort_values(ascending=False).index[0]

nyc_data_clean_stage2 = nyc_data_clean_stage1.fillna({'vehicle_color': most_common_color})

rows_to_drop = list(percent_missing[(percent_missing > 0) & (percent_missing < 5)].index)

nyc_data_clean_stage3 = nyc_data_clean_stage2.dropna(subset=rows_to_drop)

nyc_data_clean_stage3.head()

Error:

TypeError                                 Traceback (most recent call last)
<ipython-input-19-c4ac39ee4122> in <module>
----> 1 nyc_data_clean_stage3.head()

~\Anaconda3\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute)
    969             Whether to compute the result, default is True.
    970         """
--> 971         return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
    972 
    973     def _head(self, n, npartitions, compute, safe):

~\Anaconda3\lib\site-packages\dask\dataframe\core.py in _head(self, n, npartitions, compute, safe)
   1002 
   1003         if compute:
-> 1004             result = result.compute()
   1005         return result
   1006 

~\Anaconda3\lib\site-packages\dask\base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

~\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

~\Anaconda3\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     79         get_id=_thread_get_id,
     80         pack_exception=pack_exception,
---> 81         **kwargs
     82     )
     83 

~\Anaconda3\lib\site-packages\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

~\Anaconda3\lib\site-packages\dask\local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

~\Anaconda3\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

~\Anaconda3\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

~\Anaconda3\lib\site-packages\dask\core.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

~\Anaconda3\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~\Anaconda3\lib\site-packages\dask\optimization.py in __call__(self, *args)
    980         if not len(args) == len(self.inkeys):
    981             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 982         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    983 
    984     def __reduce__(self):

~\Anaconda3\lib\site-packages\dask\core.py in get(dsk, out, cache)
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

~\Anaconda3\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

~\Anaconda3\lib\site-packages\dask\utils.py in apply(func, args, kwargs)
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

TypeError: apply() got an unexpected keyword argument 'how'

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 19 (13 by maintainers)

Commits related to this issue

Most upvoted comments

I’m glad to hear it. Thanks!