google-cloud-go: pubsub: Receive Does Not Return After Cancellation When Blocked in Streaming Pull

Client PubSub

Environment Unit test using pstest

Code https://github.com/ian-mi/knative-gcp/blob/wait-test-pubsub-receive/pkg/broker/ingress/handler_test.go

In particular this code will call pubsub Receive with ctx but will wait forever on Receive to return during test cleanup:

	pChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(ctx)
	go func() {
		pChan <- p.OpenInbound(cecontext.WithLogger(ctx, logtest.TestLogger(t)))
	}()
	t.Cleanup(func() {
		cancel()
		pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
		<-pChan
	})

Expected behavior When context is cancelled, any blocked StreamingPull requests should be cancelled so that the receiver can exit cleanly.

Actual behavior The receiver will wait indefinitely for blocked StreamingPull requests to finish.

Additional context

The stacktrace shows that goroutine 212 is blocked waiting for a messageIterator to stop. However goroutine 84 is blocked closing its pullstream while goroutine 85 is blocked calling StreamingPull. Since the pullstream is created on the background context it will not be cancelled: https://github.com/googleapis/google-cloud-go/blob/master/pubsub/iterator.go#L78.

panic: test timed out after 10m0s

goroutine 290 [running]:
testing.(*M).startAlarm.func1()
	/usr/lib/go/src/testing/testing.go:1459 +0x11c
created by time.goFunc
	/usr/lib/go/src/time/sleep.go:168 +0x52

goroutine 1 [chan receive, 10 minutes]:
testing.(*T).Run(0xc0002e0000, 0x217ba36, 0xb, 0x2222e38, 0x0)
	/usr/lib/go/src/testing/testing.go:1043 +0x699
testing.runTests.func1(0xc0002e0000)
	/usr/lib/go/src/testing/testing.go:1284 +0xa7
testing.tRunner(0xc0002e0000, 0xc00013fd50)
	/usr/lib/go/src/testing/testing.go:991 +0x1ec
testing.runTests(0xc00000e1a0, 0x33968e0, 0x2, 0x2, 0x0)
	/usr/lib/go/src/testing/testing.go:1282 +0x528
testing.(*M).Run(0xc000052a80, 0x0)
	/usr/lib/go/src/testing/testing.go:1199 +0x300
main.main()
	_testmain.go:48 +0x224

goroutine 4 [select]:
go.opencensus.io/stats/view.(*worker).start(0xc0000b0b40)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/go.opencensus.io/stats/view/worker.go:154 +0x1d6
created by go.opencensus.io/stats/view.init.0
	/home/ian/go/src/github.com/google/knative-gcp/vendor/go.opencensus.io/stats/view/worker.go:32 +0x9a

goroutine 11 [chan receive]:
k8s.io/klog.(*loggingT).flushDaemon(0x33b4260)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/k8s.io/klog/klog.go:1010 +0xae
created by k8s.io/klog.init.0
	/home/ian/go/src/github.com/google/knative-gcp/vendor/k8s.io/klog/klog.go:411 +0x18f

goroutine 13 [chan receive, 10 minutes]:
testing.(*T).Run(0xc0002e0120, 0x217ac50, 0xa, 0xc00000e8c0, 0xc0002e0b08)
	/usr/lib/go/src/testing/testing.go:1043 +0x699
github.com/google/knative-gcp/pkg/broker/ingress.TestHandler(0xc0002e0120)
	/home/ian/go/src/github.com/google/knative-gcp/pkg/broker/ingress/handler_test.go:237 +0x1c4a
testing.tRunner(0xc0002e0120, 0x2222e38)
	/usr/lib/go/src/testing/testing.go:991 +0x1ec
created by testing.(*T).Run
	/usr/lib/go/src/testing/testing.go:1042 +0x661

goroutine 14 [chan receive, 9 minutes]:
github.com/google/knative-gcp/pkg/broker/ingress.setupTestReceiver.func2()
	/home/ian/go/src/github.com/google/knative-gcp/pkg/broker/ingress/handler_test.go:379 +0xc4
testing.(*common).Cleanup.func1()
	/usr/lib/go/src/testing/testing.go:789 +0x4b
