airflow: Commit failed: Local: No offset stored while using AwaitMessageTriggerFunctionSensor
Apache Airflow version
2.6.3
What happened
While trying to use AwaitMessageTriggerFunctionSensor i’m increasing count of dagruns.
I’ve encountered an exception cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}
.
I tried to set consumers count less, equal and more than partitions but every time the error happened.
Here is a log:
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1327} INFO - Executing <Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13 14:35:00+00:00
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:57} INFO - Started process 8918 to run task
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message', 'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629111', '--raw', '--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path', '/tmp/tmp3de57b65']
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:85} INFO - Job 629111: Subtask await_message
[2023-07-13, 14:37:08 UTC] {task_command.py:410} INFO - Running <TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [running]> on host airflow-worker-1.airflow-worker.syn-airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:37:08 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_test_dag' AIRFLOW_CTX_TASK_ID='await_message' AIRFLOW_CTX_EXECUTION_DATE='2023-07-13T14:35:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-13T14:35:00+00:00'
[2023-07-13, 14:37:09 UTC] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=kafka_test_dag, task_id=await_message, execution_date=20230713T143500, start_date=20230713T143707
[2023-07-13, 14:37:09 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1306} INFO - Resuming after deferral
[2023-07-13, 14:38:44 UTC] {taskinstance.py:1327} INFO - Executing <Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13 14:35:00+00:00
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:57} INFO - Started process 9001 to run task
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message', 'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629114', '--raw', '--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path', '/tmp/tmpo6xz234q']
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:85} INFO - Job 629114: Subtask await_message
[2023-07-13, 14:38:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:38:46 UTC] {taskinstance.py:1598} ERROR - Trigger failed:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 537, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 615, in run_trigger
async for event in trigger.run():
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/apache/kafka/triggers/await_message.py", line 114, in run
await async_commit(asynchronous=False)
File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py", line 479, in __call__
ret: _R = await loop.run_in_executor(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py", line 538, in thread_handler
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
airflow.exceptions.TaskDeferralError: Trigger failure
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=kafka_test_dag, task_id=await_message, execution_date=20230713T143500, start_date=20230713T143707, end_date=20230713T143847
[2023-07-13, 14:38:48 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 629114 for task await_message (Trigger failure; 9001)
[2023-07-13, 14:38:48 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2023-07-13, 14:38:48 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check
What you think should happen instead
Sensor should get a message without errors. Each message should be committed once.
How to reproduce
Example of a DAG:
from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.providers.apache.kafka.sensors.kafka import \
AwaitMessageTriggerFunctionSensor
import uuid
def check_message(message):
if message:
return True
def trigger_dag(**context):
TriggerDagRunOperator(
trigger_dag_id='triggerer_test_dag',
task_id=f"triggered_downstream_dag_{uuid.uuid4()}"
).execute(context)
@dag(
description="This DAG listens kafka topic and triggers DAGs "
"based on received message.",
schedule_interval='* * * * *',
start_date=days_ago(2),
max_active_runs=4,
catchup=False
)
def kafka_test_dag():
AwaitMessageTriggerFunctionSensor(
task_id="await_message",
topics=['my_test_topic'],
apply_function="dags.kafka_consumers_dag.check_message",
event_triggered_function=trigger_dag
)
kafka_test_dag()
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
apache-airflow-providers-apache-kafka==1.1.2
Deployment
Other 3rd-party Helm chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project’s Code of Conduct
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 15 (8 by maintainers)
@ikholodkov @ddione84 I just created #36272 to fix the issue. Could you please test it?
Also, I suggest testing the current version of the operator with config
enable.auto.commit: false
; As you see in my PR, we commit the consumed messages manually, so if the auto-commit is enabled, the consumer will try to commit the consumed offsets periodically, which may be the reason for your issue (the automatic commit doesn’t find any offset to commit).Hello, I can confirm that it fixes the issue. Thanks @hussein-awala .
I can also trigger the error again by:
"enable.auto.offset.store": false
to the consumer connection configWould it make sense to add another integration test to cover
"enable.auto.offset.store": false
? I can make a pr.Hi have tested this for my pipelines and it’s seems to be working fine. thank you