airflow: Jinja templating doesn't work with container_resources when using dymanic task mapping with Kubernetes Pod Operator
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Google Cloud Composer Version - 2.1.5 Airflow Version - 2.4.3
We are trying to use dynamic task mapping with Kubernetes Pod Operator. Our use-case is to return the pod’s CPU and memory requirements from a function which is included as a macro in DAG
Without dynamic task mapping it works perfectly, but when used with the dynamic task mapping, it is unable to recognize the macro.
container_resources is a templated field as per the docs, the feature was introduced in this PR.
We also tried the toggling the boolean render_template_as_native_obj
, but still no luck.
Providing below a trimmed version of our DAG to help reproduce the issue. (function to return cpu and memory is trivial here just to show example)
What you think should happen instead
It should have worked similar with or without dynamic task mapping.
How to reproduce
Deployed the following DAG in Google Cloud Composer.
import datetime
import os
from airflow import models
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models
dvt_image = os.environ.get("DVT_IMAGE")
default_dag_args = {"start_date": datetime.datetime(2022, 1, 1)}
def pod_mem():
return "4000M"
def pod_cpu():
return "1000m"
with models.DAG(
"sample_dag",
schedule_interval=None,
default_args=default_dag_args,
render_template_as_native_obj=True,
user_defined_macros={
"pod_mem": pod_mem,
"pod_cpu": pod_cpu,
},
) as dag:
task_1 = KubernetesPodOperator(
task_id="task_1",
name="task_1",
namespace="default",
image=dvt_image,
cmds=["bash", "-cx"],
arguments=["echo hello"],
service_account_name="sa-k8s",
container_resources=k8s_models.V1ResourceRequirements(
limits={
"memory": "{{ pod_mem() }}",
"cpu": "{{ pod_cpu() }}",
}
),
startup_timeout_seconds=1800,
get_logs=True,
image_pull_policy="Always",
config_file="/home/airflow/composer_kube_config",
dag=dag,
)
task_2 = KubernetesPodOperator.partial(
task_id="task_2",
name="task_2",
namespace="default",
image=dvt_image,
cmds=["bash", "-cx"],
service_account_name="sa-k8s",
container_resources=k8s_models.V1ResourceRequirements(
limits={
"memory": "{{ pod_mem() }}",
"cpu": "{{ pod_cpu() }}",
}
),
startup_timeout_seconds=1800,
get_logs=True,
image_pull_policy="Always",
config_file="/home/airflow/composer_kube_config",
dag=dag,
).expand(arguments=[["echo hello"]])
task_1 >> task_2
task_1 (without dynamic task mapping) completes successfully, while task_2(with dynamic task mapping) fails.
Looking at the error logs, it failed while rendering the Pod spec since the calls to pod_cpu() and pod_mem() are unresolved.
Here is the traceback:
Exception when attempting to create Namespaced Pod: { “apiVersion”: “v1”, “kind”: “Pod”, “metadata”: { “annotations”: {}, “labels”: { “dag_id”: “sample_dag”, “task_id”: “task_2”, “run_id”: “manual__2023-02-08T183926.890852Z-eee90e4ee”, “kubernetes_pod_operator”: “True”, “map_index”: “0”, “try_number”: “2”, “airflow_version”: “2.4.3-composer”, “airflow_kpo_in_cluster”: “False” }, “name”: “task-2-46f76eb0432d42ae9a331a6fc53835b3”, “namespace”: “default” }, “spec”: { “affinity”: {}, “containers”: [ { “args”: [ “echo hello” ], “command”: [ “bash”, “-cx” ], “env”: [], “envFrom”: [], “image”: “us.gcr.io/ams-e2e-testing/edw-dvt-tool”, “imagePullPolicy”: “Always”, “name”: “base”, “ports”: [], “resources”: { “limits”: { “memory”: “{{ pod_mem() }}”, “cpu”: “{{ pod_cpu() }}” } }, “volumeMounts”: [] } ], “hostNetwork”: false, “imagePullSecrets”: [], “initContainers”: [], “nodeSelector”: {}, “restartPolicy”: “Never”, “securityContext”: {}, “serviceAccountName”: “sa-k8s”, “tolerations”: [], “volumes”: [] } } Traceback (most recent call last): File “/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py”, line 143, in run_pod_async resp = self._client.create_namespaced_pod( File “/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py”, line 7356, in create_namespaced_pod return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501 File “/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py”, line 7455, in create_namespaced_pod_with_http_info return self.api_client.call_api( File “/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py”, line 348, in call_api return self.__call_api(resource_path, method, File “/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py”, line 180, in __call_api response_data = self.request( File “/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/api_client.py”, line 391, in request return self.rest_client.POST(url, File “/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py”, line 275, in POST return self.request(“POST”, url, File “/opt/python3.8/lib/python3.8/site-packages/kubernetes/client/rest.py”, line 234, in request raise ApiException(http_resp=r) kubernetes.client.exceptions.ApiException: (400) Reason: Bad Request HTTP response headers: HTTPHeaderDict({‘Audit-Id’: ‘1ef20c0b-6980-4173-b9cc-9af5b4792e86’, ‘Cache-Control’: ‘no-cache, private’, ‘Content-Type’: ‘application/json’, ‘X-Kubernetes-Pf-Flowschema-Uid’: ‘1b263a21-4c75-4ef8-8147-c18780a13f0e’, ‘X-Kubernetes-Pf-Prioritylevel-Uid’: ‘3cd4cda4-908c-4944-a422-5512b0fb88d6’, ‘Date’: ‘Wed, 08 Feb 2023 18:45:23 GMT’, ‘Content-Length’: ‘256’}) HTTP response body: {“kind”:“Status”,“apiVersion”:“v1”,“metadata”:{},“status”:“Failure”,“message”:“Pod in version "v1" cannot be handled as a Pod: quantities must match the regular expression ‘^([±]?[0-9.]+)([eEinumkKMGTP][-+]?[0-9])$’”,“reason”:“BadRequest”,“code”:400}
Operating System
Google Composer Kubernetes Cluster
Versions of Apache Airflow Providers
No response
Deployment
Composer
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created a year ago
- Reactions: 1
- Comments: 16 (12 by maintainers)
@jose-lpa using
Variable.get()
in the dag script is not recommended because theDagFileProcessor
process the script each X minutes, and it loads this variable from the DB. Also this solution works with Variable but not all the other jinja templates.@pshrivastava27 here is a solution for your need
You can do the same for the other classes if needed.
Done
@hussein-awala Is the solution to use a custom pod operator in this case to be able to the jinja templating variables?
We are using Cloud Composer and dont have the flexibility to upgrade to the latest airflow version for the fix to be implemented, hence we are wondering if we should be using a custom Pod Operator now