hudi: [SUPPORT] Concurrent write (OCC) on distinct partitions random errors
hudi 0.9.0, spark3.1
To experiment with OCC I setup this local tools:
- local hive metastore
- pyspark script
- run concurrently with xargs
Sometimes it works as expected (mostrly with 2 concurrent process). But with 4 process I get randomly one of those stacktrace:
Type 1 error:
: org.apache.hudi.exception.HoodieLockException: Unable to acquire lock, lock object LockResponse(lockid:255, state:WAITING)
at org.apache.hudi.client.transaction.lock.LockManager.lock(LockManager.java:82)
at org.apache.hudi.client.transaction.TransactionManager.beginTransaction(TransactionManager.java:64)
Type 2 error:
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20210921153357
at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
Caused by: java.lang.IllegalArgumentException
Type 3 error:
/tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested
at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:544)
at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:505)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists: file:/tmp/test_hudi_pyspark_local/.hoodie/20210921151138.commit.requested
Reproduce step:
Python script:
## The idea is to generate a random partition
## They are run with a little delay in order to understand why I got the error onthe same commit timestamp
## but this is not actually needed
## There should be a COUNT=(NB+1) * 10 , where NB is the number of concurrent spark jobs
from pyspark.sql import SparkSession
import pyspark
from numpy import random
from time import sleep
sleeptime = random.uniform(2, 5)
print("sleeping for:", sleeptime, "seconds")
sleep(sleeptime)
conf = pyspark.SparkConf()
spark_conf = [
(
"spark.jars.packages",
"org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.1.2",
),
("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083"),
("spark.hadoop.javax.jdo.option.ConnectionUserName", "hive"),
("spark.hadoop.javax.jdo.option.ConnectionPassword", "hive"),
("spark.hadoop.hive.server2.thrift.url", "jdbc:hive2://localhost:10000"),
]
conf.setAll(spark_conf)
spark = (
SparkSession.builder.appName("test-hudi-hive-sync")
.config(conf=conf)
.enableHiveSupport()
.getOrCreate()
)
sc = spark.sparkContext
# Create a table
sc.setLogLevel("ERROR")
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
dataGen.generateInserts(10)
)
from pyspark.sql.functions import expr
df = (
spark.read.json(spark.sparkContext.parallelize(inserts, 10))
.withColumn("part", expr(f"'foo{sleeptime}'"))
# One partition per run !!
.withColumn("id", expr("row_number() over(partition by 1 order by 1)"))
)
databaseName = "default"
tableName = "test_hudi_pyspark_local"
basePath = f"/tmp/{tableName}"
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.partitionpath.field": "part",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
# For hive sync metastore
"hoodie.datasource.hive_sync.database": databaseName,
"hoodie.datasource.hive_sync.table": tableName,
"hoodie.datasource.hive_sync.mode": "jdbc",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.partition_fields": "part",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
# For concurrency write locks with hive metastore
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.write.lock.provider": "org.apache.hudi.hive.HiveMetastoreBasedLockProvider",
"hoodie.write.lock.hivemetastore.database": databaseName,
"hoodie.write.lock.hivemetastore.table": tableName,
"hoodie.write.lock.wait_time_ms": "12000",
"hoodie.write.lock.num_retries": "4",
"hoodie.embed.timeline.server": "false",
"hoodie.datasource.write.commitmeta.key.prefix": "deltastreamer.checkpoint.key",
}
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
print(
"@@@@@@@@@@@@@@@@ COUNT={} @@@@@@@@@@@@@@@@@@".format(
spark.read.format("hudi").load(basePath).count()
)
)
Bash script:
#!/usr/bin/env bash
NB=$1
rm -rf /tmp/test_hudi_pyspark_local/
python3 concurrent.py
seq 1 $NB | xargs -n 1 -P $NB python3 concurrent.py
Run it:
./conccurrent.sh 4
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 1
- Comments: 21 (20 by maintainers)
@jdattani AFAIK, only spark sql features are broken on 3.1