google-cloud-python: Pub/Sub: Flow Control does not work as expected.
The flow control settings for the Cloud Pub/Sub subscriber should bound the number of unacknowledged messages outstanding to the callback at a time. However, it seems to have no effect on the client, where the number of outstanding messages always ends up being 10.
OS type and version: Mac OS X 10.14.3, Debian Linux Python version: Python 2.7.10 on Mac, Python 3.6.5 on Linux google-cloud-pubsub version: 0.40.0
Steps to reproduce
- Create a Cloud Pub/Sub topic and subscription
- Publish many messages to the topic
- Update the project name and subscription name in the code below.
- Run the code.
Expected behavior: Since the flow control settings have max_messages set to 2, only two “Received Message” lines should print at a time, with two “Acking Message” lines printing out 20 seconds later. Instead, 10 messages come to the callback at a time. Changing the value of max_messages has no impact on the number of messages sent to the callback.
Code example
import time
from google.cloud import pubsub_v1
project_name=<insert project name here>
subscription_name=<insert subscription name here>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_name, subscription_name)
def callback(message):
print ("Received Message")
time.sleep(20)
print ("Acking Message")
message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=2)
subscriber.subscribe(
subscription_path, callback=callback, flow_control=flow_control)
print('Subscriber started for {}'.format(subscription_path))
while True:
time.sleep(60)
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 22 (12 by maintainers)
@plamut Thanks for the info.
I continued to run pubsub over the weekend and didn’t experience any lost messages. The earlier lost messages could have been caused by an issue on my side. Also, still no issue with max_messages so your fix definitely works.
Thanks again for your help!
@jtressle I am happy to inform you that the PR has just been approved and merged.
@jtressle All review comments on the PR have been addressed (nothing major), currently awaiting another review. I would say it will most likely be merged some time next week.
Hi @jtressle, there is a PR linked just above your comment that addresses this very issue. 😉 It has not yet been merged, though.
Until the fix is merged and released, there might a workaround, albeit an ugly one. If your message callback blocks the thread before acknowledging the received message, it might work to to limit the scheduler’s max_workers to 1.
When a message is received, a callbacks is invoked asynchronously using the underlying scheduler, and if the latter’s thread pool only has 1 available thread, only a single callback will be executed at a time - unless, of course, that callback dispatched its work to some other thread.