airflow: example_python_operator not working
Hi,
I’m still testing Airflow and still can’t get all the examples running correctly. The tutorial example runs, but now I tried following:
airflow backfill example_python_operator -s 2015-07-25
Note, this is the basic install, I did update models.py according to (https://github.com/airbnb/airflow/commit/97c98b12f517b06c21f834841cb5cf4c0b59255d); dropped the database followed by airflow initdb.
The worker (started with “airflow worker”) gave the following errors:
[2015-07-31 02:43:26,993: WARNING/MainProcess] celery@BNA001082-SRV01 ready.
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 100, in run
DagPickle).filter(DagPickle.id == args.pickle).first()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2445, in first
ret = list(self[0:1])
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2281, in __getitem__
return list(res)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 86, in instances
util.raise_from_cause(err)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 71, in instances
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 428, in _instance
loaded_instance, populate_existing, populators)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/loading.py", line 486, in _populate_full
dict_[key] = getter(row)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/sqltypes.py", line 1261, in process
return loads(value)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 209, in loads
return load(file)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 199, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 353, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named unusual_prefix_example_python_operator
[2015-07-31 02:26:16,474: ERROR/Worker-2] 1
[2015-07-31 02:26:16,502: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[c788e876-86bf-4be3-87b3-7276b7a4b4c8] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 42, in execute_command
raise AirflowException('Celery command failed')
AirflowException: Celery command failed
What I also see is that the mesage broker is not really cleaned up after such errors, messages keep on piling up.
About this issue
- Original URL
- State: closed
- Created 9 years ago
- Comments: 16 (4 by maintainers)
Hi guys, After some digs, I guess this issue is related to the pickler used in airflow, dill. Generally, dill is not able to unpickle modules.
To be short, if you want airflow being able to execute specific task on a remote box, you have to put the ‘unusual_prefix_…’ module on PYTHONPATH. There’re two ways you can do that:
hi, @mistercrunch could you please give any hints on this ?
The pickle is the mechanism by which your local DAG gets serialized and shipped to the worker. Pickling is the common term for serializing in Python.
Airflow attemps to pickle your DAG, ships it to the metadata database and gets an id associated with it. This id is passed around to the workers so that they pick up the pickle, deserialize it and run it.
unusual_prefix_
is a hack to avoid conflicts insys.modules
as Airflow attempts to refresh your DagBag from the underlying changing code.For now and until I manage to recreate the bug and fix it, I’d say use
--local
when running backfills. Is the scheduler working ok for you?