dapr: PubSub wildcard supscriptions do not work with grpc

In what area(s)?

/area runtime

/area operator

/area placement

/area docs

/area test-and-release

What version of Dapr?

edge: v1.5.1-rc.3-551-g36a7c440

Expected Behavior

When I subscribe to a wildcard (or shared subscription) topic over grpc, I want to receive the messages.

Actual Behavior

I receive the error from the go-sdk:

ERRO[0002] Failed processing MQTT message: test/0: error returned from app while processing pub/sub event d5948877-322d-4b7c-adb6-e98c8296b0aa: rpc error: code = Unknown desc = pub/sub and topic combination not configured: mqtt-pubsub/test  app_id=checkout instance= scope=dapr.contrib type=log ver=edge

also see: https://github.com/dapr/go-sdk/blob/main/service/grpc/topic.go#L87

This may seem like a problem with the sdk at first, but the TopicSubscription type does not include a identifier for the subscription, so there is nothing for the sdk to match message on except for the topic, and this does not work here because of the wildcard subscription.

https://github.com/dapr/dapr/blob/57f82682498d2e58558a0bcd0b2ae94175c45a42/dapr/proto/runtime/v1/appcallback.proto#L150-L163

Steps to Reproduce the Problem

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
  namespace: default
spec:
  type: pubsub.mqtt
  version: v1
  metadata:
  - name: url
    value: "tcp://localhost:1883"
  - name: qos
    value: 1
  - name: retain
    value: "false"
  - name: cleanSession
    value: "true"
  - name: backOffMaxRetries
    value: "0"
package main

import (
	"context"
	"log"
	"net/http"

	"github.com/dapr/go-sdk/service/common"

	daprd "github.com/dapr/go-sdk/service/grpc"
)

var sub = &common.Subscription{
	PubsubName: "mqtt-pubsub",
	Topic:      "#",
	Metadata:   map[string]string{"rawPayload": "true"},
}

func main() {
	s, _ := daprd.NewService(":6002")
	if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
		log.Fatalf("error adding topic subscription: %v", err)
	}
	if err := s.Start(); err != nil && err != http.ErrServerClosed {
		log.Fatalf("error listenning: %v", err)
	}
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
	log.Printf("Subscriber received: %s", e.RawData)
	return false, nil
}

Release Note

RELEASE NOTE:

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 15 (13 by maintainers)

Most upvoted comments

@yaron2 is there another issue for the ID field of subscriptions to be added which you discussed above? This will likely be needed to support multiple wildcard subscriptions. Right now neither Go SDK nor Python SDK support multiple wildcard subscriptions.

I just tried it and it works fine 😃 Currently we only have a single subscription per microservice to this solves it for our usecase

This would mean that the sdk would need to understand mqtt wildcard/shared-subscription rules, as the topic could also be ā€œhello/+/worldā€ or ā€œ$share/GROUPID/TOPICā€

Correct, we don’t have a global wildcard identifier so this is a no-go.

adding the original topic as well as the actual one to the message sent to the app could help though.

I’ll look into sending this as part of the metadata.