beam: [Bug]: Python SDK gets stuck when using Unbounded PCollection in streaming mode on GroupByKey after ReadFromKafka on DirectRunner, FlinkRunner and DataflowRunner
What happened?
Consider the trivial example pipeline below:
"""
Reproduce the KafkaIO + Unbounded source + streaming mode bug.
"""
import logging
import os
import apache_beam as beam
from apache_beam.io.external import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window
logging.getLogger().setLevel(logging.DEBUG)
def kafka_consumer_config():
"""
Returns config for the KafkaIO source.
"""
return {
"bootstrap.servers": os.getenv("KAFKA_BROKER_URL"),
"auto.offset.reset": "latest",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"group.id": os.getenv("KAFKA_GROUP_ID"),
"enable.auto.commit": "true",
"sasl.jaas.config": f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{os.getenv('KAFKA_SASL_USERNAME')}\" password=\"{os.getenv('KAFKA_SASL_PASSWORD')}\";",
}
with beam.Pipeline(options=PipelineOptions()) as pipeline:
_ = (
pipeline
| "Read from kafka" >> kafka.ReadFromKafka(
kafka_consumer_config(),
[os.getenv("KAFKA_TOPIC")])
| "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5))
| "Group by key" >> beam.GroupByKey()
| "Print" >> beam.Map(lambda t: logging.warning("%s - %s", t[0], t[1]))
)
When this pipeline is run at least in these 3 environments:
DataflowRunner
(streaming mode)FlinkRunner
(streaming mode, locally, not on cluster, haven’t tested with cluster)DirectRunner
(streaming mode)
All of them get stuck on the GroupByKey
PTransform. The trigger is never fired apparently, though it is impossible to see it from the logging I get.
When adding max_num_records
to the ReadFromKafka
step, effectively transforming the source collection into a bounded collection, this works, both in batch and streaming mode, in all of the environments listed above.
Data is timestamped in Kafka using process time, although it is unclear from the documentation whether the KafkaIO adapter in Beam automatically timestamps entries in the source PCollection it generates.
I have also tried timestamping them manually using with_metadata
and the msg.timestamp
property returned, to no avail.
If I look at the Beam test suite, I see the ReadFromKafka
PTransform is only tested without windowing and without grouping in Python. Should this maybe be added?
This impacts all python workloads running on Kafka, and it seems rather surprising that no one else has run into this yet.
Issue Priority
Priority: 3
Issue Component
Component: io-java-kafka
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 31 (23 by maintainers)
tyvm to @damccorm for the idea