google-cloud-dotnet: Deadlock when disposing underlying gRPC channel in `SubscriberClient`

This is a follow-up of #10304 — now that clients are being disposed, a deadlock caused by what appears to be a race condition between cancelling a gRPC streaming call and disposing the channel has started to manifest.

I am not sure whether to report it here or to grpc-dotnet, but since it’s occurring due to how the channels are used by this library, I figured I would start here.

This is difficult to reproduce reliably, but for my integration tests it happens quite often; I am still unable to run all my integration tests as mentioned in the linked issue, but now due to the teardown hanging because of this race condition.

Environment details

  • OS: macOS Ventura
  • .NET version: 7
  • Package name and version: Google.Cloud.PubSub.V1 @ 3.5.1

Steps to reproduce

Run the following application with a debugger attached, the latest version of Google.Cloud.PubSub.V1 installed, and the emulator running on localhost:8085. When no more output is produced, pause the debugger and inspect the thread stacks.

using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using Grpc.Core;

/*
 * Run this application with a debugger attached. When the output stops producing,
 * usually ending with  "Received message" and "Stopping subscriber", pause execution and inspect
 * the thread stacks. You will see the "cancelinator" thread being stuck
 * at `GrpcChannel.FinishActiveCall() -> Monitor.Enter`, and one of the .NET Worker Threads being
 * stuck at `GrpcCall.Cleanup() -> CancellationTokenSource.Registrations.WaitForCallbackToComplete`
 *
 * Be patient, it may take >1000 iterations to manifest depending on your setup. I don't know of
 * a more reliable way to reproduce this.
 */

// Emulator.
const string endpoint = "http://localhost:8085";

// Run ID, used to ensure unique topics and subs.
var runId = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var i = 1;
while (true)
{
    var iteration = i++;
    // Create the topic and subscription.
    var topic = await CreateTopic(iteration);
    var subscription = await CreateSubscription(topic);

    // Publish a message to the topic.
    var messageData = await PublishMessage(topic, iteration);

    // Set up a subscriber and wait for the message to be received.
    // That's how we know we have an open streaming call.
    var subscriber = await new SubscriberClientBuilder
    {
        Endpoint = endpoint,
        ChannelCredentials = ChannelCredentials.Insecure,
        ClientCount = 1,
        SubscriptionName = subscription,
    }.BuildAsync();

    var receivedMessageTcs = new TaskCompletionSource();
    Console.WriteLine($"{iteration}: Starting subscriber");

    // Simulate a "background worker". This is not uncommon in ASP.NET with BackgroundServices. 
    await Task.Run(async () =>
    {
        var startTask = subscriber.StartAsync(async (message, t) =>
        {
            await Task.Run(() =>
            {
                if (message.Data.ToStringUtf8() != messageData)
                {
                    return;
                }

                // Notify that we received the message we sent.
                receivedMessageTcs.SetResult();

                // Cancel the CTS in a different thread.
                var cancelinatorThread = new Thread(() =>
                {
                    Console.WriteLine($"{iteration}: Stopping subscriber");
                    subscriber.StopAsync(default(CancellationToken));
                })
                {
                    Name = "cancelinator"
                };
                cancelinatorThread.Start();
            }, t);

            return SubscriberClient.Reply.Ack;
        });

        // Wait for the message.
        Console.WriteLine($"{iteration}: Waiting for message");
        await receivedMessageTcs.Task;

        Console.WriteLine($"{iteration}: Received message");
        await startTask;
    });

    Console.WriteLine($"{iteration}: Complete");
    Console.WriteLine($"-------------------------");
}

// Creates the topic.
async Task<TopicName> CreateTopic(int iteration)
{
    var publisherServiceClientBuilder = new PublisherServiceApiClientBuilder
    {
        ChannelCredentials = ChannelCredentials.Insecure,
        Endpoint = endpoint
    };

    var publisherClient = await publisherServiceClientBuilder.BuildAsync();
    var topic = new TopicName("deadlock-testing", $"topic-{iteration}-{runId}");
    await publisherClient.CreateTopicAsync(topic);
    MaybeDispose(publisherServiceClientBuilder.LastCreatedChannel);
    return topic;
}

// Creates the subscription.
async Task<SubscriptionName> CreateSubscription(TopicName topic)
{
    var subscriberServiceClientBuilder = new SubscriberServiceApiClientBuilder()
    {
        ChannelCredentials = ChannelCredentials.Insecure,
        Endpoint = endpoint
    };

    var subscriberClient = await subscriberServiceClientBuilder.BuildAsync();
    var subscription = new SubscriptionName("deadlock-testing", $"{topic.TopicId}-subscription");
    await subscriberClient.CreateSubscriptionAsync(
        name: subscription,
        topic: topic,
        pushConfig: null,
        ackDeadlineSeconds: 10);

    return subscription;
}

