airflow: Can't read S3 remote logs when using gevent/eventlent webserver workers.

Hey everyone. I’ve upgraded to 1.10.9. It appears that the logging changes broke the functionality for reading S3 remote logs in the Web UI (writing is ok). In the change log it mentions that Airflow’s logging mechanism has been refactored to uses Python’s builtin logging module:

[AIRFLOW-1611] Customize logging

I followed the directions in the changelog and created the following log config:

import os
import six

from airflow import AirflowException
from airflow.configuration import conf
from airflow.utils.file import mkdirs
from typing import Dict, Any

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()

FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

COLORED_LOG_FORMAT = conf.get('core', 'COLORED_LOG_FORMAT')

COLORED_LOG = conf.getboolean('core', 'COLORED_CONSOLE_LOG')

COLORED_FORMATTER_CLASS = conf.get('core', 'COLORED_FORMATTER_CLASS')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
    conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')

PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

FORMATTER_CLASS_KEY = '()' if six.PY2 else 'class'


#
# Getting this from environment because the changelog for 1.10.9 says to set
# the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The
# `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
#
REMOTE_BASE_LOG_FOLDER = os.environ.get('AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
        'airflow_coloured': {
            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
            FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter'
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}  # type: Dict[str, Any]

However, the task log reader is always defaulting to using the FileTaskHandler. This should not occur because I have the following settings in airflow.cfg:

remote_logging = True
remote_base_log_folder = s3://my-bucket-name
remote_log_conn_id = aws_default
task_log_reader = s3.task

The s3.task handler passed to the task_log_reader setting should be creating an instance of the S3TaskHandler class to read the task logs to from S3. This occurs when rendering the get_logs_with_metadata view in www/views.py.


Apache Airflow version: 1.10.9 Kubernetes version: 1.15

Environment:

  • Cloud provider or hardware configuration: AWS
  • OS (e.g. from /etc/os-release): python:3.7.6-buster docker image

What happened: Logs did not appear in the Airflow Web UI. The FileTaskHandler tries to fetch the file locally or from the worker on port 8793. However, the logs do not exist in either location since we are using the Kubernetes Executor. This produces the following errors messages:

*** Log file does not exist: /usr/local/airflow/logs/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Fetching from: http://MY_DAG_NAME-0dde5ff5a786437cb14234:8793/log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='MY_DAG_NAME-0dde5ff5a786437cb14234', port=8793): Max retries exceeded with url: /log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f708332fc90>: Failed to establish a new connection: [Errno -2] Name or service not known'))

What you expected to happen:

The logs should be rendered in the Web UI using the S3TaskHandler class.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 8
  • Comments: 66 (40 by maintainers)

Commits related to this issue

Most upvoted comments

Update - new solution is adding gevent monkey patching to top of config_templates/airflow_local_settings.py. It works.

Previously I had to manually patch botocore to fix this by removing bypassing of patched SSLContext, but it broke in Airflow 1.10.12.

I ran into IOErrors on the scheduler when using monkey.patch_all() similar to this and the suggested fix monkey.patch_all(thread=False, socket=False) caused warnings on threading.

In the end I figured the issue was only with monkey-patching SSL as somehow it’s imported earlier than gunicorn expected as seen from the warning in the webserver logs.

Sep 15 02:10:45 ubuntu-xenial pipenv[22660]: /etc/airflow/.local/share/virtualenvs/airflow-bTdwlyD1/lib/python3.8/site-packages/gunicorn/workers/ggevent.py:53: MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported may lead to errors, including RecursionError on Python 3.6. It may also silently lead to incorrect behaviour on Python 3.7. Please monkey-patch earlier. See https://github.com/gevent/gevent/issues/1016. Modules that had direct imports (NOT patched): ['urllib3.util (/etc/airflow/.local/share/virtualenvs/airflow-bTdwlyD1/lib/python3.8/site-packages/urllib3/util/__init__.py)', 'urllib3.util.ssl_ (/etc/airflow/.local/share/virtualenvs/airflow-bTdwlyD1/lib/python3.8/site-packages/urllib3/util/ssl_.py)'].
Sep 15 02:10:45 ubuntu-xenial pipenv[22660]:   monkey.patch_all()

In the end adding this to the top of airflow_local_settings.py worked for me.

from gevent import monkey
monkey.patch_ssl()

If anyone pops on this issue, I wanted to mention that as of 2.1.3 the issue is still present, and solution that still works is prepending this:

from gevent import monkey; monkey.patch_ssl()

to very top line of file airflow/config_templates/airflow_local_settings.py (or your respective actual file if you have made one) of Airflow Webserver component.

Make sure to not touch any other airflow components in your cluster like Airflow Worker or Airflow Scheduler (so if you have single node setup, this may not work for you), as those are not going to behave correctly with monkeypatches for SSL.

Unfortunately, mentioned above hack worked in 2.1.4, but no longer works on 2.2.0. Some code refactoring have affected the loading flow of Airflow Webserver, so we’re no longer patching SSL early enough (so monkeypatching exceptions are popping). I’m still digging in code to identify where we should plug hack now.

UPD few hours later: now you need to add it to the very top of file airflow/settings.py instead of airflow_local_settings.py

The issue seems to be infinite recursion due to an interaction between gevent’s monkey patching and the botocore library used by S3TaskHandler:

Traceback (most recent call last):
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/utils/log/s3_task_handler.py", line 131, in s3_log_exists
    return self.hook.get_key(remote_log_location) is not None
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/hooks/S3_hook.py", line 224, in get_key
    obj = self.get_resource_type('s3').Object(bucket_name, key)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/contrib/hooks/aws_hook.py", line 186, in get_resource_type
    config=config, verify=self.verify)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 389, in resource
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 823, in create_client
    credentials = self.get_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 428, in get_credentials
    'credential_provider').load_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 919, in get_component
    self._components[name] = factory()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 149, in _create_credential_resolver
    self, region_name=self._last_client_region_used
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 70, in create_credential_resolver
    container_provider = ContainerProvider()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 1803, in __init__
    fetcher = ContainerMetadataFetcher()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/utils.py", line 1578, in __init__
    timeout=self.TIMEOUT_SECONDS
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 180, in __init__
    self._manager = PoolManager(**self._get_pool_manager_kwargs())
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 188, in _get_pool_manager_kwargs
    'ssl_context': self._get_ssl_context(),
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 197, in _get_ssl_context
    return create_urllib3_context()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 72, in create_urllib3_context
    context.options |= options
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  [Previous line repeated 409 more times]
RecursionError: maximum recursion depth exceeded

