hudi: [SUPPORT] Invalid number of file groups for partition:column_stats

Problem: When DeltaStreamer running in continuous mode is killed and resumed, below error is thrown.

23/01/12 13:28:06 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback s3://datalake_bucket/test/hudi_poc/continuous_cow/loan_applications commits 20230112132517252
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
	at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
	at org.apache.hudi.async.AsyncCleanerService.waitForCompletion(AsyncCleanerService.java:75)
	at org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:605)
	at org.apache.hudi.client.BaseHoodieWriteClient.postCommit(BaseHoodieWriteClient.java:529)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
	at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:624)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:333)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback s3://datalake_bucket/test/hudi_poc/continuous_cow/loan_applications commits 20230112132517252
	at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:779)
	at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1189)
	at org.apache.hudi.client.BaseHoodieWriteClient.rollbackFailedWrites(BaseHoodieWriteClient.java:1172)
	at org.apache.hudi.client.BaseHoodieWriteClient.lambda$clean$33796fd2$1(BaseHoodieWriteClient.java:852)
	at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:142)
	at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:851)
	at org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:821)
	at org.apache.hudi.async.AsyncCleanerService.lambda$startService$0(AsyncCleanerService.java:55)
	... 4 more
Caused by: java.lang.IllegalArgumentException: Invalid number of file groups for partition:column_stats, found=0, required=1
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.prepRecords(HoodieBackedTableMetadataWriter.java:968)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:132)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:924)
	at org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$2(BaseActionExecutor.java:77)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
	at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:77)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:255)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:124)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:145)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281)
	at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:762)
	... 11 more

Steps to reproduce the behavour:

  1. Run HoodieDeltaStreamer with below config
