hudi: [SUPPORT] Failure to delete records with missing attributes from PostgresDebeziumSource

Describe the problem you faced

Hi there, we are facing an issue that is failing to delete records using Deltastreamer on a PostgresDebeziumSource.

We are seeing that the delete messages from Debezium/Kafka only provide the primary_key id to be deleted, with all other columns set to default values null/""/0. Since Deltastreamer is trying to delete that ID in the inserted_at partition of 0ms (aka 1970-01-01), it cannot find the record and the commit fails (unless commit-on-errors is used, then the delete will be ignored and the data will have rows it should not).

One question is whether this PR accounts for the partition column being passed as 0 or is that a separate issue?

It could possibly mean that the upstream Postgres databases are using Replica Identity default on the source tables, but the behavior here is a little different than the documentation (it looks like the “before” would only have the PK column, however our “before” has all columns just with their default values).

Wondering if anyone else has faced this before, or if there could be a workaround in Deltastreamer that allows it to delete the record without knowing what partition it is from.

We will document any findings during our investigation/experimentation with upstream sources.

Thanks in advance!

Example delete message from Debezium/Kafka:

    {
        "topic_name": "<redacted>",
        "partition": 0,
        "offset": 136251,
        "value": {
            "before": {
                "id": "4b-<redacted>-09",
                "col1": "",
                "col2": "",
                "col3": 0,
                "col4": 0,
                "col5": null,
                "col6": 0,
                "inserted_at": 0,
            },
            "after": null,
            "source": {
                "version": "1.9.6.Final",
                "connector": "postgresql",
                "name": "<redacted>",
                "ts_ms": 1688672165304,
                "snapshot": "false",
                "db": "<redacted>",
                "sequence": "[\"121985431416\",\"121985482592\"]",
                "schema": "<redacted>",
                "table": "<redacted>",
                "txId": 45263257,
                "lsn": 121985482592,
                "xmin": null
            },
            "op": "d",
            "ts_ms": 1688672165397,
            "transaction": null
        }
    }

To Reproduce

Steps to reproduce the behavior:

  1. Run Deltastreamer on a PostgresDebeziumSource connected to a Kafka topic, using Timestamp Key generator
  2. Perform a delete on the Postgres DB and emit the record to Kafka through the Debezium replication slot
  3. Monitor Deltastreamer logs for this error
Error for key:HoodieKey { recordKey=4b-<redacted>-09 partitionPath=1970/01/01} is java.util.NoSuchElementException
  1. Deltastreamer commit will fail and rollback, cannot proceed past that message in Kafka
  2. Turn on commit-on-errors and the error is ignored but the record is not deleted

Expected behavior

The delete should be

Environment Description

  • Hudi version : 13

  • Spark version : 3.1

  • Hive version : N/A

  • Hadoop version : N/A

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : yes

Additional context

Previous issue may provide more context (this issue is a continuation after the tombstone/NullPointer issue was solved).

Stacktrace

23/07/07 17:31:24 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Delta Sync found errors when writing. Errors/Total=2448/4998
23/07/07 17:31:24 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Printing out the top 100 errors
23/07/07 17:31:25 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Global error :
23/07/07 17:31:25 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: Error for key:HoodieKey { recordKey=<redacted>  partitionPath=1970/01/01} is java.util.NoSuchElementException: No value present in Option
23/07/07 17:31:25 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: Error for key:HoodieKey { recordKey=<redacted>  partitionPath=1970/01/01} is java.util.NoSuchElementException: No value present in Option
23/07/07 17:31:25 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: Error for key:HoodieKey { recordKey=<redacted>  partitionPath=1970/01/01} is java.util.NoSuchElementException: No value present in Option
23/07/07 17:31:25 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: Error for key:HoodieKey { recordKey=<redacted> partitionPath=1970/01/01} is java.util.NoSuchElementException: No value present in Option
<...>
23/07/07 17:31:32 INFO org.apache.hudi.utilities.deltastreamer.DeltaSync: Shutting down embedded timeline server
23/07/07 17:31:32 ERROR org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer: error while running MultiTableDeltaStreamer for table: <redacted>
org.apache.hudi.exception.HoodieException: Commit 20230707173107555 failed and rolled-back !
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:860)

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 18 (10 by maintainers)

Most upvoted comments

@sydneyhoran @Sam-Serpoosh Does this solution works?

@ad1happy2go Looks like REPLICA IDENTITY FULL is mostly discouraged by PG (interesting article and SO Thread). It would be ideal not to have to change this setting to FULL to avoid the downsides.

I know Hudi has the limitation on global uniqueness when dealing with partitioned Hudi Tables. So is there any way to make this work with partitioned Hudi Tables without having to set REPLICA IDENTITY to FULL?

Hey @ad1happy2go @danny0405 @soumilshah1995 unfortunately with the latest master branch, the non-op deletes are still happening and the job is not processing our deletes completely. However, it is just no longer throwing DeltaSync global errors for “Error for key:HoodieKey” anymore, so the commit-on-errors flag doesn’t appear to be needed with the latest.

Test steps: Insert records, see the records in S3/redshift table. We receive 3 kafka messages for each record - an create, update and then delete. The first 2 come in with all details populated in the "after" field. The 3rd one comes with only the ID populated in the "before" with everything else being default values so the inserted_at as 0 (default value), so Deltastreamer looks for the record in the 1970/01/01 partition instead of the correct partition.

This is what the first 2 Kafka messages look like for record insertion c & then update u:

Screenshot 2023-07-11 at 9 24 06 PM

And then the u is followed by a d on the record we were testing:

Screenshot 2023-07-11 at 9 24 18 PM

Based on the hoodie.commit file, we see that 5 records were to be deleted from the 1970/01/01 partition (this test was with 5 inserts/deletes):

Screenshot 2023-07-11 at 9 18 41 PM

In this .commit file from another test, you can see that the records were inserted into the current 2023/07/11 partition but the delete operation was on the 1970/01/01 partition:

Screenshot 2023-07-12 at 12 53 44 PM

The records are still present in Redshift after the "d" operation:

Screenshot 2023-07-12 at 12 10 41 PM

One more thing I can think of is that we use the timestamp keygenerator hoodie.deltastreamer.keygen.timebased.timestamp.type = EPOCHMICROSECONDS. We used a fresh master jar for this test, with one exception - the only thing custom that had to be added was this timestamp type EPOCHMICROSECONDS added to TimestampBasedAvroKeyGenerator.java. I’m wondering if the delete logic for finding a record with no timestamp attached somehow defaults to milliseconds? But that wouldn’t make too much sense as the partition keyGen is 0 in the incoming record… which would be 1970/01/01 with either conversion…

How can Deltastreamer find out which partition to delete a record from if its not told the partition timestamp?

Any thoughts on this?