aio-pika: aio_pika is slower than pika

I have prepared two scripts to publish 10k messages, one using aio_pika and the second using pika. I measure time them need to send these messages. First script:

import asyncio
import aio_pika
import datetime


async def main(loop):
    connection = await aio_pika.connect("amqp://guest:guest@172.17.0.2/", loop=loop)
    routing_key = "test_queue"
    channel = await connection.channel()
    num_of_messages = 10000
    start = datetime.datetime.now()
    msg = aio_pika.Message(body=b'hello')
    for _ in range(num_of_messages):
        await channel.default_exchange.publish(
            msg,
            routing_key=routing_key
        )
    tt = datetime.datetime.now() - start
    print(tt.total_seconds(), num_of_messages/tt.total_seconds())
    await connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Result:

2.227745 4488.844100199978

Second:

import pika
import datetime


connection = pika.BlockingConnection(pika.URLParameters("amqp://guest:guest@172.17.0.2/"))
channel = connection.channel()

num_of_messages = 10000

routing_key = "test_queue"

start = datetime.datetime.now()

for _ in range(num_of_messages):
    channel.basic_publish(exchange='',
                        routing_key=routing_key,
                        body=b'hello')

tt = datetime.datetime.now() - start
print(tt.total_seconds(), num_of_messages/tt.total_seconds())

connection.close()

Result:

0.37214 26871.60745955823

It looks pika is 6 time faster than aio_pika. Why there is such difference?

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 16 (8 by maintainers)

Most upvoted comments

I ran both @mosquito’s benchmarks (I’ll call them bench1 and bench2 respectively) on my laptop and got the following results:

$ python -V
Python 3.6.3
$ pip freeze | grep aio-pika
aio-pika==2.6.0

bench1: 8.663177 1154.3109415864412 bench2: 1.87766 5325.77782985205 Then I modified the first benchmark:

--- bench1.py	2018-04-19 23:34:41.000000000 +0600
+++ bench3.py	2018-04-19 23:34:11.000000000 +0600
@@ -10,11 +10,14 @@
     num_of_messages = 10000
     start = datetime.datetime.now()
     msg = aio_pika.Message(body=b'hello')
+    futs = []
+    publish = channel.default_exchange.publish
     for _ in range(num_of_messages):
-        await channel.default_exchange.publish(
+        futs.append(publish(
             msg,
             routing_key=routing_key
-        )
+        ))
+    await asyncio.gather(*futs)
     tt = datetime.datetime.now() - start
     print(tt.total_seconds(), num_of_messages/tt.total_seconds())
     await connection.close()

(basically, removed await per iteration and instead used asyncio.gather to await all coroutines)

bench3: 4.197344 2382.4590026454825 (two (not quite, see below) times faster than bench1, two times slower than bench2)

Then I ran bench2 and bench3 under cProfile, here are top 20 calls sorted by tottime: bench2:

1036805 function calls (1034522 primitive calls) in 2.106 seconds

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    30007    0.440    0.000    0.440    0.000 {method 'send' of '_socket.socket' objects}
    10000    0.213    0.000    1.397    0.000 /venv/lib/python3.6/site-packages/pika/connection.py:1573(_send_message)
    10007    0.102    0.000    0.560    0.000 /venv/lib/python3.6/site-packages/pika/adapters/base_connection.py:418(_handle_write)
    10000    0.086    0.000    1.570    0.000 /venv/lib/python3.6/site-packages/pika/channel.py:307(basic_publish)
    30006    0.080    0.000    0.142    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:32(_marshal)
    10002    0.071    0.000    0.248    0.000 /venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py:1154(_flush_output)
   100053    0.066    0.000    0.066    0.000 {built-in method _struct.pack}
    10000    0.062    0.000    0.211    0.000 /venv/lib/python3.6/site-packages/pika/spec.py:1511(encode)
    20017    0.048    0.000    0.119    0.000 /venv/lib/python3.6/site-packages/pika/data.py:11(encode_short_string)
    10010    0.046    0.000    0.094    0.000 /venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py:403(<lambda>)
    10000    0.043    0.000    0.051    0.000 /venv/lib/python3.6/site-packages/pika/spec.py:2157(encode)
    10000    0.041    0.000    1.841    0.000 /venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py:1984(publish)
    10004    0.037    0.000    0.168    0.000 /venv/lib/python3.6/site-packages/pika/adapters/blocking_connection.py:383(_flush_output)
    10006    0.029    0.000    0.305    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:68(marshal)
56073/56072    0.028    0.000    0.028    0.000 {built-in method builtins.isinstance}
    30012    0.027    0.000    0.027    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:22(__init__)
    10000    0.027    0.000    0.134    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:98(marshal)
103434/103318    0.027    0.000    0.027    0.000 {built-in method builtins.len}
    30012    0.025    0.000    0.025    0.000 /venv/lib/python3.6/site-packages/pika/compat.py:49(byte)
    10000    0.025    0.000    0.025    0.000 /venv/lib/python3.6/site-packages/pika/spec.py:2070(__init__)

