beam: [Bug]: [Python SDK] Memory leak in 2.47.0 - 2.51.0 SDKs.

What happened?

We have identified a memory leak that affects Beam Python SDK versions 2.47.0 and above. The leak was triggered by an upgrade to protobuf==4.x.x. We rootcaused this leak to https://github.com/protocolbuffers/protobuf/issues/14571 and it has been remediated in Beam 2.52.0.

[update: 2023-12-19]: Due to another issue related to protobuf upgrade, Python streaming users should continue to apply the mitigation steps below with Beam 2.52.0 or switch to Beam 2.53.0 once available.

Mitigation

Until Beam 2.52.0 is released, consider any of the following workarounds:

  • Use apache-beam==2.46.0 or below.

  • Install protobuf 3.x in the submission and runtime environment. For example, you can use a --requirements_file pipeline option with a file that includes:

    protobuf==3.20.3
    grpcio-status==1.48.2
    

    For more information, see: https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

  • Use a python implementation of protobuf by setting a PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python environment variable in the runtime environment. This might degrade the performance since python implementation is less efficient. For example, you could create a custom Beam SDK container from a Dockerfile that looks like the following:

    FROM apache/beam_python3.10_sdk:2.47.0
    ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
    

    For more information, see: https://beam.apache.org/documentation/runtime/environments/

  • Install protobuf==4.25.0 or newer in the submission and runtime environment.

Users of Beam 2.50.0 SDK should additionally follow mitigation options for https://github.com/apache/beam/issues/28318.

Additional details

The leak can be reproduced by a pipeline:

  with beam.Pipeline(options=pipeline_options) as p:
    # duplicate reads to increase throughput
    inputs = []
    for i in range(32):
      inputs.append(
          p | f"Read pubsub{i}" >> ReadFromPubSub(topic='projects/pubsub-public-data/topics/taxirides-realtime', with_attributes=True)
      )

    inputs | beam.Flatten()

Dataflow pipeline options for the above pipeline: --max_num_workers=1 --autoscaling_algorithm=NONE --worker_machine_type=n2-standard-32

The leak was triggered by Beam switching default protobuf package version from 3.19.x to 4.22.x in https://github.com/apache/beam/pull/24599. The new versions of protobuf also switched the default protobuf implemetation to a upb implementation. The upb implementation had two known leaks that have since been mitigated by protobuf team in: https://github.com/protocolbuffers/protobuf/issues/10088, https://github.com/protocolbuffers/upb/issues/1243 . The latest available protobuf==4.24.4 does not yet have the fix, but we have confirmed that using a patched version built in https://github.com/protocolbuffers/upb/actions/runs/6028136812 fixes the leak.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

About this issue

  • Original URL
  • State: closed
  • Created 10 months ago
  • Comments: 23 (15 by maintainers)

Most upvoted comments

hey @tvalentyn - any luck fixing the mem issue in 2.51.0?

Remaining work: updgrade protobuf lower bound once their fixes are released.

i meant the leak might carry over to 2.51.0 unless i find a fix before the release and CP it.

However, apache beam 2.50.0 depends on protobuf (>=3.20.3,<4.24.0). Is this comment meant to address that?

you should be able to force-install and use the newer version of protobuf without adverse effects in this case, even though it doens’t fit the restriction.

Beam community produces a release roughly every 6 weeks.

re comment: I was hoping to have a restriction protobuf>=4.24.3, but it is a bit more involved.