azure-sdk-for-python: ADF v2 Error submitting action scripts and accessing ADLS files in Spark Activity. azure-mgmt-datafactory

My HDInsightSparkActivity() is failing to run the entry_file_path object with an error ‘Spark job failed, batch id:0’ in the Pipeline runs Monitor. I am also receiving an error when trying to run pySpark code against ALDS in Jupyter on the cluster that has been provisioned with the SDK. The error I am receiving is: ‘fs.adl.oauth2.client.id not found’ in the notebook. I am also receiving an error on the head node of the cluster when trying to install libraries from Conda-Forge: ‘OSError(13, ‘Permission denied’)’

My Service Principal ID and Key has been validated and has access to each of the services. So the cluster that I have spun up with the SDK, does not have access to run commands on the head node, access its primary Blob storage, or access data from an Azure service like ADLS.

Here is a portal snapshot of the additional ADLS storage access that I am not able to provision with the SDK. Although this should have nothing to do with the cluster executing post-action scripts from the blob storage, I believe that this may have something to do with the errors.

20180321_153237

Here is the code I am using to create the cluster: (please assume I have imported all the associated libraries and imported all the subscription, resource group, data factory, credentials, and client names)

ls_name = 'storageLinkedService'
storage_string = SecureString('MyBlobStorage')
ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
    
#Create HDinsights spark cluster ondemand linked service
    spark_ls_name = 'newrunSDKservice'
    spark_cluster = HDInsightOnDemandLinkedService(cluster_size=4
                                                , time_to_live='08:00:00'
                                                , version= '3.6'
                                                , linked_service_name=LinkedServiceReference(ls_name)
                                                , host_subscription_id = subscription_id
                                                , tenant=tenantkey
                                                , cluster_resource_group=rg
                                                , service_principal_id=spID
                                                , service_principal_key=spKey
                                                , cluster_user_name=clusterUser
                                                , cluster_password=clusterPass
                                                , cluster_ssh_user_name=clusterSSH
                                                , cluster_ssh_password=sshPass
                                                , cluster_type='spark'
                                                , spark_version='2.1'
                                                , cluster_name_prefix= 'nrun' 
                                                )
spark_ls_ref= LinkedServiceReference(spark_ls_name)
spark_ls = adf_client.linked_services.create_or_update(rg_name, df_name, spark_ls_name, spark_cluster)
print_item(spark_ls)

#Spark Activtiy
    root_path= 'input/actions' 
    entry_file_path = 'localcp.sh'
    #Create Action Activity
    spark_act_name= 'pySpark-adls'
    spark_activity = HDInsightSparkActivity(spark_act_name
                                                , root_path
                                                , entry_file_path
                                                , linked_service_name=spark_ls_ref
                                                , get_debug_info='Failure'
                                                , spark_job_linked_service=ds_ls
                                                , proxy_user=clusterUser)
 
spark_p_name = 'adls-spark'
params_for_spark_pipeline = {}
spark_p_obj = PipelineResource(activities=[spark_activity], parameters=params_for_pipeline)
spark_p = adf_client.pipelines.create_or_update(rg_name, df_name, spark_p_name, spark_p_obj)
print_item(spark_p)

spark_run_response = adf_client.pipelines.create_run(rg_name, df_name , spark_p_name, {})
spark_pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, spark_run_response.run_id)

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 16 (8 by maintainers)

Most upvoted comments

ADLS V2 support has been added to HDI OnDemand.