airflow: Commit failed: Local: No offset stored while using AwaitMessageTriggerFunctionSensor

Apache Airflow version

2.6.3

What happened

While trying to use AwaitMessageTriggerFunctionSensor i’m increasing count of dagruns. I’ve encountered an exception cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}. I tried to set consumers count less, equal and more than partitions but every time the error happened. Here is a log:

[2023-07-13, 14:37:07 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-07-13, 14:37:07 UTC] {taskinstance.py:1327} INFO - Executing <Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13 14:35:00+00:00
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:57} INFO - Started process 8918 to run task
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message', 'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629111', '--raw', '--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path', '/tmp/tmp3de57b65']
[2023-07-13, 14:37:07 UTC] {standard_task_runner.py:85} INFO - Job 629111: Subtask await_message
[2023-07-13, 14:37:08 UTC] {task_command.py:410} INFO - Running <TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [running]> on host airflow-worker-1.airflow-worker.syn-airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:37:08 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='kafka_test_dag' AIRFLOW_CTX_TASK_ID='await_message' AIRFLOW_CTX_EXECUTION_DATE='2023-07-13T14:35:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-13T14:35:00+00:00'
[2023-07-13, 14:37:09 UTC] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=kafka_test_dag, task_id=await_message, execution_date=20230713T143500, start_date=20230713T143707
[2023-07-13, 14:37:09 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [queued]>
[2023-07-13, 14:38:43 UTC] {taskinstance.py:1306} INFO - Resuming after deferral
[2023-07-13, 14:38:44 UTC] {taskinstance.py:1327} INFO - Executing <Task(AwaitMessageTriggerFunctionSensor): await_message> on 2023-07-13 14:35:00+00:00
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:57} INFO - Started process 9001 to run task
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'kafka_test_dag', 'await_message', 'scheduled__2023-07-13T14:35:00+00:00', '--job-id', '629114', '--raw', '--subdir', 'DAGS_FOLDER/dags/kafka_consumers_dag.py', '--cfg-path', '/tmp/tmpo6xz234q']
[2023-07-13, 14:38:44 UTC] {standard_task_runner.py:85} INFO - Job 629114: Subtask await_message
[2023-07-13, 14:38:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: kafka_test_dag.await_message scheduled__2023-07-13T14:35:00+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow-dev.svc.opus.s.mesh
[2023-07-13, 14:38:46 UTC] {taskinstance.py:1598} ERROR - Trigger failed:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 537, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 615, in run_trigger
    async for event in trigger.run():
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/apache/kafka/triggers/await_message.py", line 114, in run
    await async_commit(asynchronous=False)
  File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py", line 479, in __call__
    ret: _R = await loop.run_in_executor(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/asgiref/sync.py", line 538, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
airflow.exceptions.TaskDeferralError: Trigger failure
[2023-07-13, 14:38:47 UTC] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=kafka_test_dag, task_id=await_message, execution_date=20230713T143500, start_date=20230713T143707, end_date=20230713T143847
[2023-07-13, 14:38:48 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 629114 for task await_message (Trigger failure; 9001)
[2023-07-13, 14:38:48 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2023-07-13, 14:38:48 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead

Sensor should get a message without errors. Each message should be committed once.

How to reproduce

Example of a DAG:

from airflow.decorators import dag
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.providers.apache.kafka.sensors.kafka import \
    AwaitMessageTriggerFunctionSensor

import uuid

def check_message(message):
    if message:
        return True


def trigger_dag(**context):
    TriggerDagRunOperator(
        trigger_dag_id='triggerer_test_dag',
        task_id=f"triggered_downstream_dag_{uuid.uuid4()}"
    ).execute(context)


@dag(
    description="This DAG listens kafka topic and triggers DAGs "
                "based on received message.",
    schedule_interval='* * * * *',
    start_date=days_ago(2),
    max_active_runs=4,
    catchup=False
)
def kafka_test_dag():
    AwaitMessageTriggerFunctionSensor(
        task_id="await_message",
        topics=['my_test_topic'],
        apply_function="dags.kafka_consumers_dag.check_message",
        event_triggered_function=trigger_dag
    )


kafka_test_dag()

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka==1.1.2

Deployment

Other 3rd-party Helm chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 15 (8 by maintainers)

Most upvoted comments

@ikholodkov @ddione84 I just created #36272 to fix the issue. Could you please test it?

Also, I suggest testing the current version of the operator with config enable.auto.commit: false; As you see in my PR, we commit the consumed messages manually, so if the auto-commit is enabled, the consumer will try to commit the consumed offsets periodically, which may be the reason for your issue (the automatic commit doesn’t find any offset to commit).

Hello, I can confirm that it fixes the issue. Thanks @hussein-awala .

I can also trigger the error again by:

  1. reverting the fix
  2. adding "enable.auto.offset.store": false to the consumer connection config

Would it make sense to add another integration test to cover "enable.auto.offset.store": false? I can make a pr.

@ikholodkov @ddione84 I just created #36272 to fix the issue. Could you please test it?

Also, I suggest testing the current version of the operator with config enable.auto.commit: false; As you see in my PR, we commit the consumed messages manually, so if the auto-commit is enabled, the consumer will try to commit the consumed offsets periodically, which may be the reason for your issue (the automatic commit doesn’t find any offset to commit).

Hi have tested this for my pipelines and it’s seems to be working fine. thank you