airflow: Attempting to read an xcom produced by KubernetesPodOperator results in UnicodeDecodeError
Apache Airflow version: 1.10.13
Kubernetes version (if you are using kubernetes) (use kubectl version
): v1.15.11-eks
Client Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.0", GitCommit:"2bd9643cee5b3b3a5ecbd3af49d09018f0773c77", GitTreeState:"clean", BuildDate:"2019-09-18T14:36:53Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"15+", GitVersion:"v1.15.11-eks-065dce", GitCommit:"065dcecfcd2a91bd68a17ee0b5e895088430bd05", GitTreeState:"clean", BuildDate:"2020-07-16T01:44:47Z", GoVersion:"go1.12.17", Compiler:"gc", Platform:"linux/amd64"}
Environment:
- Cloud provider or hardware configuration: AWS (EKS)
- OS (e.g. from /etc/os-release):
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
- Kernel (e.g.
uname -a
):Linux ddac867b589a 4.19.76-linuxkit #1 SMP Tue May 26 11:42:35 UTC 2020 x86_64 GNU/Linux
- Python:
Python 3.7.9
- Others: On Docker, built on top of apache/airflow image (FROM apache/airflow)
What happened:
xcom.pull(...)
throws UnicodeDecodeError when the task that produced the xcom is KubernetesPodOperator:
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 161, in get_one
return json.loads(result.value.decode('UTF-8'))
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
(Full stack trace lower)
Note that this is happening across all of our DAGs when I try to migrate from 1.10.10 to 1.10.13
How to reproduce it:
Here are 2 tasks that should reproduce it (that is part of a DAG I use to test K8s features because making an update internally):
bash_echo_1 = KubernetesPodOperator(
task_id="bash_echo_1",
image="bash:4.4",
name="bash-echo-1", # kubernetes pod names do not accept '_'
cmds=[
"bash",
"-c",
(
"mkdir -p /airflow/xcom "
'&& echo \'{"key1":"value1", "key2": "value2"}\' > /airflow/xcom/return.json'
),
], # that's how xcom works for KubernetesPodOperator
# on_failure_callback=alert_opsgenie, # uncomment to test opgenie
do_xcom_push=True,
) # this needs to be set to true for `/airflow/xcom/return.json` to be pushed as an xcom object
bash_echo_2 = KubernetesPodOperator(
task_id="bash_echo_2",
name="bash-echo-2", # kubernetes pod names do not accept '_'
image="bash:4.4",
arguments=[
"echo",
'key1 was: {{ ti.xcom_pull("bash_echo_1")["key1"] }}',
',key2 was: {{ ti.xcom_pull("bash_echo_1")["key2"] }}',
',the entire object was: {{ ti.xcom_pull("bash_echo_1") }}',
],
)
bash_echo_1 >> bash_echo_2
stack trace
``` Process DagFileProcessor324119-Process: Traceback (most recent call last): File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, **self._kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor pickle_dags) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1620, in process_file self._process_dags(dagbag, dags, ti_keys_to_schedule) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1299, in _process_dags self._process_task_instances(dag, tis_out) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 793, in _process_task_instances ready_tis = run.update_state(session=session) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 281, in update_state ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, finished_tasks, session) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 340, in _get_ready_tis session=session): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 659, in are_dependencies_met session=session): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 683, in get_failed_dep_statuses dep_context): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 106, in get_dep_statuses for dep_status in self._get_dep_statuses(ti, session, dep_context): File "/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/not_previously_skipped_dep.py", line 58, in _get_dep_statuses task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1564, in xcom_pull return pull_fn(task_id=task_ids) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 161, in get_one return json.loads(result.value.decode('UTF-8')) UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte ```Note that bash_echo_2 simply does not get scheduled, the stack trace here comes from the scheduler Pod (we run Airflow on KubernetesExecutor)
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 17 (13 by maintainers)
Let’s collaborate on this @ephraimbuddy