cudf: AttributeError("'DataFrame' object has no attribute 'apply'")

System: Red Hat Enterprise Linux Server release 7.7 (Maipo) CUDA: 10.2 dask-cuda: 0.19.0 python: 3.7.10 GPU: 4 NVIDIA Titan V-100

I’m trying to use dask_cudf.apply on a dataframe but am getting the following error:

Traceback (most recent call last):
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/utils.py", line 175, in raise_on_meta_error
    yield
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/core.py", line 5510, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/utils.py", line 900, in __call__
    return getattr(obj, self.method)(*args, **kwargs)
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/cudf/core/dataframe.py", line 668, in __getattr__
    raise AttributeError("'DataFrame' object has no attribute %r" % key)
AttributeError: 'DataFrame' object has no attribute 'apply'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "lcs_dask.py", line 99, in <module>
    new_strs = data_series.apply(process_text, axis=1, args=(run_type,))
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/core.py", line 4679, in apply
    M.apply, self._meta_nonempty, func, args=args, udf=True, **kwds
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/core.py", line 5510, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/utils.py", line 196, in raise_on_meta_error
    raise ValueError(msg) from e
ValueError: Metadata inference failed in `apply`.

You have supplied a custom function and Dask is unable to
determine the type of output that that function returns.

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
AttributeError("'DataFrame' object has no attribute 'apply'")

Traceback:
---------
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/utils.py", line 175, in raise_on_meta_error
    yield
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/dataframe/core.py", line 5510, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/dask/utils.py", line 900, in __call__
    return getattr(obj, self.method)(*args, **kwargs)
  File "/home/aclifton/anaconda3/envs/dask-cudf/lib/python3.7/site-packages/cudf/core/dataframe.py", line 668, in __getattr__
    raise AttributeError("'DataFrame' object has no attribute %r" % key)

Apologies for the newbie question, but am I going about this correctly? Here is a code example:

import subprocess # we will use this to obtain our local IP using the following command
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, progress
import dask_cudf
import spacy

if __name__ == "__main__":
    print('loading spacy model...')
    nlp = spacy.load('en_core_web_sm')
	
    def process_text(text: str, process_method: str):
        if 'no_punct_lower' in process_method:
            doc = nlp(text)
            new_strs_list = []
            proc_list = []
            for token in doc:
                if token.pos_ != 'PUNCT' and token.pos_ != 'SPACE':
                    proc_list.append(token.text.lower())
                new_strs_list.append(' '.join(proc_list))
            return dask_cudf.Series(new_strs_list)
        else:
            return text


    def setup_dask():
        cmd = "hostname --all-ip-addresses"
        process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
        output, error = process.communicate()
        IPADDR = str(output.decode()).split()[0]

        cluster = LocalCUDACluster(ip=IPADDR)
        client = Client(cluster)
        return cluster, client

    data_path = '/path/to/data.csv'

    print('reading data...')
    data_df = dask_cudf.read_csv(data_path, encoding='ISO-8859-1')

    print('setting up gpu_cluster and dask_client...')
    gpu_cluster, dask_client = setup_dask()

    run_types = ['raw', 'raw_no_punct_lower', 'lemma', 'lemma_no_punct_lower']

    for run_type in run_types:
	if 'raw' in run_type:
             data_series = data_df['raw'].to_frame()
        else:
             data_series = data_df['lemmatized'].to_frame()

        new_strs = data_series.apply(process_text, axis=1, args=(run_type,))
        new_strs.compute()

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 17 (9 by maintainers)

Most upvoted comments

If you change your function input to be a Series or the dataframe and a column name rather than an individual row, you can combine GPU spaCy with cuDF/Dask via map_partitions.

Something like:

def parser(df, col, batch_size=256):
    spacy.require_gpu()
    nlp = spacy.load("en_core_web_sm")
    docs = nlp.pipe(df[col], batch_size=batch_size)
    out = []
    for doc in docs:
        # do the per record processing and append to out
    df["out"] = out
    return df

ddf.map_partitions(parser)

Note spacy by default does a lot of computation in nlp/pipe, so you may want to explicitly disable certain steps.

@jakirkham Thank you for transferring this!