kafka-spark-consumer: Offset is still updated when exception occurs during processing
Hi,
I have a problem when trying to simulate error during rdd processing. I do it by throwing and exception like the following (simplified)
val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.fromString(storageLevelReceiver))
val partitionOffset_stream = ProcessedOffsetManager.getPartitionOffset(tmp_stream, props)
tmp_stream.foreachRDD(rdd => {
Thread.sleep(5000)
throw new Exception("error")
})
ProcessedOffsetManager.persists(partitionOffset_stream, props)
I run the spark job in yarn cluster mode.
The problem is that the rdd stage failed but then it still execute offset persist. Then, the driver is restarted. This result a message loss since the message is skipped.
Versions: Scala 2.11 Spark 2.3.0 Kafka 0.10.2 Kafka Spark Consumer 1.0.17
Any idea why this is happening?
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 28 (15 by maintainers)
Your Welcome @dibbhatt. I did some test last friday and it should be ok. I may do more testing this week.