MassTransit: IServiceProvider not available for Saga messages.

Is this a bug report?

YES

Using custom message-level filter MyMessageFilter<TContext, TMessage> : IFilter<TContext> where TContext : class, ConsumeContext<TMessage> where TMessage : class and Autofac-integration it seems MT creates message specific DI scope just fine when dealing with typical consumer. However when dealing with state machines it either does not creates a scope or rather creates it way later down the consume pipeline.

public async Task Send(TContext context, IPipe<TContext> next)
{
    var sp = context.GetPayload<IServiceProvider>(); // works fine for message consumers, fails for sagas
}

Filter is registered using fairly standard approach using observer:

public static void UseMyFilterOnConsume(this IConsumePipeConfigurator configurator, ...)
{
    var observer = new MyFilterSpecificationObserver(...);

    configurator.ConnectHandlerConfigurationObserver(observer);
    configurator.ConnectConsumerConfigurationObserver(observer);
    configurator.ConnectSagaConfigurationObserver(observer);
}

public class MyFilterSpecificationObserver : IConsumerConfigurationObserver, ISagaConfigurationObserver, IHandlerConfigurationObserver
{
    // ...

    private class SagaFilterSpecification<TSaga, TMessage> : IPipeSpecification<SagaConsumeContext<TSaga, TMessage>> where TSaga : class, ISaga where TMessage : class
    {
            public void Apply(IPipeBuilder<SagaConsumeContext<TSaga, TMessage>> builder)
            {
                builder.AddFilter(new MyMessageFilter<SagaConsumeContext<TSaga, TMessage>, TMessage>(...));
            }
    }

    // ...

    public void SagaMessageConfigured<TSaga, TMessage>(ISagaMessageConfigurator<TSaga, TMessage> configurator) where TSaga : class, ISaga where TMessage : class
    {
        configurator.AddPipeSpecification(new SagaFilterSpecification<TSaga, TMessage>(this));
    }
}

Bus setup is fairly simple:

builder.AddMassTransit(cfg =>
{
    cfg.AddSagaStateMachine<...>().RedisRepository(...);

    cfg.UsingAmazonSqs((ctx, bfc) =>
    {
        bfc.Host(...);

        bfc.UseMyFilterOnConsume(...);

        bfc.ReceiveEndpoint(..., rec =>
        {
            rec.ConfigureSaga<...>(ctx);
        });
    });
});

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 17 (10 by maintainers)

Commits related to this issue

Most upvoted comments

So the change here is that your filter will be resolved within the saga/consumer scope. And what you want, the single lifetime scope from the consumer all the way to sent/received messages is already how it is supposed to behave.