airflow: provide_context=True not working with PythonVirtualenvOperator

Apache Airflow version: 1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.14.8

Environment: Docker (Ubuntu 18.4 - Python 3.7)

  • Cloud provider or hardware configuration: Azure
  • OS (e.g. from /etc/os-release): Docker (Ubuntu 18.4 - Python 3.7)
  • Kernel (e.g. uname -a): Docker (Ubuntu 18.4 - Python 3.7)
  • Install tools: N/A
  • Others: N/A What happened:

When we enable provide_context=True for CustomPythonVirtualenvOperator we get the error below.

[2020-04-07 15:08:51,940] {taskinstance.py:1128} ERROR - can't pickle module objects
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 297, in execute_callable
    self._write_args(input_filename)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 339, in _write_args
    pickle.dump(arg_dict, f)
TypeError: can't pickle module objects

One way to get around this issue is to create your own CustomPythonVirtualenvOperator and overwrite _write_args, but this should not be the case. Feel free to use this if you’re encountering the same issue:

class CustomPythonVirtualenvOperator(PythonVirtualenvOperator):
    def _write_args(self, input_filename):
        # serialize args to file
        if self._pass_op_args():
            with open(input_filename, 'wb') as f:
                # we only need dag_run to access conf at run time
                arg_dict = ({'args': self.op_args, 'kwargs': {'dag_run': self.op_kwargs['dag_run']}})
                if self.use_dill:
                    dill.dump(arg_dict, f)
                else:
                    pickle.dump(arg_dict, f)

What you expected to happen:

Ideally we should be able to use the context so we can run these tasks with run-time arguments via the CLI or the REST API.

How to reproduce it:

from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow import DAG
import pickle
import dill

default_args = {
    'owner': 'Luis M',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue'
}
dag = DAG(
    'bug',
    default_args=default_args,
    description='bug',
    schedule_interval=timedelta(days=1))


class CustomPythonVirtualenvOperator(PythonVirtualenvOperator):
    def _write_args(self, input_filename):
        # serialize args to file
        if self._pass_op_args():
            with open(input_filename, 'wb') as f:
                arg_dict = ({'args': self.op_args, 'kwargs': {'dag_run': self.op_kwargs['dag_run']}})
                if self.use_dill:
                    dill.dump(arg_dict, f)
                else:
                    pickle.dump(arg_dict, f)


def passf(**kwargs):
    pass

def failf(**kwargs):
    pass
 
task1 = CustomPythonVirtualenvOperator(
        task_id='task1',
        python_callable=passf,
        python_version='3',
        dag=dag,
        provide_context=True
)

task2 = PythonVirtualenvOperator(
        task_id='task2',
        python_callable=failf,
        python_version='3',
        dag=dag,
        provide_context=True
)

Anything else we need to know:

If you run the DAG provided you should see task1 passing and task2 failing.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 18 (10 by maintainers)

Most upvoted comments

Hi,

Was this bug fixed in Airflow version 2?

@mik-laj Hi Kamil - I created the PR #8256 to fix this issue on the branch v1-10-stable and the CI tests are passing, there seems to be an issue with requirements, but I that’s related to this change. Could you let me know the next steps?

There were too many changes on the master branch, I will revisit this bug there once 2.0 is out.

Can you post some details of your methods/functions? Imports /logs ? I’d be curious if we can improve the error message to tell exactly what’s wrong.

Hi, facing the same issue with airflow 2.1.2

Hi @jatejeda Please check my github, I used the PythonVirtualenvOperator in a personal project using the version 2.

https://github.com/Wittline/uber-expenses-tracking

no, it wasn’t fixed … I have the same bug in 2.0.2

Hi,

Was this bug fixed in Airflow version 2?

Yeah thanks, will include it, planning to cut 1.10.12rc2 later tonight

Is it right that this issue is not going to be fixed in 1.10.x?

As a workaround one could use dill=True, it is able to serialize modules.

This problem seems indeed significant. I wonder if it appears in the master version. Have you tried to check which objects in the context are causing the problem? Maybe we can exclude one or two objects to restore the correct behavior of this option in Airflow 1.10.