azure-sdk-for-python: EventHub unexpectedly closes connection

  • Package Name: azure-eventhub
  • Package Version: 5.11.2
  • Operating System:Ubuntu 22.04.2
  • Python Version: Python 3.10.6

Describe the bug We process messages from the builtin eventhub in iothub in 8 partitions.

We use the checkpoint store, which updates the checkpoint every couple of minutes (we batch messages from a single device for a couple of minutes and always update the checkpoint to the oldest successfully event). Thus, the checkpoints lags always 10 minutes behind the current event.

Every couple of days (mostly at night or in the morning), we receive the following error:

[2023-04-19 06:09:24,631] azure.eventhub._pyamqp.aio._connection_async - ERROR - END frame received on invalid channel. Closing connection.

This affects all partitions but the errors are spread over ~15 minutes. After the error, we continue to receive events, but we seem to have restarted at the oldest events in the affected partition, not at the checkpoint.

The following logs show the state before the error and after the error. The numbers are the sequence numbers, and the watermark is the difference of the latest sequence number to the current sequence number.

[2023-04-19 06:09:17,928] - INFO - Partition 2: Checkpoint: 182280527, Current: 182284438, Latest: 182284438, Watermark: 0 [2023-04-19 06:09:17,928] - INFO - Partition 7: Checkpoint: 35976136, Current: 35980359, Latest: 35980359, Watermark: 0 [2023-04-19 06:09:17,928] - INFO - Partition 1: Checkpoint: 165200412, Current: 165207858, Latest: 165207858, Watermark: 0 [2023-04-19 06:09:17,928] - INFO - Partition 0: Checkpoint: 176898458, Current: 176908895, Latest: 176908895, Watermark: 0 [2023-04-19 06:09:17,928] - INFO - Partition 5: Checkpoint: 38114026, Current: 38118748, Latest: 38118748, Watermark: 0 [2023-04-19 06:09:17,928] - INFO - Partition 4: Checkpoint: 35067842, Current: 35075820, Latest: 35075820, Watermark: 0 [2023-04-19 06:09:17,928] - INFO - Partition 3: Checkpoint: 154759160, Current: 154763232, Latest: 154763232, Watermark: 0 [2023-04-19 06:09:17,928] - INFO - Partition 6: Checkpoint: 43058778, Current: 43065847, Latest: 43065847, Watermark: 0

[2023-04-19 06:09:24,631] azure.eventhub._pyamqp.aio._connection_async - ERROR - END frame received on invalid channel. Closing connection. [2023-04-19 06:09:24,703] azure.eventhub._pyamqp.aio._connection_async - ERROR - END frame received on invalid channel. Closing connection.

[2023-04-19 06:09:27,929] - INFO - Event rate: 62 evt/s. Pending batches: 0 [2023-04-19 06:09:27,929] - INFO - Partition 2: Checkpoint: 182281169, Current: 182284438, Latest: 182284438, Watermark: 0 [2023-04-19 06:09:27,929] - INFO - Partition 7: Checkpoint: 35976136, Current: 35980359, Latest: 35980359, Watermark: 0 [2023-04-19 06:09:27,929] - INFO - Partition 1: Checkpoint: 165200520, Current: 165207977, Latest: 165207977, Watermark: 0 [2023-04-19 06:09:27,929] - INFO - Partition 0: Checkpoint: 176898504, Current: 176909095, Latest: 176909095, Watermark: 0 [2023-04-19 06:09:27,929] - INFO - Partition 5: Checkpoint: 38114026, Current: 37137155, Latest: 38123512, Watermark: 986357 [2023-04-19 06:09:27,929] - INFO - Partition 4: Checkpoint: 35067842, Current: 35075928, Latest: 35075928, Watermark: 0 [2023-04-19 06:09:27,929] - INFO - Partition 3: Checkpoint: 154759234, Current: 153722074, Latest: 154767118, Watermark: 1045044 [2023-04-19 06:09:27,929] - INFO - Partition 6: Checkpoint: 43059007, Current: 43066009, Latest: 43066009, Watermark: 0

The consumer is created as follows:

self.consumer = EventHubConsumerClient.from_connection_string(
            conn_str=config.EVENT_HUB_CONN_STR,
            consumer_group=config.EVENT_HUB_CONSUMER_GROUP,
            eventhub_name=config.EVENT_HUB_NAME,
            checkpoint_store=self.checkpoint_store,
            uamqp_transport=False
        )

And we run the receiver:

await self.consumer.receive_batch(on_event_batch=self._on_event_batch,
                                                on_partition_close=self.on_partition_close,
                                                on_partition_initialize=self.on_partition_initialize,
                                                starting_position='-1',
                                                starting_position_inclusive=False,
                                                track_last_enqueued_event_properties=True,
                                                prefetch=1000,
                                                partition_id=partition_id,
                                                on_error=self.on_error)

Am i correct that the starting_position parameter is only used if there is no checkpoint? Otherwise this could at least explain this behaviour. When i stop and restart the service, the event consumer starts at the expected checkpoint .

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 1
  • Comments: 34 (18 by maintainers)

Most upvoted comments

In the meantime I would encourage you to open another issue as this one is a different ask from the original thread. Itll help in tracking things 😃

✅ 👉 https://github.com/Azure/azure-sdk-for-python/issues/31229

@kashifkhan

I will try as soon as possible to reproduce the issue with the checkpoint service using the latest version. However, this will take a couple of days.

Just wanted to update this issue and apologies for the long delay in between responses. I am working on resolving this issue during this sprint.

This is happening due to a low level issue where the service issues a disconnect after the timeout period from the service has been reached.

@macieyng for the service bus issue, can you create a new issue please and if you have any logs that can help it would be great.

In the meantime, i checked again with separate checkpoint stores for updating and reading.

It does not help.

Changed the loglevels. Will report back to you with the logs when it happens the next time.

Thank you @claria, just need you to set the following please

with the logging_enable that is optional, it will output whats going across the wire. If there is sensitive information, I can provide you my email to send the info otherwise its optional 😃

import logging
import sys

logger = logging.getLogger("azure.eventhub")
logger.setLevel(logging.DEBUG)

self.consumer = EventHubConsumerClient.from_connection_string(
            conn_str=config.EVENT_HUB_CONN_STR,
            consumer_group=config.EVENT_HUB_CONSUMER_GROUP,
            eventhub_name=config.EVENT_HUB_NAME,
            checkpoint_store=self.checkpoint_store,
            uamqp_transport=False,
            logging_enable=True,
        )