luigi: Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?

This is on Windows, Luigi latest. I’m trying to run some tasks with --workers 8, but this is causing Luigi to crash every time:

INFO: Done scheduling tasks
INFO: Running Worker with 8 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 380, in main
INFO: Worker Worker(salt=938199001, workers=8, host=ai-fieldtest, username=azureuser, pid=4552) was stopped. Shutting do
wn Keep-Alive thread
    prepare(preparation_data)
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 488, in prepare
    assert main_name not in sys.modules, main_name
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
  File "C:\Anaconda\lib\site-packages\luigi\retcodes.py", line 61, in run_with_retcodes
    worker = luigi.interface._run(argv)['worker']
  File "C:\Anaconda\lib\site-packages\luigi\interface.py", line 237, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "C:\Anaconda\lib\site-packages\luigi\interface.py", line 194, in _schedule_and_run
    success &= w.run()
  File "C:\Anaconda\lib\site-packages\luigi\worker.py", line 862, in run
    self._run_task(task_id)
  File "C:\Anaconda\lib\site-packages\luigi\worker.py", line 693, in _run_task
    p.start()
  File "C:\Anaconda\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 277, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 199, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Anaconda\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Anaconda\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Anaconda\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Anaconda\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 748, in save_global
    (obj, module, name))
PicklingError: Can't pickle <function update_tracking_url at 0x0000000001E100B8>: it's not found as luigi.worker.update_tracking_url
AssertionError: __main__

Any idea how I might get around this?

Edit: Seems like pickle prior to 3.4 doesn’t support pickling nested functions (i.e. update_tracking_url). Since I’m not going to use the hadoop functionality, I just commented it out and it works :\

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Reactions: 1
  • Comments: 20 (9 by maintainers)

Most upvoted comments

Hello, what’s the status on this bug? I got recently bit by this error, which forces me to stay at the 1.2.1 version of luigi on Windows.

Thanks for the input. I tried two approaches, both of which ended in multiprocessing throwing an error about not being able to pickle thread.lock:

Approach 1 - using functools.partial

I tried this approach before I saw your comment.

In worker.py:

    def _update_tracking_url_callback(tracking_url, **kwargs):
        task_id = kwargs["task_id"]
        self._scheduler.add_task(
            task_id=task_id,
            worker=self._id,
            status=RUNNING,
            tracking_url=tracking_url,
        )

    def _create_task_process(self, task):        
        tracking_url_callback = partial(self._update_tracking_url_callback, task_id=task.task_id)
        return TaskProcess(
            task, self._id, self._task_result_queue,
            random_seed=bool(self.worker_processes > 1),
            worker_timeout=self._config.timeout,
            tracking_url_callback=tracking_url_callback,
        )

Approach 2 - function tuples as suggested above

I wasn’t entirely sure what you intended from your comment, so this is what I interpreted it as. Apologies if this isn’t what you meant!

In worker.py

def _update_tracking_url_callback(self, tracking_url, task_id):
        self._scheduler.add_task(
            task_id=task_id,
            worker=self._id,
            status=RUNNING,
            tracking_url=tracking_url,
        )

    def _create_task_process(self, task):
        tracking_url_callback = (self._update_tracking_url_callback, (task.task_id))
        return TaskProcess(
            task, self._id, self._task_result_queue,
            random_seed=bool(self.worker_processes > 1),
            worker_timeout=self._config.timeout,
            tracking_url_callback=tracking_url_callback,
        )

In hadoop.py

if tracking_url_match:
                    tracking_url = tracking_url_match.group('url')
                    try:
                        tracking_url_callback_func = tracking_url_callback[0]
                        tracking_url_callback_args = tracking_url_callback[1]
                        tracking_url_callback_func(tracking_url, *tracking_url_callback_args)
                    except Exception as e:
                        logger.error("Error in tracking_url_callback, disabling! %s", e)
                        tracking_url_callback = lambda x: None

Both approaches resulted in this error dump:

ERROR: Uncaught exception in luigi
Traceback (most recent call last):
  File "C:\Anaconda\lib\site-packages\luigi\retcodes.py", line 61, in run_with_retcodes
    worker = luigi.interface._run(argv)['worker']
  File "C:\Anaconda\lib\site-packages\luigi\interface.py", line 237, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "C:\Anaconda\lib\site-packages\luigi\interface.py", line 194, in _schedule_and_run
    success &= w.run()
  File "C:\Anaconda\lib\site-packages\luigi\worker.py", line 864, in run
    self._run_task(task_id)
  File "C:\Anaconda\lib\site-packages\luigi\worker.py", line 694, in _run_task
    p.start()
  File "C:\Anaconda\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 277, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 199, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Anaconda\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Anaconda\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Anaconda\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Anaconda\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 548, in save_tuple
    save(element)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 67, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "C:\Anaconda\lib\pickle.py", line 401, in save_reduce
    save(args)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 548, in save_tuple
    save(element)
  File "C:\Anaconda\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Anaconda\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Anaconda\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Anaconda\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Anaconda\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Anaconda\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Anaconda\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Anaconda\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Anaconda\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Anaconda\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Anaconda\lib\pickle.py", line 396, in save_reduce
    save(cls)
  File "C:\Anaconda\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Anaconda\lib\pickle.py", line 748, in save_global
    (obj, module, name))
PicklingError: Can't pickle <type 'thread.lock'>: it's not found as thread.lock
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Anaconda\lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError

However, commenting out this line, thereby bypassing the pickling of the callback:

    def _create_task_process(self, task):        
        tracking_url_callback = partial(self._update_tracking_url_callback, task_id=task.task_id)
        return TaskProcess(
            task, self._id, self._task_result_queue,
            random_seed=bool(self.worker_processes > 1),
            worker_timeout=self._config.timeout,
####    tracking_url_callback=tracking_url_callback,
        )

resolved the error and allowed multiprocessing to work as expected.

I suspect pickle is unable to pickle callback functions as they’re being used here. I’m not familiar with the inner workings of multiprocessing or pickle, so I’m not sure if how much further I can debug this on my own.

Edit: I wasn’t able to find much relevant stuff online, but I did find some posts that suggest that pickle should be able to pickle partials at least: https://bugs.python.org/issue5228. Other posts seem to suggest using other libraries (like pathos.multiprocessing), which is probably out of the question