airflow: Can't pass in **context kwarg to the PythonVirtualenvOperator's python_callable

Apache Airflow version

2.2.3 (latest released)

What happened

Following the pythonvirtualenvoperator guide it states that to access context variables you need to pass system_site_packages=True to the operator. However, when trying to pass in the **context to the operators python_callable it gives an error of:

[2022-01-19, 23:16:17 UTC] {process_utils.py:160} INFO - Executing cmd: /usr/local/bin/python -m virtualenv /tmp/venvwp_95bpf --system-site-packages
[2022-01-19, 23:16:17 UTC] {process_utils.py:164} INFO - Output:
[2022-01-19, 23:16:18 UTC] {process_utils.py:168} INFO - created virtual environment CPython3.9.9.final.0-64 in 563ms
[2022-01-19, 23:16:18 UTC] {process_utils.py:168} INFO -   creator CPython3Posix(dest=/tmp/venvwp_95bpf, clear=False, no_vcs_ignore=False, global=True)
[2022-01-19, 23:16:18 UTC] {process_utils.py:168} INFO -   seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/home/astro/.local/share/virtualenv)
[2022-01-19, 23:16:18 UTC] {process_utils.py:168} INFO -     added seed packages: pip==21.3.1, setuptools==58.3.0, wheel==0.37.0
[2022-01-19, 23:16:18 UTC] {process_utils.py:168} INFO -   activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_start' or 'logical_date' instead.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds }}' instead.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds_nodash }}' instead.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
[2022-01-19, 23:16:18 UTC] {logging_mixin.py:109} WARNING - /usr/local/lib/python3.9/site-packages/airflow/utils/context.py:152 AirflowContextDeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
[2022-01-19, 23:16:18 UTC] {taskinstance.py:1700} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1455, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1511, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 393, in execute
    return super().execute(context=serializable_context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 174, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 412, in execute_callable
    self._write_args(input_filename)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 448, in _write_args
    self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
TypeError: cannot pickle 'module' object
[2022-01-19, 23:16:18 UTC] {taskinstance.py:1267} INFO - Marking task as FAILED. dag_id=venv_op_not_accepting_context_kwarg, task_id=test, execution_date=20220119T231616, start_date=20220119T231617, end_date=20220119T231618
[2022-01-19, 23:16:18 UTC] {standard_task_runner.py:89} ERROR - Failed to execute job 36 for task test
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1455, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1511, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 393, in execute
    return super().execute(context=serializable_context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 174, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 412, in execute_callable
    self._write_args(input_filename)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 448, in _write_args
    self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
TypeError: cannot pickle 'module' object
[2022-01-19, 23:16:18 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-01-19, 23:16:18 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you expected to happen

I expected there to be no traceback as the function is set to pass.

How to reproduce

from airflow.models import DAG
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
from airflow.utils.dates import days_ago

def test_venv_func(**context):
    pass

with DAG(
    dag_id="venv_op_not_accepting_context_kwarg",
    schedule_interval=None,
    start_date=days_ago(2),
) as dag:

    test = PythonVirtualenvOperator(
        task_id="test",
        python_callable=test_venv_func,
        system_site_packages=True,
    )

Operating System

Docker (debian:buster)

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

Astro CLI with this image:

quay.io/astronomer/ap-airflow-dev:2.2.3-2.dev-onbuild

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 4
  • Comments: 15 (14 by maintainers)

Most upvoted comments

Hey I set use_dill=True and I’m getting this traceback:

INFO - Executing cmd: /tmp/venv12se6ywa/bin/python /tmp/venv12se6ywa/script.py /tmp/venv12se6ywa/script.in /tmp/venv12se6ywa/script.out /tmp/venv12se6ywa/string_args.txt
[2022-02-22, 19:37:47 UTC] {process_utils.py:169} INFO - Output:
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO - Traceback (most recent call last):
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -   File "/tmp/venv12se6ywa/script.py", line 15, in <module>
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -     arg_dict = dill.load(file)
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -   File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 270, in load
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -     return Unpickler(file, ignore=ignore, **kwds).load()
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -   File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 472, in load
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -     obj = StockUnpickler.load(self)
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -   File "/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 462, in find_class
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO -     return StockUnpickler.find_class(self, module, name)
[2022-02-22, 19:37:48 UTC] {process_utils.py:173} INFO - ModuleNotFoundError: No module named 'unusual_prefix_50f8c09f995678d8614dafc9dd846f4d45c5416e_venv_xcoms'

This is the dag I’m using to get that traceback:

from airflow.models import DAG
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
from airflow.utils.dates import days_ago


def push_some_data():
    value = [1, "two", 3]
    return value

def pull_that_data(**context):
    pulled_data = context['ti'].xcom_pull(task_ids='push_data', key="return_value")
    print(type(pulled_data))
    print(pulled_data)
    
with DAG(
    dag_id="test_venv_op_xcoms",
    start_date=days_ago(1),
    schedule_interval="@once",
    is_paused_upon_creation=False,
    tags=["core", "extended_tags", "venv_op"],
    render_template_as_native_obj=True,
) as dag:

    pusher = PythonVirtualenvOperator(
        task_id="push_data",
        python_callable=push_some_data,

    )

    puller = PythonVirtualenvOperator(
        task_id="pull_data",
        python_callable=pull_that_data,
        #op_args=['{{ ti.xcom_pull(task_ids=["push_data"]) }}'],
        use_dill=True,
    )

pusher >> puller

You can do this instead

from airflow.operators.python import get_current_context

@task.virtualenv(...)
def add_days(days_to_add: int) -> str:
    data_interval_start = get_current_context()["get_current_context"]
    return (data_interval_start + timedelta(days=days_to_add)).isoformat()

with DAG(...):
    add_days(10)

See documentation:

https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#accessing-context-variables-in-decorated-tasks

I suspected the pervasive usage ot **context is people copy-pasting code assuming it’s the only way without ever reading documentation. Although now re-reading the section, the writing really doesn’t do the feature enough justice, and people might not realise what’s possible even after reading it. I’ll find some time to improve it.

Moving to 2.3 since this is not a “bug” per se but not working by design.

Ah yeah I think this is a combination nobody anticipated. Given that dill is already a core Airflow requirement, maybe we should just disallow the two options to be used together?

It should be possible, but there’s no test for this, and IIRC this has been broken for a while (much earlier than the custom object’s introduction).

I’d very much not recommend using **context for PythonVirtualenvOperator (and other operators that run the function with isolation, like DockerOperator) due to how cross-process data interchange works in general, but we should still fix this.