RawRabbit: RawRabbit Subscriber does not work in ASP.NET Core

I created a thread in Stackoverflow, no one answered though, I ask it here.

I am creating a RestApi in asp.net core, and in one of my services, I publish a message to RabbitMQ using RawRabbit. That being said, I see the message is published in the RabbitMQ control panel when I have commented the subscriber part, and the number of consumers is 0, when I add the subscriber part number of consumers become 1 and the messages are getting consumed so there is no message in the control panel, BUT what is weird is that none of the codes ( in this case it’s just a log ) in the subscriber does not run.

Publisher part:

public async void RaiseAsync(string event_name, ShoppingCartItemAdded data) {
  const string EXCHANGE_NAME = "myRabbit";
  Action<IPublishContext> x = (ctx) => ctx
    .UsePublishConfiguration(xfg => xfg.OnExchange(EXCHANGE_NAME));
    //.WithRoutingKey("shoppingcartitemadded"));
  await this.Client.PublishAsync<ShoppingCartItemAdded>(data, x );
}

Subscriber part:

public class ShoppingCartItemAddedConsumer
{
  private readonly ILogger<ShoppingCartItemAddedConsumer> logger;
  private readonly IBusClient client;

  public ShoppingCartItemAddedConsumer(ILogger<ShoppingCartItemAddedConsumer> logger, IBusClient client)
  {
      this.logger = logger;
      this.client = client;
      this.logger.LogInformation("Subscriber created");
  }

  public async void Run()
  {
    const string QUEUE_NAME = "myWebApi";
    const string EXCHANGE_NAME = "myRabbit";

    this.logger.LogInformation("Registering subscriber");

    await client.SubscribeAsync<ShoppingCartItemAdded>(async msg =>
    {
      this.logger.LogInformation("Message received from rabbitmq : {message}", msg);
    }, ctx => ctx
      .UseSubscribeConfiguration(cfg => cfg
        .OnDeclaredExchange(dex => dex
          .WithName(EXCHANGE_NAME)
          .WithAutoDelete(false)
          .WithDurability(true)
          .WithType(ExchangeType.Topic))
      .FromDeclaredQueue(dq => dq
          .WithName(QUEUE_NAME)
          .WithExclusivity(false)
          .WithDurability(true)
          .WithAutoDelete(false)))
      );

      this.logger.LogInformation("Subscriber registered");
    }
}```

And I registered the subscriber as `singleton` so later I can do: 

```cs
var consumer = app.ApplicationServices.GetRequiredService<ShoppingCartItemAddedConsumer>();
consumer.Run();
  • I have also tried creating an IHostedService, to make sure the process does exists the whole time the app is running, got same result.

and this is the log at Startup of the services:

    [09:02:25 INF] Subscriber created
    [09:02:26 INF] Registering subscriber
    [09:02:26 INF] Configuration action for shoppingcartitemadded found.
    [09:02:26 INF] Declaring queue myWebApi.
    [09:02:26 INF] Declaring exchange myRabbit.
    [09:02:26 INF] Binding queue myWebApi to exchange myRabbit with routing key shoppingcartitemadded
    [09:02:26 INF] Preparing to consume message from queue 'myWebApi'.
    [09:02:26 INF] Subscriber registered

and the following is the log when publishing a message:

    [14:13:35 INF] Setting 'Publish Acknowledge' for channel '3'
    [14:13:35 INF] Setting up publish acknowledgement for 1 with timeout 0:00:01
    [14:13:35 INF] Sequence 1 added to dictionary
    [14:13:35 WRN] No body found in the Pipe context.
    [14:13:35 INF] Performing basic publish with routing key shoppingcartitemadded on exchange myRabbitAPI.
    [14:13:35 INF] Setting up publish acknowledgement for 2 with timeout 0:00:01
    [14:13:35 INF] Sequence 2 added to dictionary
    [14:13:35 WRN] No body found in the Pipe context.
    [14:13:35 INF] Performing basic publish with routing key shoppingcartitemadded on exchange myRabbitAPI.
    [14:13:35 INF] Executed action method HelloMicroServices.Controllers.ShoppingCartController.Post (HelloMicroServices), returned result Microsoft.AspNetCore.Mvc.OkObjectResult in 304.6292ms.
    [14:13:35 INF] Executing ObjectResult, writing value of type 'HelloMicroServices.Datastores.Models.ShoppingCart'.
    [14:13:35 INF] Executed action     HelloMicroServices.Controllers.ShoppingCartController.Post (HelloMicroServices) in 414.4309ms
    [14:13:35 INF] Request finished in 475.1868ms 200 application/json; charset=utf-8
    [14:13:35 INF] Recieived ack for 1
    [14:13:35 INF] Recieived ack for 2

The RabbitMQ control panel when the subscriber part is commented:

rmq

when uncommenting, the value in front of Consumer is 1

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 16 (2 by maintainers)

Most upvoted comments

@shahabganji

Here is the solution I found. I am not sure the cause, but how the IBusClient is created is making a difference. My guess is that the pipeline is different. We ran into this as well and switched to creating our IBusClient instance(s) via the factory instead of dependency injection.

      this.Client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions()
         {
             ClientConfiguration = new RawRabbitConfiguration()
             {
                 Hostnames = new System.Collections.Generic.List<string>() { "localhost"},
                 Password = "guest",
                 VirtualHost = "/",
                 Port = 5672,
                 Username = "guest"
                 
                
             }
         });

When I changed your publisher in the event store to the above code it started publishing the object correctly to rabbit. I used the rabbit UI GetMessage feature to show it was publishing the json. I wouldn’t recommend putting this in the ctor for production code obviously, since that couples object construction with I/O.

Anyway, I had to shut the project down to use the GetMessage feature in the UI because there was a deadlock with the subscriber in that the messages were left in an unacked state. This means that the subscriber was being sent them but your method was not being raised; it was holding onto the message.

From there I went ahead and also updated your subscriber code to construct the IBusClient the same way and viola, works just fine.

@pardahlman I am not sure if that helps you at all with what the difference could be between using DI to get the IBusClient versus using the factory, but there appears to be something different that causes the behavior. He doesn’t seem to be configuring anything out of the ordinary for DI.

			RawRabbitConfiguration config = new RawRabbitConfiguration();
			this.Configuration.Bind("RabbitMQ", config);

			services.AddRawRabbit(new RawRabbitOptions() {
				ClientConfiguration = config,
					DependencyInjection = di => di.AddSingleton<ISerializer, RawRabbit.Serialization.JsonSerializer>()
			});

-Red