google-cloud-dotnet: Deadlock when disposing underlying gRPC channel in `SubscriberClient`
This is a follow-up of #10304 — now that clients are being disposed, a deadlock caused by what appears to be a race condition between cancelling a gRPC streaming call and disposing the channel has started to manifest.
I am not sure whether to report it here or to grpc-dotnet
, but since it’s occurring due to how the channels are used by this library, I figured I would start here.
This is difficult to reproduce reliably, but for my integration tests it happens quite often; I am still unable to run all my integration tests as mentioned in the linked issue, but now due to the teardown hanging because of this race condition.
Environment details
- OS: macOS Ventura
- .NET version: 7
- Package name and version: Google.Cloud.PubSub.V1 @ 3.5.1
Steps to reproduce
Run the following application with a debugger attached, the latest version of Google.Cloud.PubSub.V1
installed, and the emulator running on localhost:8085
. When no more output is produced, pause the debugger and inspect the thread stacks.
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
/*
* Run this application with a debugger attached. When the output stops producing,
* usually ending with "Received message" and "Stopping subscriber", pause execution and inspect
* the thread stacks. You will see the "cancelinator" thread being stuck
* at `GrpcChannel.FinishActiveCall() -> Monitor.Enter`, and one of the .NET Worker Threads being
* stuck at `GrpcCall.Cleanup() -> CancellationTokenSource.Registrations.WaitForCallbackToComplete`
*
* Be patient, it may take >1000 iterations to manifest depending on your setup. I don't know of
* a more reliable way to reproduce this.
*/
// Emulator.
const string endpoint = "http://localhost:8085";
// Run ID, used to ensure unique topics and subs.
var runId = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
var i = 1;
while (true)
{
var iteration = i++;
// Create the topic and subscription.
var topic = await CreateTopic(iteration);
var subscription = await CreateSubscription(topic);
// Publish a message to the topic.
var messageData = await PublishMessage(topic, iteration);
// Set up a subscriber and wait for the message to be received.
// That's how we know we have an open streaming call.
var subscriber = await new SubscriberClientBuilder
{
Endpoint = endpoint,
ChannelCredentials = ChannelCredentials.Insecure,
ClientCount = 1,
SubscriptionName = subscription,
}.BuildAsync();
var receivedMessageTcs = new TaskCompletionSource();
Console.WriteLine($"{iteration}: Starting subscriber");
// Simulate a "background worker". This is not uncommon in ASP.NET with BackgroundServices.
await Task.Run(async () =>
{
var startTask = subscriber.StartAsync(async (message, t) =>
{
await Task.Run(() =>
{
if (message.Data.ToStringUtf8() != messageData)
{
return;
}
// Notify that we received the message we sent.
receivedMessageTcs.SetResult();
// Cancel the CTS in a different thread.
var cancelinatorThread = new Thread(() =>
{
Console.WriteLine($"{iteration}: Stopping subscriber");
subscriber.StopAsync(default(CancellationToken));
})
{
Name = "cancelinator"
};
cancelinatorThread.Start();
}, t);
return SubscriberClient.Reply.Ack;
});
// Wait for the message.
Console.WriteLine($"{iteration}: Waiting for message");
await receivedMessageTcs.Task;
Console.WriteLine($"{iteration}: Received message");
await startTask;
});
Console.WriteLine($"{iteration}: Complete");
Console.WriteLine($"-------------------------");
}
// Creates the topic.
async Task<TopicName> CreateTopic(int iteration)
{
var publisherServiceClientBuilder = new PublisherServiceApiClientBuilder
{
ChannelCredentials = ChannelCredentials.Insecure,
Endpoint = endpoint
};
var publisherClient = await publisherServiceClientBuilder.BuildAsync();
var topic = new TopicName("deadlock-testing", $"topic-{iteration}-{runId}");
await publisherClient.CreateTopicAsync(topic);
MaybeDispose(publisherServiceClientBuilder.LastCreatedChannel);
return topic;
}
// Creates the subscription.
async Task<SubscriptionName> CreateSubscription(TopicName topic)
{
var subscriberServiceClientBuilder = new SubscriberServiceApiClientBuilder()
{
ChannelCredentials = ChannelCredentials.Insecure,
Endpoint = endpoint
};
var subscriberClient = await subscriberServiceClientBuilder.BuildAsync();
var subscription = new SubscriptionName("deadlock-testing", $"{topic.TopicId}-subscription");
await subscriberClient.CreateSubscriptionAsync(
name: subscription,
topic: topic,
pushConfig: null,
ackDeadlineSeconds: 10);
return subscription;
}
// Publishes a message to the topic.
async Task<string> PublishMessage(TopicName topic, int iteration)
{
var publisherClientBuilder = new PublisherClientBuilder()
{
ChannelCredentials = ChannelCredentials.Insecure,
Endpoint = endpoint,
ClientCount = 1,
TopicName = topic,
Settings = new PublisherClient.Settings
{
BatchingSettings = new BatchingSettings(
elementCountThreshold: 1,
byteCountThreshold: 1,
delayThreshold: TimeSpan.FromMicroseconds(value: 1))
}
};
await using var publisher = await publisherClientBuilder.BuildAsync();
var message = FormatMessageData(iteration);
await publisher.PublishAsync(message);
return message;
}
// Dispose the channel if possible.
static void MaybeDispose(ChannelBase? channel)
{
if (channel is IDisposable disposable)
{
disposable.Dispose();
}
}
string FormatMessageData(int iteration)
{
return $"{runId}-{iteration}";
}
Eventually, the output will halt, looking something like:
2337: Stopping subscriber
2337: Complete
-------------------------
2338: Starting subscriber
2338: Waiting for message
2338: Received message
2338: Stopping subscriber
2338: Complete
-------------------------
2339: Starting subscriber
2339: Waiting for message
2339: Received message
2339: Stopping subscriber
At that point, pause your debugger and look at the thread stacks. Irrelevant/inactive threads omitted for brevity:
.NET ThreadPool Worker @2220808
Thread.Sleep()
SpinWait.SpinOnceCore()
CancellationTokenSource.Registrations.WaitForCallbackToComplete()
GrpcCall<StreamingPullRequest, StreamingPullResponse>.Cleanup()
GrpcCall<StreamingPullRequest, StreamingPullResponse>.Dispose()
GrpcChannel.Dispose()
SubscriberClientBuilder.<BuildAsyncImpl>g__DisposeChannelAsync|23_1()
SubscriberClientBuilder.<>c__DisplayClass23_1.<BuildAsyncImpl>b__2()
SubscriberClientBuilder.<>c.<BuildAsyncImpl>b__23_3()
Enumerable.SelectArrayIterator<__Canon, __Canon>.MoveNext()
Task.WhenAll()
SubscriberClientBuilder.<>c__DisplayClass23_0.<BuildAsyncImpl>b__0()
Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d.MoveNext()
AsyncMethodBuilderCore.Start<Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d>()
AsyncTaskMethodBuilder<Exception>.Start<Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d>()
Extensions.<>c__DisplayClass4_0.<ConfigureAwaitHideErrors>g__Inner|0()
Extensions.ConfigureAwaitHideErrors()
SubscriberClientImpl.<StopCompletionAsync>d__28.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.<StopCompletionAsync>d__28>.ExecutionContextCallback()
ExecutionContext.RunInternal() [6]
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.<StopCompletionAsync>d__28>.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.<StopCompletionAsync>d__28>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [3]
Task.RunContinuations() [8]
Task<__Canon>.TrySetResult()
TaskFactory.CompleteOnInvokePromise.Invoke()
Task.RunContinuations() [7]
Task<VoidTaskResult>.TrySetResult() [2]
UnwrapPromise<VoidTaskResult>.TrySetFromTask()
UnwrapPromise<VoidTaskResult>.Invoke()
Task.RunContinuations() [6]
Task<VoidTaskResult>.TrySetResult() [1]
AsyncTaskMethodBuilder<VoidTaskResult>.SetExistingTaskResult()
AsyncTaskMethodBuilder.SetResult()
SubscriberClientImpl.SingleChannel.<StartAsync>d__47.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.SingleChannel.<StartAsync>d__47>.ExecutionContextCallback()
ExecutionContext.RunInternal() [5]
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.SingleChannel.<StartAsync>d__47>.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<SubscriberClientImpl.SingleChannel.<StartAsync>d__47>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [2]
Task.RunContinuations() [5]
Task<SubscriberClientImpl.SingleChannel.TaskNextAction>.TrySetResult()
AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.SetExistingTaskResult()
AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.SetResult()
SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6.MoveNext()
AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.AsyncStateMachineBox<SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6>.ExecutionContextCallback()
ExecutionContext.RunInternal() [4]
AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.AsyncStateMachineBox<SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6>.MoveNext()
AsyncTaskMethodBuilder<SubscriberClientImpl.SingleChannel.TaskNextAction>.AsyncStateMachineBox<SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.<DequeueAsync>d__6>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [1]
Task.RunContinuations() [4]
Task<int>.TrySetResult()
TaskCompletionSource<int>.TrySetResult()
SubscriberClientImpl.AsyncSingleRecvQueue<SubscriberClientImpl.SingleChannel.TaskNextAction>.Enqueue()
SubscriberClientImpl.SingleChannel.<>c__DisplayClass50_0.<Add>b__0()
ExecutionContext.RunInternal() [3]
Task.ExecuteWithThreadLocal()
ThreadPoolTaskScheduler.TryExecuteTaskInline()
TaskScheduler.TryRunInline()
TaskContinuation.InlineIfPossibleOrElseQueue()
ContinueWithTaskContinuation.Run()
Task.RunContinuations() [3]
Task.FinishSlow()
Task.TrySetException()
AsyncTaskMethodBuilder<bool>.SetException()
HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18.MoveNext()
AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18>.ExecutionContextCallback()
ExecutionContext.RunInternal() [2]
AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18>.MoveNext()
AsyncTaskMethodBuilder<bool>.AsyncStateMachineBox<HttpContentClientStreamReader<StreamingPullRequest, StreamingPullResponse>.<MoveNextCore>d__18>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [2]
Task.RunContinuations() [2]
Task.TrySetCanceled() [2]
AsyncTaskMethodBuilder<StreamingPullResponse>.SetException()
StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>.MoveNext()
AsyncTaskMethodBuilder<StreamingPullResponse>.AsyncStateMachineBox<StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>>.ExecutionContextCallback()
ExecutionContext.RunInternal() [1]
AsyncTaskMethodBuilder<StreamingPullResponse>.AsyncStateMachineBox<StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>>.MoveNext()
AsyncTaskMethodBuilder<StreamingPullResponse>.AsyncStateMachineBox<StreamExtensions.<ReadMessageAsync>d__4<StreamingPullResponse>>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction() [1]
Task.RunContinuations() [1]
Task.TrySetCanceled() [1]
AsyncTaskMethodBuilder<int>.SetException()
Http2Connection.Http2Stream.<ReadDataAsync>d__76.MoveNext()
AsyncTaskMethodBuilder<int>.AsyncStateMachineBox<Http2Connection.Http2Stream.<ReadDataAsync>d__76>.ExecutionContextCallback()
ExecutionContext.RunFromThreadPoolDispatchLoop()
AsyncTaskMethodBuilder<int>.AsyncStateMachineBox<Http2Connection.Http2Stream.<ReadDataAsync>d__76>.MoveNext()
AsyncTaskMethodBuilder<int>.AsyncStateMachineBox<Http2Connection.Http2Stream.<ReadDataAsync>d__76>.ExecuteFromThreadPool()
ThreadPoolWorkQueue.Dispatch()
PortableThreadPool.WorkerThread.WorkerThreadStart()
Thread.StartCallback()
[Native to Managed Transition]
cancelinator @2222686
Monitor.Enter()
GrpcChannel.FinishActiveCall()
GrpcCall<StreamingPullRequest, StreamingPullResponse>.Cleanup()
GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73>.ExecutionContextCallback()
ExecutionContext.RunInternal() [2]
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73>.MoveNext()
AsyncTaskMethodBuilder<VoidTaskResult>.AsyncStateMachineBox<GrpcCall<StreamingPullRequest, StreamingPullResponse>.<RunCall>d__73>.MoveNext()
AwaitTaskContinuation.RunOrScheduleAction()
Task.RunContinuations()
Task<Status>.TrySetResult()
TaskCompletionSource<Status>.TrySetResult()
GrpcCall<StreamingPullRequest, StreamingPullResponse>.CancelCall()
GrpcCall<StreamingPullRequest, StreamingPullResponse>.CancelCallFromCancellationToken()
GrpcCall<StreamingPullRequest, StreamingPullResponse>.<>c.<InitializeCall>b__78_0()
ExecutionContext.RunInternal() [1]
CancellationTokenSource.ExecuteCallbackHandlers()
SubscriberClientImpl.StopAsync()
Program.<>c__DisplayClass0_1.<<Main>$>b__8()
Thread.StartHelper.Run()
Thread.StartCallback()
[Native to Managed Transition]
From the looks of it, it appears that the Dispose
of the channel is racing with the cancellation of the streaming call.
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 17 (7 by maintainers)
Completely understand! Thank you so much for being so patient and responsive. It makes an enormous difference. “AAA++ customer, would try to resolve issues again” 😉
Will close this issue now, but ping it again when we’ve got a new release.
@jeffijoe: We’ve released version 3.6.0-beta01 with the workaround.
Update: We were finally able to reproduce the issue after ~3000 iterations and the stack trace matches the one shared above. We are trying a possible fix in the library which can work independently of the fix in Grpc.Net.Client . Will keep this thread posted as we make progress.

Thanks @jeffijoe , yep, it’s useful to see it actually happening. But even if we haven’t seen it happen on our side, I’m pretty certain there’s a race condition, I understand why and how we are triggering it. It’s just a matter of luck who gets to actually reproduce it.
We are on it, and hopefully we can release a fix on our side soon, independent of what happens with Grpc.Net.Client.
I’ve filed https://github.com/grpc/grpc-dotnet/issues/2119 describing the Grpc.Net.Client race condition that manifests after we have started disposing of the channel, and what I think it’s a possible fix.
We might be able to make changes on our side so as not to trigger the race condition ourselves, while we wait for Grpc.Net.Client to confirm/fix etc. We’ll know more about that tomorrow, May 11th.