amazon-kinesis-data-analytics-examples: Run python process failed when running tumbling-window.py on KDA

When running the tumbling window sample (python) the application does not start in KDA. Followed the instructions in the Docs. I see the following logs in CloudWatch:

{
    "locationInformation": "org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:107)",
    "logger": "org.apache.flink.client.python.PythonDriver",
    "message": "Run python process failed",
    "throwableInformation": "java.lang.RuntimeException: Python process exits with code: 1\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:104) ~[flink-python_2.12-1.11.1.jar:1.11.1]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]\n\tat jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]\n\tat java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n\tat java.lang.Thread.run(Thread.java:834) [?:?]\n",
    "threadName": "Flink-DispatcherRestEndpoint-thread-2",
    "applicationARN": "arn:aws:kinesisanalytics:us-east-1:XXXXXXXXXXXX:application/pythonsample",
    "applicationVersionId": "5",
    "messageSchemaVersion": "1",
    "messageType": "ERROR"
}
{
    "locationInformation": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:206)",
    "logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
    "message": "Error occurred when trying to start the job",
    "throwableInformation": "org.apache.flink.client.program.ProgramAbortException\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:111) ~[?:?]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]\n\tat jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]\n\tat jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]\n\tat java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203) ~[flink-dist_2.12-1.11.1.jar:1.11.1]\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]\n\t... 6 more\nWrapped by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) [?:?]\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) [?:?]\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]\n\tat java.lang.Thread.run(Thread.java:834) [?:?]\n",
    "threadName": "Flink-DispatcherRestEndpoint-thread-2",
    "applicationARN": "arn:aws:kinesisanalytics:us-east-1:XXXXXXXXXXXX:application/pythonsample",
    "applicationVersionId": "5",
    "messageSchemaVersion": "1",
    "messageType": "ERROR"
}
{
    "locationInformation": "org.apache.flink.runtime.rest.handler.AbstractHandler.handleException(AbstractHandler.java:210)",
    "logger": "org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler",
    "message": "Exception occurred in REST handler: Could not execute application.",
    "threadName": "Flink-DispatcherRestEndpoint-thread-2",
    "applicationARN": "arn:aws:kinesisanalytics:us-east-1:299003995950:application/pythonsample",
    "applicationVersionId": "5",
    "messageSchemaVersion": "1",
    "messageType": "ERROR"
}
{
    "applicationARN": "arn:aws:kinesisanalytics:us-east-1:XXXXXXXXXXXX:application/pythonsample",
    "applicationVersionId": 5,
    "message": "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:207)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException\n\tat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)\n\tat java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)\n\t... 6 more\nCaused by: org.apache.flink.client.program.ProgramAbortException\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:111)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:201)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:203)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)\n\t... 6 more\n",
    "messageType": "ERROR",
    "messageSchemaVersion": "1",
    "errorCode": "CodeError.InvalidApplicationCode"
}

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 23 (3 by maintainers)

Most upvoted comments

Hi all, thank you for your patience with this issue.

The Amazon Kinesis SQL Connector for Flink (v2.0.3) has been indexed into Maven and is recommended for use within your PyFlink applications reading from Kinesis.

You can find the jar here.

I will mark this issue as closed, as including this jar will resolve any issues within the thread.

Hi @flo-mair without any more context into the error (are there any more logs that are helpful?) I am wondering if you provided the environment variables required to run the application?

Search Cloudwatch for errors containing PythonDriver which has helped me in debugging when my pyflink application has errors.

Thanks @jeremyber-aws! I have succeeded to run my job by adding some dependencies.

<includes>
    <include>com.amazonaws:*</include>
    <include>com.google.protobuf:*</include>
    <include>org.apache.httpcomponents:*</include>
    <include>software.amazon.awssdk:*</include>
    <include>software.amazon.eventstream:*</include>
    <include>software.amazon.ion:*</include>
    <include>org.reactivestreams:*</include>
    <include>io.netty:*</include>
    <include>com.typesafe.netty:*</include>
    <include>com.amazonaws:amazon-kinesis-producer</include>
    <include>com.google.guava:*</include>
    <include>commons-logging:*</include>
    <include>com.fasterxml.jackson.core:*</include>
    <!-- Added -->
    <include>com.fasterxml.jackson.dataformat:*</include>
    <include>joda-time:joda-time</include>
</includes>