hudi: [BUG] MOR Table Hard Deletes Create issue with Athena Querying RT Tables

Hello All Hope yuou are doing well and I wanted to report this as a BUG on MOR with Apache HUDI. Here is what happened when I selected MOR and created apache HUDI table performed append and updates things worked perfectly alright. I saw two tables creates ro(read optimized tables ) and RT(Real time ) which has all latest commits. Now I was able to query both the table using Athena and things break when I started perfoming hard delete. I inserted some sample data into data lake emp 0 ,1,3,4,5,6,7 and now I deleted the emp_4 delete was successful but then I am no longer able to query my RT tables using Athena and shows following error

Test 1 (NO DELETES DONE SO FAR) APPEND AND UPDATES

image

image

image

Deletes

image

Code

try:
    import os
    import sys
    import uuid

    import pyspark
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, asc, desc
    from awsglue.utils import getResolvedOptions
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.context import GlueContext

    from faker import Faker

    print("All modules are loaded .....")

except Exception as e:
    print("Some modules are missing {} ".format(e))


# ----------------------------------------------------------------------------------------
#                 Settings
# -----------------------------------------------------------------------------------------

database_name1 = "hudidb"
table_name = "hudi_table"
base_s3_path = "s3a://glue-learn-begineers"
final_base_path = "{base_s3_path}/{table_name}".format(
    base_s3_path=base_s3_path, table_name=table_name
)

# ----------------------------------------------------------------------------------------------------
global faker
faker = Faker()


class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                x,
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                faker.random_int(min=10000, max=150000),
                faker.random_int(min=18, max=60),
                faker.random_int(min=0, max=100000),
                faker.unix_time()
            ) for x in range(5)
        ]


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
        .getOrCreate()
    return spark


spark = create_spark_session()
sc = spark.sparkContext
glueContext = GlueContext(sc)

"""
CHOOSE ONE 
"hoodie.datasource.write.storage.type": "MERGE_ON_READ",
"hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
"""


hudi_options = {
    'hoodie.table.name': table_name,
    "hoodie.datasource.write.storage.type": "MERGE_ON_READ",
    'hoodie.datasource.write.recordkey.field': 'emp_id',
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'ts',

    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': database_name1,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',

}


# ====================================================
"""Create Spark Data Frame """
# ====================================================
# data = DataGenerator.get_data()
#
# columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
# df = spark.createDataFrame(data=data, schema=columns)
# df.write.format("hudi").options(**hudi_options).mode("overwrite").save(final_base_path)


# ====================================================
"""APPEND """
# ====================================================

# impleDataUpd = [
#     (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
#     (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
# ]
#
# columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
# usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
# usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)
#

# ====================================================
"""UPDATE """
# ====================================================
# impleDataUpd = [
#     (3, "this is update on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
# ]
# columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
# usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
# usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)
#

# ====================================================
"""HARD DELETE """
# ====================================================

hudi_hard_delete_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': 'emp_id',
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': 'delete',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}


print("\n")
hard_delete_df = spark.sql("SELECT * FROM hudidb.hudi_table_rt where emp_id='4' ")
print(hard_delete_df.show())
print("\n")
hard_delete_df.write.format("hudi").options(**hudi_hard_delete_options).mode("append").save(final_base_path)

if i am doing something wrong happy to change it i think with hard deletes when i performed i cannot query the data now on athena image

Error GENERIC_INTERNAL_ERROR: org/objenesis/strategy/InstantiatorStrategy

Glue Version 4

image

Steps how i configured Glue Job

https://drive.google.com/file/d/1mkED3AUZBARsgeRCzk0K0XMvSyajo7mQ/view

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 31 (13 by maintainers)

Most upvoted comments

Sure i will tell my company sysops to create support ticket 😄