beam: [Bug]: KinesisIO processing-time watermarking can cause data loss

What happened?

Beam 2.46.0 Java 11 Flink 1.15

According to the doc FlinkRunner supports options parallelism and maxParallelism. When parallelism is reduced between restarts with a savepoint, some bundles are never dispatched.

This also might be an issue related to window or sink de-serialization when the app is re-started from serialized state.

How to reproduce:

  1. Start a Beam-Flink streaming job with parallelism=3, and maxParallelism not set. The job in the example had a window and file sink.
  2. Stop the job with a savepoint
  3. Re-start the job from the savepoint

If the job is then re-started again from that savepoint with original parallelism=3, the bundles are dispatched and pass through.

Setting maxParallelism=3 and keeping it across app restarts does not change this behaviour. This behaviour does not occur when parallelism is increased, e.g. setting parallelism from 1 to 3 between savepoint-ed restarts makes all the data to sink.

This is a simplified job code (AWS Kinesis source, 3 shards):

PCollection<KinesisRecord> windowedRecords = p.apply("Source", reader)
                .apply("Fixed windows", Window.<KinesisRecord>into(FixedWindows.of(Duration.standardSeconds(60))))
                ...
                .apply(
                        "Sink to S3",
                        FileIO.<GenericRecord>write()
                                .via(ParquetIO.sink(ConsumedEvent.SCHEMA$)
                                        .withCompressionCodec(CompressionCodecName.SNAPPY))
                                .to(opts.getSinkLocation()))

Toy project: https://github.com/psolomin/beam-playground/tree/parallelism-issue/kinesis-io-with-enhanced-fan-out#vanilla-flink

Full code snippet class

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 17 (17 by maintainers)

Most upvoted comments

@psolomin The mailing list thread is not conclusive, but generally I think we can try to unify the behavior of KafkaIO and KinesisIO, that would be option 2). Would you like to work on the change?

Can you try setting --experiments=use_deprecated_read for KafkaIO?

Tried. And it still worked without issues - reducing Flink parallelism from 3 to 2, with 3 partitions in the source topic, did not cause records loss. This is how I run it:

docker exec -u flink -it kafka-consumer-flink-jm-1 flink run \
	-s file:///mnt/savepoints/savepoint-< savepoint id > \
	--class com.kfk.consumer.FlinkRunnerMain --detached \
	/mnt/artifacts/example-com.kfk.consumer.FlinkRunnerMain-bundled-0.1-SNAPSHOT.jar \
	--bootstrapServers=kafka:9092 --inputTopics=raw,raw2 --outputDir=/mnt/output \
	--autoWatermarkInterval=10000 \
	--externalizedCheckpointsEnabled=true \
	--checkpointingMode=EXACTLY_ONCE \
	--numConcurrentCheckpoints=1 \
	--checkpointTimeoutMillis=500000 \
	--checkpointingInterval=60000 \
	--minPauseBetweenCheckpoints=5000 \
	--stateBackend=rocksdb \
	--stateBackendStoragePath=file:///tmp/flink-state \
	--parallelism=2 \
        --experiments=use_deprecated_read

My toy project: https://github.com/psolomin/beam-playground/tree/25975/kafka-consumer

Increasing this to P1 as there’s a risk of data loss.