I still need to dive deeper into this

@potiuk will test this out

I faced with similar issue when I tried to get secret from secrets backend (HashiCorp Vault) in webserver_config.py.

from airflow.models import Variable
...
AUTH_LDAP_BIND_USER = Variable.get("ldap-bind-user")
AUTH_LDAP_BIND_PASSWORD = Variable.get("ldap-bind-password")

So, based on answers of @dimon222, I had to patch airflow/settings.py during my custom docker image build. Below the proof of concept:

FROM apache/airflow:2.2.3-python3.8
USER airflow
RUN sed -i '1s/^/from gevent import monkey; monkey.patch_ssl() \n/' /home/airflow/.local/lib/python3.8/site-packages/airflow/settings.py

And it works fine! I hope this patch will be included into official airflow code.

Solution mentioned above by me

@cmlad yes its the reason for sure. And HTTP vs HTTPS no difference.

I randomly found potential fix in one of previously mentioned issues #8164 (comment) This works, prepending monkey patching in dagbag.py. What is invoking dagbag.py so early ? Not sure.

This no longer works in 1.10.12. Perhaps, something gets loaded earlier again and messes up urllib3. The issue is still present.

How can I read logs from worker pods? This is important since we need to see logs in real time to see whats happening. S3 logs are available only when task gets completed. I am getting below error currently:

Log file does not exist: /opt/airflow/logs/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Fetching from: http://taskpod-49ccd964791a4740b199:8793/log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='taskpod-49ccd964791a4740b199', port=8793): Max retries exceeded with url: /log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f486c9b8be0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))

Can confirm that using a threaded gunicorn worker (gevent in our case) breaks the web component’s ability to show task logs that are in S3. Moving back to sync works for us, and since we’re running on Kubernetes a few more web pods to absorb the ALB/ELB health checks isn’t (so far) a performance concern.

Would love to see this fixed, however.

Using Airflow default <tt>aws_default</tt> for the S3 connection. Relevant configuration shared among web, scheduler, and worker pods below the fold
  - AIRFLOW__CORE__REMOTE_LOGGING=True
  - AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws_default
  - AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://${OUR-BUCKET}/logs
  - AIRFLOW__WEBSERVER__WORKER_CLASS=sync
  - AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL=3600 # the default 30s worker lifetime is *way* too short
  - AWS_DEFAULT_REGION=us-east-1
  - AWS_ROLE_ARN=arn:aws:iam::${OUR_AWS_ACCOUNT_ID}:role/${OUR_AIRFLOW_APP_ROLE}
  - AWS_WEB_IDENTITY_TOKEN_FILE=/var/run/secrets/eks.amazonaws.com/serviceaccount/token # This is how EKS does IAM<>Pod stuff