hudi: [BUG] MOR Table Hard Deletes Create issue with Athena Querying RT Tables
Hello All Hope yuou are doing well and I wanted to report this as a BUG on MOR with Apache HUDI. Here is what happened when I selected MOR and created apache HUDI table performed append and updates things worked perfectly alright. I saw two tables creates ro(read optimized tables ) and RT(Real time ) which has all latest commits. Now I was able to query both the table using Athena and things break when I started perfoming hard delete. I inserted some sample data into data lake emp 0 ,1,3,4,5,6,7 and now I deleted the emp_4 delete was successful but then I am no longer able to query my RT tables using Athena and shows following error
Test 1 (NO DELETES DONE SO FAR) APPEND AND UPDATES



Deletes

Code
try:
import os
import sys
import uuid
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, asc, desc
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from faker import Faker
print("All modules are loaded .....")
except Exception as e:
print("Some modules are missing {} ".format(e))
# ----------------------------------------------------------------------------------------
# Settings
# -----------------------------------------------------------------------------------------
database_name1 = "hudidb"
table_name = "hudi_table"
base_s3_path = "s3a://glue-learn-begineers"
final_base_path = "{base_s3_path}/{table_name}".format(
base_s3_path=base_s3_path, table_name=table_name
)
# ----------------------------------------------------------------------------------------------------
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data():
return [
(
x,
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
faker.random_int(min=10000, max=150000),
faker.random_int(min=18, max=60),
faker.random_int(min=0, max=100000),
faker.unix_time()
) for x in range(5)
]
def create_spark_session():
spark = SparkSession \
.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.getOrCreate()
return spark
spark = create_spark_session()
sc = spark.sparkContext
glueContext = GlueContext(sc)
"""
CHOOSE ONE
"hoodie.datasource.write.storage.type": "MERGE_ON_READ",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"""
hudi_options = {
'hoodie.table.name': table_name,
"hoodie.datasource.write.storage.type": "MERGE_ON_READ",
'hoodie.datasource.write.recordkey.field': 'emp_id',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.hive_sync.enable': 'true',
"hoodie.datasource.hive_sync.mode":"hms",
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': database_name1,
'hoodie.datasource.hive_sync.table': table_name,
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
}
# ====================================================
"""Create Spark Data Frame """
# ====================================================
# data = DataGenerator.get_data()
#
# columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
# df = spark.createDataFrame(data=data, schema=columns)
# df.write.format("hudi").options(**hudi_options).mode("overwrite").save(final_base_path)
# ====================================================
"""APPEND """
# ====================================================
# impleDataUpd = [
# (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
# (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
# ]
#
# columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
# usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
# usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)
#
# ====================================================
"""UPDATE """
# ====================================================
# impleDataUpd = [
# (3, "this is update on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
# ]
# columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
# usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
# usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)
#
# ====================================================
"""HARD DELETE """
# ====================================================
hudi_hard_delete_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'emp_id',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
print("\n")
hard_delete_df = spark.sql("SELECT * FROM hudidb.hudi_table_rt where emp_id='4' ")
print(hard_delete_df.show())
print("\n")
hard_delete_df.write.format("hudi").options(**hudi_hard_delete_options).mode("append").save(final_base_path)
if i am doing something wrong happy to change it i think with hard deletes when i performed i cannot query the data now on athena

Error GENERIC_INTERNAL_ERROR: org/objenesis/strategy/InstantiatorStrategy
Glue Version 4

Steps how i configured Glue Job
https://drive.google.com/file/d/1mkED3AUZBARsgeRCzk0K0XMvSyajo7mQ/view
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 31 (13 by maintainers)
Sure i will tell my company sysops to create support ticket 😄