hadoopcryptoledger: Bug while executing example "Analyzing the Ethereum Blockchain with Apache Flink"

Hi everyone !

I followed the steps written in the wiki but I have an error when I execute the jar with Flink.

  1. I downloaded some Ethereum blockchain with geth and converted it to bin files.
  2. I put it on HDFS (Sandbox HDP 2.6)
  3. I built the example using the following command : sbt +clean +assembly +it:test. I do get the jar file in target directory
  4. I downloaded the flink tar.gz file here -> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz (I choose the Without bundled Hadoop version)
  5. I run the jar with the following command : bin/flink run example-hcl-flink-scala-ethereumblock.jar --input hdfs://localhost:8020/user/ethereum/input --output hdfs://localhost:8020/user/ethereum/output

And I get this output :

Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. Starting execution of program


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.RuntimeException: Cannot create Hadoop WritableTypeInfo. at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2155) at org.zuinnote.flink.ethereum.example.FlinkScalaEthereumBlockCounter$.countTotalTransactions(FlinkScalaEthereumBlockCounter.scala:41) at org.zuinnote.flink.ethereum.example.FlinkScalaEthereumBlockCounter$.main(FlinkScalaEthereumBlockCounter.scala:34) at org.zuinnote.flink.ethereum.example.FlinkScalaEthereumBlockCounter.main(FlinkScalaEthereumBlockCounter.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) … 9 more Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/io/Writable at org.apache.flink.api.java.typeutils.WritableTypeInfo.<init>(WritableTypeInfo.java:54) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2148) … 17 more Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.Writable at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) … 23 more

Am I missing something ?

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 16 (10 by maintainers)

Most upvoted comments

It should not be localhost if you work on a HDP vm. If you look in the log files you will see that it cannot resolve it.

You have to use instead of localhost the dns I Provided above. On older HDPs you had to replace localhost by sandbox.hortonworks.com

On 11. Jul 2018, at 11:06, Christophe notifications@github.com wrote:

I was missing the flink distribution with hadoop thanks. I am able to generate the jar with the modifications perfectly. But I have a new error when I execute the jar with flink when I execute this command :

bin/flink run example-hcl-flink-scala-ethereumblock.jar --input hdfs://localhost:8020/user/ethereum/input --output hdfs://localhost:8020/user/ethereum/output

I don’t understand where it comes, the directory for hdfs is ok, I have the hadoop compatibility jar in lib folder and I started the cluster with the script start-cluster.sh. I don’t think this is related to dependencies this time.

Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. Starting execution of program

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:540) at org.zuinnote.flink.ethereum.example.FlinkScalaEthereumBlockCounter$.main(FlinkScalaEthereumBlockCounter.scala:35) at org.zuinnote.flink.ethereum.example.FlinkScalaEthereumBlockCounter.main(FlinkScalaEthereumBlockCounter.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) … 12 more Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable. … 10 more Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error.] at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) … 4 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error.] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:225) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:209) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) … 5 more

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread.

besides a cluster, i tried with the above build.sbt and the HDP 2.6.5 (newer then yours, but it does not matter). I downloaded the flink 1.5 distribution with hadoop 2.7. I added the hadoopcompatibility for 1.5 in the lib folder of the flink distribution. I started the local flink cluster using the script start-cluster.sh in the bin folder of the flink distribution. I ran the following command (you need to adapt path according to the folders that you have): ./flink-1.5.0/bin/flink run example-hcl-flink-scala-ethereumblock.jar --input hdfs://sandbox-hdp.hortonworks.com:8020/home/input/ethereum/ethgenesis --output hdfs://sandbox-hdp.horto nworks.com:8020/home/output/ethereum

I find the correct output in /home/output/ethereum