iceberg: Unable to merge CDC data into snapshot data. java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long

Apache Iceberg version

1.3.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

  1. I created a iceberg table from existing parquet file
  2. Then I am trying to insert cdc data into it and i get the below error.

–> I am not sure if this issue is because of bad data? If yes then how doi debug the issue?

emr: 6.9.0 spark: 3.3.0 scala: 2_12

spark shell command

spark-shell --driver-memory 1g --executor-memory 1g --executor-cores 1 --driver-cores 1 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true --conf spark.sql.adaptive.coalescePartitions.minPartitionNum=1 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.maxAppAttempts=1   --conf spark.yarn.submit.waitAppCompletion=false  --files /home/hadoop/jars/log4j2.properties --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop  --conf spark.sql.catalog.local.warehouse=$PWD/warehouse  --conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" --name ravic  --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1  --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar

Exception::

ob aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 2711) (ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long+details
Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 2711) (ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
	at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:95)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_32_62$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.addMetadataColumnsIfNeeded(FileScanRDD.scala:291)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:318)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:

code

  val sess = Application.spark();
    val snapshotDf = sess.read.parquet("s3://bucket/snapshots2/ge11-partitioned/")
    // val _snapshotDf = snapshotDf.sortWithinPartitions("__created_date_")
    snapshotDf.createOrReplaceTempView("snapshot")
    sess.sql(""" CREATE TABLE ge112 
                  USING iceberg 
                  TBLPROPERTIES ('key'='_id.oid') 
                  location 's3://bucket/snapshots2/ge11-ice2/'  
                  as 
                    select * from snapshot """)
    sess.sql(""" alter table ge112 add partition field __created_date_  """)             

    // val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/GE11GLOBAL_candidates-CandidatesList.json")
    val cdcDf = sess.read.schema(snapshotDf.schema).json("s3://bucket/inputs2/ge11-partitioned/")
    cdcDf.createOrReplaceTempView("cdc")
    val _cdcDf =sess.sql(""" select * 
                             from cdc c1 
                             where cdc_pk in ( 
                                                select max(cdc_pk) 
                                                from cdc c2 
                                                where _id.oid is not null 
                                                  and _id.oid !='' 
                                                  and c2.__created_date_=c1.__created_date_  
                                                group by _id.oid)  """)
    _cdcDf.registerTempTable("_cdc")
    sess.sql(""" MERGE INTO ge112 t 
                using ( 
                        select * 
                        from _cdc )u 
                on t._id.oid = u._id.oid 
                  when matched then update set * 
                  when not matched then insert * """)
    
  }

About this issue

  • Original URL
  • State: open
  • Created 10 months ago
  • Comments: 28 (9 by maintainers)

Most upvoted comments

As @RussellSpitzer mentioned above, I would try and look at the schema produced by the data frames

I’ve been discussing with @sabyasachinandy @harshith-bolar-rapido offline, I can reproduce the issue from the minimal reproduction steps here https://github.com/apache/iceberg/issues/8333#issuecomment-1854204500 only specifically on Spark 3.3.0 and cannot reproduce the issue in Spark 3.3.4 or later.

@harshith-bolar-rapido confirmed that he cannot repro on 3.3.4 either.

So something changed in the patch releases between 3.3.0 and 3.3.4.

I’d encourage folks on 3.3.0 to upgrade to 3.3.4 or higher if possible. I’ll leave this open for now for a bit in case other folks hit this at higher spark versions.

@chandu-1101 , thanks for your welcome i share 2 printscreens the first simulate on a dbeaver session connected to spark the operations that dbt internaly executes, in this case dbt creates a temporary view from a select over the table where we look for new data. Then when it tries to merge new data to destiny we got the cast exception. image

in the second printscreen whe change the create temporary view for a create table sentence and then we save the exception and the merge operation works fine image

FYI: if you use an insert/Overwrite it will work