google-cloud-python: Can't use/pickle datastore Clients on google dataflow (apache beam)

I am attempting to use datastore.Client() from within a google cloud dataflow (apache beam) pipeline.

It attempts to pickle objects being passed around (lexically or arguments) to processing stages, but unfortunately the Client is not pickleable:

  File "lib/apache_beam/transforms/ptransform.py", line 474, in __init__
    self.args = pickler.loads(pickler.dumps(self.args))
  File "lib/apache_beam/internal/pickler.py", line 212, in loads
    return dill.loads(s)
  File "/Users/me/Library/Python/2.7/lib/python/site-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/Users/me/Library/Python/2.7/lib/python/site-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/local/Cellar/python/2.7.12_1/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/local/Cellar/python/2.7.12_1/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1089, in load_newobj
    obj = cls.__new__(cls, *args)
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 35, in grpc._cython.cygrpc.Channel.__cinit__ (src/python/grpcio/grpc/_cython/cygrpc.c:4022)
TypeError: __cinit__() takes at least 2 positional arguments (0 given)

I believe the correct fix is to discard the Connection when serializing, and rebuild it when deserialized.

I could attempt to recreate the Client within each processing pipeline, but that can cause O(Records) Client creations…and since in my testing I see:

DEBUG:google_auth_httplib2:Making request: POST https://accounts.google.com/o/oauth2/token

printed on each creation, then I imagine Google SRE would really prefer we not do this O(N) times.

This is a tricky cross-team interaction issue (only occurs for those pickling Clients, in my case: google-cloud-datastore and apache-beam google-dataflow), so not sure the proper place to file this. I’ve cross-posted it to the apache beam JIRA as well https://issues.apache.org/jira/browse/BEAM-1788, though the issue is in the google cloud datastore code.

Mac 10.12.3, Python 2.7.12, google-cloud-dataflow 0.23.0

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 29 (13 by maintainers)

Most upvoted comments

@mikelambert The correct and idiomatic solution is to build client creation into your DoFn:

class MyDoFn(beam.DoFn):

  def start_bundle(self, process_context):
     self._dsclient = datastore.Client()

  def process(self, context, *args, **kwargs):
     # do stuff with self._dsclient

No change is needed here from the client library team. This this is a very standard pattern in beam. It will not result in an O(records) cost, only an O(shards) cost, which the beam runner will likely factor in when deciding how large to make bundles.

I do however have a proposal to allow creation of beam.DoFns from python generators:see: https://github.com/elibixby/incubator-beam/pull/1 which gets you a little bit better of a syntax maybe.

Still the team is very justified in not reopening.

EDIT: Also @mikelambert RE serialization, Beam allows you to write your own “coders” to control serialization method. This would still be a better solution than changing the client to be pickleable, as you could serialize ONLY the information that was unique to your clients, rather than the entire client.

Hi, I just wanted to post here because this answer was hugely useful for me as well, and I did not find this in any of the regular apache beam tutos. In fact, I first saw the “start_bundle” method mentioned here, and then had to look into the source code to see how it works. Since it is seems to be a quite important method, it would maybe be nice to have it mentioned more explicitly in the doc.

In case somebody ends up here with a problem similar to mine, I believe this happens when using multiple cpu-cores in threading. For example, using multiprocessing.ThreadPool works but multiprocessing.Pool throws this error (in my case, for BigQuery).

So make sure it’s a single core execution? (I’m shooting in the blind here, but using Pool caused this exception for me and ThreadPool resolved it.)