dapr: Concurrent streaming requests fail with dapr grpc proxy

In what area(s)?

/area runtime

What version of Dapr?

1.7.0

Expected Behavior

Multiple grpc streaming requests over the dapr grpc proxy should be supported.

Actual Behavior

A single streaming request through dapr works. After adding a 2nd concurrent streaming request the following error is observed:

== APP == <AioRpcError of RPC that terminated with:
== APP ==       status = StatusCode.CANCELLED
== APP ==       details = "grpc: the client connection is closing"
== APP ==       debug_error_string = "{"created":"@1650462614.016521900","description":"Error received from peer ipv6:[::1]:45649","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"grpc: the client connection is closing","grpc_status":1}"
== APP == >

We have also seen that the 2nd streaming call got established, but returned the responses from the other call. This maybe timing related.

Steps to Reproduce the Problem

Take the python hellostreamingworld example from grpc.

async_greeter_server.py

# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The Python AsyncIO implementation of the GRPC hellostreamingworld.MultiGreeter server."""

import asyncio
import logging
import time

import grpc
from hellostreamingworld_pb2 import HelloReply
from hellostreamingworld_pb2 import HelloRequest
from hellostreamingworld_pb2_grpc import MultiGreeterServicer
from hellostreamingworld_pb2_grpc import add_MultiGreeterServicer_to_server

NUMBER_OF_REPLY = 10


class Greeter(MultiGreeterServicer):

    async def sayHello(
        self, request: HelloRequest,
        context: grpc.aio.ServicerContext
    ) -> HelloReply:
        logging.info("Serving sayHello request %s", request)
        for i in range(NUMBER_OF_REPLY):
            time.sleep(2)
            yield HelloReply(message=f"Hello number {i}, {request.name}!")


async def serve() -> None:
    server = grpc.aio.server()
    add_MultiGreeterServicer_to_server(Greeter(), server)
    listen_addr = "[::]:55555"
    server.add_insecure_port(listen_addr)
    logging.info("Starting server on %s", listen_addr)
    await server.start()
    await server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(serve())

Also take the corresponding proto files unchanged.

hellostreamingworld_pb2_grpc.py

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import hellostreamingworld_pb2 as hellostreamingworld__pb2


