zenml: [BUG]: StepInterfaceError: Unable to find materializer for output 'output' of type `` in step 'trainer'.

Contact Details [Optional]

vishalisrinivasan97@gmail.com

System Information

ZenML version: 0.13.1 Install path: /home/<username>/.pyenv/versions/3.7.6/lib/python3.7/site-packages/zenml Python version: 3.7.6 Platform information: {‘os’: ‘linux’, ‘linux_distro’: ‘ubuntu’, ‘linux_distro_like’: ‘debian’, ‘linux_distro_version’: ‘18.04’} Environment: native Integrations: [‘kubeflow’, ‘kubernetes’, ‘tensorboard’, ‘tensorflow’]

What happened?

When I try to implement this code “https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration”; at the step “Run pipeline”: _

Initialize the pipeline

first_pipeline = mnist_pipeline( importer=importer(), normalizer=normalizer(), trainer=trainer(), evaluator=evaluator(), )

first_pipeline.run()

_

When I try to run the above snippet, I face StepInterfaceError

StepInterfaceError: Unable to find materializer for output ‘output’ of type <class 'keras.engine.training.Model'> in step ‘trainer’. Please make sure to either explicitly set a materializer for step outputs using step.with_return_materializers(...) or registering a default materializer for specific types by subclassing BaseMaterializer and setting its ASSOCIATED_TYPES class variable. For more information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.

Reproduction steps

You can get the entire source code from here: https://github.com/zenml-io/zenml/tree/main/examples/kubeflow_pipelines_orchestration

Initialize the pipeline

first_pipeline = mnist_pipeline( importer=importer(), normalizer=normalizer(), trainer=trainer(), evaluator=evaluator(), )

first_pipeline.run()

Relevant log output

