hudi: [SUPPORT] Concurrent write (OCC) on distinct partitions random errors

hudi 0.9.0, spark3.1

To experiment with OCC I setup this local tools:

  • local hive metastore
  • pyspark script
  • run concurrently with xargs

Sometimes it works as expected (mostrly with 2 concurrent process). But with 4 process I get randomly one of those stacktrace:

Type 1 error:

 : org.apache.hudi.exception.HoodieLockException: Unable to acquire lock, lock object LockResponse(lockid:255, state:WAITING)
 at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:82)
 at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:64)

Type 2 error:

 : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20210921153357
 at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
 at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
 Caused by: java.lang.IllegalArgumentException

Type 3 error:

 /tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested
 at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:544)
 at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:505)
 Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: file:/tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested

Reproduce step:

Python script:

## The idea is to generate a random partition
## They are run with a little delay in order to understand why I got the error onthe same commit timestamp
## but this is not actually needed
## There should be a COUNT=(NB+1) * 10 , where NB is the number of concurrent spark jobs

from pyspark.sql import SparkSession

import pyspark
from numpy import random
from time import sleep

sleeptime = random.uniform(2, 5)
print("sleeping for:", sleeptime, "seconds")
sleep(sleeptime)
conf = pyspark.SparkConf()
spark_conf = [
    (
        "spark.jars.packages",
        "org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.1.2",
    ),
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
    ("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083"),
    ("spark.hadoop.javax.jdo.option.ConnectionUserName", "hive"),
    ("spark.hadoop.javax.jdo.option.ConnectionPassword", "hive"),
    ("spark.hadoop.hive.server2.thrift.url", "jdbc:hive2://localhost:10000"),
]
conf.setAll(spark_conf)
spark = (
    SparkSession.builder.appName("test-hudi-hive-sync")
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()
)
sc = spark.sparkContext

# Create a table
sc.setLogLevel("ERROR")
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
    dataGen.generateInserts(10)
)
from pyspark.sql.functions import expr

df = (
    spark.read.json(spark.sparkContext.parallelize(inserts, 10))
    .withColumn("part", expr(f"'foo{sleeptime}'"))
 # One partition per run !!
    .withColumn("id", expr("row_number() over(partition by 1 order by 1)"))
)


databaseName = "default"
tableName = "test_hudi_pyspark_local"
basePath = f"/tmp/{tableName}"

hudi_options = {
    "hoodie.table.name": tableName,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.partitionpath.field": "part",
    "hoodie.datasource.write.table.name": tableName,
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.upsert.shuffle.parallelism": 2,
    "hoodie.insert.shuffle.parallelism": 2,
    # For hive sync metastore
    "hoodie.datasource.hive_sync.database": databaseName,
    "hoodie.datasource.hive_sync.table": tableName,
    "hoodie.datasource.hive_sync.mode": "jdbc",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.partition_fields": "part",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    # For concurrency write locks with hive metastore
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.cleaner.policy.failed.writes": "LAZY",
    "hoodie.write.lock.provider": "org.apache.hudi.hive.HiveMetastoreBasedLockProvider",
    "hoodie.write.lock.hivemetastore.database": databaseName,
    "hoodie.write.lock.hivemetastore.table": tableName,
    "hoodie.write.lock.wait_time_ms": "12000",
    "hoodie.write.lock.num_retries": "4",
    "hoodie.embed.timeline.server": "false",
    "hoodie.datasource.write.commitmeta.key.prefix": "deltastreamer.checkpoint.key",
}

(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
print(
    "@@@@@@@@@@@@@@@@ COUNT={} @@@@@@@@@@@@@@@@@@".format(
        spark.read.format("hudi").load(basePath).count()
    )
)

Bash script:

#!/usr/bin/env bash
NB=$1
rm -rf /tmp/test_hudi_pyspark_local/
python3 concurrent.py
seq 1 $NB  | xargs -n 1 -P $NB python3 concurrent.py

Run it:

./conccurrent.sh 4

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 21 (20 by maintainers)

Most upvoted comments

@jdattani AFAIK, only spark sql features are broken on 3.1