airflow: infinite recursion in StreamLogWriter

Apache Airflow version

2.2.2

What happened

When a task customizes logging (add a stream handler) and throws an exception, the airflow logging code goes into an infinite loop, doubling the log buffer every time, and quickly using up all the memory. The call stack is something like:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1390, in _run_raw_task
    self.handle_failure(e, test_mode, error_file=error_file, session=session)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1704, in handle_failure
    self.log.error("Task failed with exception", exc_info=error)
  File "/usr/lib64/python3.9/logging/__init__.py", line 1475, in error
    self._log(ERROR, msg, args, **kwargs)
  File "/usr/lib64/python3.9/logging/__init__.py", line 1589, in _log
    self.handle(record)
  File "/usr/lib64/python3.9/logging/__init__.py", line 1599, in handle
    self.callHandlers(record)
  File "/usr/lib64/python3.9/logging/__init__.py", line 1661, in callHandlers
    hdlr.handle(record)
  File "/usr/lib64/python3.9/logging/__init__.py", line 952, in handle
    self.emit(record)
  File "/usr/lib64/python3.9/logging/__init__.py", line 1086, in emit
    stream.write(msg + self.terminator)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 119, in write
    self.flush()
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 126, in flush
    self._propagate_log(b)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 109, in _propagate_log
    self.logger.log(self.level, remove_escape_codes(message))
  File "/usr/lib64/python3.9/logging/__init__.py", line 1512, in log
    self._log(level, msg, args, **kwargs)
  File "/usr/lib64/python3.9/logging/__init__.py", line 1589, in _log
    self.handle(record)
<and so on ...>

What you expected to happen

No log recursion.

How to reproduce

Raise an exception within a PythonOperator (below is an example of a function to be called).

        import logging
        logging.getLogger().addHandler(logging.StreamHandler())
        raise RuntimeError('trigger logging')

Operating System

CentOS 8 with Python 3.9

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

pip3 install apache-airflow[celery,mysql,ssh]==2.2.2 ‘celery<5.2’ ‘click<8’ ‘SQLAlchemy<1.4’

Anything else

I am not sure if this is indeed a bug or a misuse of the logging handlers. Any suggestion is highly appreciated!

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 2
  • Comments: 15 (10 by maintainers)

Most upvoted comments

Here you go, #20541

Thanks for your time.