beam: [Bug]: KinesisIO processing-time watermarking can cause data loss
What happened?
Beam 2.46.0 Java 11 Flink 1.15
According to the doc FlinkRunner supports options parallelism
and maxParallelism
. When parallelism
is reduced between restarts with a savepoint, some bundles are never dispatched.
This also might be an issue related to window or sink de-serialization when the app is re-started from serialized state.
How to reproduce:
- Start a Beam-Flink streaming job with
parallelism=3
, andmaxParallelism
not set. The job in the example had a window and file sink. - Stop the job with a savepoint
- Re-start the job from the savepoint
If the job is then re-started again from that savepoint with original parallelism=3
, the bundles are dispatched and pass through.
Setting maxParallelism=3
and keeping it across app restarts does not change this behaviour. This behaviour does not occur when parallelism
is increased, e.g. setting parallelism
from 1
to 3
between savepoint-ed restarts makes all the data to sink.
This is a simplified job code (AWS Kinesis source, 3 shards):
PCollection<KinesisRecord> windowedRecords = p.apply("Source", reader)
.apply("Fixed windows", Window.<KinesisRecord>into(FixedWindows.of(Duration.standardSeconds(60))))
...
.apply(
"Sink to S3",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(ConsumedEvent.SCHEMA$)
.withCompressionCodec(CompressionCodecName.SNAPPY))
.to(opts.getSinkLocation()))
Toy project: https://github.com/psolomin/beam-playground/tree/parallelism-issue/kinesis-io-with-enhanced-fan-out#vanilla-flink
Full code snippet class
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
About this issue
- Original URL
- State: open
- Created a year ago
- Comments: 17 (17 by maintainers)
@psolomin The mailing list thread is not conclusive, but generally I think we can try to unify the behavior of KafkaIO and KinesisIO, that would be option 2). Would you like to work on the change?
Tried. And it still worked without issues - reducing Flink parallelism from 3 to 2, with 3 partitions in the source topic, did not cause records loss. This is how I run it:
My toy project: https://github.com/psolomin/beam-playground/tree/25975/kafka-consumer
Increasing this to P1 as there’s a risk of data loss.