bench3:

4349192 function calls (4337014 primitive calls) in 6.526 seconds

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    30007    0.539    0.000    0.539    0.000 {method 'send' of '_socket.socket' objects}
   280016    0.319    0.000    0.486    0.000 /venv/lib/python3.6/site-packages/aio_pika/message.py:197(__setattr__)
    10000    0.267    0.000    1.862    0.000 /venv/lib/python3.6/site-packages/pika/connection.py:1573(_send_message)
    30005    0.200    0.000    0.200    0.000 /venv/lib/python3.6/site-packages/pika/connection.py:1625(_trim_frame_buffer)
    30010    0.188    0.000    0.511    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:189(decode_frame)
    10000    0.155    0.000    2.884    0.000 /venv/lib/python3.6/site-packages/aio_pika/channel.py:204(_publish)
386604/386603    0.125    0.000    0.125    0.000 {built-in method builtins.isinstance}
    10001    0.123    0.000    0.520    0.000 /venv/lib/python3.6/site-packages/aio_pika/message.py:68(__init__)
    10007    0.122    0.000    0.680    0.000 /venv/lib/python3.6/site-packages/pika/adapters/base_connection.py:418(_handle_write)
    50012    0.122    0.000    0.243    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:594(_call_soon)
    60116    0.121    0.000    0.240    0.000 /venv/lib/python3.6/site-packages/pika/callback.py:15(name_or_value)
    10000    0.114    0.000    0.863    0.000 /venv/lib/python3.6/site-packages/aio_pika/message.py:255(__init__)
    30006    0.105    0.000    0.227    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:32(_marshal)
    10000    0.101    0.000    0.135    0.000 /venv/lib/python3.6/site-packages/aio_pika/message.py:172(properties)
    50022    0.098    0.000    6.269    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/events.py:125(_run)
    60011    0.098    0.000    0.098    0.000 /venv/lib/python3.6/site-packages/pika/compat.py:49(byte)
    50012    0.098    0.000    0.359    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:564(call_soon)
    10000    0.097    0.000    0.164    0.000 /venv/lib/python3.6/site-packages/pika/spec.py:2157(encode)
   120053    0.094    0.000    0.094    0.000 {built-in method _struct.pack}
    50015    0.092    0.000    0.108    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/events.py:86(__init__)

Obviously, there are too much function calls happening. 4 times more than required (taking bench2 as the base line). In python, abstractions aren’t free. Let’s get rid of Message and publish messages directly:

--- bench3.py	2018-04-19 23:34:11.000000000 +0600
+++ bench4.py	2018-04-19 23:59:58.000000000 +0600
@@ -9,13 +9,17 @@
     channel = await connection.channel(publisher_confirms=False)
     num_of_messages = 10000
     start = datetime.datetime.now()
-    msg = aio_pika.Message(body=b'hello')
+    props = aio_pika.message.BasicProperties()
     futs = []
-    publish = channel.default_exchange.publish
+    publish = channel._publish
     for _ in range(num_of_messages):
         futs.append(publish(
-            msg,
-            routing_key=routing_key
+            '',
+            body=b'hello',
+            routing_key=routing_key,
+            properties=props,
+            mandatory=False,
+            immediate=False,
         ))
     await asyncio.gather(*futs)
     tt = datetime.datetime.now() - start

bench4: 3.662427 2730.429848840673 Now let’s sort the cProfile output by cumulative time:

         1840853 function calls (1828263 primitive calls) in 3.949 seconds

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    191/1    0.016    0.000    3.949    3.949 {built-in method builtins.exec}
        1    0.000    0.000    3.949    3.949 bench4.py:1(<module>)
        1    0.000    0.000    3.713    3.713 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:432(run_until_complete)
        1    0.000    0.000    3.713    3.713 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:404(run_forever)
       15    0.073    0.005    3.712    0.247 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:1330(_run_once)
    50017    0.100    0.000    3.609    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/events.py:125(_run)
    10000    0.062    0.000    3.196    0.000 /venv/lib/python3.6/site-packages/aio_pika/common.py:116(wrap)
    10000    0.189    0.000    3.117    0.000 /venv/lib/python3.6/site-packages/aio_pika/channel.py:204(_publish)
    10000    0.091    0.000    2.154    0.000 /venv/lib/python3.6/site-packages/pika/channel.py:307(basic_publish)
    10002    0.020    0.000    2.010    0.000 /venv/lib/python3.6/site-packages/pika/channel.py:1141(_send_method)
    10006    0.025    0.000    1.991    0.000 /venv/lib/python3.6/site-packages/pika/connection.py:1558(_send_method)
    10000    0.287    0.000    1.965    0.000 /venv/lib/python3.6/site-packages/pika/connection.py:1573(_send_message)
    10007    0.035    0.000    0.875    0.000 /venv/lib/python3.6/site-packages/pika/adapters/base_connection.py:278(_flush_outbound)
    10007    0.153    0.000    0.816    0.000 /venv/lib/python3.6/site-packages/pika/adapters/base_connection.py:418(_handle_write)
    30007    0.640    0.000    0.640    0.000 {method 'send' of '_socket.socket' objects}
    10006    0.041    0.000    0.426    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:68(marshal)
    50012    0.089    0.000    0.400    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:564(call_soon)
    10004    0.069    0.000    0.360    0.000 {method 'set_result' of '_asyncio.Future' objects}
    50012    0.165    0.000    0.290    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:594(_call_soon)
    10000    0.090    0.000    0.283    0.000 /venv/lib/python3.6/site-packages/pika/spec.py:1511(encode)

