google-cloud-go: pubsub: Large number of duplicate messages suddenly

I’ve been noticing an influx of duplicate messages. Previously I don’t think I had ever came across one, suddenly started seeing large volumes of dupes. For example out of 50 messages that were all ACKd just recently, I saw 48 duplicates. It sounds similar to this issue: https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2465

When I look in Google Cloud Console at the API requests, I’m seeing large numbers of 499 error codes. In the last 4 days I have 1,312 200 error codes, but 7,580 499 error codes.

MaxOutstandingMessages = 10 MaxExtension = 10 minutes

Versions:

  • cloud.google.com/go: aeeacb092ec71c83668e4828857289d89c34b610
  • github.com/googleapis/gax-go: 317e0006254c44a0ac427cc52a0e083ff0b9622f
  • google.golang.org/genproto: 1e559d0a00eef8a9a43151db4665280bd8dd5886
  • google.golang.org/grpc: f92cdcd7dcdc69e81b2d7b338479a19a8723cfa3

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 5
  • Comments: 72 (37 by maintainers)

Most upvoted comments

I am running into the same issue and it’s incredibly frustrating. In my debugging, I set ReceiveSettings.NumGoroutines = 1, MaxOutstandingMessages = 1 and ReceiveSettings.MaxExtension = 10 * time.Minute.

With the default subscription.AckDeadline = 10seconds:

  1. If I ack the msg as soon as I receive it, I get 100% deletion rate.
  2. If I wait 100ms after (just sleep), I deleted 196 of 200 msgs.
  3. If I wait 200ms after, I deleted 102 of 200 msgs.
  4. If I wait 300ms after, I deleted only 68 of 200 msgs.

I also created a subscription w/ AckDeadline: 5 * time.Minute. the deletion/ack rate is slightly better. The last run I did w/ 300ms delay ack’ed 123 of 200 msgs.

Did Google Pubsub change something on their end? Seems like it’s all happening in the last couple of days.

@cristiangraz I’m curious why is this issue closed?

It seems like this issue is haunting us as we have it in multiple projects now. After having followed this whole discussion and also spent time studying the internals of this library, we came to the conclusion that the current library is designed for pipelines having a setup with a somewhat stable incoming messages, and a high throughput in terms of processing.

In our case, we receive spikes of messages every 5 minutes, and the processing time of 1 message can vary and take up to a couple of seconds sometimes and there is not expected correlation between the number of incoming messages and the speed at which we want to process them.

If our understanding is correct, the streaming pull strategy used in this library can eventually fetch more messages than the MaxOutstandingMessages, which from a developer experience point of view is a bit hard to understand. I do understand now that this allows for a very high throughput in some scenarios. However it also introduces all issues discussed in this thread.

On our side, we tried leveraging the non streaming pull approach and so far it seems to address the problems. However our solution required us to re-implement parts of this pubsub client in order to re-create some of the needed features.

Is there any chance you could introduce a parameter letting the user choose whether to use the experimental streaming subscription pulling, or using the API endpoint? It seems like the latter respects the MaxOutstandingMessages and would work very fine in our use case.

Otherwise, if you plan to somehow deprecate the Pull endpoint in favour of the StreamingPull, is there any chances we could implement an option forcing the client to respect the max outstanding messages? Even a hack in the beginning, for example if the client was to Nack directly all messages after MaxOutstandingMessages amount has been received could help us solve our issue.

I hope this all makes sense. We feel like our current implementation re-invents the wheel, and given that you mentioned earlier that you were working on this case, I wanted to share our experimentations and expectations. I hope this is somewhat useful.

As of afb80096eae340697e1153d7af9a5a418ba75067, we support synchronous mode for Receive. If you want more control over throughput, please try it. (It ensures that you will never pull more than MaxOutstandingMessages from the Pub/Sub service at a time.)

I’m going to close this now. Reopen if you are still experiencing many duplicates and synchronous mode isn’t helping.

Hey @pongad,

I’ve experimented with multiple consumers, both within GCP’s network and outside it, with multiple variations of the consumer code (Python and Go).

I tried truncating the handler to simply ack the message and still end up with a consumption rate of 10% of the publishing rate.

I also experimented with the Cloud Dataflow pubsub to bigquery template and its consumption rate was similar.

I have a single sub, with a 10m deadline and consumers with a minimal handler - checking the publishTime field vs now yields 0 so there’s no delivery delay… And yet I’m seeing less than 100 messages per second vs 3k being published.

@tmatsuo From our analysis, things look slightly different from what you just mentioned.

