pulsar: [Bug] Client with shared subscription is blocked

Search before asking

  • I searched in the issues and found nothing similar.

Version

Client - 3.1.0 Pulsar - 3.1.0 (and later builds)

Also reported on 3.0.1

Minimal reproduce step

My reproducible steps:

  1. Create persistent topic with 3 partitions
  2. Publish 1 mln messages (30KB)
  3. Run the client and consumer:
    PulsarClient client = PulsarClient.builder()
        .serviceUrl(this.pulsarBrokerUrl)
        .build();

    Consumer consumer = client.newConsumer()
        .topic(sourceTopic)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscriptionName(subscriptionName)
        .subscriptionType(SubscriptionType.Shared)
        .receiverQueueSize(8)
        .ackTimeout(5, TimeUnit.SECONDS)
        .subscribe();

What did you expect to see?

All messages are received

What did you see instead?

Client stops to receive messages, restart client helps, but it get stuck after some time.

Anything else?

The issue was originally created described here: #21082 @MichalKoziorowski-TomTom also faces the issue.

I’ve created new issue, because it in #21082 the author says that broker restart helps. In case of this issue, it looks like it’s client related and some race condition observed in 3.x.x. after introducing ackTimeout

Are you willing to submit a PR?

  • I’m willing to submit a PR!

About this issue

  • Original URL
  • State: open
  • Created 10 months ago
  • Reactions: 1
  • Comments: 19 (6 by maintainers)

Most upvoted comments

It looks like I was able to reproduce the issue in the two runs today (failed 2/2).

  • I’ve created SmallRye project (one producer to produce 1 mln messages, one consumer to read it using shared subscription)
  • The cluster is created on GCP using Pulsar operator from SN (great tool!)

The code is here: https://github.com/michalcukierman/pulsar-21104

In general it’s very much like in the bug description. Produce 1 mln messages of 30kb:

  @Outgoing("requests-out")
  public Multi<String> produce() {
    return Multi.createBy().repeating()
        .uni(() -> Uni
            .createFrom()
            .item(() -> RandomStringUtils.randomAlphabetic(30_000))
            .onItem()
            .invoke(() -> System.out.println("+ Produced: " + outCount.incrementAndGet()))
        )
        .atMost(1_000_000);
  }

Read it using client with shared subscription and write to another topic:

@ApplicationScoped
public class Processor {

  private final AtomicLong inCount = new AtomicLong(0);
  @Incoming("requests-in")
  @Outgoing("dump-out")
  @Blocking
  PulsarOutgoingMessage<String> process(PulsarIncomingMessage<String> in) {
    System.out.println(" - Processed: " + inCount.incrementAndGet());
    return PulsarOutgoingMessage.from(in);
  }
}

The settings of the client are:

pulsar.client.serviceUrl=pulsar://brokers-broker:6650

mp.messaging.incoming.requests-in.subscriptionType=Shared
mp.messaging.incoming.requests-in.numIoThreads=4
mp.messaging.incoming.requests-in.subscriptionName=request-shared-subscription
mp.messaging.incoming.requests-in.ackTimeoutMillis=5000
mp.messaging.incoming.requests-in.subscriptionInitialPosition=Earliest
mp.messaging.incoming.requests-in.receiverQueueSize=8
mp.messaging.incoming.requests-in.topic=persistent://public/default/requests_4
mp.messaging.incoming.requests-in.connector=smallrye-pulsar

mp.messaging.outgoing.dump-out.topic=persistent://public/default/dump
mp.messaging.outgoing.dump-out.connector=smallrye-pulsar
mp.messaging.outgoing.dump-out.blockIfQueueFull=true
mp.messaging.outgoing.dump-out.maxPendingMessages=8
mp.messaging.outgoing.dump-out.maxPendingMessagesAcrossPartitions=12

The retention of the topic requests is set using Pulsar Admin in Java to -1 -1.

During two runs the consumer get stucked: Screenshot 2023-10-02 at 21 12 42

@mattisonchao I’ll have a time next week to get back to it.

I think it happens:

  • there is ackTimeout set on a consumer
  • high throughput
  • the receviverQueue size is low

There may be a race condition in 3.1.0 client, as the situation was not observed with 2.10.4 (we’ve downgraded, also @MichalKoziorowski-TomTom reported this as a fix).