We can clearly see the spike here:

    10000    0.189    0.000    3.117    0.000 /venv/lib/python3.6/site-packages/aio_pika/channel.py:204(_publish)
    10000    0.091    0.000    2.154    0.000 /venv/lib/python3.6/site-packages/pika/channel.py:307(basic_publish)

So aio-pika’s publish is taking a lot of time. Why?

I can see two possible culprits: https://github.com/mosquito/aio-pika/blob/master/aio_pika/channel.py#L207 and https://github.com/mosquito/aio-pika/blob/master/aio_pika/channel.py#L212 The latter is actually quite function-calls heavy (previous cProfile output, sorted by tottime):

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    30007    0.640    0.000    0.640    0.000 {method 'send' of '_socket.socket' objects}
    10000    0.287    0.000    1.965    0.000 /venv/lib/python3.6/site-packages/pika/connection.py:1573(_send_message)
    10000    0.189    0.000    3.117    0.000 /venv/lib/python3.6/site-packages/aio_pika/channel.py:204(_publish)
    50012    0.165    0.000    0.290    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:594(_call_soon)
    10007    0.153    0.000    0.816    0.000 /venv/lib/python3.6/site-packages/pika/adapters/base_connection.py:418(_handle_write)
    30006    0.110    0.000    0.196    0.000 venv/lib/python3.6/site-packages/pika/frame.py:32(_marshal)
    50017    0.100    0.000    3.609    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/events.py:125(_run)
   100053    0.096    0.000    0.096    0.000 {built-in method _struct.pack}
    50015    0.094    0.000    0.110    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/events.py:86(__init__)
    10000    0.091    0.000    2.154    0.000 /venv/lib/python3.6/site-packages/pika/channel.py:307(basic_publish)
    10000    0.090    0.000    0.283    0.000 /venv/lib/python3.6/site-packages/pika/spec.py:1511(encode)
    50012    0.089    0.000    0.400    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:564(call_soon)
    10000    0.077    0.000    0.090    0.000 /venv/lib/python3.6/site-packages/pika/spec.py:2157(encode)
       15    0.073    0.005    3.712    0.247 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:1330(_run_once)
    10004    0.069    0.000    0.360    0.000 {method 'set_result' of '_asyncio.Future' objects}
    20017    0.063    0.000    0.153    0.000 /venv/lib/python3.6/site-packages/pika/data.py:11(encode_short_string)
    10000    0.062    0.000    3.196    0.000 /venv/lib/python3.6/site-packages/aio_pika/common.py:116(wrap)
30004/20003    0.050    0.000    0.071    0.000 /venv/lib/python3.6/site-packages/aio_pika/common.py:65(add)
    10000    0.044    0.000    0.212    0.000 /venv/lib/python3.6/site-packages/pika/frame.py:98(marshal)
    10005    0.041    0.000    0.045    0.000 /Users/malinoff/.pyenv/versions/3.6.3/lib/python3.6/asyncio/base_events.py:273(create_future)

I modified my installed aio-pika channel.py directly, I removed the write lock completely and changed self._create_future() to asyncio.Future() bench5: 2.64704 3777.8046421663444 It is 3.3 times faster then the original benchmark, and only 1.4 times slower (instead of 4.6 times) than the pika’s BlockingConnection.

I’m sure we can identify more bottlenecks by looking at the cProfile output more closely; I got these results in 10 minutes or so. I hope they will be helpful 😃

after release aio-pika>=5 tests passing faster two times then similar for aio-pika<=4

@krieghan could you please repeat your tests?

@malinoff now aio-pika uses aiormq and here has only channel-scope locks. And one more thing is buffered queues for incomming amqp frames. That means the library will parse AMQP protocol ahead of consumer coroutine finished.

@michailj @akhoronko feel free to join this performance party. Please repeat your tests, that’s should be so awesome.

@michailj you are publishing to the single channel. Try to use more channels to achieve asyncio advantage. And, afaik, aio-pika uses publisher confirms (https://www.rabbitmq.com/confirms.html#publisher-confirms), but pika doesn’t by default.

I mean that for instance when we need to publish a lot of messages using await channel.default_exchange.publish it is ineffective due to the write locks at channel level. So if it could be something like this:

channel_pool = await ChannelPool(connection, ...)
await channel_pool.default_exchange.publish(message, ...)

than there wouldn’t be any problems with speed of publishing. Isn’t it? Correct me if I’m wrong.