dapr: PubSub wildcard supscriptions do not work with grpc
In what area(s)?
/area runtime
/area operator
/area placement
/area docs
/area test-and-release
What version of Dapr?
edge: v1.5.1-rc.3-551-g36a7c440
Expected Behavior
When I subscribe to a wildcard (or shared subscription) topic over grpc, I want to receive the messages.
Actual Behavior
I receive the error from the go-sdk:
ERRO[0002] Failed processing MQTT message: test/0: error returned from app while processing pub/sub event d5948877-322d-4b7c-adb6-e98c8296b0aa: rpc error: code = Unknown desc = pub/sub and topic combination not configured: mqtt-pubsub/test app_id=checkout instance= scope=dapr.contrib type=log ver=edge
also see: https://github.com/dapr/go-sdk/blob/main/service/grpc/topic.go#L87
This may seem like a problem with the sdk at first, but the TopicSubscription type does not include a identifier for the subscription, so there is nothing for the sdk to match message on except for the topic, and this does not work here because of the wildcard subscription.
Steps to Reproduce the Problem
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mqtt-pubsub
namespace: default
spec:
type: pubsub.mqtt
version: v1
metadata:
- name: url
value: "tcp://localhost:1883"
- name: qos
value: 1
- name: retain
value: "false"
- name: cleanSession
value: "true"
- name: backOffMaxRetries
value: "0"
package main
import (
"context"
"log"
"net/http"
"github.com/dapr/go-sdk/service/common"
daprd "github.com/dapr/go-sdk/service/grpc"
)
var sub = &common.Subscription{
PubsubName: "mqtt-pubsub",
Topic: "#",
Metadata: map[string]string{"rawPayload": "true"},
}
func main() {
s, _ := daprd.NewService(":6002")
if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("error listenning: %v", err)
}
}
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("Subscriber received: %s", e.RawData)
return false, nil
}
Release Note
RELEASE NOTE:
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 15 (13 by maintainers)
@yaron2 is there another issue for the ID field of subscriptions to be added which you discussed above? This will likely be needed to support multiple wildcard subscriptions. Right now neither Go SDK nor Python SDK support multiple wildcard subscriptions.
I just tried it and it works fine š Currently we only have a single subscription per microservice to this solves it for our usecase
Correct, we donāt have a global wildcard identifier so this is a no-go.
Iāll look into sending this as part of the metadata.