pipelines: Nested ParallelFor: can't resolve parameter from outer loop operation

What happened: I’m creating a pipeline which runs lots of training jobs in parallel. Parameters are passed as arrays, and all combinations of parameters are used to run.

First, new data into the system is validated (first step). Once per pipeline run! If one parameter changes, a different data set as a whole needs to be generated (this is the second step). For all combinations of two other parameters, the data set needs to be transformed and converted into a different format (third step). Then, all other parameters can use one of the exports generated.

The problem I think is that the output of export operations (from an outer ParallelFor loop) is being accessed from the inner operations (another ParallelFor loop within it).

Any advice greatly appreciated, thank you.

Error message:

invalid spec: templates.for-loop-for-loop-99fab5de-1.tasks.for-loop-for-loop-fea34ed3-2 failed to resolve {{inputs.parameters.create-dataset-dataset_id}}

Trimmed (pseudo-) code, which builds/uploads fine, but does not create any run successfully:

@dsl.pipeline(
    name='Hyperparameter Tuning',
    description='Hyperparameter Tuning'
)
def ht_op(
    someparam,
    dataset_label,
    a=[5.0],
    b=[30.0],
    epochs=20,
    c=[True],
    layers=[[{"lstm_nodes":128,"dropout":0.2},{"lstm_nodes":128,"dropout":0.2}]],
    learning_rate = [1e-3],
    decay = [1e-5]
    ):

    validate = validate_op()
    
    with dsl.ParallelFor(a) as _a:
        
        # one dataset per value of a
        new_dataset = create_dataset_op(
            label=("%s %ss") % (dataset_label, _a),
            a=_a,
            someparam=someparam
        )
        new_dataset.after(validate)
        
        # we'll need a new export for each (b,c) pair
        with dsl.ParallelFor(b) as _b:
            with dsl.ParallelFor(c) as _c:
                export_dataset = export_op(
                    dataset_id=new_dataset.output, # here's the problem?
                    dataset_label=dataset_label,
                    b=_b,
                    c=_c)

                with dsl.ParallelFor(layers) as _layers:
                    with dsl.ParallelFor(learning_rate) as _learning_rate:
                        with dsl.ParallelFor(decay) as _decay:
                            make_model = bb_make_model_op(
                                label=("%s %ss hptuning") % (dataset_label, _a),
                                dataset_id=new_dataset.output, # used again here
                                training_data_url=export_dataset.outputs['training_data_uri'],
                                validation_data_url=export_dataset.outputs['validation_data_uri'],
                                test_data_url=export_dataset.outputs['test_data_uri'],
                                layers=_layers,
                                learning_rate=_learning_rate,
                                decay=_decay,
                                version=export_dataset.outputs['version']
                            )

                            make_model.container.add_resource_request('cpu', cpu_request)
                            make_model.container.add_resource_request('memory', memory_request)
                            make_model.container.add_resource_limit('cpu', cpu_limit)
                            make_model.container.add_resource_limit('memory', memory_limit)

    
# compile pipeline now
pipeline_func = ht_op
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)
client.upload_pipeline(pipeline_package_path=pipeline_filename,pipeline_name='Hyperparameter Tuning')

What did you expect to happen: Run to be created, graph to be displayed.

What steps did you take: Ran the above code, created a run.

Other This nested sample:

https://github.com/kubeflow/pipelines/blob/master/sdk/python/tests/compiler/testdata/withitem_nested.py#L42

…looks to work, but it doesn’t use output from the outer loop. Should op11 here be able to take an input from an output of op1?

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 2
  • Comments: 18 (4 by maintainers)

Most upvoted comments

I’m still having the same problem. Is this fixed?