iceberg: Unable to merge CDC data into snapshot data. java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
Apache Iceberg version
1.3.1 (latest release)
Query engine
Spark
Please describe the bug 🐞
- I created a iceberg table from existing parquet file
- Then I am trying to insert cdc data into it and i get the below error.
–> I am not sure if this issue is because of bad data? If yes then how doi debug the issue?
emr: 6.9.0 spark: 3.3.0 scala: 2_12
spark shell command
spark-shell --driver-memory 1g --executor-memory 1g --executor-cores 1 --driver-cores 1 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true --conf spark.sql.adaptive.coalescePartitions.minPartitionNum=1 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.submit.waitAppCompletion=false --files /home/hadoop/jars/log4j2.properties --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=$PWD/warehouse --conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" --name ravic --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1 --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar
Exception::
ob aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 2711) (ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long+details
Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 2711) (ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:95)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_32_62$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.addMetadataColumnsIfNeeded(FileScanRDD.scala:291)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:318)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
code
val sess = Application.spark();
val snapshotDf = sess.read.parquet("s3://bucket/snapshots2/ge11-partitioned/")
// val _snapshotDf = snapshotDf.sortWithinPartitions("__created_date_")
snapshotDf.createOrReplaceTempView("snapshot")
sess.sql(""" CREATE TABLE ge112
USING iceberg
TBLPROPERTIES ('key'='_id.oid')
location 's3://bucket/snapshots2/ge11-ice2/'
as
select * from snapshot """)
sess.sql(""" alter table ge112 add partition field __created_date_ """)
// val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/GE11GLOBAL_candidates-CandidatesList.json")
val cdcDf = sess.read.schema(snapshotDf.schema).json("s3://bucket/inputs2/ge11-partitioned/")
cdcDf.createOrReplaceTempView("cdc")
val _cdcDf =sess.sql(""" select *
from cdc c1
where cdc_pk in (
select max(cdc_pk)
from cdc c2
where _id.oid is not null
and _id.oid !=''
and c2.__created_date_=c1.__created_date_
group by _id.oid) """)
_cdcDf.registerTempTable("_cdc")
sess.sql(""" MERGE INTO ge112 t
using (
select *
from _cdc )u
on t._id.oid = u._id.oid
when matched then update set *
when not matched then insert * """)
}
About this issue
- Original URL
- State: open
- Created 10 months ago
- Comments: 28 (9 by maintainers)
As @RussellSpitzer mentioned above, I would try and look at the schema produced by the data frames
I’ve been discussing with @sabyasachinandy @harshith-bolar-rapido offline, I can reproduce the issue from the minimal reproduction steps here https://github.com/apache/iceberg/issues/8333#issuecomment-1854204500 only specifically on Spark 3.3.0 and cannot reproduce the issue in Spark 3.3.4 or later.
@harshith-bolar-rapido confirmed that he cannot repro on 3.3.4 either.
So something changed in the patch releases between 3.3.0 and 3.3.4.
I’d encourage folks on 3.3.0 to upgrade to 3.3.4 or higher if possible. I’ll leave this open for now for a bit in case other folks hit this at higher spark versions.
@chandu-1101 , thanks for your welcome i share 2 printscreens the first simulate on a dbeaver session connected to spark the operations that dbt internaly executes, in this case dbt creates a temporary view from a select over the table where we look for new data. Then when it tries to merge new data to destiny we got the cast exception.
in the second printscreen whe change the create temporary view for a create table sentence and then we save the exception and the merge operation works fine
FYI: if you use an insert/Overwrite it will work