components-contrib: Mqtt pubsub components fails to read messages on startup when multiple subscriptions configured
Expected Behavior
Messages that are persisted on queue while dapr sidecar was down should be processed after dapr sidecar is started.
Actual Behavior
After darp sidecar is started messages which are one queue are not processed.
Steps to Reproduce the Problem
Configure 2 subscribtions for mqtt component:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: testapp-pubsub
spec:
type: pubsub.mqtt
version: v1
metadata:
- name: url
value: "abc"
- name: qos
value: 1
- name: retain
value: "false"
- name: cleanSession
value: "false"
- name: backOffMaxRetries
value: "5"
Subscription 1
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: subscription1
spec:
pubsubname: testapp-pubsub
topic: topic1
route: /api/route1
Subscription 2
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: subscription2
spec:
pubsubname: testapp-pubsub
topic: topic2
route: /api/route2
- Start dapr + application for first time so that it creates queue
- Stop dapr
- Send at least 2 messages for topic1
- Start dapr for second time
- You should see similar behavior:
time="2022-11-08T16:27:09.771556034Z" level=info msg="app is subscribed to the following topics: [topic1 topic2] through pubsub=testapp-pubsub" app_id=test.testapp.api instance=536c465db880 scope=dapr.runtime type=log ver=1.9.3
time="2022-11-08T16:27:09.771606899Z" level=debug msg="subscribing to topic='topic1' on pubsub='testapp-pubsub'" app_id=test.testapp.api instance=536c465db880 scope=dapr.runtime type=log ver=1.9.3
time="2022-11-08T16:27:09.771680668Z" level=info msg="initializing the subscriber" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:09.777840715Z" level=debug msg="subscribing to topic='topic2' on pubsub='testapp-pubsub'" app_id=test.testapp.api instance=536c465db880 scope=dapr.runtime type=log ver=1.9.3
time="2022-11-08T16:27:09.777891519Z" level=debug msg="Processing MQTT message topic1#130 (retained=false)" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:09.77789804Z" level=info msg="re-initializing the subscriber" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:09.798402511Z" level=debug msg="Done processing MQTT message topic1#130; sending ACK" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:39.689842717Z" level=debug msg="Refreshing all mDNS addresses." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:27:39.689979132Z" level=debug msg="no mDNS apps to refresh." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:09.690739406Z" level=debug msg="Refreshing all mDNS addresses." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:09.690850391Z" level=debug msg="no mDNS apps to refresh." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:39.691071766Z" level=debug msg="Refreshing all mDNS addresses." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
time="2022-11-08T16:28:39.691134361Z" level=debug msg="no mDNS apps to refresh." app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
- Subscribing for topic1
- Subscribing for topic2
- Only one message is being processed but isn’t removed from queue
- No more messages which are on queue are processed
- Also messages that are being send after start are not processed
Notice that:
- if there is less than 2 messages on queue (0 or 1) mqtt component works fine and messages are processed
- in scenario above if there were messages for topic2 on queue then it would work fine
It seems that adding second subscription causes race condition.
time="2022-11-08T16:27:09.77789804Z" level=info msg="re-initializing the subscriber" app_id=test.testapp.api instance=536c465db880 scope=dapr.contrib type=log ver=1.9.3
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 17 (11 by maintainers)
Summary -
@bwojdyla Feel free to close this.
@bwojdyla I could not reproduce the issue, it seems to work fine for me. Please find below details steps and logs. Please let us know if there needs to any modification to the steps below -
Logs -
1. Subscriber and publisher both were up initially -
Publisher Logs
INFO[0000] app is subscribed to the following topics: [B A] through pubsub=orderpubsub app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime type=log ver=edge INFO[0000] initializing the subscriber app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge == APP == Published data to Topic A: {“orderId_A”:1} INFO[0000] re-initializing the subscriber app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge INFO[0000] placement tables updated, version: 0 app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime.actor.internal.placement type=log ver=edge INFO[0000] dapr initialized. Status: Running. Init Elapsed 30ms app_id=checkout-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime type=log ver=edge ℹ️ Updating metadata for app command: node . ✅ You’re up and running! Both Dapr and your app logs will appear here.
== APP == Published data to Topic A: {“orderId_A”:2} == APP == Published data to Topic A: {“orderId_A”:3} == APP == Published data to Topic A: {“orderId_A”:4} == APP == Published data to Topic A: {“orderId_A”:5}
Subscriber Logs
== APP == Subscriber received message on Topic A: { orderId_A: 1 } == APP == Subscriber received message on Topic A: { orderId_A: 2 } == APP == Subscriber received message on Topic A: { orderId_A: 2 } == APP == Subscriber received message on Topic A: { orderId_A: 3 } == APP == Subscriber received message on Topic A: { orderId_A: 3 } == APP == Subscriber received message on Topic A: { orderId_A: 4 } == APP == Subscriber received message on Topic A: { orderId_A: 4 } == APP == Subscriber received message on Topic A: { orderId_A: 5 } == APP == Subscriber received message on Topic A: { orderId_A: 5 }
2. Put down subscriber while publisher keeps on sending -
Publisher log - == APP == Published data to Topic A: {“orderId_A”:6} == APP == Published data to Topic A: {"orderId_A”:7} == APP == Published data to Topic A: {"orderId_A”:8}
Bring subscriber app online - Subscriber logs - INFO[0000] app is subscribed to the following topics: [B A] through pubsub=orderpubsub app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime type=log ver=edge INFO[0000] initializing the subscriber app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge INFO[0000] re-initializing the subscriber app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.contrib type=log ver=edge INFO[0000] placement tables updated, version: 0 app_id=order-processor-http instance=Pravins-MBP.guest.corp.microsoft.com scope=dapr.runtime.actor.internal.placement type=log ver=edge == APP == Subscriber received message on Topic A: { orderId_A: 7 } == APP == Subscriber received message on Topic A: { orderId_A: 8 } == APP == Subscriber received message on Topic A: { orderId_A: 6 }
3. Later logs -
Publisher -
== APP == Published data to Topic A: {“orderId_A”:9} == APP == Published data to Topic A: {“orderId_A”:10} == APP == Published data to Topic A: {“orderId_A”:11} == APP == Published data to Topic A: {“orderId_A”:12} == APP == Published data to Topic A: {“orderId_A”:13} == APP == Published data to Topic A: {“orderId_A”:14}
Subscriber - == APP == Subscriber received message on Topic A: { orderId_A: 9 } == APP == Subscriber received message on Topic A: { orderId_A: 10 } == APP == Subscriber received message on Topic A: { orderId_A: 11 } == APP == Subscriber received message on Topic A: { orderId_A: 12 } == APP == Subscriber received message on Topic A: { orderId_A: 13 } == APP == Subscriber received message on Topic A: { orderId_A: 14 }