airflow: ECONNRESET error in scheduler using KubernetesExecutor on AKS
Apache Airflow version: 2.0.0
Kubernetes version:
Client Version: version.Info{Major:"1", Minor:"15", GitVersion:"v1.15.3", GitCommit:"2d3c76f9091b6bec110a5e63777c332469e0cba2", GitTreeState:"clean", BuildDate:"2019-08-19T11:13:54Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.13", GitCommit:"37c06f456fdb4d25e402b5fbcb72cd6a77a021a9", GitTreeState:"clean", BuildDate:"2020-09-18T21:59:14Z", GoVersion:"go1.13.9", Compiler:"gc", Platform:"linux/amd64"}
Environment:
- Cloud provider or hardware configuration: Azure Kubernetes Service
- Image : apache/airflow/2.0.0-python3.6
- Config Variables:
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__DONOT_PICKLE=false
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=false
AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
AIRFLOW__CORE__FERNET_KEY=*****
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=bash -c 'eval "$DATABASE_SQLALCHEMY_CMD"'
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT=True
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF=my-name-env
AIRFLOW__KUBERNETES__NAMESPACE=airflow
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE=/home/airflow/scripts/pod-template.yaml
AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME=my-name
AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs
AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=wasb://airflow-logs@******.blob.core.windows.net
AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=/opt/airflow/logs/scheduler
AIRFLOW__WEBSERVER__BASE_URL=http://****/my-name
AIRFLOW__WEBSERVER__WEB_SERVER_PORT=8080
What happened:
After installing airflow in AKS via helm charts, webserver and scheduler start up as expected. After some time (with activity or while sitting idly) scheduler spits out the following:
scheduler error messages
[2021-01-26 16:22:08,620] {kubernetes_executor.py:111} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
return self.connection.recv_into(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
self._raise_ssl_error(self._ssl, result)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 436, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 763, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 693, in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
return self._sock.recv_into(b)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
raise SocketError(str(e))
OSError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 103, in run
kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 145, in _run
for event in list_worker_pods():
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 144, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 46, in iter_resp_lines
for seg in resp.read_chunked(decode_content=False):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 792, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 454, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')",)', OSError("(104, 'ECONNRESET')",))
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
return self.connection.recv_into(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
self._raise_ssl_error(self._ssl, result)
File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 436, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 763, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 693, in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
return self._sock.recv_into(b)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
raise SocketError(str(e))
OSError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 103, in run
kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 145, in _run
for event in list_worker_pods():
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 144, in stream
for line in iter_resp_lines(resp):
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 46, in iter_resp_lines
for seg in resp.read_chunked(decode_content=False):
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 792, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
self.gen.throw(type, value, traceback)
File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 454, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')",)', OSError("(104, 'ECONNRESET')",))
[2021-01-26 16:22:10,177] {kubernetes_executor.py:266} ERROR - Error while health checking kube watcher process. Process died for unknown reasons
[2021-01-26 16:22:10,189] {kubernetes_executor.py:126} INFO - Event: and now my watch begins starting at resource_version: 0
[2021-01-26 16:23:00,720] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
What you expected to happen:
Scheduler should run (or sit idly) without error
How to reproduce it: Unknown
Anything else we need to know:
Steps I’ve taken to debug:
Based on the location of the errors in the stack trace, I assumed the error was related to the KubernetesExecutor
making an api request for a list of pods. To debug this I exec
ed into the pod and ran
KUBE_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
curl -sSk -H "Authorization: Bearer $KUBE_TOKEN" https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/api/v1/pods/
which initially gave me a 403 forbidden error. I then created the following ClusterRoleBinding
:
rbac-read.yaml
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: system:serviceaccount:airflow:my-name:read-pods
namespace: kube-system
subjects:
- kind: ServiceAccount
name: my-name
namespace: airflow
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
Afterward the above bash commands successfully returned a list of pods in the cluster. I then opened a python shell (still within the scheduler
pod) and successfully ran
>>> from kubernetes import client, config
>>> config.load_incluster_config()
>>> v1 = client.CoreV1Api()
>>> pods = v1.list_pod_for_all_namespaces(watch=False)
>>> airflow_pods = v1.list_namespaced_pod("airflow")
Given that this ran successfully, I’m at a loss as to why I’m still getting the ECONNRESET
error.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 2
- Comments: 24 (6 by maintainers)
Guys, I work with @alete89. Another solution for this, and specially if you are in older Airflow versions that still don’t have the AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE configuration key, is to execute at some moment at the start of airflow this in a python script:
This worked for us apparently, and basically set on urllib3 (which is the library that airflow uses for connectivity under the hood) the same parametry as was mentioned in this issue and in other places on the internet.
In our case, aparently, there were some tcp hangup that provoke the consumption of all available executor capacity of parallelism.
We’re having the same issue running Airflow 2.0.1 on AKS. It seems this issue is related: https://github.com/apache/airflow/issues/13916
I’m experiencing something similar but in 1.10.14 with the same symptoms @mrpowerus described in https://github.com/apache/airflow/issues/14974 (like absence of
Event:
in logging) and also open_slots going down to 0 because it seems like the executor never realizes tasks are getting done, never frees the slot (although they are marked as done in the UI).The problem is that according to 1.10.14 docs, there’s no AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE env to set, and I’m not sure if there’s another way to set it.
Any movement here? Currently I’m using
CeleryExecutor
instead ofKubernetesExecutor
and am not facing the same issue, but I would like to move back toKubernetesExecutor
@gillbuchanan Indeed. Too many tabs and too late! 😉 https://github.com/apache/airflow/issues/14175
For me it fails no matter which executor. Here’s my helm commnad:
I added the ClusterRoleBinding but it doesn’t help. Any help appreciated!