beam: [Bug]: Python SDK gets stuck when using Unbounded PCollection in streaming mode on GroupByKey after ReadFromKafka on DirectRunner, FlinkRunner and DataflowRunner

What happened?

Consider the trivial example pipeline below:

"""
Reproduce the KafkaIO + Unbounded source + streaming mode bug.
"""

import logging
import os

import apache_beam as beam
from apache_beam.io.external import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window

logging.getLogger().setLevel(logging.DEBUG)

def kafka_consumer_config():
    """
    Returns config for the KafkaIO source.
    """
    return {
            "bootstrap.servers": os.getenv("KAFKA_BROKER_URL"),
            "auto.offset.reset": "latest",
            "security.protocol": "SASL_SSL",
            "sasl.mechanism": "PLAIN",
            "group.id": os.getenv("KAFKA_GROUP_ID"),
            "enable.auto.commit": "true",
            "sasl.jaas.config": f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{os.getenv('KAFKA_SASL_USERNAME')}\" password=\"{os.getenv('KAFKA_SASL_PASSWORD')}\";",
            }

with beam.Pipeline(options=PipelineOptions()) as pipeline:
    _ = (
        pipeline
        | "Read from kafka" >> kafka.ReadFromKafka(
            kafka_consumer_config(),
            [os.getenv("KAFKA_TOPIC")])
        | "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5))
        | "Group by key" >> beam.GroupByKey()
        | "Print" >> beam.Map(lambda t: logging.warning("%s - %s", t[0], t[1]))
    )

When this pipeline is run at least in these 3 environments:

  • DataflowRunner (streaming mode)
  • FlinkRunner (streaming mode, locally, not on cluster, haven’t tested with cluster)
  • DirectRunner (streaming mode)

All of them get stuck on the GroupByKey PTransform. The trigger is never fired apparently, though it is impossible to see it from the logging I get.

When adding max_num_records to the ReadFromKafka step, effectively transforming the source collection into a bounded collection, this works, both in batch and streaming mode, in all of the environments listed above.

Data is timestamped in Kafka using process time, although it is unclear from the documentation whether the KafkaIO adapter in Beam automatically timestamps entries in the source PCollection it generates.

I have also tried timestamping them manually using with_metadata and the msg.timestamp property returned, to no avail.

If I look at the Beam test suite, I see the ReadFromKafka PTransform is only tested without windowing and without grouping in Python. Should this maybe be added?

This impacts all python workloads running on Kafka, and it seems rather surprising that no one else has run into this yet.

Issue Priority

Priority: 3

Issue Component

Component: io-java-kafka

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 31 (23 by maintainers)

Most upvoted comments

tyvm to @damccorm for the idea