sagemaker-python-sdk: FrameworkProcessor is broken with SageMaker Pipelines

Describe the bug Trying to use any Processor derived from FrameworkProcessor is bugged with SageMaker Pipelines. There is a problem with the command and entrypoint parameter, where command does not pass python3, causing the following error:

line 2: import: command not found

To reproduce

  1. Create a FrameworkProcessor (i.e. PyTorchProcessor, TensorFlowProcessor)
  2. Create a ProcessingStep and a Pipeline
  3. Execute it
  4. See it fail

Expected behavior The pipeline should go through.

Screenshots or logs

Screenshot from Pipelines: image

Logs from CloudWatch:

/opt/ml/processing/input/entrypoint/inference_with_processing.py: line 2: import: command not found
/opt/ml/processing/input/entrypoint/inference_with_processing.py: line 3: import: command not found
/opt/ml/processing/input/entrypoint/inference_with_processing.py: line 4: import: command not found
/opt/ml/processing/input/entrypoint/inference_with_processing.py: line 5: import: command not found
/opt/ml/processing/input/entrypoint/inference_with_processing.py: line 6: from: command not found

System information A description of your system. Please provide:

  • SageMaker Python SDK version: 2.57.0
  • Framework name (eg. PyTorch) or algorithm (eg. KMeans): Every Framework
  • Framework version: Every version supported by SM
  • Python version: 3.8
  • CPU or GPU: CPU and GPU
  • Custom Docker image (Y/N): N

Additional context N/A

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 11
  • Comments: 23 (6 by maintainers)

Most upvoted comments

Still the case for now.

However, there is now a possibility to use the new sagemaker.workflow.pipeline_context.PipelineSession to have the .run() generate the arguments without actually running the Processing job. Tried in a Jupyter Notebook with a custom FrameworkProcessor, but should work with any FrameworkProcessor. Your code would look like:

from sagemaker.sklearn import SKLearn, SKLearnProcessor
from sagemaker.processing import FrameworkProcessor  # or change with any other FrameworkProcessor like HuggingFaceProcessor
from sagemaker.workflow.pipeline_context import PipelineSession

session = PipelineSession()

skpv2 = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version='0.23-1',
    role = get_execution_role(),
    instance_count=1,
    instance_type='ml.m5.large',
    sagemaker_session = session
)

step_args = skpv2.run(
    code='processing.py',
    source_dir="code", # add processing.py and requirements.txt here
    inputs=[...], outputs=[...]
)

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep

processing_step = ProcessingStep(
    name="MyProcessingStep",
    step_args=step_args
)

# [ define the other steps if any ]

pipeline = Pipeline(steps=[...])

Just make sure to update the SageMaker Python SDK to the latest version 😃

UPDATE 2: ScriptProcessor does work, however there is no support for source_dir parameter (as commented above by @athewsey ). If you need custom dependencies or for multi-files script, create your own custom container by extending the SM images for TF/PyTorch/HuggingFace/MXNet.

For those who need some sort of directions on how to change from FrameworkProcessor to ScriptProcessor, here is an example for TF2.3:

##### COMMENT THE TENSORFLOWPROCESSOR
 
# from sagemaker.tensorflow import TensorFlowProcessor
# tp = TensorFlowProcessor(
#     framework_version='2.3',
#     role = get_execution_role(),
#     instance_count=1,
#     instance_type='ml.m5.large',
#     base_job_name='DSM-TF-Demo-Process',
#     py_version='py37'
# )
 
 
##### AND REPLACE WITH
 
from sagemaker.image_uris import retrieve
from sagemaker.processing import ScriptProcessor
from sagemaker import get_execution_role
 
image_uri = retrieve(
    framework='tensorflow', 
    region='eu-west-1', 
    version='2.3', 
    py_version='py37', 
    image_scope='training',
    instance_type='ml.m5.xlarge'
)
sp = ScriptProcessor(
    role=get_execution_role(),
    image_uri=image_uri,
    command=['python3'],
    instance_count=1,
    instance_type='ml.m5.xlarge'
)
# Now, either run sp.run() or create a sagemaker.workflow.steps.ProcessingStep() , as needed

A very short example of a Dockerfile to extend the default TF container and install dependencies (not tested yet):

FROM 763104351884.dkr.ecr.eu-west-1.amazonaws.com/tensorflow-training:2.3-cpu-py37
COPY requirements.txt /opt/ml/processing/input/code/requirements.txt
RUN pip install -r /opt/ml/processing/input/code/requirements.txt

The FrameworkProcessor has a method called get_run_args (doc here) that is designed to help integrate this processor to the ProcessingStep, which can be put within a SageMaker pipeline. If you want to add pip dependencies, you can add a requirements.txt file under BASE_DIR.

Here is a simplified code that helps to connect the dots between: FrameworkProcessor, get_run_args, ProcessingStep and Pipeline.


from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    FrameworkProcessor
)

from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
    ProcessingStep
)

from sagemaker.tensorflow import TensorFlow

BASE_DIR = os.path.dirname(os.path.realpath(__file__))

preprocessing_processor = FrameworkProcessor(
    estimator_cls=TensorFlow,
    framework_version='2.4.3',
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    py_version='py37',
    command=["python3"],
    base_job_name="some-preprocessing-step"
)

train_data_in_s3 = ProcessingOutput(
    source="/opt/ml/processing/output/train/",
    destination=Join(
        on="/",
        values=[
            "s3:/",
            data_s3_bucket,
            os.environ["SAGEMAKER_PROJECT_NAME"],
            data_s3_key,
            'train/'
        ],
    ),
    output_name='train',
    s3_upload_mode='Continuous',
)

test_data_in_s3 = ProcessingOutput(
    source="/opt/ml/processing/output/test/",
    destination=Join(
        on="/",
        values=[
            "s3:/",
            data_s3_bucket,
            os.environ["SAGEMAKER_PROJECT_NAME"],
            data_s3_key,
            'test/'
        ],
    ),
    output_name='test',
    s3_upload_mode='Continuous',
)

data_s3_key_in_project = Join(
    on="/",
    values=[
        os.environ["SAGEMAKER_PROJECT_NAME"],
        data_s3_key
    ],
)

preprocessing_run_args = preprocessing_processor.get_run_args(
    code="preprocess.py",
    source_dir=BASE_DIR,
    inputs=[],
    outputs=[train_data_in_s3, test_data_in_s3],
    arguments=[
        '--data-s3-bucket', "your bucket name",
        '--data-s3-key', "your key"
    ]
)

preprocessing_step = ProcessingStep(
    name="your-preprocessing-step-name",
    processor=preprocessing_processor,
    inputs=preprocessing_run_args.inputs,
    outputs=preprocessing_run_args.outputs,
    job_arguments=preprocessing_run_args.arguments,
    code=preprocessing_run_args.code
)

pipeline_name = "your-pipeline-name"

distributed_ml_training_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        # your pipeline parameters here
    ],
    steps=[preprocessing_step, ...]
)

If you are using this inside a SageMaker Studio MLOps Project, make sure to declare your requirements.txt inside a MANIFEST.in file to be shipped with the library: https://packaging.python.org/en/latest/guides/using-manifest-in/.

Is this issue fixed?

Thanks @dgallitelli

We would encourage users to adopt this new way to construct TrainingStep, ProcessingStep, TransformStep, TuningStep, and ModelStep.

We have a readthedocs about to releasing to introduce all the improvements we made to the SageMaker pythonSDK Pipeline module.