airflow: Can't view running tasks logs using KubernetesExecutor.

Description

I want to be able to see logs for running tasks in airflow web UI when I’m using “KubernetesExecutor”. Airflow tries to read logs of worker pod which does not contain my task logs because “airflow tasks run” command redirects all stdout/stderr to log file.

Here is what I see when I try to read logs of running tasks:

*** Falling back to local log
*** Trying to get logs (last 100 lines) from worker pod datainfraexporterhourlydagwaitforpublishedpostdump.a825336790e24b40ad43b2411ef617c5 ***

BACKEND=postgres
DB_HOST=divar-infra-db-cluster-master-service.skubel-76f219d5-eb73-4f23-96b3-a8cf4ab4f00a
DB_PORT=5432

[2021-05-22 19:00:30,157] {dagbag.py:451} INFO - Filling up the DagBag from /git/airflow/dags/data_infra/exporter/hourly.py

Use case / motivation

It’s not convenient to wait for tasks to finish in order to read their logs.

Are you willing to submit a PR?

I can help.

Related Issues

I don’t think so.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 4
  • Comments: 19 (15 by maintainers)

Most upvoted comments

Sure there are plenty unit tests. I doubt it would not work, otherwise it could have been quite a problem for other people, but let’see maybe someone will pick it up @dstandish - this is the discussion that lead to my PR so maybe indeed there is a problem with stackdriver (see the issue created by the user)

I’ve had this issue for a while now, but I was able to resolve it by adding this to the top of my dag.py file.

# Set Task Logger to INFO for better task logs
log = logging.getLogger("airflow.task")
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
log.addHandler(handler)

It does pretty much what @hsnprsd had in his class, but doesn’t require overriding the default logging config.

I came up with a solution today. I changed the default logging config class like this:

import sys
from copy import deepcopy

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG

LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)

LOGGING_CONFIG["handlers"]["custom_console"] = {
    "class": "logging.StreamHandler",
    "formatter": "airflow",
    "stream": sys.__stdout__,
}
LOGGING_CONFIG["loggers"]["airflow.task"]["handlers"].append("custom_console")

I moved this file to $AIRFLOW_HOME/config/logging_config.py and changed AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS to logging_config.LOGGING_CONFIG