In your table, the server sends and the client receives at about the same time. Our analysis shows that this isn’t really the case. There could be significant buffering between client and server. So things might look kind of like this instead:

Time Client Server
0 Buffers (the client hasn’t “seen” the message) Send a message
6 Sees message
10 Message expires
11 Sends modack Got it, but too late

Your solution might help to a degree. However, it’s always possible for enough messages to back up that changing keep alive period won’t help.

We’re fixing this partly server-side so things look like this:

Time Client Server
0 Buffers (the client hasn’t “seen” the message) Send a message, but pause the “clock”
6 Sees message; notifies server [1] starts the clock, message expires at time=16
11 Sends modack Got it, with time to spare

The change [1] is already made to the client here.

Last week, we estimated that the server-side fix should be rolling out this week. I’ll keep this thread updated.

@jba Can we reopen this since people are still experiencing problems? I don’t want this issue to lose visibility.

duplicates would be on a per client basis

I don’t think that’s right. The service will load balance messages (including re-sent ones) across all streams in a subscription. However, if you only have a handful of streams, a significant fraction will end up on the same one. For instance, if you have two streams (processes), then each will see half the messages, and get half the dups. So a per-client solution will weed out a quarter of the dups. I guess that’s not a great ratio, now that I do the math. In any case, my point was that a per-client solution is much simpler architecturally, and maybe it gives you enough de-duping to make your system perform adequately.

@jba

So you exit from the call to Receive, then wait a while, then call Receive again?

Yes and no. We notice duplicates in 2 circumstances.

  • Indeed stopping the client then restarting it later
  • When we have to Nack messages. e.g validation issue on our end, we suddenly start Nack’ing messages to a point that the queue isn’t being consumed at all. Note that we see duplicates when we start consuming the queue again. What I mean is we only see duplicates when the queue has accumulated and we’re trying to dequeue it.

pubsub_dup_rate

Also, slightly unrelated I guess, but it looks like Nack’ed messages are put back at the front of the queue, so that they are immediately fetched again by the clients. We saw a few times our pipeline completely stalling because of relatively few number of “invalid” messages that kept being retried.

For example […]

Thanks for the example, if I understand this right:

  • When the client pulls messages, an undefined? amount of messages are actually pulled from the server (more than MaxOustandingMessages I presume)
  • after 10 minutes + subscription deadline, if they haven’t been seen by the client, they will timeout and will eventually be sent again, even though they should eventually be processed by the client, which can lead to duplicates.

So that’d mean that if a queue has accumulated too many undelivered messages, such that they can’t be processed within (10 minutes + subscription deadline), then duplicates are to be expected. Is that right?

That’d match what we see then. What could we possibly do to mitigate this?

It would help if you could give us more data.

What would help?

In terms of settings, we use:

  • Subscription has a 600s deadline
  • Receiver settings has NumGoroutines=1, MaxOutstandingMessages=8.
  • Message processing time: p99 is < 10s, p95 is < 1s

Thanks a lot for the clarification In the meantime what we did is we added a cache layer to filter out messages based on their ID.

@sfriquet:

stop consuming a subscription and then starting to acknowledge messages again

So you exit from the call to Receive, then wait a while, then call Receive again?

When we restart consuming the subscription again by acking the messages, we see a surge in duplicated messages…

The timer for a message’s ack deadline begins when the message arrives on the client, or after 10 minutes, whichever comes first. Messages arrive on the client in bulk, and may be buffered (by proxies or the client’s network layer) before they arrive. If you ack slowly, messages may time out, and then you will see duplicates.

For example, say there are 10,000 undelivered messages when you call Receive, and you process them one at a time (one subscriber process, NumGoroutines=1, MaxOutstandingMessages=1). You ack each message after 1 second. After 10 minutes you have seen 600 messages, but there may be many more that have left the server and are in a buffer somewhere. Ten seconds later (assuming the default ack deadline of 10s), these messages will time out. You will see and ack them as you continue processing, but the server will have already marked them as expired and will redeliver them.

The bottom line is that it can be hard to distinguish this behavior—which is “normal,” though undesirable—from a bug in the PubSub client or service.

It would help if you could give us more data.

I’ll unassign myself from this, but I’ll keep an eye on it.

@pongad We changed our consumer to ack immediately (to remove any possibility of time out). It behaves this way immediately. Notably this behavior only exists in kubernetes. Running locally (inside a docker container) it executes perfectly.

+1 on this issue. We’ve been trying to determine for days why we aren’t processing through our queues and reverting to v0.11.0 seems to have resolved the issue. We were showing a ton of Google Cloud Pub/Sub API errors in our cloud console.