hudi: [SUPPORT] Unable to read Hudi MOR data set in a test on 0.7

Tips before filing an issue

I’m trying out Hudi 0.7 locally via PySpark and oddly I can write data that can be read back as parquet but not as Hudi. It seems InMemoryFileIndex is missing?

Stacktrace for pytest test:

============================================================== FAILURES ==============================================================
_____________________________________________________________ test_hudi ______________________________________________________________
answer = 'xro66', gateway_client = <py4j.java_gateway.GatewayClient object at 0x1069a83d0>, target_id = 'o65', name = 'load'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.

        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.

        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
>                       format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o65.load.
E                   : java.lang.NoSuchMethodError: 'void org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(org.apache.spark.sql.SparkSession, scala.collection.Seq, scala.collection.immutable.Map, scala.Option, org.apache.spark.sql.execution.datasources.FileStatusCache)'
E                       at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:89)
E                       at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
E                       at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
E                       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89)
E                       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53)
E                       at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
E                       at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
E                       at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
E                       at scala.Option.getOrElse(Option.scala:189)
E                       at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
E                       at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
E                       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E                       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E                       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                       at java.base/java.lang.reflect.Method.invoke(Method.java:567)
E                       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E                       at py4j.Gateway.invoke(Gateway.java:282)
E                       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                       at py4j.commands.CallCommand.execute(CallCommand.java:79)
E                       at py4j.GatewayConnection.run(GatewayConnection.java:238)
E                       at java.base/java.lang.Thread.run(Thread.java:830)

../../.pyenv/versions/3.7.6/envs/spark/lib/python3.7/site-packages/py4j/protocol.py:328: Py4JJavaError

To Reproduce

Steps to reproduce the behavior:

  1. Sample code using pytest:
import pytest
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql import Row


def test_hudi(tmp_path):
    SparkContext.getOrCreate(
        conf=SparkConf()
        .setAppName("testing")
        .setMaster("local[1]")
        .set(
            "spark.jars.packages",
            "org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1",
        )
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .set("spark.sql.hive.convertMetastoreParquet", "false")
    )
    spark = SparkSession.builder.getOrCreate()

    hudi_options = {
        "hoodie.table.name": "test",
        "hoodie.datasource.write.recordkey.field": "id",
        "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
        "hoodie.datasource.write.partitionpath.field": "year,month,day",
        "hoodie.datasource.write.table.name": "test",
        "hoodie.datasource.write.table.type": "MERGE_ON_READ",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.datasource.write.precombine.field": "ts",
    }
    df = spark.createDataFrame(
        [
            Row(id=1, year=2020, month=7, day=5, ts=1),
        ]
    )
    df.write.format("hudi").options(**hudi_options).mode("append").save(str(tmp_path))
    read_df = spark.read.format("parquet").load(str(tmp_path) + "/*/*/*")
    # This works and prints:
    # [Row(_hoodie_commit_time='20210210160002', _hoodie_commit_seqno='20210210160002_0_1', _hoodie_record_key='id:1', _hoodie_partition_path='2020/7/5', _hoodie_file_name='e8febcc9-58b6-4174-8e83-90842d5492b0-0_0-21-12005_20210210160002.parquet', id=1, year=2020, month=7, day=5, ts=1)]
    print(read_df.collect())

    read_df = spark.read.format("hudi").load(str(tmp_path) + "/*/*/*")
    # This does not
    print(read_df.collect())
  1. Install pytest (I’m using 6.1.1) and run: py.test -s --verbose test_hudi.py (NB: tmp_path is a pytest fixture https://docs.pytest.org/en/stable/tmpdir.html for creating a temporary directory)

Expected behavior

The read should work, not sure why InMemoryFileIndex is missing. Could be something wrong with my setup. I’m using Scala 2.12.10 locally.

Environment Description

  • Hudi version : 0.7.0

  • Spark version : 3.0.0

  • Hive version : N/A

  • Hadoop version : N/A

  • Storage (HDFS/S3/GCS…) : Local

  • Running on Docker? (yes/no) : no

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 16 (10 by maintainers)

Most upvoted comments

I am still triaging the issue. here is the tracking ticket in the mean time: https://issues.apache.org/jira/browse/HUDI-1568. Happening even for spark.

got it. yes, I could able to repro now. will investigate further. thnx.