mlflow: [BUG] autolog not working from pyspark because of _capture_modules.py stuck

Willingness to contribute

No. I cannot contribute a bug fix at this time.

MLflow version

1.28/1.29

System information

running on kubernetes v1.20.8 ubuntu 20.04, jupyterlab 3.4.3, python 3.8.13 spark 3.2.1 (pyspark) minio quay.io/minio/minio:RELEASE.2022-02-18T01-50-10Z

Describe the problem

Hi team, I’m using mlflow to log a pysparkml model. I’ve got a mlflow tracking which is configured to use a local minio as storage.

Model logging is working from pure python (sklearn) while there seems to be a problem

  • when using mlflow.spark.log without specifying a conda_env, ie
mlflow.spark.log_model(pipeline_model,"model",conda_env=conda_env)
  • or when using sparkml autolog

In both cases it seems, after correctly logging the sparkml pipeline files to minio, to call a piece of code “_capture_modules.py” to prepare the main file MLmodel:

jovyan 7228 0.5 0.4 583672 109928 ? Sl 09:41 0:04 /home/jovyan/condaenv/edge/bin/python3.8 /home/jovyan/condaenv/edge/lib/python3.8/site-packages/mlflow/utils/_capture_modules.py --model-path /tmp/tmprxa5r8da --flavor spark --output-file /tmp/tmporxuqx4o/imported_modules.txt --sys-path [“/home/jovyan/notebook/working”, “/home/jovyan/condaenv/edge/lib/python3.8/site-packages/git/ext/gitdb”, “/tmp/spark-b0b87cac-185a-455a-a4f7-2541285e2836/userFiles-2989435d-54ac-48f2-92e7-86ae1cfb781a”, “/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip”, “/usr/local/spark/python”, “/home/jovyan/notebook/working”, “/home/jovyan/condaenv/edge/lib/python38.zip”, “/home/jovyan/condaenv/edge/lib/python3.8”, “/home/jovyan/condaenv/edge/lib/python3.8/lib-dynload”, “”, “/home/jovyan/condaenv/edge/lib/python3.8/site-packages”, “/home/jovyan/condaenv/edge/lib/python3.8/site-packages/gitdb/ext/smmap”]

and it starts a spark instance which seems not properly configured:

