hudi: [SUPPORT] Hudi MERGE_ON_READ load to dataframe fails for the versions [0.6.0],[0.7.0] and runs for [0.5.3]

  • Hudi is not able to read MERGE_ON_READ table when using the versions [0.6.0] and [0.7.0] When I run the same code with the version [0.5.3] I am able to read the table generated by the option of merge on read.

Steps to reproduce the behavior:

**1.**Start a pyspark shell 2.pyspark --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' Or pyspark --packages org.apache.hudi:hudi-spark-bundle_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' 3. ``` >>>S3_SNAPSHOT = <snapshot location> >>>S3_MERGE_ON_READ = <location to replicate data> >>> from pyspark.sql.functions import * >>> df = spark.read.parquet(S3_SNAPSHOT) >>>df.count()
21/01/27 14:49:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting ‘spark.sql.debug.maxToStringFields’. 950897550
>>> hudi_options_insert = { … “hoodie.table.name”: “sample_schema.table_name”, … “hoodie.datasource.write.storage.type”: “MERGE_ON_READ”, … “hoodie.datasource.write.recordkey.field”: “id”, … “hoodie.datasource.write.operation”: “bulk_insert”, … “hoodie.datasource.write.partitionpath.field”: “ds”, … “hoodie.datasource.write.precombine.field”: “id”, … “hoodie.insert.shuffle.parallelism”: 135 … } >>>df.write.format(“hudi”).options(**hudi_options_insert).mode(“overwrite”).save(S3_MERGE_ON_READ)



4.

**Expected behavior**

Data is loaded to dataframe perfectly when spark shell is created with the parameters:
`pyspark --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
`

**Environment Description**
EMR 
* Hudi version :[0.7.0], [0.6.0] is giving error.  [0.5.3] is running fluently

* Spark version : [2.4.4], [3.0.1]

* Hive version : 

* Hadoop version :

* Storage (HDFS/S3/GCS..) : S3

* Running on Docker? (yes/no) : no 




**Stacktrace**

    >>> df_mor = spark.read.format("hudi").load(S3_MERGE_ON_READ + "/*")

Traceback (most recent call last): File “<stdin>”, line 1, in <module> File “/usr/lib/spark/python/pyspark/sql/readwriter.py”, line 178, in load return self._df(self._jreader.load(path)) File “/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py”, line 1305, in call File “/usr/lib/spark/python/pyspark/sql/utils.py”, line 128, in deco return f(*a, **kw) File “/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py”, line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o86.load. : java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89) at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)


About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 43 (25 by maintainers)

Most upvoted comments

My team was running into this issue. We are running EMR 6.4. Spark 3.1.2. Hudi 0.8.0. We eventually found that you need to provide the EMR dependencies in your spark submit/shell/notebook. Amazon makes their own slightly modified version of Spark/Hudi etc. So in your POM set spark hudi bundle, spark avro, spark core, spark sql all to provided. EMR does not include hudi/avro natively on their classpath though. So you need to include them in your --jars config noted here: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-work-with-dataset.html

Hopefully it saves someone the trouble that my team went through.

I am seeing this issue with MOR tables using Apache Spark 3.1.2 (Not using AWS EMR) and Hudi 0.7.0, is it possible to re-open please? Or is it fixed in 0.8.0?

Context: I have created a table using deltastreamer. Deltastreamer appears to work fine, but later when I try and create dataframe to read the table in pyspark I get the following error:

Traceback (most recent call last): File “./init_hudi_for_billing_ds”, line 45, in <module> billingDF=sqlContext.read.format(“hudi”).load(basePath+“//”) File “/home/spark_311/py1/lib64/python3.6/dist-packages/pyspark/sql/readwriter.py”, line 204, in load return self._df(self._jreader.load(path)) File “/home/spark_311/py1/lib64/python3.6/dist-packages/py4j/java_gateway.py”, line 1305, in call answer, self.gateway_client, self.target_id, self.name) File “/home/spark_311/py1/lib64/python3.6/dist-packages/pyspark/sql/utils.py”, line 111, in deco return f(*a, **kw) File “/home/spark_311/py1/lib64/python3.6/dist-packages/py4j/protocol.py”, line 328, in get_return_value format(target_id, “.”, name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o30.load. : java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89) at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325) at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

I’ve tried setting the table.type as MERGE_ON_READ in the hudi options, but has no effect. These errors are not seen with COPY_ON_WRITE tables.

Hi @nsivabalan

We created MOR Hudi table with Hudi DeltaStreamer [0.7.0]. We have tried to read the table with Pyspark (python) and Spark (scala) as well and in both case we got the above mentioned error.

We created COW Hudi table with DeltaStreamer and we could read the table with Pyspark (python) and Spark (scala).

I’m also having the same type of issue in EMR 6.4 after building and deploying Hudi 0.9.0. Note that as mentioned above, the default binaries work just fine (EMR 6.4 with Hudi 0.8.0).

It seems that there’s likely something off with the build or referencing. I used mvn clean package -DskipTests -Dspark3 -Dscala-2.12 -T 30.

What’s really interesting is that I can create an MoR table w/o issue, but trying to do a load renders the loaded DF unusable. It looks like the DF is loaded, but then becomes unusable.

This tip also worked for me (i.e. using spark.sql and referencing the table from the Glue data catalog). Unfortunately, querying the data this way seems to be much slower (compared to 0.8.0).

I documented my build and installation process in this slack thread.

Edit: I tested this with a CoW table and I did not have the issue, i.e. the following works just fine. It did; however, take 2.7x longer to do the read than it did in 0.8.0.

df = spark.read.format("org.apache.hudi").load(path)
df.show()

@nsivabalan I was able to execute all steps successfully in the quick start and I could reproduce the issue by changing the storage type in the hudi options. I have changed the storage type of quick start example to merge_on_read and it failed as well. Here is the modification I have applied.

pyspark --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

tableName = "hudi_trips_cow"
basePath = "S3:///tmp/hudi_trips_mor"
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

Below storage type has modified. And I am getting an error when I read the file.

hudi_options = {
  'hoodie.table.name': tableName,
  "hoodie.datasource.write.storage.type": "MERGE_ON_READ",  
  'hoodie.datasource.write.recordkey.field': 'uuid',
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'insert',
  'hoodie.datasource.write.precombine.field': 'ts',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}

df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)

tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*")

Please find the error stack below.

An error occurred while calling o267.load.
: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V
	at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
	at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
	at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

Hi, I am still getting a similar error at the time of reading.

hudi_options_insert = { … “hoodie.table.name”: “the_table_name”, … “hoodie.datasource.write.storage.type”: “MERGE_ON_READ”, … “hoodie.datasource.write.table.type”: “MERGE_ON_READ”, … “hoodie.datasource.write.recordkey.field”: “id”, … “hoodie.datasource.write.operation”: “bulk_insert”, … “hoodie.datasource.write.partitionpath.field”: “ds”, … “hoodie.datasource.write.precombine.field”: “id”, … “hoodie.insert.shuffle.parallelism”: 135 … } df.write.format(“hudi”).options(**hudi_options_insert).mode(“overwrite”).save(S3_MERGE_ON_READ) df_mor = spark.read.format(“hudi”).load(S3_MERGE_ON_READ + “/*”)

Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 178, in load return self._df(self._jreader.load(path)) File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/usr/lib/spark/python/pyspark/sql/utils.py", line 128, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o87.load. : java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89) at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)