hudi: [SUPPORT] Cannot sync to spark embedded derby hive meta store (the default one)

Tips before filing an issue

  • Have you gone through our FAQs? yes

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly. Not sure yet whether this is a bug or configuration problem.

Describe the problem you faced

I would like to test hudi locally within a spark session. However it fails with java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient details below.

To Reproduce

Steps to reproduce the behavior:

Install pyspark 3.2.2 which via python

python -m pip install pyspark==3.2.2

Then you can open ipython (needs to be pip-installed as well) or plain python shell in which you can execute the following

from pyspark.sql import SparkSession
from pathlib import Path
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([
    # hudi config
    "--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0",
    "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer",
    "--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
    "--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
    # "--conf spark.sql.hive.convertMetastoreParquet=false", # taken from AWS example
    # others
    # "--conf spark.eventLog.enabled=false",
    # "--conf spark.sql.catalogImplementation=hive",
    # "--conf spark.sql.hive.metastore.schema.verification=false",
    # "--conf spark.sql.hive.metastore.schema.verification.record.version=false",
    # f"--conf spark.sql.warehouse.dir={Path('.').absolute() / 'metastore_warehouse'}",
    # f"--conf spark.hadoop.hive.metastore.warehouse.dir={Path('.').absolute() / 'metastore_warehouse'}",
    # necessary last string
    "pyspark-shell",
])

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

sc.setLogLevel("WARN")
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
    dataGen.generateInserts(10)
)
from pyspark.sql.functions import expr

df = spark.read.json(spark.sparkContext.parallelize(inserts, 10)).withColumn(
    "part", expr("'foo'")
)

tableName = "test_hudi_pyspark_local"
basePath = f"{Path('.').absolute()}/tmp/{tableName}"

hudi_options = {
    "hoodie.table.name": tableName,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.partitionpath.field": "part",
    "hoodie.datasource.write.table.name": tableName,
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.upsert.shuffle.parallelism": 2,
    "hoodie.insert.shuffle.parallelism": 2,
    "hoodie.datasource.hive_sync.database": "default",
    "hoodie.datasource.hive_sync.table": tableName,
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.hive_sync.partition_fields": "part",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "index.global.enabled": "true",
    "hoodie.index.type": "GLOBAL_BLOOM",
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))

This fails. See the stacktrace at the end. The example was adapted from https://github.com/apache/hudi/issues/4506

Expected behavior

Proper interaction with the default hive metastore so that afterwards I can check spark.sql("SHOW TABLES FROM default") and see the newly created table. Or I can use spark.table(tableName).

Environment Description

  • Hudi version : 0.12.0

  • Spark version : 3.2.2

  • Hive version : ? default

  • Hadoop version : ? default

  • Storage (HDFS/S3/GCS…) : local filesystem

  • Running on Docker? (yes/no) : no

  • Python version: 3.9.13

Additional context

EDIT ADDED: Also others report problems with derby hive metastore and MetaException(message:Version information not found in metastore. ) See https://stackoverflow.com/questions/69555717/metaexceptionmessageversion-information-not-found-in-metastore-hive-3-1-1

Stacktrace

[...]
java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
[...]
Caused by: java.lang.reflect.InvocationTargetException
[...]
Caused by: MetaException(message:Version information not found in metastore. )
[...]
Caused by: MetaException(message:Version information not found in metastore. )
[...]
``` 22/09/27 08:33:36 WARN HoodieSparkSqlWriter$: hoodie table at /home/ssahm/Projects_Freelance/Fielmann/tmp/hudi_test_local/testtable2 already exists. Deleting existing data & overwriting with new data. 22/09/27 08:33:37 WARN HoodieBackedTableMetadata: Metadata table was not found at path /home/ssahm/Projects_Freelance/Fielmann/tmp/hudi_test_local/testtable2/.hoodie/metadata 22/09/27 08:33:37 WARN HoodieBackedTableMetadata: Metadata table was not found at path /home/ssahm/Projects_Freelance/Fielmann/tmp/hudi_test_local/testtable2/.hoodie/metadata 22/09/27 08:33:37 WARN HoodieBackedTableMetadata: Metadata table was not found at path /home/ssahm/Projects_Freelance/Fielmann/tmp/hudi_test_local/testtable2/.hoodie/metadata 22/09/27 08:33:37 WARN HoodieBackedTableMetadata: Metadata table was not found at path /home/ssahm/Projects_Freelance/Fielmann/tmp/hudi_test_local/testtable2/.hoodie/metadata 22/09/27 08:33:39 WARN Hive: Failed to register all functions. java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1742) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3607) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3659) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3639) at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3901) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248) at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231) at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:395) at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:339) at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:319) at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288) at org.apache.hudi.hive.ddl.HiveQueryDDLExecutor.<init>(HiveQueryDDLExecutor.java:62) at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:82) at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:101) at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:95) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89) at org.apache.hudi.sync.common.util.SyncUtilHelpers.instantiateMetaSyncTool(SyncUtilHelpers.java:75) at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:56) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:648) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2$adapted(HoodieSparkSqlWriter.scala:647) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:647) at org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:592) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:178) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1740) ... 72 more Caused by: MetaException(message:Version information not found in metastore. ) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:83) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:92) at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:6902) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:162) at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:70) ... 77 more Caused by: MetaException(message:Version information not found in metastore. ) at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java:7810) at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java:7788) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:101) at com.sun.proxy.$Proxy44.verifySchema(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:595) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:588) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:655) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:431) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:148) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:79) ... 81 more ```

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 19 (6 by maintainers)