testing.(*common).runCleanup(0xc0002e0b40, 0x0, 0x0, 0x0)
	/usr/lib/go/src/testing/testing.go:819 +0xc1
testing.tRunner.func2(0xc0002e0b40)
	/usr/lib/go/src/testing/testing.go:985 +0x72
testing.tRunner(0xc0002e0b40, 0xc00000e8c0)
	/usr/lib/go/src/testing/testing.go:995 +0x20e
created by testing.(*T).Run
	/usr/lib/go/src/testing/testing.go:1042 +0x661

goroutine 52 [select]:
google.golang.org/grpc.(*ccBalancerWrapper).watcher(0xc0008b2140)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/balancer_conn_wrappers.go:69 +0x1a2
created by google.golang.org/grpc.newCCBalancerWrapper
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/balancer_conn_wrappers.go:60 +0x2f4

goroutine 53 [select]:
google.golang.org/grpc.(*addrConn).resetTransport(0xc000b8c2c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/clientconn.go:1126 +0x57f
created by google.golang.org/grpc.(*addrConn).connect
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/clientconn.go:799 +0x104

goroutine 56 [select]:
google.golang.org/grpc.(*ccBalancerWrapper).watcher(0xc0008b24c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/balancer_conn_wrappers.go:69 +0x1a2
created by google.golang.org/grpc.newCCBalancerWrapper
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/balancer_conn_wrappers.go:60 +0x2f4

goroutine 57 [select]:
google.golang.org/grpc.(*addrConn).resetTransport(0xc000b8c580)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/clientconn.go:1126 +0x57f
created by google.golang.org/grpc.(*addrConn).connect
	/home/ian/go/src/github.com/google/knative-gcp/vendor/google.golang.org/grpc/clientconn.go:799 +0x104

goroutine 100 [runnable]:
github.com/cloudevents/sdk-go/v2/protocol/pubsub.(*Protocol).OpenInbound(0xc0004fd2d0, 0x2596ee0, 0xc0007684e0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/cloudevents/sdk-go/v2/protocol/pubsub/protocol.go:215 +0x3cd
github.com/google/knative-gcp/pkg/broker/ingress.setupTestReceiver.func1(0xc0001131a0, 0xc0004fd2d0, 0x2596e20, 0xc000902080, 0x25c1ec0, 0xc0002e0b40)
	/home/ian/go/src/github.com/google/knative-gcp/pkg/broker/ingress/handler_test.go:374 +0xa5
created by github.com/google/knative-gcp/pkg/broker/ingress.setupTestReceiver
	/home/ian/go/src/github.com/google/knative-gcp/pkg/broker/ingress/handler_test.go:373 +0x51e

goroutine 83 [semacquire, 9 minutes]:
sync.runtime_Semacquire(0xc000768be0)
	/usr/lib/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc000768bd8)
	/usr/lib/go/src/sync/waitgroup.go:130 +0xd4
golang.org/x/sync/errgroup.(*Group).Wait(0xc000768bd0, 0x2221d88, 0x2596e20)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:40 +0x43
cloud.google.com/go/pubsub.(*Subscription).Receive(0xc000766660, 0x2596e20, 0xc000518180, 0xc0005306e0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:818 +0xc21
github.com/cloudevents/sdk-go/v2/protocol/pubsub/internal.(*Connection).Receive(0xc0005fad90, 0x2596e20, 0xc000518180, 0xc0004b6960, 0xc0008d9ea8, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/cloudevents/sdk-go/v2/protocol/pubsub/internal/connection.go:307 +0x157
github.com/cloudevents/sdk-go/v2/protocol/pubsub.(*Protocol).startSubscriber(0xc0004fd2d0, 0x2596e20, 0xc000518180, 0x2175f50, 0x6, 0x217ea18, 0xd, 0xc000530200, 0xc0006c7fa0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/cloudevents/sdk-go/v2/protocol/pubsub/protocol.go:185 +0x2e0
github.com/cloudevents/sdk-go/v2/protocol/pubsub.(*Protocol).OpenInbound.func1(0xc0004fd2d0, 0x2596e20, 0xc000518180, 0xc0007664e0, 0xc00076a2a0, 0x2596ee0, 0xc0007684e0, 0x2175f50, 0x6, 0x217ea18, ...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/cloudevents/sdk-go/v2/protocol/pubsub/protocol.go:202 +0x88
created by github.com/cloudevents/sdk-go/v2/protocol/pubsub.(*Protocol).OpenInbound
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/cloudevents/sdk-go/v2/protocol/pubsub/protocol.go:201 +0x243

goroutine 84 [semacquire, 9 minutes]:
sync.runtime_SemacquireMutex(0xc00051849c, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000518498)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc000518498)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc000518480, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc000518480, 0x2221da8, 0x0, 0x0, 0x0, 0x7ffcc, 0xc000c9dbe8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).CloseSend(0xc000518480, 0xc0001039a8, 0xc000c9dc70)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:182 +0x5a
cloud.google.com/go/pubsub.(*messageIterator).sender.func1(0xc00076e9c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:269 +0x7f
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc00076e9c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:351 +0x6e2
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 85 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc000c92b40, 0xdf74178bb, 0xdf74178bb, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc000c92b40, 0xc000a83628, 0x2221e70, 0xc00051ca60, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc000c92b40, 0xc000a83628, 0xc000c967e0, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc000c92b40, 0xc00051ca70, 0x1, 0x1, 0x47d290, 0xc000a836b8, 0xc000a836c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc000518490, 0xc000768c60, 0xc000c96760, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc000518480, 0xc00051ca50, 0x0, 0xc000a83910, 0xc000a838e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc000518480, 0xc0008e4260, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc000518480, 0xc000a83ab8, 0xc000a83aa8, 0x1, 0x1, 0x1ffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc000518480, 0xc0008b0c00, 0xc00077a600, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc00076e9c0, 0x0, 0x0, 0x1f939a0, 0xc0001eb2c0, 0xc000c92300, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc0007669c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 86 [semacquire, 9 minutes]:
sync.runtime_SemacquireMutex(0xc0005185dc, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0005185d8)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc0005185d8)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc0005185c0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc0005185c0, 0xc000095bf0, 0x0, 0x0, 0x0, 0xc000103c80, 0xc000095c08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc0005185c0, 0xc000938000, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc00076ea90)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 87 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc000e0c8a0, 0x8c362af40, 0x8c362af40, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc000e0c8a0, 0xc000a11628, 0x2221e70, 0xc000c905e0, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc000e0c8a0, 0xc000a11628, 0xc00000f220, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc000e0c8a0, 0xc000c905f0, 0x1, 0x1, 0x47d290, 0xc000a116b8, 0xc000a116c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc0005185d0, 0xc000768db0, 0xc00000f060, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc0005185c0, 0xc000c905d0, 0x0, 0xc000a11910, 0xc000a118e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc0005185c0, 0xc000260820, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc0005185c0, 0xc000a11ab8, 0xc000765aa8, 0x1, 0x1, 0xffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc0005185c0, 0xc00055c600, 0x0, 0x7f41d05b2080)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc00076ea90, 0x7f4100000000, 0x1bf08eab64, 0xe9, 0x1bf08eab64, 0x1bf08eab64, 0x18)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000766ba0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 88 [semacquire, 9 minutes]:
sync.runtime_SemacquireMutex(0xc0005186dc, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0005186d8)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc0005186d8)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc0005186c0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc0005186c0, 0xc000c86bf0, 0x0, 0x0, 0x0, 0xc0009a4000, 0xc000c86c08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc0005186c0, 0xc00055e360, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc00076eb60)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 89 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc0006bd3b0, 0xd6da9abd9, 0xd6da9abd9, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc0006bd3b0, 0xc000a0d628, 0x2221e70, 0xc0004c9090, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc0006bd3b0, 0xc000a0d628, 0xc0008ea8a0, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc0006bd3b0, 0xc0004c90a0, 0x1, 0x1, 0x47d290, 0xc000a0d6b8, 0xc000a0d6c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc0005186d0, 0xc000768f00, 0xc0008ea820, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc0005186c0, 0xc0004c9070, 0x0, 0xc000a0d910, 0xc000a0d8e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc0005186c0, 0xc000ba2020, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc0005186c0, 0xc000a0dab8, 0xc000c82aa8, 0x1, 0x1, 0xffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc0005186c0, 0xc000ba4000, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc00076eb60, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000766d80)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 90 [semacquire, 9 minutes]:
sync.runtime_SemacquireMutex(0xc00051881c, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000518818)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc000518818)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc000518800, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc000518800, 0xc000091bf0, 0x0, 0x0, 0x0, 0xc0009a4300, 0xc000091c08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc000518800, 0xc00055e3f0, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc00076ec30)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 91 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc0006bc450, 0xa7d7279a9, 0xa7d7279a9, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc0006bc450, 0xc000bc7628, 0x2221e70, 0xc0004c85c0, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc0006bc450, 0xc000bc7628, 0xc0008ea3a0, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc0006bc450, 0xc0004c85e0, 0x1, 0x1, 0x47d290, 0xc000bc76b8, 0xc000bc76c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc000518810, 0xc000769050, 0xc0008ea260, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc000518800, 0xc0004c85b0, 0x0, 0xc000bc7910, 0xc000bc78e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc000518800, 0xc000ba2100, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc000518800, 0xc000bc7ab8, 0xc000c82aa8, 0x1, 0x1, 0xffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc000518800, 0xc000ba41e0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc00076ec30, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000766f60)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 92 [semacquire, 10 minutes]:
sync.runtime_SemacquireMutex(0xc00051891c, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000518918)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc000518918)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc000518900, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc000518900, 0xc000e06bf0, 0x0, 0x0, 0x0, 0xc0009a4600, 0xc000e06c08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc000518900, 0xc00055e480, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc00076ed00)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 93 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc0006bd110, 0x9b0a87cbf, 0x9b0a87cbf, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc0006bd110, 0xc000bc3628, 0x2221e70, 0xc0004c8bd0, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc0006bd110, 0xc000bc3628, 0xc0008ea680, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc0006bd110, 0xc0004c8be0, 0x1, 0x1, 0x47d290, 0xc000bc36b8, 0xc000bc36c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc000518910, 0xc0007691a0, 0xc0008ea600, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc000518900, 0xc0004c8bc0, 0x0, 0xc000bc3910, 0xc000bc38e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc000518900, 0xc00051c140, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc000518900, 0xc000bc3ab8, 0xc00069caa8, 0x1, 0x1, 0xc0ffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc000518900, 0xc000d100c0, 0x100000000, 0xc000c74000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc00076ed00, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000767140)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 75 [semacquire, 10 minutes]:
sync.runtime_SemacquireMutex(0xc00056035c, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000560358)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc000560358)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc000560340, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc000560340, 0xc00069dbf0, 0x0, 0x0, 0x0, 0xc000683080, 0xc00069dc08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc000560340, 0xc00055e510, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc0006ec9c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 178 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc000e0c450, 0x31d0580d8, 0x31d0580d8, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc000e0c450, 0xc000f2f628, 0x2221e70, 0xc000c90140, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc000e0c450, 0xc000f2f628, 0xc00000ece0, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc000e0c450, 0xc000c90150, 0x1, 0x1, 0x47d290, 0xc000f2f6b8, 0xc000f2f6c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc000560350, 0xc000544540, 0xc00000ea80, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc000560340, 0xc000c90130, 0x0, 0xc000f2f910, 0xc000f2f8e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc000560340, 0xc000f02020, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc000560340, 0xc000f2fab8, 0xc000f10aa8, 0x1, 0x1, 0xc0ffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc000560340, 0xc000f12000, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc0006ec9c0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000f06000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 45 [semacquire, 10 minutes]:
sync.runtime_SemacquireMutex(0xc0005604dc, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0005604d8)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc0005604d8)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc0005604c0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc0005604c0, 0xc00068ebf0, 0x0, 0x0, 0x0, 0xc00057c600, 0xc00068ec08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc0005604c0, 0xc00055e5a0, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc00054e340)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 62 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc000768270, 0xa60ae24db, 0xa60ae24db, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc000768270, 0xc000f2b628, 0x2221e70, 0xc0008e4910, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc000768270, 0xc000f2b628, 0xc000501060, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc000768270, 0xc0008e4920, 0x1, 0x1, 0x47d290, 0xc000f2b6b8, 0xc000f2b6c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc0005604d0, 0xc000544750, 0xc000500ea0, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc0005604c0, 0xc0008e4900, 0x0, 0xc000f2b910, 0xc000f2b8e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc0005604c0, 0xc000261090, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc0005604c0, 0xc000f2bab8, 0xc000762aa8, 0x1, 0x1, 0xc0ffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc0005604c0, 0xc00055ca20, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc00054e340, 0x0, 0x33b3da0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc0008ee360)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 163 [semacquire, 10 minutes]:
sync.runtime_SemacquireMutex(0xc000e2c19c, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000e2c198)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc000e2c198)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc000e2c180, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc000e2c180, 0xc000097bf0, 0x0, 0x0, 0x0, 0xc000d9a300, 0xc000097c08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc000e2c180, 0xc000938090, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc000db4410)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 63 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc000e0ca20, 0xd8608f7cc, 0xd8608f7cc, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc000e0ca20, 0xc0008c1628, 0x2221e70, 0xc000c90760, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc000e0ca20, 0xc0008c1628, 0xc00000f4c0, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc000e0ca20, 0xc000c90770, 0x1, 0x1, 0x47d290, 0xc0008c16b8, 0xc0008c16c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc000e2c190, 0xc000e0c270, 0xc00000f320, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc000e2c180, 0xc000c90750, 0x0, 0xc0008c1910, 0xc0008c18e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc000e2c180, 0xc0003ee220, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc000e2c180, 0xc0008c1ab8, 0xc000c80aa8, 0x1, 0x1, 0xffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc000e2c180, 0xc000ba43c0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc000db4410, 0x0, 0xc0006ba000, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000e080c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 64 [semacquire, 10 minutes]:
sync.runtime_SemacquireMutex(0xc0006ce05c, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0006ce058)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc0006ce058)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc0006ce040, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc0006ce040, 0xc00069cbf0, 0x0, 0x0, 0x0, 0xc000f08480, 0xc00069cc08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc0006ce040, 0xc00055e630, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc0003c41a0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 65 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc00022b920, 0x9524328fb, 0x9524328fb, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc00022b920, 0xc000ac3628, 0x2221e70, 0xc0008e46f0, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc00022b920, 0xc000ac3628, 0xc000500e00, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc00022b920, 0xc0008e4700, 0x1, 0x1, 0x47d290, 0xc000ac36b8, 0xc000ac36c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc0006ce050, 0xc00012c150, 0xc000500d80, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc0006ce040, 0xc0008e46e0, 0x0, 0xc000ac3910, 0xc000ac38e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc0006ce040, 0xc0008e4090, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc0006ce040, 0xc000ac3ab8, 0xc000e04aa8, 0x1, 0x1, 0xffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc0006ce040, 0xc0008b0240, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc0003c41a0, 0xc000000000, 0x4740fa, 0xc000672c00, 0xc000b8c4d8, 0x499ffc, 0x4b51ff)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000e082a0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 210 [semacquire, 10 minutes]:
sync.runtime_SemacquireMutex(0xc0006ce31c, 0x900000000, 0x1)
	/usr/lib/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc0006ce318)
	/usr/lib/go/src/sync/mutex.go:138 +0x1c1