// Publishes a message to the topic.
async Task<string> PublishMessage(TopicName topic, int iteration)
{
    var publisherClientBuilder = new PublisherClientBuilder()
    {
        ChannelCredentials = ChannelCredentials.Insecure,
        Endpoint = endpoint,
        ClientCount = 1,
        TopicName = topic,
        Settings = new PublisherClient.Settings
        {
            BatchingSettings = new BatchingSettings(
                elementCountThreshold: 1,
                byteCountThreshold: 1,
                delayThreshold: TimeSpan.FromMicroseconds(value: 1))
        }
    };

    await using var publisher = await publisherClientBuilder.BuildAsync();
    var message = FormatMessageData(iteration);
    await publisher.PublishAsync(message);

    return message;
}

// Dispose the channel if possible.
static void MaybeDispose(ChannelBase? channel)
{
    if (channel is IDisposable disposable)
    {
        disposable.Dispose();
    }
}

string FormatMessageData(int iteration)
{
    return $"{runId}-{iteration}";
}

Eventually, the output will halt, looking something like:

2337: Stopping subscriber
2337: Complete
-------------------------
2338: Starting subscriber
2338: Waiting for message
2338: Received message
2338: Stopping subscriber
2338: Complete
-------------------------
2339: Starting subscriber
2339: Waiting for message
2339: Received message
2339: Stopping subscriber

At that point, pause your debugger and look at the thread stacks. Irrelevant/inactive threads omitted for brevity:

.NET ThreadPool Worker @2220808
	Thread.Sleep()
	SpinWait.SpinOnceCore()
	CancellationTokenSource.Registrations.WaitForCallbackToComplete()
	GrpcCall<StreamingPullRequest, StreamingPullResponse>.Cleanup()
	GrpcCall<StreamingPullRequest, StreamingPullResponse>.Dispose()
	GrpcChannel.Dispose()
	SubscriberClientBuilder.<BuildAsyncImpl>g__DisposeChannelAsync|23_1()
	SubscriberClientBuilder.<>c__DisplayClass23_1.<BuildAsyncImpl>b__2()
	SubscriberClientBuilder.<>c.<BuildAsyncImpl>b__23_3()
	Enumerable.SelectArrayIterator<__Canon, __Canon>.MoveNext()
	Task.WhenAll()
	SubscriberClientBuilder.<>c__DisplayClass23_0.<BuildAsyncImpl>b__0()
	Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d.MoveNext()
	AsyncMethodBuilderCore.Start<Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d>()
	AsyncTaskMethodBuilder<Exception>.Start<Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d>()
	Extensions.<>c__DisplayClass4_0.<ConfigureAwaitHideErrors>g__Inner|0()
	Extensions.ConfigureAwaitHideErrors()
	SubscriberClientImpl.<StopCompletionAsync>d__28.MoveNext()
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.<StopCompletionAsync>d__28>.ExecutionContextCallback()
	ExecutionContext.RunInternal() [6]
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.<StopCompletionAsync>d__28>.MoveNext()
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.<StopCompletionAsync>d__28>.MoveNext()
	AwaitTaskContinuation.RunOrScheduleAction() [3]
	Task.RunContinuations() [8]
	Task<__Canon>.TrySetResult()
	TaskFactory.CompleteOnInvokePromise.Invoke()
	Task.RunContinuations() [7]
	Task<VoidTaskResult>.TrySetResult() [2]
	UnwrapPromise<VoidTaskResult>.TrySetFromTask()
	UnwrapPromise<VoidTaskResult>.Invoke()
	Task.RunContinuations() [6]
	Task<VoidTaskResult>.TrySetResult() [1]
	AsyncTaskMethodBuilder<VoidTaskResult>.SetExistingTaskResult()
	AsyncTaskMethodBuilder.SetResult()
	SubscriberClientImpl.SingleChannel.<StartAsync>d__47.MoveNext()
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.SingleChannel.<StartAsync>d__47>.ExecutionContextCallback()
	ExecutionContext.RunInternal() [5]
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.SingleChannel.<StartAsync>d__47>.MoveNext()
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.SingleChannel.<StartAsync>d__47>.MoveNext()
	AwaitTaskContinuation.RunOrScheduleAction() [2]
	Task.RunContinuations() [5]
	Task<SubscriberClientImpl.SingleChannel.TaskNextAction>.TrySetResult()
	AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.SetExistingTaskResult()
	AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.SetResult()
	SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6.MoveNext()
	AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.AsyncStateMachineBox<SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6>.ExecutionContextCallback()
	ExecutionContext.RunInternal() [4]
	AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.AsyncStateMachineBox<SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6>.MoveNext()
	AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.AsyncStateMachineBox<SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6>.MoveNext()
	AwaitTaskContinuation.RunOrScheduleAction() [1]
	Task.RunContinuations() [4]
	Task<int>.TrySetResult()
	TaskCompletionSource<int>.TrySetResult()
	SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.Enqueue()
	SubscriberClientImpl.SingleChannel.<>c__DisplayClass50_0.<Add>b__0()
	ExecutionContext.RunInternal() [3]
	Task.ExecuteWithThreadLocal()
	ThreadPoolTaskScheduler.TryExecuteTaskInline()
	TaskScheduler.TryRunInline()
	TaskContinuation.InlineIfPossibleOrElseQueue()
	ContinueWithTaskContinuation.Run()
	Task.RunContinuations() [3]
	Task.FinishSlow()
	Task.TrySetException()
	AsyncTaskMethodBuilder<bool>.SetException()
	HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18.MoveNext()
	AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18>.ExecutionContextCallback()
	ExecutionContext.RunInternal() [2]
	AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18>.MoveNext()
	AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18>.MoveNext()
	AwaitTaskContinuation.RunOrScheduleAction() [2]
	Task.RunContinuations() [2]
	Task.TrySetCanceled() [2]
	AsyncTaskMethodBuilder<StreamingPullResponse>.SetException()
	StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>.MoveNext()
	AsyncTaskMethodBuilder<StreamingPullResponse>.AsyncStateMachineBox<StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>>.ExecutionContextCallback()
	ExecutionContext.RunInternal() [1]
	AsyncTaskMethodBuilder<StreamingPullResponse>.AsyncStateMachineBox<StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>>.MoveNext()
	AsyncTaskMethodBuilder<StreamingPullResponse>.AsyncStateMachineBox<StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>>.MoveNext()
	AwaitTaskContinuation.RunOrScheduleAction() [1]
	Task.RunContinuations() [1]
	Task.TrySetCanceled() [1]
	AsyncTaskMethodBuilder<int>.SetException()
	Http2Connection.Http2Stream.<ReadDataAsync>d__76.MoveNext()
	AsyncTaskMethodBuilder<int>.AsyncStateMachineBox<Http2Connection.Http2Stream.<ReadDataAsync>d__76>.ExecutionContextCallback()
	ExecutionContext.RunFromThreadPoolDispatchLoop()
	AsyncTaskMethodBuilder<int>.AsyncStateMachineBox<Http2Connection.Http2Stream.<ReadDataAsync>d__76>.MoveNext()
	AsyncTaskMethodBuilder<int>.AsyncStateMachineBox<Http2Connection.Http2Stream.<ReadDataAsync>d__76>.ExecuteFromThreadPool()
	ThreadPoolWorkQueue.Dispatch()
	PortableThreadPool.WorkerThread.WorkerThreadStart()
	Thread.StartCallback()
	[Native to Managed Transition]