spark-submit --master yarn --jars /usr/lib/spark/external/lib/spark-avro.jar,s3://<some_domain>/jars/hudi-utilities-bundle_2.12-0.11.1.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--jars /usr/lib/spark/external/lib/spark-avro.jar,s3://<some_domain>/jars/hudi-utilities-bundle_2.12-0.11.1.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--deploy-mode cluster s3://<some_domain>/jars/deltastreamer-addons-1.3.jar \
--enable-sync \
--hoodie-conf hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest \
--hoodie-conf hoodie.parquet.compression.codec=snappy \
--hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor \
--hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false \
--hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false \
--hoodie-conf auto.offset.reset=earliest \
--table-type COPY_ON_WRITE \
--source-class com.<some_domain>.sources.ConfluentAvroKafkaSource \
--schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider \
--props s3://<artifacts>/config/hudi/clusteringjob.properties \
--source-limit 10000000 \
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://<some_domain>-tech.in/subjects/db_name.public.table_name-value/versions/latest \
--hoodie-conf hoodie.datasource.hive_sync.database=test_clustering \
--hoodie-conf hoodie.datasource.hive_sync.table=cow_table_name \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.precombine.field=__lsn \
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=db_name.public.table_name \
--hoodie-conf group.id=hudi-cow-continuous-loan-applications \
--source-ordering-field __lsn \
--target-base-path s3://<some_domain>/test/hudi_poc/continuous_cow/table_name \
--target-table cow_table_name \
--payload-class com.<some_domain>.payload.PostgresSoftDeleteDebeziumAvroPayload \
--hoodie-conf hoodie.bloom.index.update.partition.path=false \
--hoodie-conf hoodie.metrics.on=true \
--hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY \
--hoodie-conf hoodie.metrics.pushgateway.host=pushgateway.prod.<some_domain>-tech.in \
--hoodie-conf hoodie.metrics.pushgateway.port=443 \
--hoodie-conf hoodie.metrics.pushgateway.delete.on.shutdown=false \
--hoodie-conf hoodie.metrics.pushgateway.job.name=hudi_cow_continuous_table_name \
--hoodie-conf hoodie.metrics.pushgateway.random.job.name.suffix=false \
--hoodie-conf hoodie.metrics.reporter.metricsname.prefix=hudi \
--hoodie-conf hoodie.datasource.write.partitionpath.field='' \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator \
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor \
--transformer-class com.<some_domain>.transform.DebeziumTransformer \
--hoodie-conf hoodie.clean.automatic=true \
--hoodie-conf hoodie.clean.async=true \
--hoodie-conf hoodie.clean.max.commits=10 \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS \
--hoodie-conf hoodie.keep.max.commits=800 \
--hoodie-conf hoodie.keep.min.commits=600 \
--hoodie-conf hoodie.cleaner.hours.retained=1 \
--hoodie-conf hoodie.cleaner.parallelism=500 \
--hoodie-conf hoodie.clean.allow.multiple=false \
--hoodie-conf hoodie.cleaner.incremental.mode=true \
--hoodie-conf hoodie.archive.async=true \
--hoodie-conf hoodie.archive.automatic=true \
--hoodie-conf hoodie.archive.merge.files.batch.size=20 \
--hoodie-conf hoodie.commits.archival.batch=20 \
--hoodie-conf hoodie.archive.delete.parallelism=500 \
--hoodie-conf hoodie.archive.merge.enable=true \
--hoodie-conf hoodie.clustering.inline=false \
--hoodie-conf hoodie.bloom.index.use.metadata=false \
--hoodie-conf hoodie.index.type=BLOOM \
--hoodie-conf hoodie.metadata.enable=true \
--hoodie-conf hoodie.metadata.index.column.stats.parallelism=50 \
--hoodie-conf hoodie.metadata.compact.max.delta.commits=10 \
--hoodie-conf hoodie.metadata.index.column.stats.enable=true \
--hoodie-conf hoodie.metadata.metrics.enable=true \
--hoodie-conf hoodie.metadata.index.bloom.filter.file.group.count=20 \
--hoodie-conf hoodie.metadata.cleaner.commits.retained=60 \
--hoodie-conf hoodie.metadata.index.check.timeout.seconds=900 \
--hoodie-conf hoodie.metadata.populate.meta.fields=true \
--hoodie-conf hoodie.metadata.index.async=true \
--hoodie-conf hoodie.file.listing.parallelism=800 \
--hoodie-conf hoodie.metadata.index.bloom.filter.enable=true \
--hoodie-conf hoodie.metadata.index.bloom.filter.parallelism=500 \
--hoodie-conf hoodie.metadata.clean.async=true \
--hoodie-conf hoodie.metadata.keep.max.commits=80 \
--hoodie-conf hoodie.metadata.insert.parallelism=20 \
--hoodie-conf hoodie.metadata.keep.min.commits=70 \
--hoodie-conf hoodie.write.markers.type=DIRECT \
--hoodie-conf hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL \
--hoodie-conf hoodie.cleaner.policy.failed.writes=LAZY \
--hoodie-conf hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider \
--hoodie-conf hoodie.deltastreamer.source.kafka.enable.commit.offset=true \
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool \
--continuous
  1. Kill the application
  2. Restart the application and the job fails with above mentioned exception. When metadata is disabled, job runs fine

Expected behavior

The job is supposed to resume when restarted without any problem.

Environment Description

  • Hudi version : 0.11.1

  • Spark version : 3.1.1

  • Hive version : 3.1.2

  • Hadoop version :

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : No

Let me know if you need more info. Thanks in advance.

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 17 (9 by maintainers)

Most upvoted comments

@nsivabalan - we disable metadata all together to avoid the issues.

there are some issues wrt some of the metadata configs. Some of the metadata table configs are not meant to be overridden.

–hoodie-conf hoodie.metadata.compact.max.delta.commits=10
–hoodie-conf hoodie.metadata.cleaner.commits.retained=60
–hoodie-conf hoodie.metadata.populate.meta.fields=true
–hoodie-conf hoodie.metadata.clean.async=true
–hoodie-conf hoodie.metadata.keep.max.commits=80
–hoodie-conf hoodie.metadata.insert.parallelism=20
–hoodie-conf hoodie.metadata.keep.min.commits=70 \

Please do not override any of these configs. I have created a follow up ticket to not expose these configs https://issues.apache.org/jira/browse/HUDI-5582

Can you restart metadata from scratch w/ right set of configs and let us know how it goes. But I have some clarification as well. I see you are setting async indexing for metadata table to true(hoodie.metadata.index.async). So, are you spinning up a separate spark submit job (other than deltastreamer) to build the metadata indexes. If not, you may have to fix that as well. More details can be found here https://hudi.apache.org/docs/metadata_indexing.