Creating run for pipeline: mnist_pipeline
Cache enabled for pipeline mnist_pipeline
Using stack default to run pipeline mnist_pipeline...

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ <ipython-input-19-3e8dc075b972>:9 in <module>                                                    │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/pipelines/base_pipeline.py:449 in run               │
│                                                                                                  │
│   446 │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True                                 │
│   447 │   │   try:                                                                               │
│   448 │   │   │   return_value = stack.deploy_pipeline(                                          │
│ ❱ 449 │   │   │   │   self, runtime_configuration=runtime_configuration                          │
│   450 │   │   │   )                                                                              │
│   451 │   │   finally:                                                                           │
│   452 │   │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False                            │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/stack/stack.py:832 in deploy_pipeline               │
│                                                                                                  │
│   829 │   │   )                                                                                  │
│   830 │   │                                                                                      │
│   831 │   │   return_value = self.orchestrator.run(                                              │
│ ❱ 832 │   │   │   pipeline, stack=self, runtime_configuration=runtime_configuration              │
│   833 │   │   )                                                                                  │
│   834 │   │                                                                                      │
│   835 │   │   # Put pipeline level cache policy back to make sure the next runs                  │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/base_orchestrator.py:250 in run       │
│                                                                                                  │
│   247 │   │   # Create the protobuf pipeline which will be needed for various reasons            │
│   248 │   │   # in the following steps                                                           │
│   249 │   │   pb2_pipeline: Pb2Pipeline = Compiler().compile(                                    │
│ ❱ 250 │   │   │   create_tfx_pipeline(pipeline, stack=stack)                                     │
│   251 │   │   )                                                                                  │
│   252 │   │                                                                                      │
│   253 │   │   self._configure_node_context(                                                      │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/orchestrators/utils.py:50 in create_tfx_pipeline    │
│                                                                                                  │
│    47 │   │   │   the pipeline.                                                                  │
│    48 │   """                                                                                    │
│    49 │   # Connect the inputs/outputs of all steps in the pipeline                              │
│ ❱  50 │   zenml_pipeline.connect(**zenml_pipeline.steps)                                         │
│    51 │                                                                                          │
│    52 │   tfx_components = {                                                                     │
│    53 │   │   step.name: step.component for step in zenml_pipeline.steps.values()                │
│ <ipython-input-18-3f1c2fa0b83f>:11 in mnist_pipeline                                             │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:707 in __call__                  │
│                                                                                                  │
│   704 │   │   }                                                                                  │
│   705 │   │                                                                                      │
│   706 │   │   # make sure we have registered materializers for each output                       │
│ ❱ 707 │   │   materializers = self.get_materializers(ensure_complete=True)                       │
│   708 │   │                                                                                      │
│   709 │   │   # Prepare the output artifacts and spec                                            │
│   710 │   │   for key, value in self.OUTPUT_SIGNATURE.items():                                   │
│                                                                                                  │
│ /usr/local/lib/python3.7/dist-packages/zenml/steps/base_step.py:366 in get_materializers         │
│                                                                                                  │
│   363 │   │   │   │   │   │   f"registering a default materializer for specific "                │
│   364 │   │   │   │   │   │   f"types by subclassing `BaseMaterializer` and setting "            │
│   365 │   │   │   │   │   │   f"its `ASSOCIATED_TYPES` class variable.",                         │
│ ❱ 366 │   │   │   │   │   │   url="https://docs.zenml.io/developer-guide/advanced-usage/materi   │
│   367 │   │   │   │   │   )                                                                      │
│   368 │   │                                                                                      │
│   369 │   │   return materializers                                                               │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'output' of type `<class 'keras.engine.training.Model'>`
in step 'trainer'. Please make sure to either explicitly set a materializer for step outputs using 
`step.with_return_materializers(...)` or registering a default materializer for specific types by subclassing 
`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more information, visit 
https://docs.zenml.io/developer-guide/advanced-usage/materializer.

Code of Conduct

  • I agree to follow this project’s Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 15 (10 by maintainers)

Most upvoted comments

@dudeperf3ct I see… sorry I misunderstood! Thank you so much for your help!

@ahmadmustafaanis yes, for every object that passes through the pipeline, you need to tell zenml how to serialize/deserialize it. In this way this object can be cached and stored. In zenml this is done using something called a Materializer. For most Python objects you don’t need to write your own Materializer, these are already builtin within zenml. But for custom objects you’d need to write a custom materializer for it.

Here’s a guide how to write a custom materializer https://docs.zenml.io/v/0.6.0/guides/index/custom-materializer

A Keras model is not a standard Python object so you’d need to write one. But this has been done by someone in zenml and to use it you’ll have to install the integration with zenml integration install tensorflow.

I am getting the same error for my simple code. Here is my code

from zenml.steps import step, Output

@step
def data_loader() -> Output(x_train=np.ndarray, y_train=np.ndarray, x_test=np.ndarray, y_test=np.ndarray):
    """
    return MNIST x_train, y_train, x_test, y_test
    """
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    
    return x_train, y_train, x_test, y_test

@step
def initialize_model() -> Output(untrained_model=tf.keras.Sequential):
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(num_classes, activation="softmax"),
            ])
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
    return model

@step
def preprocess_data(x:np.ndarray, y:np.ndarray) ->Output(x_processed= np.ndarray, y_processed=np.ndarray):
    x_processed = x.astype("float32") / 255
    x_processed = np.expand_dims(x_processed, -1)
    
    y_processed = keras.utils.to_categorical(y, num_classes)
    
    return x_processed, y_processed

@step
def train_model(x_train:np.ndarray, y_train:np.ndarray, untrained_model:tf.keras.Sequential) -> Output(trained_model=tf.keras.Sequential, history=dict):
    history = untrained_model.fit(x_train, y_train, epochs=epochs)
    return untrained_model, history 

@step
def eval_model(trained_model: tf.keras.Sequential, x_test:np.ndarray, y_test:np.ndarray) -> None:
    trainedmodel.evaluate(x_test, y_test)

from zenml.pipelines import pipeline

@pipeline
def runner(data_loader, initialize_model, preprocess_data, train_model):
    x_train, y_train, x_test, y_test = data_loader()
    x_train_processed, y_train_processed = preprocess_data(x_train, y_train)    
    model = initialize_model()
    model, history = train_model(x_train_processed, y_train_processed, model)
    
mnist_pipeline = runner(data_loader(), initialize_model(), preprocess_data(), train_model())

mnist_pipeline.run()

The error is

Creating run for pipeline: runner
Cache enabled for pipeline runner
Using stack default to run pipeline runner...
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /tmp/ipykernel_26594/730396930.py:1 in <cell line: 1>                                            │
│                                                                                                  │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/730396930.py'                         │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/pipelines/base_pipeline.py:44 │
│ 8 in run                                                                                         │
│                                                                                                  │
│   445 │   │   # behavior                                                                         │
│   446 │   │   constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True                                 │
│   447 │   │   try:                                                                               │
│ ❱ 448 │   │   │   return_value = stack.deploy_pipeline(                                          │
│   449 │   │   │   │   self, runtime_configuration=runtime_configuration                          │
│   450 │   │   │   )                                                                              │
│   451 │   │   finally:                                                                           │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/stack/stack.py:831 in         │
│ deploy_pipeline                                                                                  │
│                                                                                                  │
│   828 │   │   │   pipeline=pipeline, runtime_configuration=runtime_configuration                 │
│   829 │   │   )                                                                                  │
│   830 │   │                                                                                      │
│ ❱ 831 │   │   return_value = self.orchestrator.run(                                              │
│   832 │   │   │   pipeline, stack=self, runtime_configuration=runtime_configuration              │
│   833 │   │   )                                                                                  │
│   834                                                                                            │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/base_orchestrat │
│ or.py:250 in run                                                                                 │
│                                                                                                  │
│   247 │   │   # Create the protobuf pipeline which will be needed for various reasons            │
│   248 │   │   # in the following steps                                                           │
│   249 │   │   pb2_pipeline: Pb2Pipeline = Compiler().compile(                                    │
│ ❱ 250 │   │   │   create_tfx_pipeline(pipeline, stack=stack)                                     │
│   251 │   │   )                                                                                  │
│   252 │   │                                                                                      │
│   253 │   │   self._configure_node_context(                                                      │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/orchestrators/utils.py:50 in  │
│ create_tfx_pipeline                                                                              │
│                                                                                                  │
│    47 │   │   │   the pipeline.                                                                  │
│    48 │   """                                                                                    │
│    49 │   # Connect the inputs/outputs of all steps in the pipeline                              │
│ ❱  50 │   zenml_pipeline.connect(**zenml_pipeline.steps)                                         │
│    51 │                                                                                          │
│    52 │   tfx_components = {                                                                     │
│    53 │   │   step.name: step.component for step in zenml_pipeline.steps.values()                │
│                                                                                                  │
│ /tmp/ipykernel_26594/100900222.py:7 in runner                                                    │
│                                                                                                  │
│ [Errno 2] No such file or directory: '/tmp/ipykernel_26594/100900222.py'                         │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:707 in     │
│ __call__                                                                                         │
│                                                                                                  │
│   704 │   │   }                                                                                  │
│   705 │   │                                                                                      │
│   706 │   │   # make sure we have registered materializers for each output                       │
│ ❱ 707 │   │   materializers = self.get_materializers(ensure_complete=True)                       │
│   708 │   │                                                                                      │
│   709 │   │   # Prepare the output artifacts and spec                                            │
│   710 │   │   for key, value in self.OUTPUT_SIGNATURE.items():                                   │
│                                                                                                  │
│ /home/ahmad/anaconda3/envs/zenml/lib/python3.8/site-packages/zenml/steps/base_step.py:357 in     │
│ get_materializers                                                                                │
│                                                                                                  │
│   354 │   │   │   │   materializers[output_name] = materializer                                  │
│   355 │   │   │   else:                                                                          │
│   356 │   │   │   │   if ensure_complete:                                                        │
│ ❱ 357 │   │   │   │   │   raise StepInterfaceError(                                              │
│   358 │   │   │   │   │   │   f"Unable to find materializer for output "                         │
│   359 │   │   │   │   │   │   f"'{output_name}' of type `{output_type}` in step "                │
│   360 │   │   │   │   │   │   f"'{self.name}'. Please make sure to either "                      │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
StepInterfaceError: Unable to find materializer for output 'untrained_model' of type `<class 
'keras.engine.sequential.Sequential'>` in step 'initialize_model'. Please make sure to either explicitly set a 
materializer for step outputs using `step.with_return_materializers(...)` or registering a default materializer for
specific types by subclassing `BaseMaterializer` and setting its `ASSOCIATED_TYPES` class variable. For more 
information, visit https://docs.zenml.io/developer-guide/advanced-usage/materializer.