grpc: python grpc server with multiprocessing fails

What version of gRPC and what language are you using?

grpc 1.13.0 with python 3.6.5

What operating system (Linux, Windows, …) and version?

CentOS 7 (Linux 3.10.0-862.3.2.el7.x86_64)

What runtime / compiler are you using (e.g. python version or version of gcc)

CPython 3.6.5

What did you do?

server = grpc.server(futures.ProcessPoolExecutor(max_workers=4))
service_pb2_grpc.add_FindPortServicer_to_server(FindPort(), server)
server.add_insecure_port('[::]:' + port)
server.start()

What did you expect to see?

Based on doc https://github.com/grpc/grpc/blob/master/doc/fork_support.md gRPC server should run on 4 processes

What did you see instead?

Server crash

8156 tcp_server_posix.cc:210]    Failed accept4: Invalid argument
Traceback (most recent call last):
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib64/python3.6/multiprocessing/reduction.py", line 52, in dumps
    cls(buf, protocol).dump(obj)
  File "stringsource", line 2, in grpc._cython.cygrpc.RequestCallEvent.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

Make sure you include information that can help us debug (full error message, exception listing, stack trace, logs).

Anything else we should know about your project / environment?

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Reactions: 5
  • Comments: 27 (11 by maintainers)

Most upvoted comments

The following two approaches should help with combining gRPC Python servers and the fork() syscall (e.g., using Python’s multiprocessing library). The main constraint that must be satisified is that your process must only invoke the fork() syscall before you create your gRPC server(s).

Option 1:

If your environment supports the SO_REUSEPORT socket option, you can run multiple copies of your gRPC server in individual processes started via multiprocessing.Process, and all of the servers can listen on the same port. SO_REUSEPORT is set by default if you build gRPC from source on Linux, but it is not available in the manylinux spec used by the binaries we distribute on pip, so depending on your environment you may have to resort to option 2.

This type of pre-fork + SO_REUSEPORT would look something like the following:

def startGrpcServer():
    server = grpc.server()
    server.add_insecure_port('[::]:50051')
    server.start()

for i in range(5):
  p = multiprocessing.Process(target=startGrpcServer)
  p.start()

Option 2:

Run a single gRPC Python server, but offload all CPU-intensive work to a multiprocessing.Pool started before you create the gRPC server.

This would look like the following:

pool = multiprocessing.Pool(processes=4)

# The implementation of your RPC method. This will run in the main process, 
# but the work will be done asynchronously by the pool of pre-forked workers.
def rpcMethod(self, request, context):
  result = pool.apply_async(someExpensiveFunction(request)
  return result.get(timeout=1)

server = grpc.server()
server.add_insecure_port('[::]:50051')
server.start()

It’s worth noting that our manylinux wheels do not support SO_REUSEPORT, so for the moment, if you want to take advantage of this feature on Linux, you’ll need to compile from source. You can do this by installing with pip install gprcio --no-binary grpcio.

Based on more testing, I’m now convinced that pre-forking via subprocess pools isn’t a viable method for handling this (too many failure scenarios to deal with). Here’s a summary of techniques that can work with their pitfalls:

Pools (including multiprocessing.Pool, billiard.Pool): Setup pool before gRPC server is started (pre-fork) Must ensure that no subprocesses crash or stop, otherwise pool becomes broken Must ensure that no subprocesses are restarted, otherwise main thread seg faults

Processes w/ SO_REUSEPORT: Setup processes, each process runs its own gRPC server bound to the same port w/ SO_REUSEPORT (pre-fork) SO_REUSEPORT uses IP/port hashing for load balancing and will lead to uneven task distribution if a single client sends many requests SO_REUSEPORT may lead to uneven task distribution when used with a frontend proxy/load balancer (e.g. Envoy) (untested) RPC queue not shared across processes

Processes w/o SO_REUSEPORT: Setup processes, each process runs its own gRPC server bound to a unique port (pre-fork) Requires a local load balancer to distribute tasks RPC queue not shared across processes

I’d like to share this as a proposed workaround for this problem and seek feedback on this technique. Here’s a proof-of-concept for how I’ve got this working (warning, some pseudo-code is present here):

from concurrent import futures
from concurrent.futures import TimeoutError

import grpc
import pebble

def make_widget(i):
    return i * 2

class WidgetServicer(widget_pb2_grpc.WidgetServicer):
    def __init__(self, pool):
        self.pool = pool

    def WidgetRequest(self, request, context):
        response = widget_pb2.Response()
        future = self.pool.schedule(make_widget, args=[request.Value], timeout=context.time_remaining())

        try:
            response.Widget = future.result()
        except TimeoutError:
            context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, 'Timeout')  # this handles process timeout
        except pebble.common.ProcessExpired:
            context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, 'Abnormal termination')  # this handles process crashes        

        return response

def main():
    pool = pebble.ProcessPool(max_workers=processes, max_tasks=tasks)
    executor = futures.ThreadPoolExecutor(max_workers=processes)
    server = grpc.server(executor)
    servicer = WidgetServicer(pool)
    <start your server and block forever>

Here’s how I think this is working: when you set both the ProcessPool and ThreadPoolExecutor to the same count, it locks the gRPC servicer into only feeding requests to the number child processes that are available. If you were to increase the ThreadPoolExecutor count, then requests would queue in the ProcessPool creating a backlog of work that may never be delivered upon; keeping this count the same, the gRPC servicer will maintain control over the life of the request and never queue them in the ProcessPool (if the count of requests exhausts the number of available processes, then eventually the client would get a timeout).

There’s also some error checking for timeouts and process crashes in the ProcessPool.

[PYTHON] @ericgribkoff , I have ran into a use case where i am using a gRPC for a server side CPU bound task (and need to use python multi processing as there isn’t an option in pure Python). Can you please suggest a solution here as i see no support for server side for fork ?

I am facing grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>

@evanj How did you deal with https://github.com/grpc/grpc/issues/15334#issuecomment-397048855 ?