sync.(*Mutex).Lock(0xc0006ce318)
	/usr/lib/go/src/sync/mutex.go:81 +0x7d
cloud.google.com/go/pubsub.(*pullStream).get(0xc0006ce300, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:70 +0x76
cloud.google.com/go/pubsub.(*pullStream).call(0xc0006ce300, 0xc000da8bf0, 0x0, 0x0, 0x0, 0xc000f08780, 0xc000da8c08)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Send(0xc0006ce300, 0xc00055e6c0, 0x6, 0x5)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:155 +0x81
cloud.google.com/go/pubsub.(*messageIterator).pingStream(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:474
cloud.google.com/go/pubsub.(*messageIterator).sender(0xc0003c44e0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:355 +0x68a
created by cloud.google.com/go/pubsub.newMessageIterator
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:110 +0x773

goroutine 211 [select]:
github.com/googleapis/gax-go/v2.Sleep(0x2596ee0, 0xc00022b290, 0x4642e7203, 0x4642e7203, 0xffffffffffffff01)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:55 +0x12c
github.com/googleapis/gax-go/v2.invoke(0x2596ee0, 0xc00022b290, 0xc000abf628, 0x2221e70, 0xc0008e4570, 0x1, 0x1, 0x2222e60, 0xc0008cc580, 0x2)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:95 +0x17b
github.com/googleapis/gax-go/v2.Invoke(0x2596ee0, 0xc00022b290, 0xc000abf628, 0xc000500c80, 0x2, 0x2, 0x1, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/github.com/googleapis/gax-go/v2/invoke.go:48 +0x160
cloud.google.com/go/pubsub/apiv1.(*SubscriberClient).StreamingPull(0xc0008ce390, 0x2596ee0, 0xc00022b290, 0xc0008e4580, 0x1, 0x1, 0x47d290, 0xc000abf6b8, 0xc000abf6c0, 0x43b5c8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/apiv1/subscriber_client.go:525 +0x2e9
cloud.google.com/go/pubsub.newPullStream.func1(0xc0006ce310, 0xc00003d320, 0xc000500c00, 0x1)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:47 +0x1f7
cloud.google.com/go/pubsub.(*pullStream).openWithRetry(0xc0006ce300, 0xc0008e4560, 0x0, 0xc000abf910, 0xc000abf8e8)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:101 +0x199
cloud.google.com/go/pubsub.(*pullStream).get(0xc0006ce300, 0xc000e16140, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:93 +0x20b
cloud.google.com/go/pubsub.(*pullStream).call(0xc0006ce300, 0xc000abfab8, 0xc000c83aa8, 0x1, 0x1, 0xffffffff, 0xc000000000)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:129 +0x1b2
cloud.google.com/go/pubsub.(*pullStream).Recv(0xc0006ce300, 0xc0000b20c0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/pullstream.go:172 +0xc9
cloud.google.com/go/pubsub.(*messageIterator).recvMessages(...)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:254
cloud.google.com/go/pubsub.(*messageIterator).receive(0xc0003c44e0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:196 +0xad9
cloud.google.com/go/pubsub.(*Subscription).Receive.func2(0x0, 0x0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:764 +0x7cc
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000768bd0, 0xc000e08480)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:57 +0x86
created by golang.org/x/sync/errgroup.(*Group).Go
	/home/ian/go/src/github.com/google/knative-gcp/vendor/golang.org/x/sync/errgroup/errgroup.go:54 +0x74

goroutine 212 [semacquire, 10 minutes]:
sync.runtime_Semacquire(0xc00076ea40)
	/usr/lib/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00076ea38)
	/usr/lib/go/src/sync/waitgroup.go:130 +0xd4
cloud.google.com/go/pubsub.(*messageIterator).stop(0xc00076e9c0)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/iterator.go:123 +0x99
cloud.google.com/go/pubsub.(*Subscription).Receive.func3(0x2596e20, 0xc000518400, 0xc0006f2000, 0xa, 0x10, 0xc000530700)
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:808 +0xb2
created by cloud.google.com/go/pubsub.(*Subscription).Receive
	/home/ian/go/src/github.com/google/knative-gcp/vendor/cloud.google.com/go/pubsub/subscription.go:803 +0xc10
exit status 2
FAIL	github.com/google/knative-gcp/pkg/broker/ingress	600.461s

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 18 (10 by maintainers)

Most upvoted comments

I haven’t encountered this outside of testing, however I think this issue would manifest when attempting to cancel with a subscription which is not currently receiving messages since the real service will also not return a StreamingPullResponse with no messages. This is a case which I expect that we could hit with a live service since we use multiple retry topics which may receive messages infrequently. I would not expect that successfully received messages should be cancelled here, only that a blocked StreamingPullRequest can be cancelled.