azure-sdk-for-python: [Eventhubs] Cannot send encoded bytes
- Package Name: azure-eventhub
- Package Version: 5.1.0
- Operating System: python:3.8 (dockerhub image)
- Python Version: 3.8
Describe the bug I fetch rows from a SQL server database and send those in batches to Eventhubs using the async client. I have had multiple instances of the service running against different databases without problems until now with some new tables I get random send failures.
To Reproduce
I can’t reproduce the error as the failure seems random. It seems to be related to some specific tables, but even they don’t immediately fail.
Log from the container:
2020-09-11 14.19.59.732	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Invalid argument (list=(nil), item=0x55e6b2bd9d10)' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/src/singlylinkedlist.c':b'singlylinkedlist_remove':108) 	
	
    2020-09-11 14.19.59.732	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Error removing pending delivery from the list' (b'/data/src/vendor/azure-uamqp-c/src/link.c':b'link_transfer_async':1596) 	
	
    2020-09-11 14.19.59.732	malloc(): corrupted unsorted chunks 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.730 INFO uamqp.connection: Connection b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6' state changed from <ConnectionState.OPENED: 9> to <ConnectionState.END: 13> 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.connection: Connection with ID b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6' unexpectedly in an error state. Closing: False, Error: None 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: CBS error occurred on connection b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6'. 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.sender: Message sender b'sender-link-9d531a90-358d-446a-952b-241c5d93f492' state changed from <MessageSenderState.Open: 2> to <MessageSenderState.Idle: 0> on connection: b'EHProducer-50b50369-4815-41fd-8639-1d6390535ff6' 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Connection not open' (b'/data/src/vendor/azure-uamqp-c/src/connection.c':b'connection_encode_frame':2048) 	
	
    2020-09-11 14.19.59.731	2020-09-11 14:19:59.731 INFO uamqp.c_uamqp: b'Failed session send transfer' (b'/data/src/vendor/azure-uamqp-c/src/link.c':b'link_transfer_async':1593) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.728 INFO uamqp.c_uamqp: b'Failure: sending socket failed. errno=104 (Connection reset by peer).' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/socketio_berkeley.c':b'socketio_send':884) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'Error in xio_send.' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'write_outgoing_bytes':641) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'Error in write_outgoing_bytes.' (b'/data/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'tlsio_openssl_send':1374) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'xio_send failed' (b'/data/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1190) 	
	
    2020-09-11 14.19.59.729	2020-09-11 14:19:59.729 INFO uamqp.c_uamqp: b'Cannot send encoded bytes' (b'/data/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268) 
Sending code:
    async def send_batch(self, batch: List[Dict]) -> Dict[str, str]:
        idx = 0
        last_lsn = None
        while True:
            # Batch is split to max send size Eventhub accepts
            event_data_batch = await self.client.create_batch()
            for record in batch[idx:]:
                try:
                    event_data_batch.add(EventData(record["event"].json_string))
                    idx += 1
                    last_lsn = record["lsn"]
                except ValueError:
                    # Max size reached
                    break
            if len(event_data_batch) > 0:
                await self.client.send_batch(event_data_batch)
            if idx == len(batch):
                break
        return last_lsn
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 15 (8 by maintainers)
Commits related to this issue
- CodeGen from PR 13737 in Azure/azure-rest-api-specs Merge Dev-containerservice-microsoft.containerservice-2021-03-01 API to master (#13737) * Adds base for updating Microsoft.ContainerService from v... — committed to AzureSDKAutomation/azure-sdk-for-python by deleted user 3 years ago
- CodeGen from PR 13737 in Azure/azure-rest-api-specs Merge Dev-containerservice-microsoft.containerservice-2021-03-01 API to master (#13737) * Adds base for updating Microsoft.ContainerService from v... — committed to AzureSDKAutomation/azure-sdk-for-python by deleted user 3 years ago
Hi
I updated the eventhubs library to 5.2.0, but still get the same error. I revised the test and added a custom logger. You can find the log output after the test code.
Note: I’m testing against an eventhub which has 4 partitions.
Log output: