components-contrib: Mqtt pubsub components fails to read messages on startup when multiple subscriptions configured

Expected Behavior

Messages that are persisted on queue while dapr sidecar was down should be processed after dapr sidecar is started.

Actual Behavior

After darp sidecar is started messages which are one queue are not processed.

Steps to Reproduce the Problem

Configure 2 subscribtions for mqtt component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: testapp-pubsub
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "abc"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "false"
  - name: backOffMaxRetries
    value: "5"

Subscription 1

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: subscription1 
spec:
  pubsubname: testapp-pubsub
  topic: topic1
  route: /api/route1

Subscription 2

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: subscription2
spec:
  pubsubname: testapp-pubsub
  topic: topic2
  route: /api/route2
  1. Start dapr + application for first time so that it creates queue
  2. Stop dapr
  3. Send at least 2 messages for topic1
  4. Start dapr for second time
  5. You should see similar behavior:
time="2022-11-08T16:27:09.771556034Z" level=info msg="app is subscribed to the following topics: [topic1 topic2] through pubsub=testapp-pubsub" app_id=test.testapp.api instance=536c465db880 scope=dapr.runtime type=log ver=1.9.3
time="2022-11-08T16:27:09.771606899Z" level=debug msg="subscribing to topic='topic1' on pubsub='testapp-pubsub'" app_id=test.testapp.api instance=536c465db880 scope=dapr.runtime type=log ver=1.9.3
time="2022-11-08T16:27:09.771680668Z" level=info msg="initializing the subscriber" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:09.777840715Z" level=debug msg="subscribing to topic='topic2' on pubsub='testapp-pubsub'" app_id=test.testapp.api instance=536c465db880 scope=dapr.runtime type=log ver=1.9.3
time="2022-11-08T16:27:09.777891519Z" level=debug msg="Processing MQTT message topic1#130 (retained=false)" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:09.77789804Z" level=info msg="re-initializing the subscriber" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:09.798402511Z" level=debug msg="Done processing MQTT message topic1#130; sending ACK" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:39.689842717Z" level=debug msg="Refreshing all mDNS addresses." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:39.689979132Z" level=debug msg="no mDNS apps to refresh." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:09.690739406Z" level=debug msg="Refreshing all mDNS addresses." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:09.690850391Z" level=debug msg="no mDNS apps to refresh." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:39.691071766Z" level=debug msg="Refreshing all mDNS addresses." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:39.691134361Z" level=debug msg="no mDNS apps to refresh." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
  1. Subscribing for topic1
  2. Subscribing for topic2
  3. Only one message is being processed but isn’t removed from queue
  4. No more messages which are on queue are processed
  5. Also messages that are being send after start are not processed

Notice that:

  • if there is less than 2 messages on queue (0 or 1) mqtt component works fine and messages are processed
  • in scenario above if there were messages for topic2 on queue then it would work fine

It seems that adding second subscription causes race condition.

time="2022-11-08T16:27:09.77789804Z" level=info msg="re-initializing the subscriber" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 17 (11 by maintainers)

Most upvoted comments

Summary -

  • Issue is reproducible with 1.9.3 and needs a fix.
  • Issue does not exists with dapr edge version however the pending messages are getting delivered multiple times.

@bwojdyla Feel free to close this.

@bwojdyla I could not reproduce the issue, it seems to work fine for me. Please find below details steps and logs. Please let us know if there needs to any modification to the steps below -

  • Start subscriber app to subscribe to 2 topics A and B. Took above shared yaml files but modified the topic names to keep simple.
  • Start publisher app and send 5 messages only to topic A. As stated above in the issue description, there should not be any message on queue for Topic ‘B’
  • Stop subscriber app but keep sending data to topic A. While subscribe app was down, publisher sent 3 more messages.
  • Bring subscriber app up again.
  • [Result]All message that was sent was received. 1. Initially when both apps were up. All Messages sent were received by subscriber on topic A. 2. All messages that were sent when subscriber went down were received when subscriber came online again. 3. All message that were sent thereafter were being received normally.

Logs -

1. Subscriber and publisher both were up initially -

Publisher Logs

INFO[0000] app is subscribed to the following topics: [B A] through pubsub=orderpubsub app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime type=log ver=edge INFO[0000] initializing the subscriber app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge == APP == Published data to Topic A: {“orderId_A”:1} INFO[0000] re-initializing the subscriber app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge INFO[0000] placement tables updated, version: 0 app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime.actor.internal.placement type=log ver=edge INFO[0000] dapr initialized. Status: Running. Init Elapsed 30ms app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime type=log ver=edge ℹ️ Updating metadata for app command: node . ✅ You’re up and running! Both Dapr and your app logs will appear here.

== APP == Published data to Topic A: {“orderId_A”:2} == APP == Published data to Topic A: {“orderId_A”:3} == APP == Published data to Topic A: {“orderId_A”:4} == APP == Published data to Topic A: {“orderId_A”:5}

Subscriber Logs

== APP == Subscriber received message on Topic A: { orderId_A: 1 } == APP == Subscriber received message on Topic A: { orderId_A: 2 } == APP == Subscriber received message on Topic A: { orderId_A: 2 } == APP == Subscriber received message on Topic A: { orderId_A: 3 } == APP == Subscriber received message on Topic A: { orderId_A: 3 } == APP == Subscriber received message on Topic A: { orderId_A: 4 } == APP == Subscriber received message on Topic A: { orderId_A: 4 } == APP == Subscriber received message on Topic A: { orderId_A: 5 } == APP == Subscriber received message on Topic A: { orderId_A: 5 }

2. Put down subscriber while publisher keeps on sending -

Publisher log - == APP == Published data to Topic A: {“orderId_A”:6} == APP == Published data to Topic A: {"orderId_A”:7} == APP == Published data to Topic A: {"orderId_A”:8}

Bring subscriber app online - Subscriber logs - INFO[0000] app is subscribed to the following topics: [B A] through pubsub=orderpubsub app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime type=log ver=edge INFO[0000] initializing the subscriber app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge INFO[0000] re-initializing the subscriber app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge INFO[0000] placement tables updated, version: 0 app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime.actor.internal.placement type=log ver=edge == APP == Subscriber received message on Topic A: { orderId_A: 7 } == APP == Subscriber received message on Topic A: { orderId_A: 8 } == APP == Subscriber received message on Topic A: { orderId_A: 6 }

3. Later logs -

Publisher -

== APP == Published data to Topic A: {“orderId_A”:9} == APP == Published data to Topic A: {“orderId_A”:10} == APP == Published data to Topic A: {“orderId_A”:11} == APP == Published data to Topic A: {“orderId_A”:12} == APP == Published data to Topic A: {“orderId_A”:13} == APP == Published data to Topic A: {“orderId_A”:14}

Subscriber - == APP == Subscriber received message on Topic A: { orderId_A: 9 } == APP == Subscriber received message on Topic A: { orderId_A: 10 } == APP == Subscriber received message on Topic A: { orderId_A: 11 } == APP == Subscriber received message on Topic A: { orderId_A: 12 } == APP == Subscriber received message on Topic A: { orderId_A: 13 } == APP == Subscriber received message on Topic A: { orderId_A: 14 }