dapr: Concurrent streaming requests fail with dapr grpc proxy
In what area(s)?
/area runtime
What version of Dapr?
1.7.0
Expected Behavior
Multiple grpc streaming requests over the dapr grpc proxy should be supported.
Actual Behavior
A single streaming request through dapr works. After adding a 2nd concurrent streaming request the following error is observed:
== APP == <AioRpcError of RPC that terminated with:
== APP == status = StatusCode.CANCELLED
== APP == details = "grpc: the client connection is closing"
== APP == debug_error_string = "{"created":"@1650462614.016521900","description":"Error received from peer ipv6:[::1]:45649","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"grpc: the client connection is closing","grpc_status":1}"
== APP == >
We have also seen that the 2nd streaming call got established, but returned the responses from the other call. This maybe timing related.
Steps to Reproduce the Problem
Take the python hellostreamingworld example from grpc.
async_greeter_server.py
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python AsyncIO implementation of the GRPC hellostreamingworld.MultiGreeter server."""
import asyncio
import logging
import time
import grpc
from hellostreamingworld_pb2 import HelloReply
from hellostreamingworld_pb2 import HelloRequest
from hellostreamingworld_pb2_grpc import MultiGreeterServicer
from hellostreamingworld_pb2_grpc import add_MultiGreeterServicer_to_server
NUMBER_OF_REPLY = 10
class Greeter(MultiGreeterServicer):
async def sayHello(
self, request: HelloRequest,
context: grpc.aio.ServicerContext
) -> HelloReply:
logging.info("Serving sayHello request %s", request)
for i in range(NUMBER_OF_REPLY):
time.sleep(2)
yield HelloReply(message=f"Hello number {i}, {request.name}!")
async def serve() -> None:
server = grpc.aio.server()
add_MultiGreeterServicer_to_server(Greeter(), server)
listen_addr = "[::]:55555"
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())
Also take the corresponding proto files unchanged.
hellostreamingworld_pb2_grpc.py
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import hellostreamingworld_pb2 as hellostreamingworld__pb2
class MultiGreeterStub(object):
"""The greeting service definition.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.sayHello = channel.unary_stream(
'/hellostreamingworld.MultiGreeter/sayHello',
request_serializer=hellostreamingworld__pb2.HelloRequest.SerializeToString,
response_deserializer=hellostreamingworld__pb2.HelloReply.FromString,
)
class MultiGreeterServicer(object):
"""The greeting service definition.
"""
def sayHello(self, request, context):
"""Sends multiple greetings
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_MultiGreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
'sayHello': grpc.unary_stream_rpc_method_handler(
servicer.sayHello,
request_deserializer=hellostreamingworld__pb2.HelloRequest.FromString,
response_serializer=hellostreamingworld__pb2.HelloReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'hellostreamingworld.MultiGreeter', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class MultiGreeter(object):
"""The greeting service definition.
"""
@staticmethod
def sayHello(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/hellostreamingworld.MultiGreeter/sayHello',
hellostreamingworld__pb2.HelloRequest.SerializeToString,
hellostreamingworld__pb2.HelloReply.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
hellostreamingworld_pb2.py
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: hellostreamingworld.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='hellostreamingworld.proto',
package='hellostreamingworld',
syntax='proto3',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n\x19hellostreamingworld.proto\x12\x13hellostreamingworld\"3\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\rnum_greetings\x18\x02 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2b\n\x0cMultiGreeter\x12R\n\x08sayHello\x12!.hellostreamingworld.HelloRequest\x1a\x1f.hellostreamingworld.HelloReply\"\x00\x30\x01\x62\x06proto3'
)
_HELLOREQUEST = _descriptor.Descriptor(
name='HelloRequest',
full_name='hellostreamingworld.HelloRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='hellostreamingworld.HelloRequest.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='num_greetings', full_name='hellostreamingworld.HelloRequest.num_greetings', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=50,
serialized_end=101,
)
_HELLOREPLY = _descriptor.Descriptor(
name='HelloReply',
full_name='hellostreamingworld.HelloReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='message', full_name='hellostreamingworld.HelloReply.message', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=103,
serialized_end=132,
)
DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST
DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), {
'DESCRIPTOR' : _HELLOREQUEST,
'__module__' : 'hellostreamingworld_pb2'
# @@protoc_insertion_point(class_scope:hellostreamingworld.HelloRequest)
})
_sym_db.RegisterMessage(HelloRequest)
HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), {
'DESCRIPTOR' : _HELLOREPLY,
'__module__' : 'hellostreamingworld_pb2'
# @@protoc_insertion_point(class_scope:hellostreamingworld.HelloReply)
})
_sym_db.RegisterMessage(HelloReply)
_MULTIGREETER = _descriptor.ServiceDescriptor(
name='MultiGreeter',
full_name='hellostreamingworld.MultiGreeter',
file=DESCRIPTOR,
index=0,
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_start=134,
serialized_end=232,
methods=[
_descriptor.MethodDescriptor(
name='sayHello',
full_name='hellostreamingworld.MultiGreeter.sayHello',
index=0,
containing_service=None,
input_type=_HELLOREQUEST,
output_type=_HELLOREPLY,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_MULTIGREETER)
DESCRIPTOR.services_by_name['MultiGreeter'] = _MULTIGREETER
# @@protoc_insertion_point(module_scope)
Then run the async_greeter_server with dapr run:
dapr run \
--app-id greeterservice \
--app-protocol grpc \
--app-port 55555 \
--dapr-grpc-port 52001 \
python async_greeter_server.py
Take the async_greeter_client.py example and modify it, so that two concurrent requests are executed on separate asyncio tasks against the server through the dapr grpc proxy:
async_greeter_client.py
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python AsyncIO implementation of the GRPC hellostreamingworld.MultiGreeter client."""
import asyncio
import logging
import grpc
import hellostreamingworld_pb2
import hellostreamingworld_pb2_grpc
import os
async def subs2(mess, stub, metadata):
# Direct read from the stub
hello_stream = stub.sayHello(
hellostreamingworld_pb2.HelloRequest(name=mess), metadata=metadata)
while True:
response = await hello_stream.read()
if response == grpc.aio.EOF:
break
print("Greeter client received from direct read: " + response.message, flush=True)
async def subs(request, stub, metadata):
async for response in stub.sayHello(request, metadata=metadata):
print("Greeter client received from async generator: " +
response.message, flush=True)
async def run() -> None:
try:
port = int(str(os.getenv("DAPR_GRPC_PORT")))
if port is not None:
address = f"localhost:{port}" #NOT WORKING change port to 55555
channel = grpc.aio.insecure_channel(address)
metadata = (('dapr-app-id', 'greeterservice'),)
stub = hellostreamingworld_pb2_grpc.MultiGreeterStub(channel)
# Read from an async generator
await asyncio.gather(
asyncio.create_task(subs(hellostreamingworld_pb2.HelloRequest(name="Request1"), stub, metadata)),
asyncio.create_task(subs(hellostreamingworld_pb2.HelloRequest(name="Request2"), stub, metadata)),
)
except Exception as ex:
print(ex)
if __name__ == "__main__":
logging.basicConfig()
asyncio.run(run())
Run the async_greeter_client.py with dapr run:
dapr run \
--app-id clientapp \
--app-protocol grpc \
python3 async_greeter_client.py
Observations:
- Either
Request1orRequest2arrives at the server, but not both - Client throws the following error:
== APP == <AioRpcError of RPC that terminated with:
== APP == status = StatusCode.CANCELLED
== APP == details = "grpc: the client connection is closing"
== APP == debug_error_string = "{"created":"@1650464025.032784900","description":"Error received from peer ipv6:[::1]:43623","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"grpc: the client connection is closing","grpc_status":1}"
== APP == >
- If changing the grpc port in the client to directly call the grpc server port (55555) without dapr, both requests are executed successfully concurrently with the following output:
== APP == Greeter client received from async generator: Hello number 0, Request1!
== APP == Greeter client received from async generator: Hello number 0, Request2!
== APP == Greeter client received from async generator: Hello number 1, Request1!
== APP == Greeter client received from async generator: Hello number 1, Request2!
== APP == Greeter client received from async generator: Hello number 2, Request1!
== APP == Greeter client received from async generator: Hello number 2, Request2!
== APP == Greeter client received from async generator: Hello number 3, Request1!
== APP == Greeter client received from async generator: Hello number 3, Request2!
== APP == Greeter client received from async generator: Hello number 4, Request1!
== APP == Greeter client received from async generator: Hello number 4, Request2!
== APP == Greeter client received from async generator: Hello number 5, Request1!
== APP == Greeter client received from async generator: Hello number 5, Request2!
== APP == Greeter client received from async generator: Hello number 6, Request1!
== APP == Greeter client received from async generator: Hello number 6, Request2!
== APP == Greeter client received from async generator: Hello number 7, Request1!
== APP == Greeter client received from async generator: Hello number 7, Request2!
== APP == Greeter client received from async generator: Hello number 8, Request1!
== APP == Greeter client received from async generator: Hello number 8, Request2!
== APP == Greeter client received from async generator: Hello number 9, Request1!
== APP == Greeter client received from async generator: Hello number 9, Request2!
Release Note
RELEASE NOTE:
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 20 (12 by maintainers)
Commits related to this issue
- Fix resiliency logic in gRPC proxying In the case of gRPC proxying, we need to record all the calls that have occurred and replay them if we need to perform retries. The array of calls was being shar... — committed to halspang/dapr by halspang 2 years ago
- Fix resiliency logic in gRPC proxying In the case of gRPC proxying, we need to record all the calls that have occurred and replay them if we need to perform retries. The array of calls was being shar... — committed to halspang/dapr by halspang 2 years ago
- Fix resiliency logic in gRPC proxying (#4829) In the case of gRPC proxying, we need to record all the calls that have occurred and replay them if we need to perform retries. The array of calls was ... — committed to dapr/dapr by halspang 2 years ago
- Fix resiliency logic in gRPC proxying (#4829) In the case of gRPC proxying, we need to record all the calls that have occurred and replay them if we need to perform retries. The array of calls was ... — committed to berndverst/dapr by halspang 2 years ago
- Fix resiliency logic in gRPC proxying (#4829) In the case of gRPC proxying, we need to record all the calls that have occurred and replay them if we need to perform retries. The array of calls was be... — committed to berndverst/dapr by halspang 2 years ago
- Fix resiliency logic in gRPC proxying (#4829) In the case of gRPC proxying, we need to record all the calls that have occurred and replay them if we need to perform retries. The array of calls was be... — committed to addjuarez/dapr by halspang 2 years ago
- Updating Feature Branch with Master (#5001) * Contexts and timeouts in appmanager (#4774) * Contexts and timeouts in appmanager Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.git... — committed to dapr/dapr by RyanLettieri 2 years ago
- Fix unregister reminder concurrency issue (#4842) * Added timeout to "Wait for ACR deployment" (#4810) Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: ... — committed to dapr/dapr by addjuarez 2 years ago
@yaron2
It is hard to check whether the connection is unused. Although timeout is set to the connection by
context.Timeout, this is independent to connections state (onlyClientConn#Close()could change the state).I just concerned plan 3 is a little hard to read, not techinical reasons.
I will try plan 3 and make a fix PR.
@hanxinimm the fix for this issue will go into 1.8. It wasn’t released in 1.7.4.
The errors you are showing are fine as Dapr runs without mTLS in self-hosted mode by default.
@Syuparn Plan 3 looks ok with me, any concerns with it?
If we have a timeout on the connection, will that not be enough to clean up unused connections?