airflow: example_python_operator not working

Hi,

I’m still testing Airflow and still can’t get all the examples running correctly. The tutorial example runs, but now I tried following:

airflow backfill example_python_operator -s 2015-07-25

Note, this is the basic install, I did update models.py according to (https://github.com/airbnb/airflow/commit/97c98b12f517b06c21f834841cb5cf4c0b59255d); dropped the database followed by airflow initdb.

The worker (started with “airflow worker”) gave the following errors:


[2015-07-31 02:43:26,993: WARNING/MainProcess] celery@BNA001082-SRV01 ready.
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 100, in run
    DagPickle).filter(DagPickle.id == args.pickle).first()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2445, in first
    ret = list(self[0:1])
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2281, in __getitem__
    return list(res)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 86, in instances
    util.raise_from_cause(err)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 71, in instances
    rows = [proc(row) for row in fetch]
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 428, in _instance
    loaded_instance, populate_existing, populators)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 486, in _populate_full
    dict_[key] = getter(row)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/sqltypes.py", line 1261, in process
    return loads(value)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 209, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 199, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 353, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named unusual_prefix_example_python_operator

[2015-07-31 02:26:16,474: ERROR/Worker-2] 1
[2015-07-31 02:26:16,502: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[c788e876-86bf-4be3-87b3-7276b7a4b4c8] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 42, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed

What I also see is that the mesage broker is not really cleaned up after such errors, messages keep on piling up.

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 16 (4 by maintainers)

Commits related to this issue

Most upvoted comments

Hi guys, After some digs, I guess this issue is related to the pickler used in airflow, dill. Generally, dill is not able to unpickle modules.

To be short, if you want airflow being able to execute specific task on a remote box, you have to put the ‘unusual_prefix_…’ module on PYTHONPATH. There’re two ways you can do that:

  1. Copy all of the $AIRFLOW_HOME/dags directory to remote box, and start task instance with ‘-sd’ argument provided, namely, 'airflow run example_dag … -sd '. This way, airflow would scan the provided subdir, and import dag modules as ‘unusual_prefix_**’ names. In this way, we’re actually not using the pickling feature of airflow.
  2. Ask the author of dill to add module serializing support. The related issue is here.

hi, @mistercrunch could you please give any hints on this ?

The pickle is the mechanism by which your local DAG gets serialized and shipped to the worker. Pickling is the common term for serializing in Python.

Airflow attemps to pickle your DAG, ships it to the metadata database and gets an id associated with it. This id is passed around to the workers so that they pick up the pickle, deserialize it and run it.

unusual_prefix_ is a hack to avoid conflicts in sys.modules as Airflow attempts to refresh your DagBag from the underlying changing code.

For now and until I manage to recreate the bug and fix it, I’d say use --local when running backfills. Is the scheduler working ok for you?