confluent-kafka-dotnet: ProduceAsync blocks indefinitely

Hi, this is more like a question since I don’t have invitation in Slack I realized this would be the best place.

I need to make one simple producer for our automation tests and make sure message is successfully pushed before I can continue with test execution, but sadly producer blocks for indefinite amount of time.

public async Task PublishPurchases(IEnumerable<BetPurchase> purchases)
{
      foreach (var purchase in purchases)
      {
          await Producer.ProduceAsync("incoming-purchases", purchase.GetHashCode().ToString(), purchase);
      }
}

What I noticed so far:

  • I am using burrow dashboard as monitor for kafka, where I see messages are successfully pushed, but call is still blocked
  • I noticed remarks comment on the Producer<>.ProduceAsync and subscribed for Producer<>.OnError event to check if the send queue is full, but no error was raised there.

This comes from remarks as well:

Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

Sadly I am not sure what it means exactly? Which Poll method needs to be called in another thread? On Consumer<>? If yes, why would producer be blocked until consumer polls? How can I enable background polling?

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Reactions: 4
  • Comments: 34 (18 by maintainers)

Most upvoted comments

IIRC 0.11.4 contains a ‘fix’ for a very related issue, so the difference in behavior from 0.11.2 is not completely surprising - it looks like I fixed one scenario and broke another.

I’ll prioritize getting this fixed for 0.11.5.

hi mhowlett ,

We used a Web Api as Producer.

This API works fine in Debug mode, but after build and deployment on our integration environment ( Windows server / IIS), this api doesn’t produce any messages or logs.

We are in the same case as described by massarakh .

We use Confluent.Kafka (1.0.0-experimental-8)

@andreycha I agree, problem is in test only and console app works correctly.

I have same issue. I try to push data to Kafka, data was pushed correctly, no errors and produce methos is not completed anytime.

public async Task ProduceToKafka<TKey, TValue>(List<Tuple<TKey, TValue>> batch, TopicNamesEnum topicName)
        {
            using (var producer = new Producer<TKey, TValue>(_config, new AvroSerializer<TKey>(), new AvroSerializer<TValue>()))
            {
                producer.OnError += (_, e) =>
                {
                    _logger.Error(e);
                };
                           
                var tasks = batch.Select(x => producer.ProduceAsync(topicName.ToString(), x.Item1, x.Item2)).ToList();;
                await Task.WhenAll(tasks);
                _logger.Trace($"{batch.Count} events are writted to Kafka. Topic name: {topicName}.");
            }
        }
[TestMethod]
        public async Task TestKafkaProducerAndConsumer()
        {
            const string testKey = "test_key";
            const string testSValue = "test_string_value";
            const int testIValue = 10;

            var kafka = new KafkaUtils("192.168.56.101:9092", "http://192.168.56.101:8081", 1000);

            await kafka.ProduceToKafka(new System.Collections.Generic.List<System.Tuple<TestKey, Test>> {
                new System.Tuple<TestKey, Test>
                (
                    new TestKey
                    {
                        name = testKey
                    },
                    new Test
                    {
                        ival = testIValue,
                        sval = testSValue
                    }
                )
            }, TopicNamesEnum.test10);

Config is default. I set bootstrap servers and schema registry only.