spring-cloud-gcp: Inconsistent Message Ordering with High-Frequency Publishing Interval

Issue

When publishing multiple events/message having the same ordering key on a interval of 1 to 500 ms, the message ordering is not maintained consistently.

Despite taking all the necessary steps to ensure proper ordering, (mentioned below) the consumed events are received out of order.

  • publishing messages to a regional end-point.
  • enabling message ordering flag at subscription level and
  • enabling message ordering flag at the publisher level.

However, if a sleep delay of approximately 1 second is introduced before publishing the individual events, the messages are received in the expected order.

Steps to reproduce

  1. Set up a publisher-subscriber system with proper configuration for message ordering.
  2. Publish a sequence of events with the same ordering key, each separated by a 1 to 500 ms interval. (Don’t bring up the consumer yet)
  3. Start the consumer that subscribes to a subscription which has message ordering flag enabled.
  4. Observe the received order of the consumed events.

Code example

Configurations

  @Bean
  public CredentialsProvider credentialsProvider() {
    return () ->
        ServiceAccountCredentials.fromStream(Files.newInputStream(Paths.get(credentialsFilePath)))
            .createScoped(PublisherStubSettings.getDefaultServiceScopes());
  }

  @Bean
  public PubSubPublisherTemplate pubSubPublisherTemplate(
      CredentialsProvider credentialsProvider, PubSubMessageConverter pubSubMessageConverter) {

    DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> projectId);
    factory.setEnableMessageOrdering(true);
    factory.setCredentialsProvider(credentialsProvider);
    factory.setEndpoint("us-east1-pubsub.googleapis.com:443");

    PubSubPublisherTemplate template = new PubSubPublisherTemplate(factory);
    template.setMessageConverter(pubSubMessageConverter);
    return template;
  }

  @Bean
  public PubSubSubscriberTemplate subscriberTemplate(
      CredentialsProvider credentialsProvider, PubSubMessageConverter pubSubMessageConverter) {

    GcpPubSubProperties gcpPubSubProperties = new GcpPubSubProperties();
    gcpPubSubProperties.setKeepAliveIntervalMinutes(1);
    gcpPubSubProperties.initialize(projectId);

    DefaultSubscriberFactory factory =
        new DefaultSubscriberFactory(() -> projectId, gcpPubSubProperties);
    factory.setCredentialsProvider(credentialsProvider);

    PubSubSubscriberTemplate template = new PubSubSubscriberTemplate(factory);
    template.setMessageConverter(pubSubMessageConverter);
    return template;
  }

  @Bean
  public PubSubTemplate pubSubTemplateBean(
      PubSubPublisherTemplate pubSubPublisherTemplate,
      PubSubSubscriberTemplate subscriberTemplate) {
    return new PubSubTemplate(pubSubPublisherTemplate, subscriberTemplate);
  }

Publisher and Subscriber

  private void publishWithOrdering(List<Item> items) {
    items.forEach(
        item -> {
          log.info(
              "Publishing item (with ordering key) : [{}] with event description : [{}] ",
              item.getItemNumber(),
              item.getItemDescription());

          pubSubTemplate.publish(
              topic(), item, Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "1234"));

          try {
            Thread.sleep(100);
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
        });
  }

@Configuration
public class PubSubListener {
  @Bean
  public Subscriber itemListener(PubSubTemplate pubSubTemplate) {

    return pubSubTemplate.subscribeAndConvert(getSubscription(), this::handleMessage, Item.class);
  }

  public void handleMessage(ConvertedBasicAcknowledgeablePubsubMessage<Item> message) {
    Item payload = message.getPayload();
    log.info(
        "Item received with item ID : [{}] and event description : [{}] with ordering key [{}]",
        payload.getItemNumber(),
        payload.getItemDescription(),
        message.getPubsubMessage().getOrderingKey());

    message.ack();
  }

  public String getSubscription() {
    return "item-topic-subscription";
  }
}

Dependencies

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-dependencies</artifactId>
      <version>4.5.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>spring-cloud-gcp-starter</artifactId>
</dependency>

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 2
  • Comments: 17 (10 by maintainers)

Most upvoted comments

The Google Pub/Sub team has marked the issue as closed and suggested transferring it to the Spring Cloud GCP board. However, I will attempt to submit a support ticket to address the matter.

Can you identify any possible bug in this scenario?

@rastogirishabh : how were you able to print the entire payload in the logs as shown below?

I modified the PubSubPublisherTemplate implementation in my local spring-cloud-gcp checkout, installed to my local maven repository, and then modified the reproducer project to be dependent on the just-build snapshot version.

Thanks @meltsufin and @burkedavison. Tested and it is working as expected.

Verified this fixes the reproducer project. PubSubConfig.pubSubPublisherTemplate():

    DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> projectId);
    // ... configure factory ...
    PubSubPublisherTemplate template = new PubSubPublisherTemplate(new CachingPublisherFactory(factory));
    // ...

@rastogirishabh : Closing this issue, but please reopen if needed.

@maitrimangal Never mind, we were actually using a different Publisher instance each time. @rastogirishabh You’re not actually using auto-configuration which would provide you with a CachingPublisherFactory that would re-use the same publisher. @burkedavison will try wrapping your instance of DefaultPublisherFactory with CachingPublisherFactory to see if it resolves the problem.

Thank you @rastogirishabh for the clear reproducer.

I’ve reproduced the issue with your project.

image

So it appears the PubSubPublisherTemplate is passing each message to a different executor, thus the onSuccess callbacks are out of order because the publishing events are too.

I don’t see anything obvious, but I think Cloud Support is the way to go for a more complete investigation. If you want to debug on your own, I would try to use the Pub/Sub client library directly without Spring Cloud GCP.

I don’t see anything obviously wrong in your setup. Can you please file a Google Cloud Support ticket for further investigation? Another thing you can try is to use pure Pub/Sub client library without Spring Cloud GCP, and see if you observe the same result.