spring-cloud-gcp: Inconsistent Message Ordering with High-Frequency Publishing Interval
Issue
When publishing multiple events/message having the same ordering key on a interval of 1 to 500 ms, the message ordering is not maintained consistently.
Despite taking all the necessary steps to ensure proper ordering, (mentioned below) the consumed events are received out of order.
- publishing messages to a regional end-point.
- enabling message ordering flag at subscription level and
- enabling message ordering flag at the publisher level.
However, if a sleep delay of approximately 1 second is introduced before publishing the individual events, the messages are received in the expected order.
Steps to reproduce
- Set up a publisher-subscriber system with proper configuration for message ordering.
- Publish a sequence of events with the same ordering key, each separated by a 1 to 500 ms interval. (Don’t bring up the consumer yet)
- Start the consumer that subscribes to a subscription which has message ordering flag enabled.
- Observe the received order of the consumed events.
Code example
Configurations
@Bean
public CredentialsProvider credentialsProvider() {
return () ->
ServiceAccountCredentials.fromStream(Files.newInputStream(Paths.get(credentialsFilePath)))
.createScoped(PublisherStubSettings.getDefaultServiceScopes());
}
@Bean
public PubSubPublisherTemplate pubSubPublisherTemplate(
CredentialsProvider credentialsProvider, PubSubMessageConverter pubSubMessageConverter) {
DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> projectId);
factory.setEnableMessageOrdering(true);
factory.setCredentialsProvider(credentialsProvider);
factory.setEndpoint("us-east1-pubsub.googleapis.com:443");
PubSubPublisherTemplate template = new PubSubPublisherTemplate(factory);
template.setMessageConverter(pubSubMessageConverter);
return template;
}
@Bean
public PubSubSubscriberTemplate subscriberTemplate(
CredentialsProvider credentialsProvider, PubSubMessageConverter pubSubMessageConverter) {
GcpPubSubProperties gcpPubSubProperties = new GcpPubSubProperties();
gcpPubSubProperties.setKeepAliveIntervalMinutes(1);
gcpPubSubProperties.initialize(projectId);
DefaultSubscriberFactory factory =
new DefaultSubscriberFactory(() -> projectId, gcpPubSubProperties);
factory.setCredentialsProvider(credentialsProvider);
PubSubSubscriberTemplate template = new PubSubSubscriberTemplate(factory);
template.setMessageConverter(pubSubMessageConverter);
return template;
}
@Bean
public PubSubTemplate pubSubTemplateBean(
PubSubPublisherTemplate pubSubPublisherTemplate,
PubSubSubscriberTemplate subscriberTemplate) {
return new PubSubTemplate(pubSubPublisherTemplate, subscriberTemplate);
}
Publisher and Subscriber
private void publishWithOrdering(List<Item> items) {
items.forEach(
item -> {
log.info(
"Publishing item (with ordering key) : [{}] with event description : [{}] ",
item.getItemNumber(),
item.getItemDescription());
pubSubTemplate.publish(
topic(), item, Collections.singletonMap(GcpPubSubHeaders.ORDERING_KEY, "1234"));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
@Configuration
public class PubSubListener {
@Bean
public Subscriber itemListener(PubSubTemplate pubSubTemplate) {
return pubSubTemplate.subscribeAndConvert(getSubscription(), this::handleMessage, Item.class);
}
public void handleMessage(ConvertedBasicAcknowledgeablePubsubMessage<Item> message) {
Item payload = message.getPayload();
log.info(
"Item received with item ID : [{}] and event description : [{}] with ordering key [{}]",
payload.getItemNumber(),
payload.getItemDescription(),
message.getPubsubMessage().getOrderingKey());
message.ack();
}
public String getSubscription() {
return "item-topic-subscription";
}
}
Dependencies
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-dependencies</artifactId>
<version>4.5.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter</artifactId>
</dependency>
About this issue
- Original URL
- State: closed
- Created a year ago
- Reactions: 2
- Comments: 17 (10 by maintainers)
The Google Pub/Sub team has marked the issue as closed and suggested transferring it to the Spring Cloud GCP board. However, I will attempt to submit a support ticket to address the matter.
Can you identify any possible bug in this scenario?
I modified the PubSubPublisherTemplate implementation in my local spring-cloud-gcp checkout, installed to my local maven repository, and then modified the reproducer project to be dependent on the just-build snapshot version.
Thanks @meltsufin and @burkedavison. Tested and it is working as expected.
Verified this fixes the reproducer project.
PubSubConfig.pubSubPublisherTemplate():@rastogirishabh : Closing this issue, but please reopen if needed.
@maitrimangal Never mind, we were actually using a different
Publisherinstance each time. @rastogirishabh You’re not actually using auto-configuration which would provide you with aCachingPublisherFactorythat would re-use the same publisher. @burkedavison will try wrapping your instance ofDefaultPublisherFactorywithCachingPublisherFactoryto see if it resolves the problem.Thank you @rastogirishabh for the clear reproducer.
I’ve reproduced the issue with your project.
PubSubPublisherTemplate.publish(topic, message)uses itspublisherFactorytocreatePublisherfor each invocation.DefaultPublisherFactory.createPublisher(topic)always builds a newPublisher.Publisher.Builderuses anInstantiatingExecutorProviderby default.InstantiatingExecutorProviderconstructs a newScheduledExecutorServiceevery timegetExecutor()is called.So it appears the PubSubPublisherTemplate is passing each message to a different executor, thus the onSuccess callbacks are out of order because the publishing events are too.
I don’t see anything obvious, but I think Cloud Support is the way to go for a more complete investigation. If you want to debug on your own, I would try to use the Pub/Sub client library directly without Spring Cloud GCP.
I don’t see anything obviously wrong in your setup. Can you please file a Google Cloud Support ticket for further investigation? Another thing you can try is to use pure Pub/Sub client library without Spring Cloud GCP, and see if you observe the same result.