tfx: StatisticsGen fails with OutOfMemoryError error while using Flink, external beam worker pool and large amount of data

Hi there!

Flink 1.10.3 Beam 2.27.0 TFX (and subpackages) 0.27.0 MiniO (S3) as storage

I’m develop some TFX pipeline: ImportExampleGen, StatisticsGen, etc.

I have ~5gb TF Example data and 10 million data rows.

I configured Flink cluster with using almost 15gb ram, but StatisticsGen fails with OutOfMemory error.

My dataset is not production. Production datasets is more huge.

Error occures on GenerateStatistics[train]/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/CombineCountsAndWeights/GroupByKey -> [2]GenerateStatistics[train]/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/{CombineCountsAndWeights, Rearrange} -> ([2]GenerateStatistics[train]/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/{Uniques_Keys, Uniques_CountPerFeatureName} -> ToKeyedWorkItem, ToKeyedWorkItem) task

Detailed error from Flink logs:

Exception in thread "grpc-default-executor-4" java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3520)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString$ArraysByteArrayCopier.copyFrom(ByteString.java:126)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(ByteString.java:362)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString.copyFrom(ByteString.java:372)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.CodedInputStream$StreamDecoder.readBytesSlowPath(CodedInputStream.java:2978)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.CodedInputStream$StreamDecoder.readBytes(CodedInputStream.java:2386)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements$Data.<init>(BeamFnApi.java:29659)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements$Data.<init>(BeamFnApi.java:29600)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements$Data$1.parsePartialFrom(BeamFnApi.java:30539)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements$Data$1.parsePartialFrom(BeamFnApi.java:30533)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.CodedInputStream$StreamDecoder.readMessage(CodedInputStream.java:2366)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements.<init>(BeamFnApi.java:29463)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements.<init>(BeamFnApi.java:29412)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements$1.parsePartialFrom(BeamFnApi.java:32943)
        at org.apache.beam.model.fnexecution.v1.BeamFnApi$Elements$1.parsePartialFrom(BeamFnApi.java:32937)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:86)
        at org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parseFrom(ProtoLiteUtils.java:223)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:215)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:118)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.MethodDescriptor.parseRequest(MethodDescriptor.java:299)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

How can i decrease memory usage?

Thanks!


Pipline options:

--runner=FlinkRunner
--flink_master=flink-cluster:8081
--streaming
--auto_balance_write_files_sharding_enabled
--faster_copy
--max_parallelism=5
--object_reuse
--environment_type=EXTERNAL
--environment_config=127.0.0.1:50000
--flink_submit_uber_jar
--parallelism=5
--sdk_worker_parallelism=5
--execution_mode_for_batch=BATCH_FORCED
--s3_access_key=valid-access-key
--s3_secret_access_key=valid-secret-key
--s3_endpoint_url=minio:9000"
--s3_verify=false

Flink memory configuration:

jobmanager.memory.process.size: 2048m
jobmanager.heap.size: 4096m
taskmanager.runtime.large-record-handler: true
taskmanager.runtime.hashjoin-bloom-filters: true
compiler.delimited-informat.max-line-samples: 100
taskmanager.memory.task.heap.size: 12gb
taskmanager.memory.managed.size: 4gb
taskmanager.memory.jvm-overhead.max: 4gb
taskmanager.memory.jvm-overhead.min: 256m
taskmanager.memory.jvm-overhead.fraction: 0.4
taskmanager.memory.task.off-heap.size: 4gb
taskmanager.memory.framework.heap.size: 512m
taskmanager.memory.framework.off-heap.size: 2gb
taskmanager.numberOfTaskSlots: 5
taskmanager.memory.network.fraction: 0.4
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 4gb

There is Flink task manager metrics from Grafana flink-task-manager-metrics As can you see, ImportExampleGen processed successfully. But, after mem idle, starts and fails StatisticsGen.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 25 (6 by maintainers)

Most upvoted comments

@ibzib Thank you for all the tips so far, I’ve added image_domain {} to my features and it seems to help.

Do you have recommended paralelism values that you have found to work well for TFX?

  • How much memory CPU per Taskmanager
  • How many slots per Taskmanager
  • How many Taskmanagers?
  • What job parallelism
  • What sdk_worker_parallelism
  • What do I scale, if I want to scale? Number of taskmanagers? Size of the taskmanagers? Number of slots?

It’s rather a lot of values to configure. I’ve read the flink documentation, and one taskmanager with two slots is roughly worth two taskmanagers with 1 slot half the size in memory.

As trial and error can be quite slow, was wondering if you can give some ballpark values for optimal TFX performance?

schema (StatsOptions.schema) with an image_domain

Thank’s I think I’ve encountered this and didn’t know there was a solution!

TFX does not test its components on Flink runner and the team has no expertise

I myself have had very mixed success with Flink since release 0.21 onward, it’s helped with larger datasets but it’s its own can of worms (jobs hanging inexplicably sometimes). The thing is, when datasets start hitting a few gigabytes one invariably ends up hitting OOM with DirectRunner even if your dataset in theory should be able to easily fit in RAM. E.g. in this issue https://github.com/tensorflow/tfx/issues/1907 it’s recommended to use Spark/Flink/Dataflow. I’ve also found https://github.com/tensorflow/transform/issues/143 to be a problem (atleast in 0.22 haven’t confirmed in 0.27).

I think Spark is even less likely to work in general than Flink and seems to have basically no support from the beam folk. As far as I can see this makes Dataflow the only option for running datasets over a few GB large which causes people who are just starting out and not on GCP to have a hard time.

I think it would be great to test with larger datasets with DirectRunner and Flink at-least to document some expectation of what should work in production with a chunky dataset. Being able to run more reliably on large datasets without Dataflow also means one can use Jupyter notebooks more readily without crashing which I offers a lot of value.

TLDR: I would love if you guys tested the components with a huge dataset if you don’t already.

@ibzib @ferryvg

Also to clarify what “support” means in a comment left by @arghyaganguly

Flink runner is currently not supported.Closing this issue.

As @ibzib correctly pointed out, the Flink runner is supported by Apache Beam, but TFX does not test its components on Flink runner and the team has no expertise on the Flink runner. TFX at this time can only provide best-effort support here (someone may be able to provide debugging hints like the comment I posted earlier, but it’s difficult for us to try to reproduce and drill into your issue).

OTOH, Apache Beam may have committed support for the Flink runner.