hudi: [SUPPORT] Unable to read Hudi MOR data set in a test on 0.7
Tips before filing an issue
I’m trying out Hudi 0.7 locally via PySpark and oddly I can write data that can be read back as parquet but not as Hudi. It seems InMemoryFileIndex is missing?
Stacktrace for pytest test:
============================================================== FAILURES ==============================================================
_____________________________________________________________ test_hudi ______________________________________________________________
answer = 'xro66', gateway_client = <py4j.java_gateway.GatewayClient object at 0x1069a83d0>, target_id = 'o65', name = 'load'
def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a Python object.
For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.
:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
> format(target_id, ".", name), value)
E py4j.protocol.Py4JJavaError: An error occurred while calling o65.load.
E : java.lang.NoSuchMethodError: 'void org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(org.apache.spark.sql.SparkSession, scala.collection.Seq, scala.collection.immutable.Map, scala.Option, org.apache.spark.sql.execution.datasources.FileStatusCache)'
E at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
E at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
E at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
E at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
E at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
E at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
E at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
E at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
E at scala.Option.getOrElse(Option.scala:189)
E at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
E at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E at java.base/java.lang.reflect.Method.invoke(Method.java:567)
E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E at py4j.Gateway.invoke(Gateway.java:282)
E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E at py4j.commands.CallCommand.execute(CallCommand.java:79)
E at py4j.GatewayConnection.run(GatewayConnection.java:238)
E at java.base/java.lang.Thread.run(Thread.java:830)
../../.pyenv/versions/3.7.6/envs/spark/lib/python3.7/site-packages/py4j/protocol.py:328: Py4JJavaError
To Reproduce
Steps to reproduce the behavior:
- Sample code using pytest:
import pytest
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
def test_hudi(tmp_path):
SparkContext.getOrCreate(
conf=SparkConf()
.setAppName("testing")
.setMaster("local[1]")
.set(
"spark.jars.packages",
"org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1",
)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.hive.convertMetastoreParquet", "false")
)
spark = SparkSession.builder.getOrCreate()
hudi_options = {
"hoodie.table.name": "test",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field": "year,month,day",
"hoodie.datasource.write.table.name": "test",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
}
df = spark.createDataFrame(
[
Row(id=1, year=2020, month=7, day=5, ts=1),
]
)
df.write.format("hudi").options(**hudi_options).mode("append").save(str(tmp_path))
read_df = spark.read.format("parquet").load(str(tmp_path) + "/*/*/*")
# This works and prints:
# [Row(_hoodie_commit_time='20210210160002', _hoodie_commit_seqno='20210210160002_0_1', _hoodie_record_key='id:1', _hoodie_partition_path='2020/7/5', _hoodie_file_name='e8febcc9-58b6-4174-8e83-90842d5492b0-0_0-21-12005_20210210160002.parquet', id=1, year=2020, month=7, day=5, ts=1)]
print(read_df.collect())
read_df = spark.read.format("hudi").load(str(tmp_path) + "/*/*/*")
# This does not
print(read_df.collect())
- Install pytest (I’m using 6.1.1) and run:
py.test -s --verbose test_hudi.py(NB:tmp_pathis a pytest fixture https://docs.pytest.org/en/stable/tmpdir.html for creating a temporary directory)
Expected behavior
The read should work, not sure why InMemoryFileIndex is missing. Could be something wrong with my setup. I’m using Scala 2.12.10 locally.
Environment Description
-
Hudi version : 0.7.0
-
Spark version : 3.0.0
-
Hive version : N/A
-
Hadoop version : N/A
-
Storage (HDFS/S3/GCS…) : Local
-
Running on Docker? (yes/no) : no
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 16 (10 by maintainers)
I am still triaging the issue. here is the tracking ticket in the mean time: https://issues.apache.org/jira/browse/HUDI-1568. Happening even for spark.
got it. yes, I could able to repro now. will investigate further. thnx.