azure-sdk-for-python: [Eventhubs] Cannot send encoded bytes

  • Package Name: azure-eventhub
  • Package Version: 5.1.0
  • Operating System: python:3.8 (dockerhub image)
  • Python Version: 3.8

Describe the bug I fetch rows from a SQL server database and send those in batches to Eventhubs using the async client. I have had multiple instances of the service running against different databases without problems until now with some new tables I get random send failures.

To Reproduce

I can’t reproduce the error as the failure seems random. It seems to be related to some specific tables, but even they don’t immediately fail.

Log from the container:


2020-09-11 14.19.59.732	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Invalid argument (list=(nil), item=0x55e6b2bd9d10)' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/src/singlylinkedlist.c':b'singlylinkedlist_remove':108) 	
	
    2020-09-11 14.19.59.732	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Error removing pending delivery from the list' (b'/data/src/vendor/azure-uamqp-c/src/link.c':b'link_transfer_async':1596) 	
	
    2020-09-11 14.19.59.732	malloc(): corrupted unsorted chunks 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.730 INFO uamqp.connection: Connection b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6' state changed from <ConnectionState.OPENED: 9> to <ConnectionState.END: 13> 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.connection: Connection with ID b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6' unexpectedly in an error state. Closing: False, Error: None 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: CBS error occurred on connection b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6'. 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.sender: Message sender b'sender-link-9d531a90-358d-446a-952b-241c5d93f492' state changed from <MessageSenderState.Open: 2> to <MessageSenderState.Idle: 0> on connection: b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6' 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Connection not open' (b'/data/src/vendor/azure-uamqp-c/src/connection.c':b'connection_encode_frame':2048) 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Failed session send transfer' (b'/data/src/vendor/azure-uamqp-c/src/link.c':b'link_transfer_async':1593) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.728 INFO uamqp.c_uamqp: b'Failure: sending socket failed. errno=104 (Connection reset by peer).' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/socketio_berkeley.c':b'socketio_send':884) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'Error in xio_send.' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'write_outgoing_bytes':641) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'Error in write_outgoing_bytes.' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'tlsio_openssl_send':1374) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'xio_send failed' (b'/data/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1190) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'Cannot send encoded bytes' (b'/data/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268) 

Sending code:

    async def send_batch(self, batch: List[Dict]) -> Dict[str, str]:
        idx = 0
        last_lsn = None
        while True:
            # Batch is split to max send size Eventhub accepts
            event_data_batch = await self.client.create_batch()
            for record in batch[idx:]:
                try:
                    event_data_batch.add(EventData(record["event"].json_string))
                    idx += 1
                    last_lsn = record["lsn"]
                except ValueError:
                    # Max size reached
                    break

            if len(event_data_batch) > 0:
                await self.client.send_batch(event_data_batch)

            if idx == len(batch):
                break
        return last_lsn

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 15 (8 by maintainers)

Commits related to this issue

Most upvoted comments

can you also kindly share the uamqp version installed in your python environment? It would help us locate the issue.

$ pip freeze | grep uamqp
uamqp==1.2.11

Hi

I updated the eventhubs library to 5.2.0, but still get the same error. I revised the test and added a custom logger. You can find the log output after the test code.

Note: I’m testing against an eventhub which has 4 partitions.

import pytest
from typing import List, Dict, Iterator
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
import os
import sys
import random
import string
import json
import asyncio
import logging
from logging.handlers import RotatingFileHandler


def get_logger(filename, level=logging.INFO):
    azure_logger = logging.getLogger("azure.eventhub")
    azure_logger.setLevel(level)
    uamqp_logger = logging.getLogger("uamqp")
    uamqp_logger.setLevel(logging.INFO)

    formatter = logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s")
    console_handler = logging.StreamHandler(stream=sys.stdout)
    console_handler.setFormatter(formatter)
    if not azure_logger.handlers:
        azure_logger.addHandler(console_handler)
    if not uamqp_logger.handlers:
        uamqp_logger.addHandler(console_handler)

    if filename:
        file_handler = RotatingFileHandler(
            filename, maxBytes=5 * 1024 * 1024, backupCount=2
        )
        file_handler.setFormatter(formatter)
        azure_logger.addHandler(file_handler)

    return azure_logger


logger = get_logger("/tmp/ehub_debug.log", level=logging.DEBUG)


@pytest.fixture
async def client() -> Iterator[EventHubProducerClient]:
    client = new_client()
    yield client
    await client.close()


async def recycle_client(client: EventHubProducerClient):
    try:
        await client.close()
    except Exception:
        print("Could not close old client cleanly. Setting to None")
    return new_client()


def new_client():
    eventhub_uri = os.environ["EVENTHUB_URI"]
    if "test-01" not in eventhub_uri:
        raise Exception("Send test events to hub test-01")
    return EventHubProducerClient.from_connection_string(eventhub_uri)


async def send_batch(client: EventHubProducerClient, batch: List[Dict]) -> None:
    idx = 0
    while True:
        # Batch is split to max send size Eventhub accepts
        event_data_batch = await client.create_batch()
        for record in batch[idx:]:
            try:
                event_data_batch.add(EventData(json.dumps(record)))
                idx += 1
            except ValueError:
                # Max size reached
                break

        if len(event_data_batch) > 0:
            await client.send_batch(event_data_batch)

        if idx == len(batch):
            break


def random_string(max_length: int) -> str:
    length = random.randint(1, max_length)
    letters = string.printable
    return "".join(random.choice(letters) for i in range(length))


def create_batch(rows: int, field_count: int) -> List[Dict]:
    fields = [f"field_{i}" for i in range(field_count)]
    batch = []
    for i in range(rows):
        batch.append({field: random_string(10) for field in fields})
    return batch

@pytest.mark.asyncio
async def test_send_large_batch_after_5min_delay_norecycle(client):
    logger.info("Sending first batch...")
    await send_batch(client, create_batch(10, 40))
    logger.info("Sleeping 5 minutes...")
    await asyncio.sleep(300)
    logger.info("Sending second batch...")
    await send_batch(client, create_batch(200, 40))

Log output:

$ pytest -sv tests/integration/eventhubs/test_eventhubs_timeout_issue.py 
========================================================================================================================== test session starts ==========================================================================================================================
platform linux -- Python 3.8.2, pytest-5.4.2, py-1.8.1, pluggy-0.13.1 -- /usr/bin/python3
cachedir: .pytest_cache
rootdir: -
plugins: asyncio-0.12.0, logbook-1.2.0
collected 1 item                                                                                                                                                                                                                                                        

tests/integration/eventhubs/test_eventhubs_timeout_issue.py::test_send_large_batch_after_5min_delay_norecycle 2020-10-12 09:59:58,381 azure.eventhub INFO     Sending first batch...
2020-10-12 09:59:58,392 uamqp.connection INFO     Connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' state changed from <ConnectionState.UNKNOWN: 999> to <ConnectionState.START: 0>
2020-10-12 09:59:58,559 uamqp.connection INFO     Connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
2020-10-12 09:59:58,769 uamqp.connection INFO     Connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' state changed from <ConnectionState.START: 0> to <ConnectionState.HDR_SENT: 2>
2020-10-12 09:59:58,821 uamqp.connection INFO     Connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' state changed from <ConnectionState.HDR_SENT: 2> to <ConnectionState.HDR_EXCH: 3>
2020-10-12 09:59:58,822 uamqp.connection INFO     Connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' state changed from <ConnectionState.HDR_EXCH: 3> to <ConnectionState.OPEN_SENT: 7>
2020-10-12 09:59:58,873 uamqp.connection INFO     Connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' state changed from <ConnectionState.OPEN_SENT: 7> to <ConnectionState.OPENED: 9>
2020-10-12 09:59:59,080 uamqp.c_uamqp INFO     CBS for connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' completed opening with status: 0
2020-10-12 09:59:59,183 uamqp.c_uamqp INFO     Token put complete with result: 0, status: 202, description: b'Accepted', connection: b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55'
2020-10-12 09:59:59,236 uamqp.sender INFO     Message sender b'sender-link-d21a54f5-6bd1-478f-b6a3-0572d323debb' state changed from <MessageSenderState.Idle: 0> to <MessageSenderState.Opening: 1> on connection: b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55'
2020-10-12 09:59:59,341 uamqp.sender INFO     Message sender b'sender-link-d21a54f5-6bd1-478f-b6a3-0572d323debb' state changed from <MessageSenderState.Opening: 1> to <MessageSenderState.Open: 2> on connection: b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55'
2020-10-12 09:59:59,456 azure.eventhub INFO     Sleeping 5 minutes...
2020-10-12 10:04:59,513 azure.eventhub INFO     Sending second batch...
2020-10-12 10:04:59,566 uamqp.c_uamqp INFO     b'Failure: sending socket failed. errno=104 (Connection reset by peer).' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/socketio_berkeley.c':b'socketio_send':884)
2020-10-12 10:04:59,566 uamqp.c_uamqp INFO     b'Error in xio_send.' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'write_outgoing_bytes':641)
2020-10-12 10:04:59,566 uamqp.c_uamqp INFO     b'Error in write_outgoing_bytes.' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'tlsio_openssl_send':1374)
2020-10-12 10:04:59,566 uamqp.c_uamqp INFO     b'xio_send failed' (b'/data/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1190)
2020-10-12 10:04:59,566 uamqp.c_uamqp INFO     b'Cannot send encoded bytes' (b'/data/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
2020-10-12 10:04:59,566 uamqp.connection INFO     Connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' state changed from <ConnectionState.OPENED: 9> to <ConnectionState.END: 13>
2020-10-12 10:04:59,567 uamqp.connection INFO     Connection with ID b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55' unexpectedly in an error state. Closing: False, Error: None
2020-10-12 10:04:59,567 uamqp.c_uamqp INFO     CBS error occurred on connection b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55'.
2020-10-12 10:04:59,567 uamqp.sender INFO     Message sender b'sender-link-d21a54f5-6bd1-478f-b6a3-0572d323debb' state changed from <MessageSenderState.Open: 2> to <MessageSenderState.Idle: 0> on connection: b'EHProducer-943e27bd-796f-4e19-9d75-c14ac4c62c55'
2020-10-12 10:04:59,567 uamqp.c_uamqp INFO     b'Connection not open' (b'/data/src/vendor/azure-uamqp-c/src/connection.c':b'connection_encode_frame':2048)
2020-10-12 10:04:59,567 uamqp.c_uamqp INFO     b'Failed session send transfer' (b'/data/src/vendor/azure-uamqp-c/src/link.c':b'link_transfer_async':1525)
2020-10-12 10:04:59,567 uamqp.c_uamqp INFO     b'Invalid argument (list=(nil), item=0x242fe00)' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/src/singlylinkedlist.c':b'singlylinkedlist_remove':108)
2020-10-12 10:04:59,567 uamqp.c_uamqp INFO     b'Error removing pending delivery from the list' (b'/data/src/vendor/azure-uamqp-c/src/link.c':b'link_transfer_async':1528)
2020-10-12 10:04:59,567 uamqp.c_uamqp INFO     b'Error in link transfer' (b'/data/src/vendor/azure-uamqp-c/src/message_sender.c':b'send_one_message':544)
2020-10-12 10:04:59,567 uamqp.c_uamqp INFO     b'Error sending message' (b'/data/src/vendor/azure-uamqp-c/src/message_sender.c':b'messagesender_send_async':916)
Fatal Python error: Segmentation fault

Current thread 0x00007fea19c27740 (most recent call first):
  File "/home/janne/.local/lib/python3.8/site-packages/uamqp/async_ops/sender_async.py", line 157 in send_async
  File "/usr/lib/python3.8/asyncio/events.py", line 81 in _run
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1859 in _run_once
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570 in run_forever
  File "/usr/lib/python3.8/asyncio/base_events.py", line 603 in run_until_complete
  File "/home/janne/.local/lib/python3.8/site-packages/pytest_asyncio/plugin.py", line 155 in inner
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/python.py", line 182 in pytest_pyfunc_call
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 84 in <lambda>
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/python.py", line 1477 in runtest
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/runner.py", line 135 in pytest_runtest_call
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 84 in <lambda>
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/runner.py", line 217 in <lambda>
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/runner.py", line 244 in from_call
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/runner.py", line 216 in call_runtest_hook
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/runner.py", line 186 in call_and_report
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/runner.py", line 100 in runtestprotocol
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/runner.py", line 85 in pytest_runtest_protocol
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 84 in <lambda>
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/main.py", line 272 in pytest_runtestloop
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 84 in <lambda>
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/main.py", line 247 in _main
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/main.py", line 191 in wrap_session
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/main.py", line 240 in pytest_cmdline_main
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 84 in <lambda>
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/janne/.local/lib/python3.8/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/janne/.local/lib/python3.8/site-packages/_pytest/config/__init__.py", line 124 in main
  File "/home/janne/.local/bin/pytest", line 8 in <module>
Segmentation fault (core dumped)