class MultiGreeterStub(object):
    """The greeting service definition.
    """

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A grpc.Channel.
        """
        self.sayHello = channel.unary_stream(
                '/hellostreamingworld.MultiGreeter/sayHello',
                request_serializer=hellostreamingworld__pb2.HelloRequest.SerializeToString,
                response_deserializer=hellostreamingworld__pb2.HelloReply.FromString,
                )


class MultiGreeterServicer(object):
    """The greeting service definition.
    """

    def sayHello(self, request, context):
        """Sends multiple greetings
        """
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')


def add_MultiGreeterServicer_to_server(servicer, server):
    rpc_method_handlers = {
            'sayHello': grpc.unary_stream_rpc_method_handler(
                    servicer.sayHello,
                    request_deserializer=hellostreamingworld__pb2.HelloRequest.FromString,
                    response_serializer=hellostreamingworld__pb2.HelloReply.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'hellostreamingworld.MultiGreeter', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))


 # This class is part of an EXPERIMENTAL API.
class MultiGreeter(object):
    """The greeting service definition.
    """

    @staticmethod
    def sayHello(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_stream(request, target, '/hellostreamingworld.MultiGreeter/sayHello',
            hellostreamingworld__pb2.HelloRequest.SerializeToString,
            hellostreamingworld__pb2.HelloReply.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

hellostreamingworld_pb2.py

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: hellostreamingworld.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor.FileDescriptor(
  name='hellostreamingworld.proto',
  package='hellostreamingworld',
  syntax='proto3',
  serialized_options=None,
  create_key=_descriptor._internal_create_key,
  serialized_pb=b'\n\x19hellostreamingworld.proto\x12\x13hellostreamingworld\"3\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\rnum_greetings\x18\x02 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2b\n\x0cMultiGreeter\x12R\n\x08sayHello\x12!.hellostreamingworld.HelloRequest\x1a\x1f.hellostreamingworld.HelloReply\"\x00\x30\x01\x62\x06proto3'
)




_HELLOREQUEST = _descriptor.Descriptor(
  name='HelloRequest',
  full_name='hellostreamingworld.HelloRequest',
  filename=None,
  file=DESCRIPTOR,
  containing_type=None,
  create_key=_descriptor._internal_create_key,
  fields=[
    _descriptor.FieldDescriptor(
      name='name', full_name='hellostreamingworld.HelloRequest.name', index=0,
      number=1, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=b"".decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
    _descriptor.FieldDescriptor(
      name='num_greetings', full_name='hellostreamingworld.HelloRequest.num_greetings', index=1,
      number=2, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=b"".decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
  ],
  extensions=[
  ],
  nested_types=[],
  enum_types=[
  ],
  serialized_options=None,
  is_extendable=False,
  syntax='proto3',
  extension_ranges=[],
  oneofs=[
  ],
  serialized_start=50,
  serialized_end=101,
)


_HELLOREPLY = _descriptor.Descriptor(
  name='HelloReply',
  full_name='hellostreamingworld.HelloReply',
  filename=None,
  file=DESCRIPTOR,
  containing_type=None,
  create_key=_descriptor._internal_create_key,
  fields=[
    _descriptor.FieldDescriptor(
      name='message', full_name='hellostreamingworld.HelloReply.message', index=0,
      number=1, type=9, cpp_type=9, label=1,
      has_default_value=False, default_value=b"".decode('utf-8'),
      message_type=None, enum_type=None, containing_type=None,
      is_extension=False, extension_scope=None,
      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
  ],
  extensions=[
  ],
  nested_types=[],
  enum_types=[
  ],
  serialized_options=None,
  is_extendable=False,
  syntax='proto3',
  extension_ranges=[],
  oneofs=[
  ],
  serialized_start=103,
  serialized_end=132,
)

DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST
DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)

HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), {
  'DESCRIPTOR' : _HELLOREQUEST,
  '__module__' : 'hellostreamingworld_pb2'
  # @@protoc_insertion_point(class_scope:hellostreamingworld.HelloRequest)
  })
_sym_db.RegisterMessage(HelloRequest)

HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), {
  'DESCRIPTOR' : _HELLOREPLY,
  '__module__' : 'hellostreamingworld_pb2'
  # @@protoc_insertion_point(class_scope:hellostreamingworld.HelloReply)
  })
_sym_db.RegisterMessage(HelloReply)



_MULTIGREETER = _descriptor.ServiceDescriptor(
  name='MultiGreeter',
  full_name='hellostreamingworld.MultiGreeter',
  file=DESCRIPTOR,
  index=0,
  serialized_options=None,
  create_key=_descriptor._internal_create_key,
  serialized_start=134,
  serialized_end=232,
  methods=[
  _descriptor.MethodDescriptor(
    name='sayHello',
    full_name='hellostreamingworld.MultiGreeter.sayHello',
    index=0,
    containing_service=None,
    input_type=_HELLOREQUEST,
    output_type=_HELLOREPLY,
    serialized_options=None,
    create_key=_descriptor._internal_create_key,
  ),
])
_sym_db.RegisterServiceDescriptor(_MULTIGREETER)

DESCRIPTOR.services_by_name['MultiGreeter'] = _MULTIGREETER

# @@protoc_insertion_point(module_scope)

Then run the async_greeter_server with dapr run:

dapr run \
  --app-id greeterservice \
  --app-protocol grpc \
  --app-port 55555 \
  --dapr-grpc-port 52001 \
python async_greeter_server.py

Take the async_greeter_client.py example and modify it, so that two concurrent requests are executed on separate asyncio tasks against the server through the dapr grpc proxy:

async_greeter_client.py

# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The Python AsyncIO implementation of the GRPC hellostreamingworld.MultiGreeter client."""

import asyncio
import logging

import grpc
import hellostreamingworld_pb2
import hellostreamingworld_pb2_grpc
import os


async def subs2(mess, stub, metadata):
    # Direct read from the stub
    hello_stream = stub.sayHello(
        hellostreamingworld_pb2.HelloRequest(name=mess), metadata=metadata)
    while True:
        response = await hello_stream.read()
        if response == grpc.aio.EOF:
            break
        print("Greeter client received from direct read: " + response.message, flush=True)


