hudi: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

We are using Deltastreamer with a PostgresDebeziumSource and consuming data from confluent Kafka, using Confluent Schema Registry as the Schema Provider. The job runs fine for some time and then all of a sudden fails with NullPointerException.

We believe this to be caused by Kafka messages with empty/null values such as with Debezium tombstone records. We do not have the ability to modify the Debezium connectors to turn off tombstone records at this time.

About tombstone records

Looking for a solution to have Deltastreamer ignore/skip over tombstone Kafka messages that contain a null value.

Thanks for any input!

To Reproduce

Steps to reproduce the behavior:

  1. Use Debezium Kafka Connector to publish data from Postgres server to Kafka.
  2. Use PostgresDebeziumSource and Confluent Schema Registry to consume data.
  3. It runs fine for some records and stores the data into files in storage.
  4. Run a delete record operation on the Postgres DB to emit a tombstone record.
  5. After some time it fails with Null Pointer Exception without much description.

Expected behavior

The job should run without errors on empty Kafka message values.

Environment Description

  • Hudi version : 0.13.0

  • Spark version : 3.1

  • Hive version : N/A

  • Hadoop version : N/A

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : both

Additional context

hoodie configs:

--target-base-path s3a://{{ bucket }}/{{ table_path }}
--target-table {{ table_name }}
--continuous
--props gs://path/to/tablename.properties
--min-sync-interval-seconds 15
--source-ordering-field updated_at
--source-limit 5000
--table-type COPY_ON_WRITE
--op UPSERT
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload

tablename.properties

hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url }}.confluent.cloud/subjects/{{ topic }}-value/versions/latest
hoodie.deltastreamer.source.kafka.topic=some.topic
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=inserted_at
hoodie.datasource.write.precombine.field=updated_at
schema.registry.url={{ schema_url }}
schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }}
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ kafka_user }}' password='{{ kafka_key }}';
bootstrap.servers={{ bootstrap_server }}
hoodie.embed.timeline.server=false
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
group.id=hudi-deltastreamer
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
basic.auth.credentials.source=USER_INFO
heartbeat.interval.ms=5000
session.timeout.ms=120000
request.timeout.ms=900000
retry.backoff.ms=500
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true
max.rounds.without.new.data.to.shutdown=5
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.client.heartbeat.interval_in_ms=120000
hoodie.client.heartbeat.tolerable.misses=10
hoodie.write.lock.client.wait_time_ms_between_retry=1000
hoodie.write.lock.max_wait_time_ms_between_retry=1000
hoodie.write.lock.wait_time_ms_between_retry=500
hoodie.write.lock.wait_time_ms=5000
hoodie.write.lock.client.num_retries=10
hoodie.metadata.enable=false

Stacktrace

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14) (10.253.229.42 executor 1): java.lang.NullPointerException
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
	at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
	at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177)
	at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
	at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
	at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
	at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:54)
	at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:36)
	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
	... 24 more
Caused by: java.lang.NullPointerException

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 29 (8 by maintainers)

Most upvoted comments

@Sam-Serpoosh Hi, Debezium does not do any serialization. It just prepares data structure described with Kafka Connect schema. The serialization itself is done by Avro converter provide by Confluent. Debezium is unable to influence the serialization in any way.