spark: [BUG]: Error while running a .net application containing a UDF against remote databricks apache spark (e.g. Azure Databricks)
Describe the bug A clear and concise description of what the bug is.
To Reproduce
Prerequisites: simple .net application with a simple SQL query and a UDF, which runs properly against a local apache spark. Steps to reproduce the behavior:
- Setup connection to a remote databricks apache spark (I have tested against an Azure Databricks) on your local dev machine using “databricks-connect configure”. Test the connection using the “databricks-connect test”
- Test the .net application without UDF first: comment out UDF related line codes and make sure the simple query runs against configured remote Azure Databricks.
- Uncomment UDF related line codes.
- See error.
Error (copied from the command line)
[2020-01-06T08:04:29.4589366Z] [L-020538381857] [Error] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 15 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], )
[2020-01-06T08:04:29.4590049Z] [L-020538381857] [Error] [JvmBridge] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.139.64.14, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 342, in main
("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 0.4.0, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:534)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:488)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:533)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:539)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:55)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:300)
at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1$$anonfun$apply$2.apply(SparkServiceImpl.scala:84)
at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1$$anonfun$apply$2.apply(SparkServiceImpl.scala:78)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1.apply(SparkServiceImpl.scala:77)
at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1.apply(SparkServiceImpl.scala:74)
at com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:417)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:239)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:234)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:276)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
at com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:398)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
at com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
at com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342)
at com.databricks.service.SparkServiceImpl$.recordOperation(SparkServiceImpl.scala:54)
at com.databricks.service.SparkServiceImpl$.executePlan(SparkServiceImpl.scala:74)
at com.databricks.service.SparkServiceRPCHandler.com$databricks$service$SparkServiceRPCHandler$$execute0(SparkServiceRPCHandler.scala:487)
at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC0$1.apply(SparkServiceRPCHandler.scala:376)
at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC0$1.apply(SparkServiceRPCHandler.scala:317)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.service.SparkServiceRPCHandler.executeRPC0(SparkServiceRPCHandler.scala:317)
at com.databricks.service.SparkServiceRPCHandler$$anon$3.call(SparkServiceRPCHandler.scala:272)
at com.databricks.service.SparkServiceRPCHandler$$anon$3.call(SparkServiceRPCHandler.scala:260)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC$1.apply(SparkServiceRPCHandler.scala:304)
at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC$1.apply(SparkServiceRPCHandler.scala:284)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.service.SparkServiceRPCHandler.executeRPC(SparkServiceRPCHandler.scala:283)
at com.databricks.service.SparkServiceRPCServlet.doPost(SparkServiceRPCServer.scala:124)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.eclipse.jetty.server.Server.handle(Server.java:539)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 342, in main
("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.7 than that in driver 0.4.0, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:534)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:488)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:533)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:539)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
[2020-01-06T08:04:29.5052265Z] [L-020538381857] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 15 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], )
at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
Unhandled exception. System.Exception: JVM method execution failed: Nonstatic method showString failed for class 15 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], )
at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
at Microsoft.Spark.Interop.Ipc.JvmBridge.CallNonStaticJavaMethod(JvmObjectReference objectId, String methodName, Object[] args)
at Microsoft.Spark.Interop.Ipc.JvmObjectReference.Invoke(String methodName, Object[] args)
at Microsoft.Spark.Sql.DataFrame.Show(Int32 numRows, Int32 truncate, Boolean vertical)
at TestApp.Program.Main(String[] args) in C:\_projects\TestApp\test-data-science\Sources\TestApp\Program.cs:line 40
Expected behavior A .net application containing a UDF works properly using connection to a remote databricks.
Additional context Similar program in Python (same SQL query and similar UDF) works using the same configured remote connection properly.
Application has been started using spark-submit, i.e.
%SPARK_HOME%\bin\spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-2.4.x-0.7.0.jar dotnet TestApp.dll
About this issue
- Original URL
- State: open
- Created 4 years ago
- Reactions: 1
- Comments: 28 (7 by maintainers)
@zwitbaum : We had a discussion with Databricks today and will get back to you once there are any updates from their side.
@zwitbaum: Just FYI, we have opened a ticket with Azure Databricks, this is currently being investigated. I will update you once I hear back from them.
@zwitbaum We have been investigating on this scenario. I can reproduce the error you mentioned. Looks like there are some architecture differences which cause SparkDotnet worker is not invoked but PySpark worker. We may need more time investigating this but I am on top of this. Thanks for your patience and feel free to let me know if you have any new findings/updates!
Details of my repro, settings are as below:
And I query the data table stored on the cluster and run an UDF on it. I got the same error as you have. The logical plan from the submission are as below which SparkDotnet worker is passed correctly but not invoked.
@imback82 : we develop and debug our application on the windows local machine exactly as described in the link you’ve provided:
%SPARK_HOME%\bin\spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-2.4.x-0.7.0.jar debugBecause we want to query the data from the azure databricks we do following:
Because the data is stored not on the local machine, but in azure databricks instead, we need to configure a connection to this databricks.
We have configured the connection to the azure databricks using the “databricks-connect configure” and tested it by the “databricks-connect test” like described here: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect It requires, that the spark home is set properly:
setx SPARK_HOME "c:\users\zwitbaum\appdata\local\programs\python\python37\lib\site-packages\pyspark"Now we can run our .net app: – open command prompt and start %SPARK_HOME%\bin\spark-submit … debug like described above – in visual studio set a breakpoint e.g. on the first line in the main method and hit the F5. – the app runs properly and shows the queried data.
Now let’s extend the application with a UDF, like
Start the app in the visual studio again by hitting the F5. The app failed with exception, which you can find in the description of the issue.
That are the steps you can do to reproduce the error easily.
What we have additionally tested before to submit the bug:
– open command prompt and start %SPARK_HOME%\bin\spark-submit … debug like described above – comment the lines, which are relates to remote azure databricks:
Start the app in the visual studio again by hitting the F5. The app runs properly and shows the queried data. UDF works well converting the values to upper.
setx SPARK_HOME "c:\users\zwitbaum\appdata\local\programs\python\python37\lib\site-packages\pyspark"and testing it with “databricks-connect test” – run the python script locally: it works properlyI hope this information can be useful for you to investigate the bug.
@zwitbaum did you follow this https://github.com/dotnet/spark/blob/master/deployment/README.md#databricks?
cc: @suhsteve @Niharikadutta @elvaliuliuliu