azure-sdk-for-js: [JS][Event Hubs] BatchingReceiver.receive() occasionally fails

A number of Event Hubs live Azure tests are flaky, due to what appears to be a race condition bug in either the Event Hubs client library, one of its dependencies, or the Event Hubs service.

Flaky Tests

  • Misc tests should be able to send and receive batched messages correctly
  • Misc tests should be able to send and receive batched messages as JSON objects correctly
  • EventHub Receiver with EventPosition specified as ‘after a particular offset’ should receive messages correctly
  • EventHub Receiver with EventPosition specified as ‘from end of stream’ should receive messages correctly
  • EventHub Receiver with EventPosition specified as ‘from a particular enqueued time’ should receive messages correctly

https://dev.azure.com/azure-sdk/playground/_test/analytics?buildDefinitionId=131

Standalone Repro

I created a console app to run the code from one of the flaky tests in a loop until it fails. Add the following file under packages/@azure/eventhubs/client/examples, and run it via ts-node. The console app should eventually fail, though it may take hundreds of iterations. When the app does fail, it always receives either 0 or 4 messages (instead of the expected 5 messages). When it receives 4 messages, it is always missing the first message.

I tried enabling the debug logs (azure:event-hubs:receiverbatching, azure:event-hubs:error), but they were not useful:

Successfully sent 5 messages batched together.
  azure:event-hubs:error [connection-122] Receiver '3fdb5f4a-a98b-4063-a906-b66a52521ec7' with address 'hub1/ConsumerGroups/$default/Partitions/0' is open? -> true +49ms
  azure:event-hubs:receiverbatching [connection-122] Receiver '3fdb5f4a-a98b-4063-a906-b66a52521ec7', adding credit for receiving 5 messages. +205ms
  azure:event-hubs:receiverbatching [connection-122] Setting the wait timer for 5 seconds for receiver '3fdb5f4a-a98b-4063-a906-b66a52521ec7'. Receiver link already present, hence reusing it. +2ms
  azure:event-hubs:receiverbatching [connection-122] Batching Receiver '3fdb5f4a-a98b-4063-a906-b66a52521ec7', 4 messages received when max wait time in seconds 5 is over. +5s
  azure:event-hubs:error [connection-122] Deleted the receiver '3fdb5f4a-a98b-4063-a906-b66a52521ec7' from the client cache. +5s
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { EventHubClient, EventData, EventPosition } from "../lib";
import { BatchingReceiver } from "../lib/batchingReceiver";

import dotenv from "dotenv";
import { exists } from 'fs';
dotenv.config();

const connectionString = "EVENTHUB_CONNECTION_STRING";
const entityPath = "EVENTHUB_NAME";
const str = process.env[connectionString] || "";
const path = process.env[entityPath] || "";

let debug = console.log;

async function doTest(): Promise<void> {
  const client = EventHubClient.createFromConnectionString(str, path);
  const hubInfo = await client.getHubRuntimeInformation();

  try {
    const partitionId = hubInfo.partitionIds[0];
    const offset = (await client.getPartitionInformation(partitionId)).lastEnqueuedOffset;
    debug(`Partition ${partitionId} has last message with offset ${offset}.`);

    let breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromOffset(offset) });

    let data = await breceiver.receive(5, 1);
    debug(data.length);

    const messageCount = 5;
    const d: EventData[] = [];
    for (let i = 0; i < messageCount; i++) {
      const obj: EventData = { body: `Hello EH ${i}` };
      d.push(obj);
    }
    d[0].partitionKey = 'pk1234656';
    await client.sendBatch(d, partitionId);
    debug("Successfully sent 5 messages batched together.");

    data = await breceiver.receive(5, 5);
    await breceiver.close();
    debug(data.length);
    if (data.length != 5)
    {
      debug("received message: ", data);
      process.exit(1);
    }

    debug("");
  } catch (err) {
    debug("should not have happened, uber catch....", err);
    throw err;
  }

  await client.close();
}

async function main(): Promise<void> {
  for (var i=0; i < 10000; i++) {
    await doTest();
  }
}