cancelinator @2222686
	Monitor.Enter()
	GrpcChannel.FinishActiveCall()
	GrpcCall<StreamingPullRequest, StreamingPullResponse>.Cleanup()
	GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73.MoveNext()
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73>.ExecutionContextCallback()
	ExecutionContext.RunInternal() [2]
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73>.MoveNext()
	AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73>.MoveNext()
	AwaitTaskContinuation.RunOrScheduleAction()
	Task.RunContinuations()
	Task<Status>.TrySetResult()
	TaskCompletionSource<Status>.TrySetResult()
	GrpcCall<StreamingPullRequest, StreamingPullResponse>.CancelCall()
	GrpcCall<StreamingPullRequest, StreamingPullResponse>.CancelCallFromCancellationToken()
	GrpcCall<StreamingPullRequest, StreamingPullResponse>.<>c.<InitializeCall>b__78_0()
	ExecutionContext.RunInternal() [1]
	CancellationTokenSource.ExecuteCallbackHandlers()
	SubscriberClientImpl.StopAsync()
	Program.<>c__DisplayClass0_1.<<Main>$>b__8()
	Thread.StartHelper.Run()
	Thread.StartCallback()
	[Native to Managed Transition]

From the looks of it, it appears that the Dispose of the channel is racing with the cancellation of the streaming call.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 17 (7 by maintainers)

Commits related to this issue

Most upvoted comments

Sorry I wanted to wait a bit so I didn’t speak too soon

Completely understand! Thank you so much for being so patient and responsive. It makes an enormous difference. “AAA++ customer, would try to resolve issues again” 😉

Will close this issue now, but ping it again when we’ve got a new release.

@jeffijoe: We’ve released version 3.6.0-beta01 with the workaround.

Update: We were finally able to reproduce the issue after ~3000 iterations and the stack trace matches the one shared above. We are trying a possible fix in the library which can work independently of the fix in Grpc.Net.Client . Will keep this thread posted as we make progress.
PubSub Grpc Client Dispose Deadlock Tasks Summary

Thanks @jeffijoe , yep, it’s useful to see it actually happening. But even if we haven’t seen it happen on our side, I’m pretty certain there’s a race condition, I understand why and how we are triggering it. It’s just a matter of luck who gets to actually reproduce it.

We are on it, and hopefully we can release a fix on our side soon, independent of what happens with Grpc.Net.Client.

I’ve filed https://github.com/grpc/grpc-dotnet/issues/2119 describing the Grpc.Net.Client race condition that manifests after we have started disposing of the channel, and what I think it’s a possible fix.

We might be able to make changes on our side so as not to trigger the race condition ourselves, while we wait for Grpc.Net.Client to confirm/fix etc. We’ll know more about that tomorrow, May 11th.