jovyan 7237 2.6 1.5 6196088 368348 ? Sl 09:41 0:19 /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/* -Xmx1g -Dio.netty.tryReflectionSetAccessible=true org.apache.spark.deploy.SparkSubmit --conf spark.master=local[1] --conf spark.databricks.io.cache.enabled=False --conf spark.driver.bindAddress=127.0.0.1 --conf spark.python.worker.reuse=True --conf spark.executor.allowSparkContext=true pyspark-shell

and try to call s3 of amazon.com instead of local minio

see stacktrace below

It usually end run in a 1.5 h after a timeout logging the remaining files (MLmodel file and “brothers”)

Is there any chance to pass spark and minio configuration to this process?

Tracking information

No response

Code to reproduce issue

from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import VectorAssembler, StandardScaler from pyspark.ml import Pipeline from pyspark.sql import SparkSession from sklearn.datasets import load_iris

import mlflow

mlflow.set_experiment(“spark_pipeline_example”)

mlflow.pyspark.ml.autolog()

df = load_iris(as_frame=True).frame.rename(columns={“target”: “label”}) df = sc.createDataFrame(df) train, test = df.randomSplit([0.8, 0.2])

assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol=“features”) scaler = StandardScaler(inputCol=assembler.getOutputCol(), outputCol=“scaledFeatures”) lor = LogisticRegression(maxIter=5, featuresCol=scaler.getOutputCol())

Non-neseted pipeline

pipeline = Pipeline(stages=[assembler, scaler, lor]) with mlflow.start_run(): pipeline_model = pipeline.fit(train)

predictions

columns = [“features”, “prediction”] pipeline_model.transform(test).select(columns).show()

Stack trace

from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import VectorAssembler, StandardScaler from pyspark.ml import Pipeline from pyspark.sql import SparkSession from sklearn.datasets import load_iris

import mlflow

mlflow.set_experiment(“spark_pipeline_example”)

mlflow.pyspark.ml.autolog()

df = load_iris(as_frame=True).frame.rename(columns={“target”: “label”}) df = sc.createDataFrame(df) train, test = df.randomSplit([0.8, 0.2])

assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol=“features”) scaler = StandardScaler(inputCol=assembler.getOutputCol(), outputCol=“scaledFeatures”) lor = LogisticRegression(maxIter=5, featuresCol=scaler.getOutputCol())

Non-neseted pipeline

pipeline = Pipeline(stages=[assembler, scaler, lor]) with mlflow.start_run(): pipeline_model = pipeline.fit(train)

predictions

columns = [“features”, “prediction”] pipeline_model.transform(test).select(columns).show()

Other info / logs

2022-09-27 09:53:14,412 +0000 [Thread-4] DEBUG (MainClientExec.java:234) - Opening connection {s}->https://miniosergio.s3.amazonaws.com:443 2022-09-27 09:53:14,413 +0000 [Thread-4] DEBUG (DefaultHttpClientConnectionOperator.java:139) - Connecting to miniosergio.s3.amazonaws.com/52.217.88.84:443 2022-09-27 09:53:14,413 +0000 [Thread-4] DEBUG (SSLConnectionSocketFactory.java:366) - Connecting socket to miniosergio.s3.amazonaws.com/52.217.88.84:443 with timeout 5000 2022-09-27 09:53:19,419 +0000 [Thread-4] DEBUG (ClientConnectionManagerFactory.java:82) - java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) at com.amazonaws.http.conn.$Proxy15.connect(Unknown Source) at com.amazonaws.thirdparty.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) at com.amazonaws.thirdparty.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) at com.amazonaws.thirdparty.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) at com.amazonaws.thirdparty.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at com.amazonaws.thirdparty.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at com.amazonaws.thirdparty.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1333) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5227) at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:6189) at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:6162) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5211) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5173) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1360) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$6(S3AFileSystem.java:2066) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:412) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:375) at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2056) at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2032) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3273) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3185) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3053) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760) at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4263) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException: Connect to miniosergio.s3.amazonaws.com:443 [miniosergio.s3.amazonaws.com/52.217.88.84] failed: connect timed out at com.amazonaws.thirdparty.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151) at com.amazonaws.thirdparty.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) … 49 more Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at com.amazonaws.thirdparty.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368) at com.amazonaws.thirdparty.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) … 50 more

What component(s) does this bug affect?

  • area/artifacts: Artifact stores and artifact logging
  • area/build: Build and test infrastructure for MLflow
  • area/docs: MLflow documentation pages
  • area/examples: Example code
  • area/model-registry: Model Registry service, APIs, and the fluent client calls for Model Registry
  • area/models: MLmodel format, model serialization/deserialization, flavors
  • area/pipelines: Pipelines, Pipeline APIs, Pipeline configs, Pipeline Templates
  • area/projects: MLproject format, project running backends
  • area/scoring: MLflow Model server, model deployment tools, Spark UDFs
  • area/server-infra: MLflow Tracking server backend
  • area/tracking: Tracking Service, tracking client APIs, autologging

What interface(s) does this bug affect?

  • area/uiux: Front-end, user experience, plotting, JavaScript, JavaScript dev server
  • area/docker: Docker use across MLflow’s components, such as MLflow Projects and MLflow Models
  • area/sqlalchemy: Use of SQLAlchemy in the Tracking Service or Model Registry
  • area/windows: Windows support

What language(s) does this bug affect?

  • language/r: R APIs and clients
  • language/java: Java APIs and clients
  • language/new: Proposals for new client languages

What integration(s) does this bug affect?

  • integrations/azure: Azure and Azure ML integrations
  • integrations/sagemaker: SageMaker integrations
  • integrations/databricks: Databricks integrations

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 28 (9 by maintainers)

Most upvoted comments

The issue is probably: https://github.com/mlflow/mlflow/blob/3d6827f1775d91bffe401c4dfa57c262eeff7333/mlflow/utils/_capture_modules.py#L115

We should clean ‘PYSPARK_GATEWAY_PORT’ and ‘PYSPARK_GATEWAY_SECRET’ in OSS spark case as well. Otherwise the child process will try to connect to the spark driver process in parent process, which causes issues.