Most upvoted comments

public class MetaStoreTxnDbUtilPrep extends MetaStoreInitListener {
    public MetaStoreTxnDbUtilPrep(Configuration config) {
        super(config);
    }

    @Override
    public void onInit(MetaStoreInitContext metaStoreInitContext) {
        try {
            TxnDbUtil.prepDb(new HiveConf(this.getConf(), HiveConf.class));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
		
// Psuedocode for setting Spark conf	
.config("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=APP;create=true")
.config("datanucleus.schema.autoCreateTables", "true")
.config("datanucleus.autoStartMechanism", "SchemaTable")
.config("hive.metastore.schema.verification", "false")
.config("hive.txn.strict.locking.mode", "false")
.config("hive.metastore.init.hooks", MetaStoreTxnDbUtilPrep.class.getCanonicalName())
.config("spark.sql.warehouse.dir", <set to unique value>)

System.setProperty("derby.system.home", <set to unique value>)

FYI I was able to have some success with getting Iceberg working with the above settings - used a MetaStore init hook to ensure Transaction related tables were created…

It still doesn’t fully work due to this issue: https://github.com/apache/iceberg/issues/370 I’m thinking using a dockerised Hive Metastore might be the way to go…

@schlichtanders I’ve been doing a bit of digging on this issue and stumbled across this thread. I’m trying to get Iceberg unit tests working with Embedded Derby too. I think the issue is related to this one: https://issues.apache.org/jira/browse/HIVE-21302 I don’t believe it’s a Hudi issue, but a Hive one. Basically, the embedded derby database does not have all the correct tables when it is initialised, and thus can’t function correctly.

@schlichtanders the derby url is not following the pattern specified here https://db.apache.org/derby/docs/10.14/ref/rrefjdbc37352.html if you use named attribute like databaseName=xxx, it should go after ;. otherwise, it should just be jdbc:derby:memory:default;create=true

i used below settings to test in-memory derby, which is working (hive 3.1.3, spark 3.1.3)

hive-site.xml

<configuration>
  <property>
    <name>system:user.name</name>
    <value>${user.name}</value>
  </property>
  <property>
    <name>system:java.io.tmpdir</name>
    <value>file:///tmp/hudi-bundles/hive/java</value>
  </property>
  <property>
    <name>hive.exec.scratchdir</name>
    <value>file:///tmp/hudi-bundles/hive/exec</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>file:///tmp/hudi-bundles/hive/warehouse</value>
  </property>
  <property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://localhost:9083</value>
  </property>
  <property>
    <name>datanucleus.schema.autoCreateAll</name>
    <value>true</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.apache.derby.jdbc.EmbeddedDriver</value>
  </property>
  <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:derby:memory:default;create=true</value>
  </property>
</configuration>

also copy to spark

cp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf/hive-site.xml

then start hms

$HIVE_HOME/bin/hive --service metastore

then start spark-shell

spark-shell --jars hudi-spark3.1-bundle_2.12-0.13.0-SNAPSHOT.jar \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'   \
  --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension

run quickstart example

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val expected = 10
val database = "default"
val tableName = "trips"
val basePath = "file:///tmp/hudi-bundles/tests/" + tableName
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(expected))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.datasource.meta.sync.enable", "true").
  option("hoodie.datasource.hive_sync.database", database).
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.SinglePartPartitionValueExtractor").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://localhost:9083/").
  mode(Overwrite).
  save(basePath)

spark.sql("desc " + tableName).show
+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time|   string|   null|
|_hoodie_commit_seqno|   string|   null|
|  _hoodie_record_key|   string|   null|
|_hoodie_partition...|   string|   null|
|   _hoodie_file_name|   string|   null|
|           begin_lat|   double|   null|
|           begin_lon|   double|   null|
|              driver|   string|   null|
|             end_lat|   double|   null|
|             end_lon|   double|   null|
|                fare|   double|   null|
|               rider|   string|   null|
|                  ts|   bigint|   null|
|                uuid|   string|   null|
|       partitionpath|   string|   null|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|       partitionpath|   string|   null|
+--------------------+---------+-------+

The updated links are: Dockerfile validate.sh you can look at the test_spark_hadoop_mr_bundles function Configuration files: hive-site.xml hudi-defaults.conf spark-defaults.conf