airflow: DockerOperator Could not serialize the XCom value into JSON
I’m running Airflow 2.0.0 with docker image from dockerhub (tested apache/airflow:2.0.0-python3.8
and apache/airflow:2.0.0
) .
I tried this example docker DAG: https://github.com/apache/airflow/blob/master/airflow/providers/docker/example_dags/example_docker.py
I changed the task configuration to:
t3 = DockerOperator(
# api_version="1.19",
docker_url="tcp://myhost:2376", # Set your docker URL
command="/bin/sleep 30",
image="alpine:latest",
network_mode="bridge",
task_id="docker_op_tester",
tls_ca_cert="/path/to/ca.pem",
tls_client_cert="/path/to/cert.pem",
tls_client_key="/path/to/key.pem",
tls_hostname="myhost",
dag=dag,
)
API versions below 1.21 are not supported anymore, because of this this is commented.
If I run the DAG the container starts and sleeps for 30 seconds. But then the error appeared:
[2021-01-05 12:32:27,514] {xcom.py:237} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
If I add do_xcom_push = False
to the config above, it works as expected. But in that case there is no chance to get xcom output. I even tried echo hello
instead of the sleep
command with the same result.
Full log:
*** Reading local file: /opt/airflow/logs/docker_sample/docker_op_tester/2021-01-05T12:31:53.033985+00:00/1.log
[2021-01-05 12:31:56,689] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:31:53.033985+00:00 [queued]>
[2021-01-05 12:31:56,699] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:31:53.033985+00:00 [queued]>
[2021-01-05 12:31:56,699] {taskinstance.py:1017} INFO -
--------------------------------------------------------------------------------
[2021-01-05 12:31:56,699] {taskinstance.py:1018} INFO - Starting attempt 1 of 2
[2021-01-05 12:31:56,699] {taskinstance.py:1019} INFO -
--------------------------------------------------------------------------------
[2021-01-05 12:31:56,705] {taskinstance.py:1038} INFO - Executing <Task(DockerOperator): docker_op_tester> on 2021-01-05T12:31:53.033985+00:00
[2021-01-05 12:31:56,709] {standard_task_runner.py:51} INFO - Started process 72747 to run task
[2021-01-05 12:31:56,712] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'docker_sample', 'docker_op_tester', '2021-01-05T12:31:53.033985+00:00', '--job-id', '61', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/mydocker.py', '--cfg-path', '/tmp/tmppdqnibne']
[2021-01-05 12:31:56,713] {standard_task_runner.py:76} INFO - Job 61: Subtask docker_op_tester
[2021-01-05 12:31:56,745] {logging_mixin.py:103} INFO - Running <TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:31:53.033985+00:00 [running]> on host fdfca86817af
[2021-01-05 12:31:56,775] {taskinstance.py:1230} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=docker_sample
AIRFLOW_CTX_TASK_ID=docker_op_tester
AIRFLOW_CTX_EXECUTION_DATE=2021-01-05T12:31:53.033985+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-01-05T12:31:53.033985+00:00
[2021-01-05 12:31:56,849] {docker.py:224} INFO - Starting docker container from image alpine:latest
[2021-01-05 12:32:27,514] {xcom.py:237} ERROR - Could not serialize the XCom value into JSON. If you are using pickles instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-01-05 12:32:27,515] {taskinstance.py:1396} ERROR - Object of type bytes is not JSON serializable
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1086, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1260, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1303, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1827, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 88, in set
value = XCom.serialize_value(value)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 235, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type bytes is not JSON serializable
[2021-01-05 12:32:27,516] {taskinstance.py:1433} INFO - Marking task as UP_FOR_RETRY. dag_id=docker_sample, task_id=docker_op_tester, execution_date=20210105T123153, start_date=20210105T123156, end_date=20210105T123227
[2021-01-05 12:32:27,548] {local_task_job.py:118} INFO - Task exited with return code 1
Log with do_xcom_push=False
:
*** Reading local file: /opt/airflow/logs/docker_sample/docker_op_tester/2021-01-05T12:37:21.850166+00:00/1.log
[2021-01-05 12:37:25,451] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:37:21.850166+00:00 [queued]>
[2021-01-05 12:37:25,462] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:37:21.850166+00:00 [queued]>
[2021-01-05 12:37:25,463] {taskinstance.py:1017} INFO -
--------------------------------------------------------------------------------
[2021-01-05 12:37:25,463] {taskinstance.py:1018} INFO - Starting attempt 1 of 2
[2021-01-05 12:37:25,463] {taskinstance.py:1019} INFO -
--------------------------------------------------------------------------------
[2021-01-05 12:37:25,468] {taskinstance.py:1038} INFO - Executing <Task(DockerOperator): docker_op_tester> on 2021-01-05T12:37:21.850166+00:00
[2021-01-05 12:37:25,471] {standard_task_runner.py:51} INFO - Started process 76866 to run task
[2021-01-05 12:37:25,475] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'docker_sample', 'docker_op_tester', '2021-01-05T12:37:21.850166+00:00', '--job-id', '65', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/mydocker.py', '--cfg-path', '/tmp/tmpusz_sweq']
[2021-01-05 12:37:25,476] {standard_task_runner.py:76} INFO - Job 65: Subtask docker_op_tester
[2021-01-05 12:37:25,508] {logging_mixin.py:103} INFO - Running <TaskInstance: docker_sample.docker_op_tester 2021-01-05T12:37:21.850166+00:00 [running]> on host fdfca86817af
[2021-01-05 12:37:25,539] {taskinstance.py:1230} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=docker_sample
AIRFLOW_CTX_TASK_ID=docker_op_tester
AIRFLOW_CTX_EXECUTION_DATE=2021-01-05T12:37:21.850166+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-01-05T12:37:21.850166+00:00
[2021-01-05 12:37:25,614] {docker.py:224} INFO - Starting docker container from image alpine:latest
[2021-01-05 12:37:56,306] {taskinstance.py:1135} INFO - Marking task as SUCCESS. dag_id=docker_sample, task_id=docker_op_tester, execution_date=20210105T123721, start_date=20210105T123725, end_date=20210105T123756
[2021-01-05 12:37:56,326] {taskinstance.py:1195} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2021-01-05 12:37:56,344] {local_task_job.py:118} INFO - Task exited with return code 0
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 12
- Comments: 19 (6 by maintainers)
Worked for me too. I use it with docker-compose so had to add environment variable: AIRFLOW__CORE__ENABLE_XCOM_PICKLING: ‘true’
I was encountering the same issue but I change one line in config file to
enable_xcom_pickling = True
and then it’s working fine now.Pass only json-serializable data. you can convert your datetime into strings or int for example.
This worked for me too.