airflow: ECONNRESET error in scheduler using KubernetesExecutor on AKS

Apache Airflow version: 2.0.0

Kubernetes version:

Client Version: version.Info{Major:"1", Minor:"15", GitVersion:"v1.15.3", GitCommit:"2d3c76f9091b6bec110a5e63777c332469e0cba2", GitTreeState:"clean", BuildDate:"2019-08-19T11:13:54Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.13", GitCommit:"37c06f456fdb4d25e402b5fbcb72cd6a77a021a9", GitTreeState:"clean", BuildDate:"2020-09-18T21:59:14Z", GoVersion:"go1.13.9", Compiler:"gc", Platform:"linux/amd64"}

Environment:

  • Cloud provider or hardware configuration: Azure Kubernetes Service
  • Image : apache/airflow/2.0.0-python3.6
  • Config Variables:
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__DONOT_PICKLE=false
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=false
AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
AIRFLOW__CORE__FERNET_KEY=*****
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=bash -c 'eval "$DATABASE_SQLALCHEMY_CMD"'
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT=True
AIRFLOW__KUBERNETES__ENV_FROM_CONFIGMAP_REF=my-name-env
AIRFLOW__KUBERNETES__NAMESPACE=airflow
AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE=/home/airflow/scripts/pod-template.yaml
AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME=my-name
AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs
AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=wasb://airflow-logs@******.blob.core.windows.net
AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY=/opt/airflow/logs/scheduler
AIRFLOW__WEBSERVER__BASE_URL=http://****/my-name
AIRFLOW__WEBSERVER__WEB_SERVER_PORT=8080

What happened:

After installing airflow in AKS via helm charts, webserver and scheduler start up as expected. After some time (with activity or while sitting idly) scheduler spits out the following:

scheduler error messages
[2021-01-26 16:22:08,620] {kubernetes_executor.py:111} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
    return self.connection.recv_into(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
    self._raise_ssl_error(self._ssl, result)
  File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
    raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 436, in _error_catcher
    yield
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 763, in read_chunked
    self._update_chunk_length()
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 693, in _update_chunk_length
    line = self._fp.fp.readline()
  File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
    raise SocketError(str(e))
OSError: (104, 'ECONNRESET')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 103, in run
    kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 145, in _run
    for event in list_worker_pods():
  File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 144, in stream
    for line in iter_resp_lines(resp):
  File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 46, in iter_resp_lines
    for seg in resp.read_chunked(decode_content=False):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 792, in read_chunked
    self._original_response.close()
  File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 454, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')",)', OSError("(104, 'ECONNRESET')",))
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
    return self.connection.recv_into(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
    self._raise_ssl_error(self._ssl, result)
  File "/home/airflow/.local/lib/python3.6/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
    raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 436, in _error_catcher
    yield
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 763, in read_chunked
    self._update_chunk_length()
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 693, in _update_chunk_length
    line = self._fp.fp.readline()
  File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
    raise SocketError(str(e))
OSError: (104, 'ECONNRESET')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 103, in run
    kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 145, in _run
    for event in list_worker_pods():
  File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 144, in stream
    for line in iter_resp_lines(resp):
  File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 46, in iter_resp_lines
    for seg in resp.read_chunked(decode_content=False):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 792, in read_chunked
    self._original_response.close()
  File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/response.py", line 454, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')",)', OSError("(104, 'ECONNRESET')",))
[2021-01-26 16:22:10,177] {kubernetes_executor.py:266} ERROR - Error while health checking kube watcher process. Process died for unknown reasons
[2021-01-26 16:22:10,189] {kubernetes_executor.py:126} INFO - Event: and now my watch begins starting at resource_version: 0
[2021-01-26 16:23:00,720] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs

What you expected to happen:

Scheduler should run (or sit idly) without error

How to reproduce it: Unknown

Anything else we need to know:

Steps I’ve taken to debug: Based on the location of the errors in the stack trace, I assumed the error was related to the KubernetesExecutor making an api request for a list of pods. To debug this I execed into the pod and ran

KUBE_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
curl -sSk -H "Authorization: Bearer $KUBE_TOKEN" https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/api/v1/pods/

which initially gave me a 403 forbidden error. I then created the following ClusterRoleBinding:

rbac-read.yaml
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: system:serviceaccount:airflow:my-name:read-pods
  namespace: kube-system
subjects:
  - kind: ServiceAccount
    name: my-name
    namespace: airflow
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io

Afterward the above bash commands successfully returned a list of pods in the cluster. I then opened a python shell (still within the scheduler pod) and successfully ran

>>> from kubernetes import client, config
>>> config.load_incluster_config()
>>> v1 = client.CoreV1Api()
>>> pods = v1.list_pod_for_all_namespaces(watch=False)
>>> airflow_pods = v1.list_namespaced_pod("airflow")

Given that this ran successfully, I’m at a loss as to why I’m still getting the ECONNRESET error.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 2
  • Comments: 24 (6 by maintainers)

Most upvoted comments

Guys, I work with @alete89. Another solution for this, and specially if you are in older Airflow versions that still don’t have the AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE configuration key, is to execute at some moment at the start of airflow this in a python script:

from urllib3.connection import HTTPConnection
import socket


HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + [
    (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
    (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 20),
    (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 5),
    (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 10)
]

This worked for us apparently, and basically set on urllib3 (which is the library that airflow uses for connectivity under the hood) the same parametry as was mentioned in this issue and in other places on the internet.

In our case, aparently, there were some tcp hangup that provoke the consumption of all available executor capacity of parallelism.

We’re having the same issue running Airflow 2.0.1 on AKS. It seems this issue is related: https://github.com/apache/airflow/issues/13916

I’m experiencing something similar but in 1.10.14 with the same symptoms @mrpowerus described in https://github.com/apache/airflow/issues/14974 (like absence of Event: in logging) and also open_slots going down to 0 because it seems like the executor never realizes tasks are getting done, never frees the slot (although they are marked as done in the UI).

The problem is that according to 1.10.14 docs, there’s no AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE env to set, and I’m not sure if there’s another way to set it.

Any movement here? Currently I’m using CeleryExecutor instead of KubernetesExecutor and am not facing the same issue, but I would like to move back to KubernetesExecutor

For me it fails no matter which executor. Here’s my helm commnad:

helm upgrade --install airflow . --namespace airflow --create-namespace \
  --values ../../airflow-values.yaml \
  --set executor="SequentialExecutor" \
  --set webserver.allowPodLogReading=true \
  --set webserver.defaultUser.password="xxx"  \
  --set ingress.enabled=true \
  --set ingress.web.host="airflow.ingress.xxx.com" \
  --set ingress.web.tls.enabled=true \
  --set ingress.web.annotations."cert-manager\.io/cluster-issuer"=letsencrypt-prod \
  --set ingress.web.tls.secretName=airflow-tls-secret \
  --set dags.persistence.enabled=false \
  --set dags.gitSync.enabled=true \
  --set dags.gitSync.repo="https://github.com/ams0/dags.git" \
  --set dags.gitSync.subPath="dags" \
  --set dags.gitSync.branch=main \
  --set images.gitSync.repository="k8s.gcr.io/git-sync/git-sync" \
  --set images.gitSync.tag="v3.2.0" \
  --set airflow.fernetKey="xxx="

I added the ClusterRoleBinding but it doesn’t help. Any help appreciated!