airflow: infinite recursion in StreamLogWriter
Apache Airflow version
2.2.2
What happened
When a task customizes logging (add a stream handler) and throws an exception, the airflow logging code goes into an infinite loop, doubling the log buffer every time, and quickly using up all the memory. The call stack is something like:
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
ti._run_raw_task(
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1390, in _run_raw_task
self.handle_failure(e, test_mode, error_file=error_file, session=session)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1704, in handle_failure
self.log.error("Task failed with exception", exc_info=error)
File "/usr/lib64/python3.9/logging/__init__.py", line 1475, in error
self._log(ERROR, msg, args, **kwargs)
File "/usr/lib64/python3.9/logging/__init__.py", line 1589, in _log
self.handle(record)
File "/usr/lib64/python3.9/logging/__init__.py", line 1599, in handle
self.callHandlers(record)
File "/usr/lib64/python3.9/logging/__init__.py", line 1661, in callHandlers
hdlr.handle(record)
File "/usr/lib64/python3.9/logging/__init__.py", line 952, in handle
self.emit(record)
File "/usr/lib64/python3.9/logging/__init__.py", line 1086, in emit
stream.write(msg + self.terminator)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 119, in write
self.flush()
File "/usr/local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 126, in flush
self._propagate_log(b)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 109, in _propagate_log
self.logger.log(self.level, remove_escape_codes(message))
File "/usr/lib64/python3.9/logging/__init__.py", line 1512, in log
self._log(level, msg, args, **kwargs)
File "/usr/lib64/python3.9/logging/__init__.py", line 1589, in _log
self.handle(record)
<and so on ...>
What you expected to happen
No log recursion.
How to reproduce
Raise an exception within a PythonOperator
(below is an example of a function to be called).
import logging
logging.getLogger().addHandler(logging.StreamHandler())
raise RuntimeError('trigger logging')
Operating System
CentOS 8 with Python 3.9
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
pip3 install apache-airflow[celery,mysql,ssh]==2.2.2 ‘celery<5.2’ ‘click<8’ ‘SQLAlchemy<1.4’
Anything else
I am not sure if this is indeed a bug or a misuse of the logging handlers. Any suggestion is highly appreciated!
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 2
- Comments: 15 (10 by maintainers)
Here you go, #20541
Thanks for your time.