pipelines: Unable to mount VolumeOp volumes inside recursive graph components

Small Backstory: I am attempting to design a pipeline that processes a large dataset in batches. This dataset is processed in batches via the batch_size pipeline parameter. Once the data has been batched, I am attempting to use a graph_component component to essentially loop over all batches and spawn processing nodes.

Each of these processing nodes needs to have the working directory created by the initial VolumeOp mounted to it.

What happened: When attempting to mount the VolumeOp-based volume into ContainerOps recursively spawned nodes, I receive the error: This step is in Error state with this message: volume '<pvc_that_was_just_created_by_volumeop>' not found in workflow spec

What did you expect to happen: The VolumeOp-based volume mounts to all ContainerOp containers, regardless of if they were spawned recursively or not.

What steps did you take: I have tried various ways to make this work, from simply calling add_pvolumes on the ContainerOps spawned recursively, to manually creating the k8s resources and appending them (.add_volume, .container.add_volume_mount), all the way to manually editing the generated YAML based on what I was reading regarding looping in the argo docs.

Anything else you would like to add: Here is a self-contained example that should reproduce this (attempted to attach to issue but GH was choking on the upload)

Instructions

1. Build a base image using the included Dockerfile
2. Upload this base image to Dockerhub
3. Replace the string `BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE` in `pipeline.py` with this newly created image+tag
4. Replace the string `CHANGE_ME_TO_A_GCS_BUCKET` with an existing GCS bucket you have access to
5. Ensure the file `500.json` exists in the bucket (included with example).
6. Compile and upload the pipeline
7. Run the pipeline

Dockerfile

FROM python:3.7-alpine
RUN pip install \
  requests \
  google-cloud-storage

Oneliner for 500.json

python -c 'import json; fd = open("500.json", "w"); json.dump([1] * 500, fd); fd.close()'

pipeline.py

import kfp.dsl as dsl
import kfp.components as components
import kubernetes.client as k8s
from collections import namedtuple
import inspect

def data_load(bucket: str, key: str, work_dir: str) -> str:
  from google.cloud import storage
  from os.path import basename, join
  c = storage.Client()
  bucket = c.get_bucket(bucket)
  blob = bucket.get_blob(key)
  if blob:
    filename = join(work_dir, basename(key))
    blob.download_to_filename(filename)
    return filename
data_load_op = components.func_to_container_op(data_load, base_image='BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE')

def data_split(batch_size: int, data_filename: str, work_dir: str) -> str:
  print(f'data_split(batch_size={batch_size}, data_filename={data_filename})')
  def chunk(l: list, c: int):
    print(f'  -> chunk(l=[{len(l)}], c={c})')
    for i in range(0, len(l), c):
      print(f'    -> yield [{len(l[i:i+c])}]')
      yield l[i:i+c]
  import json, os, uuid
  with open(data_filename, 'r') as fd:
    all_data = json.load(fd)
    print(f'all data json contained {len(all_data)} records')
  chunks = list(chunk(all_data, batch_size))
  chunk_files = []
  for i, chnk in enumerate(chunks):
    chunk_file = os.path.join(work_dir, f'{i}.chunk')
    chunk_files.append(chunk_file)
    with open(chunk_file, 'w') as fd:
      json.dump(chnk, fd)
  return json.dumps(chunk_files)
data_split_op = components.func_to_container_op(data_split, base_image='BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE')

Next = namedtuple('result', 'index batch')
def next_batch(index: int, batches: str) -> Next:
  print(f'next_batch(index={index}, batches={batches}')
  import json
  batches_ = json.loads(batches)
  if index <= len(batches_) - 1:
    nxt = batches_[index]
    print(f'  -> return Next(index={index+1}, batch={nxt})')
    return Next(index=index+1, batch=nxt)
  print(f'-> return Next(index=-1, batch=None)')
  return Next(index=-1, batch=None)
next_op = components.func_to_container_op(
  next_batch, 
  base_image='python:3.7-alpine',
  extra_code="from collections import namedtuple; Next = namedtuple('result', 'index batch')"
)

def process_batch(batch: str) -> None:
  import json
  with open(batch, 'r') as fd:
    batch_data = json.load(fd)
  print(f'loaded {len(batch_data)} records in batch')
  import time
  import random
  sleep_secs = random.randrange(0, 100)
  print(f'sleeping for {sleep_secs} seconds')
  time.sleep(sleep_secs)
process_op = components.func_to_container_op(process_batch, base_image='BUILD_IMAGE_WITH_INCLUDED_DOCKERFILE')

def print_message(message: str) -> None:
  print(message or 'NO MESSAGE SUPPLIED!')
print_op = components.func_to_container_op(print_message, base_image='python:3.7-alpine')

@dsl.graph_component
def looper(batch, batches, index, work_vol_name, work_dir):
  import kfp.gcp as gcp
  p1 = process_op(batch).apply(gcp.use_gcp_secret('user-gcp-sa'))
  p1.add_pvolumes({work_dir: k8s.V1Volume(name=work_vol_name)})
  next_ = next_op(index, batches)
  with dsl.Condition(next_.outputs['index'] != '-1'):
    looper(next_.outputs['batch'], batches, next_.outputs['index'], work_vol_name, work_dir)

@dsl.pipeline(name='Self-Contained Example')
def pipeline_func(bucket="CHANGE_ME_TO_A_GCS_BUCKET",
                  key="500.json",
                  batch_size=250,
                  work_dir='/var/pipeline'):
  #################################################
  # meta init   
  import time
  import kfp.gcp as gcp
  from os import path
  from uuid import uuid4

  WORK_DIR = work_dir
  WORK_VOLUME_NAME = 'pipeline-workdir'
  workdirop = dsl.VolumeOp(
    name='Create Scratch Volume',
    resource_name=WORK_VOLUME_NAME,
    size='1Gi',
    modes=dsl.VOLUME_MODE_RWM,
    storage_class='nfs'
  )

  load_op = data_load_op(
    bucket=bucket,
    key=key,
    work_dir=WORK_DIR
  ).apply(gcp.use_gcp_secret('user-gcp-sa')).add_pvolumes({WORK_DIR: workdirop.volume}).after(workdirop)
  load_op.container.set_image_pull_policy('Always')

  split_op = data_split_op(
    batch_size=batch_size,
    work_dir=WORK_DIR,
    data_filename=load_op.output
  ).apply(gcp.use_gcp_secret('user-gcp-sa')).add_pvolumes({WORK_DIR: workdirop.volume}).after(load_op)

  first_op = next_op(
    index=0,
    batches=split_op.output
  )

  loop_op = looper(
    batch=first_op.outputs['batch'], 
    batches=split_op.output, 
    index=first_op.outputs['index'],
    work_vol_name=workdirop.outputs['name'],
    work_dir=WORK_DIR
  ).after(workdirop).after(load_op).after(split_op).after(first_op)

  complete_op = print_op(message='DONE').after(loop_op)

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 1
  • Comments: 16 (8 by maintainers)

Commits related to this issue

Most upvoted comments

It seems to me that we would just need to include PipelineVolume in dsl._component.py here

like:

from ._pipeline_volume import PipelineVolume
    
...

if not isinstance(input, (PipelineParam, PipelineVolume)):
        raise ValueError('arguments to ' + func.__name__ + ' should be PipelineParams or PipelineVolumes.')