async def subs(request, stub, metadata):
    async for response in stub.sayHello(request, metadata=metadata):
        print("Greeter client received from async generator: " +
              response.message, flush=True)


async def run() -> None:
    try:
        port = int(str(os.getenv("DAPR_GRPC_PORT")))
        if port is not None:
            address = f"localhost:{port}" #NOT WORKING change port to 55555
            channel = grpc.aio.insecure_channel(address)
        metadata = (('dapr-app-id', 'greeterservice'),)
        stub = hellostreamingworld_pb2_grpc.MultiGreeterStub(channel)

        # Read from an async generator
        await asyncio.gather(
            asyncio.create_task(subs(hellostreamingworld_pb2.HelloRequest(name="Request1"), stub, metadata)),
            asyncio.create_task(subs(hellostreamingworld_pb2.HelloRequest(name="Request2"), stub, metadata)),
        )

    except Exception as ex:
        print(ex)

if __name__ == "__main__":
    logging.basicConfig()
    asyncio.run(run())

Run the async_greeter_client.py with dapr run:

dapr run \
  --app-id clientapp \
  --app-protocol grpc \
python3 async_greeter_client.py

Observations:

  • Either Request1 or Request2 arrives at the server, but not both
  • Client throws the following error:
== APP == <AioRpcError of RPC that terminated with:
== APP ==       status = StatusCode.CANCELLED
== APP ==       details = "grpc: the client connection is closing"
== APP ==       debug_error_string = "{"created":"@1650464025.032784900","description":"Error received from peer ipv6:[::1]:43623","file":"src/core/lib/surface/call.cc","file_line":903,"grpc_message":"grpc: the client connection is closing","grpc_status":1}"
== APP == >
  • If changing the grpc port in the client to directly call the grpc server port (55555) without dapr, both requests are executed successfully concurrently with the following output:
== APP == Greeter client received from async generator: Hello number 0, Request1!
== APP == Greeter client received from async generator: Hello number 0, Request2!
== APP == Greeter client received from async generator: Hello number 1, Request1!
== APP == Greeter client received from async generator: Hello number 1, Request2!
== APP == Greeter client received from async generator: Hello number 2, Request1!
== APP == Greeter client received from async generator: Hello number 2, Request2!
== APP == Greeter client received from async generator: Hello number 3, Request1!
== APP == Greeter client received from async generator: Hello number 3, Request2!
== APP == Greeter client received from async generator: Hello number 4, Request1!
== APP == Greeter client received from async generator: Hello number 4, Request2!
== APP == Greeter client received from async generator: Hello number 5, Request1!
== APP == Greeter client received from async generator: Hello number 5, Request2!
== APP == Greeter client received from async generator: Hello number 6, Request1!
== APP == Greeter client received from async generator: Hello number 6, Request2!
== APP == Greeter client received from async generator: Hello number 7, Request1!
== APP == Greeter client received from async generator: Hello number 7, Request2!
== APP == Greeter client received from async generator: Hello number 8, Request1!
== APP == Greeter client received from async generator: Hello number 8, Request2!
== APP == Greeter client received from async generator: Hello number 9, Request1!
== APP == Greeter client received from async generator: Hello number 9, Request2!

Release Note

RELEASE NOTE:

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 20 (12 by maintainers)

Commits related to this issue

Most upvoted comments

@yaron2

If we have a timeout on the connection, will that not be enough to clean up unused connections?

It is hard to check whether the connection is unused. Although timeout is set to the connection by context.Timeout, this is independent to connections state (only ClientConn#Close() could change the state).

Plan 3 looks ok with me, any concerns with it?

I just concerned plan 3 is a little hard to read, not techinical reasons.

I will try plan 3 and make a fix PR.

@hanxinimm the fix for this issue will go into 1.8. It wasn’t released in 1.7.4.

The errors you are showing are fine as Dapr runs without mTLS in self-hosted mode by default.

@Syuparn Plan 3 looks ok with me, any concerns with it?

plan 1. keep an old connection alive even after connectionPool replaces it with the new one. This will cause a goroutine leak.

If we have a timeout on the connection, will that not be enough to clean up unused connections?