azure-functions-dotnet-worker: With a ServiceBusTrigger, how to go from string/byte[] to Message?

Hi,

using Service Bus and the new functions model, I need to integrate with MassTransit. MT’s integration library for Service Bus expects that I pass in a Microsoft.Azure.ServiceBus.Message.

The new functions model doesn’t allow this type to directly be bound, the relevant samples all act as if the message was a regular string. I checked out Message and noted it has a constructor taking a byte[]. So I changed the type of the incoming message to byte[] - which is one of the supported types, as far as I understand - and then just construct a Message from that.

However, if I pass that to MassTransit, I get an InvalidOperationException that actually comes from Microsoft.Azure.ServiceBus:

---> System.InvalidOperationException: Operation is not valid due to the current state of the object.
[2021-04-08T18:05:51.756Z]    at Microsoft.Azure.ServiceBus.Message.SystemPropertiesCollection.ThrowIfNotReceived()
[2021-04-08T18:05:51.756Z]    at Microsoft.Azure.ServiceBus.Message.SystemPropertiesCollection.get_DeliveryCount()
[2021-04-08T18:05:51.757Z]    at MassTransit.Azure.ServiceBus.Core.Contexts.ServiceBusReceiveContext..ctor(Message message, ReceiveEndpointContext receiveEndpointContext)
...

So it seems that my idea of constructing a Message from the byte[] is not the right way to go. When I break into the function right after the Message instance has been constructed, it looks kinda incomplete, too. It seems only the Body is filled. Further investigation shows that the c’tor of Message taking an array does exactly this - it just fills the Body property from the array and initializes SystemProperties and UserProperties as empty collections. Later in the process when MassTransit tries to read Message.SystemProperties.DeliveryCount, the exception is thrown because SystemProperties is empty.

The question is: what do I need to do to create a complete Message instance from within my trigger function?

This is really critical for me, unless I find a way to make this work, we will have to switch away from Azure Functions to dockerized services, which would be a shame because the autoscaling of Azure Functions is a very cool feature.

Any help would be greatly appreciated!

Many thanks, MR

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 21 (6 by maintainers)

Most upvoted comments

When will it be possible to use ‘ServiceBusReceivedMessage’ on the ServiceBusTrigger binding with .NET 5 out-of-process (isolated)? Or is this .NET 6? Or 7? (Have various of existing 3.1 in-proc functions and need to make some decisions)

I don’t represent the Functions team but If I understand correctly the intention, the idea is that the Isolated Worker SDK will not take any dependency on the specific SDK packages to avoid the mess that was there with BrokeredMessage and Message in the past that would be repeated with with Message and ServiceBusMessage (the latest ASB SDK). For that, some sort of abstraction will be needed and it will likely be coming later. As to documenting a workaround in the official documentation - not sure that’s a good idea. That kind of “documentation” would be more suitable for an issue such as this one.

I didn’t need to fully construct a Message so didn’t bother with the reflection. As to getting the custom properties, this is what I’ve done

if (functionContext.BindingContext.BindingData.TryGetValue("UserProperties", out var customProperties) && customProperties != null)
{
  var customHeaders = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(customProperties.ToString()
        ?? throw new InvalidOperationException());
  // ...
}

For system set properties such as Label or MessageId, accessing BindingData using the property name as a key will give your the serialized values.

As to getting the custom properties, this is what I’ve done …

<del>This shouldn’t be necessary; you should be able to inject them directly,</del> e.g.

public async Task<Message> Run( 
   [ServiceBusTrigger(queueName: "queueName", Connection = "AzureServiceBus")]
   string body,
   string correlationId,
   string messageId,
   string sessionId,
   IDictionary<string, object> userProperties,
   int deliveryCount,
   FunctionContext functionContext
)

Still no update? Please, we really want to upgrade our function to .Net 5 but this is blocking.

You are better off upgrading to .NET 6 (October) and staying with in-proc SDK as you won’t be able to use SDK types with the new Isolated Worker Functions SDK at this point in time.

Still no update? Please, we really want to upgrade our function to .Net 5 but this is blocking.

Hi,

so I’m now able to receive messages using the info @SeanFeldman provided. While it works, it’s horrible because I rely on internal implementation details of Microsoft.Azure.ServiceBus.Message.

I will have to discuss internally whether we as a company want to build the infrastructure of our product on this rather shaky foundation or whether it’s maybe the better course of action to steer away from Azure Functions.

However, maybe it’ll be helpful for someone to see the complete code for creating a Message object from a byte[] and a FunctionContext:

public static class MessageFactory
{
    public static Message CreateMessage(byte[] body, FunctionContext context)
    {
        var result = new Message(body);
        result.SetPrimitiveProperty(m => m.MessageId, context);
        result.SetPrimitiveProperty(m => m.ContentType, context);
        result.SetJsonProperty(m => m.UserProperties, context);

        var sysProperties = result.SystemProperties;
        sysProperties.SetPrimitiveProperty(s => s.DeliveryCount, context);
        sysProperties.SetPrimitiveProperty(s => s.SequenceNumber, context);
        sysProperties.SetPrimitiveProperty(s => s.EnqueuedTimeUtc, context);

        // this one we cannot directly set because it's computed from a field
        var lockToken =
            context.GetPrimitiveValue<Message.SystemPropertiesCollection, string>(s => s.LockToken);
        var lockTokenGuid = Guid.Parse(lockToken);
        sysProperties.SetField("lockTokenGuid", lockTokenGuid);

        // this one we need to do indirectly because  ExpiresAtUtc is computed
        // while TTL is settable
        var expiresAtUtc = context.GetPrimitiveValue<Message, DateTime>(m => m.ExpiresAtUtc);
        result.TimeToLive = expiresAtUtc.Subtract(sysProperties.EnqueuedTimeUtc);

        return result;
    }

    static TProperty GetPrimitiveValue<TOwner, TProperty>(this FunctionContext self,
        Expression<Func<TOwner, TProperty>> accessor)
    {
        var rawValue = self.GetRawValue(accessor);
        if (typeof(TProperty) == typeof(DateTime)) rawValue = rawValue?.ToString()?.Trim('"');
        return (TProperty) Convert.ChangeType(rawValue, typeof(TProperty));
    }

    static object GetRawValue<TOwner, TProperty>(this FunctionContext self,
        Expression<Func<TOwner, TProperty>> accessor)
    {
        var property = PropertyInfo(accessor);
        var name = property.Name;
        if (self.BindingContext.BindingData.TryGetValue(name, out var rawValue)) return rawValue;
        return default;
    }

    static PropertyInfo PropertyInfo<TOwner, TProperty>(Expression<Func<TOwner, TProperty>> accessor)
    {
        var result = accessor.GetMemberExpression().Member as PropertyInfo;
        if (result == default)
            throw new ArgumentException("The accessor doesn't access a property", nameof(accessor));
        return result;
    }

    static void SetField(this object self, string fieldName, object value)
    {
        var field = self.GetType()
            .GetField(fieldName,
                BindingFlags.NonPublic |
                BindingFlags.Instance);
        if (field == default) throw new ArgumentException("There is no such field", fieldName);
        field.SetValue(self, value);
    }

    static void SetJsonProperty<TOwner, TProperty>(this TOwner self,
        Expression<Func<TOwner, TProperty>> accessor,
        FunctionContext context)
    {
        var json = (string) context.GetRawValue(accessor);
        var value = JsonConvert.DeserializeObject(json, typeof(TProperty));
        self.SetProperty(accessor, value);
    }

    static void SetPrimitiveProperty<TOwner, TProperty>(this TOwner self,
        Expression<Func<TOwner, TProperty>> accessor,
        FunctionContext context)
    {
        var value = context.GetPrimitiveValue(accessor);
        self.SetProperty(accessor, value);
    }

    static void SetProperty<TOwner, TProperty>(this TOwner self,
        Expression<Func<TOwner, TProperty>> accessor,
        object value)
    {
        var property = PropertyInfo(accessor);
        var setter = property.SetMethod;
        if (setter == default)
            throw new ArgumentException("The property does not have any setter", nameof(accessor));

        setter.Invoke(self, new[] {value});
    }
}

Note that in my tests all the properties this passage sets were always present in FunctionContext. Other properties, like for example PartitionKey were never present. This might or might not have to do with my using MassTransit or just my usage in my tests, I cannot say - yet. If I find out anything more about this, I’ll try to remember and update this thread, though 😃

@repo-owners: While technically the code passage above answers my question, I don’t think this issue should be closed because this simply is not a good solution, it’s a hack.

Been looking at this as well, hidden in the documentation .net 5 ServiceBusTrigger does support ServiceBusReceivedMessage from the new Azure.Messaging.ServiceBus library. https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=csharp#usage

I believe this is only for the old (non-isolated) model. Have you tried it on .NET 5?

Been looking at this as well, hidden in the documentation .net 5 ServiceBusTrigger does support ServiceBusReceivedMessage from the new Azure.Messaging.ServiceBus library. https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=csharp#usage

MassTransit is still using Microsoft.Azure.ServiceBus which only supports Message

If MassTransit upgrades to support the new Azure.Messaging.ServiceBus client library then upgrading the Azure Function integrations would be simple, but the transport upgrade is the tricky one.