main().catch((err) => {
  console.log("error: ", err);
});

Attempted Repro in .NET

I attempted to repro this issue with the .NET Event Hubs client, but I was unable to make it fail. This suggests the issue is a bug in the JS Event Hubs client, rather than in the service. The .NET client APIs are slightly different, but I tried to make the repro code as close to JS as possible.

using Microsoft.Azure.EventHubs;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace EventHubSendReceive
{
    class Program
    {
        static async Task Main()
        {
            var connectionString = new EventHubsConnectionStringBuilder(Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STRING"))
            {
                EntityPath = Environment.GetEnvironmentVariable("EVENTHUB_NAME")
            }.ToString();


            // Way too complicated.  Should not throw ArgumentNullException with parameter name I don't own.
            var client = EventHubClient.CreateFromConnectionString(connectionString);
            var runtimeInfo = await client.GetRuntimeInformationAsync();
            var partitionId = runtimeInfo.PartitionIds[0];

            for (var i=0; i < 100; i++)
            {
                await SendReceive(client, partitionId);
            }
        }

        private static async Task SendReceive(EventHubClient client, string partitionId)
        {
            var partitionInfo = await client.GetPartitionRuntimeInformationAsync(partitionId);

            var lastOffset = partitionInfo.LastEnqueuedOffset;
            var receiver = client.CreateReceiver("$Default", partitionId, EventPosition.FromOffset(lastOffset));

            var eventData = new EventData[5];
            for (var i = 0; i < eventData.Length; i++)
            {
                eventData[i] = new EventData(Encoding.UTF8.GetBytes($"Hello EH {i}"));
            }

            var sender = client.CreatePartitionSender(partitionId);
            await sender.SendAsync(eventData);

            var sw = Stopwatch.StartNew();
            var receivedData = await ReceiveBatchAsync(receiver, 5, TimeSpan.FromSeconds(5));
            sw.Stop();

            Console.Write(receivedData.Count() + " ");
            foreach (var d in receivedData) {
                Console.Write(Encoding.UTF8.GetString(d.Body) + " ");
            }
            Console.WriteLine(sw.Elapsed);

            foreach (var data in eventData.Concat(receivedData))
            {
                data.Dispose();
            }

            await receiver.CloseAsync();
            await sender.CloseAsync();
        }

        private static async Task<IEnumerable<EventData>> ReceiveBatchAsync(PartitionReceiver receiver, int maxMessageCount, TimeSpan waitTime)
        {
            var sw = Stopwatch.StartNew();
            var eventData = new List<EventData>(maxMessageCount);

            while (sw.Elapsed < waitTime && eventData.Count() < maxMessageCount)
            {
                var newData = await receiver.ReceiveAsync(maxMessageCount - eventData.Count(), waitTime - sw.Elapsed);
                eventData.AddRange(newData);
            }

            return eventData;
        }
    }
}

Attempted Repro with rhea-promise

@HarshaNalluru also tried to repro by directly using rhea-promise, but was also unable to make it fail. This suggests the issue is in the EventHubs layer, not rhea-promise or lower.

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache License. See License in the project root for license information.

import {
    Connection, Receiver, EventContext, ConnectionOptions,  delay, ReceiverEvents, types, Sender
  } from "rhea-promise";
  import  * as dotenv  from "dotenv";
  dotenv.config();
  
  const host = process.env.AMQP_HOST || "";
  const username = process.env.AMQP_USERNAME || "sharedAccessKeyName";
  const password = process.env.AMQP_PASSWORD;
  const port = parseInt(process.env.AMQP_PORT || "5671");
  const receiverAddress = process.env.RECEIVER_ADDRESS;
  const senderAddress = process.env.SENDER_ADDRESS;

  async function receiveOne(receiver: Receiver): Promise<any> {
    return new Promise<any>(resolve => {
      const messageWaitTimer = setTimeout(() => {
        // console.log("Didn't receive any message in 5 seconds");
  
        clearTimeout(messageWaitTimer);
        receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
        if (receiver.credit > 0) {
        //   console.log("draining credits");
          receiver.drain = true;
          receiver.addCredit(1);
          resolve();
        } else {
          receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
          resolve();
        }
      }, 5000);
  
      const onReceiveMessage = async (context: EventContext) => {
        // console.log("Received message: %O", context.message.body);
        
        clearTimeout(messageWaitTimer);
        receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
        receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
  
        resolve(context.message);
      };
  
      const onReceiveDrain = () => {
        // console.log("Drain done");
        receiver!.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
        receiver!.drain = false;
        resolve();
      };
  
      receiver!.addCredit(1);
      receiver!.on(ReceiverEvents.message, onReceiveMessage);
      receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
    });
  }

  async function main(): Promise<void> {
    const connectionOptions: ConnectionOptions = {
      transport: "tls",
      host: host,
      hostname: host,
      username: username,
      password: password,
      port: port,
      reconnect: false
    };
    const connection: Connection = new Connection(connectionOptions);
    const receiverName = "receiver-1";
    
    await connection.open();
    let count = 0;
    let flag = 0;
    while(flag !== 1 && count < 10000 ){
        count++;
        if (count%10 === 0) {
            console.log("Iteration - "+count);
        }
        let date = Date.now();
        let filterClause = `amqp.annotation.x-opt-enqueued-time > '${date}'`;
        let receiver: Receiver = await connection.createReceiver({
                name: receiverName,
                source: {
                address: receiverAddress,
                filter: {
                    "apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468C00000004)
                }
            }
        });
        // console.log("before receiveOne");
        let msgs = await receiveOne(receiver);
        // console.log("after receiveOne");
        if (msgs !== undefined) {
            console.log("received message at the first receive batch");
        }
        // sleeping for 7s to let the receiver receive messages and then closing it.
        await receiver.close();


        // console.log("Sending message");
        date = Date.now();
        const sender: Sender = await connection.createSender({
            name:  "sender-1",
            target: {
                address: senderAddress
            }
        });
        await sender.send({ body: "Hello World "+date });
        // console.log("Sent message");
        // console.log(">>>>>[%s] Delivery id: ", connection.id, delivery.id);

        await sender.close();
        filterClause = `amqp.annotation.x-opt-enqueued-time > '${date}'`;
        receiver = await connection.createReceiver({
                name: receiverName,
                source: {
                address: receiverAddress,
                filter: {
                    "apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468C00000004)
                }
            }
        });
        msgs = await receiveOne(receiver);
        if (msgs.body!=="Hello World "+date) {
            flag = 1;
            console.log("message body didn't match in the second receivebatch");
            console.log("failure at the iteration - ", count);
        }
        // sleeping for 2s to let the receiver receive messages and then closing it.
        await delay(2000);
        await receiver.close();
    }
    await connection.close();
  }
  
  main().catch((err) => console.log(err));

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 1
  • Comments: 17 (17 by maintainers)

Most upvoted comments

Very good insights! I believe I have a better picture now of what was the intent behind the original tests that we inherited here.

The 2 tests in misc.spec.ts These simply test whether the sent messages can be received. We should be able to skip the initial receive() here and the intent of the test won’t change.

The 3 tests in receiver.spec.ts

  • These tests use 3 receive() calls in total which checks for 0 events in the first and last call.
  • The intent here seems to be to ensure that the same AMQP receiver link once created gives expected results at different points in time
    • receives no events when started at end of stream (because no new events were sent yet)
    • send an event, ensure you receive it using the same link
    • no more events are received on this link.

Our decision to close the receiver link between the receive calls will be going against the original intent

Also, since receiveBatch() always closed the receiver link, the “features” being tested above were never really exposed to our users.

Therefore, it is safe to say that we can remove the multiple receive() calls altogether.

In track 2, we are introducing the “receiver” concept to the user in Event Hubs just like we did in Service Bus i.e. in Track 2 we no longer have client.receiveBatch(), but we have client.createReceiver(). The intent above will come back into play at that point, we will track that in https://github.com/Azure/azure-sdk-for-js/issues/3714