argo-workflows: Output artifact duplicated in a fan-in topology with the use of retryStrategy

Summary

An output artifact is duplicated in a fan-in topology when a pre-fan-in step producing the artifact specifies retryStrategy; the duplication occurs regardless of whether there is an actual retry or not. I can work around this duplication behavior by deduping the result on my end. That said, if this is a bug, itโ€™d be great if it could be addressed on the Argo side.

Diagnostics

What Kubernetes provider are you using? v1.18

What version of Argo Workflows are you running? v2.11.7

Reproduction steps are borrowed from StackOverflow (and this enhancement request) with a minor addition that retryStrategy is added in the write step (the retry limit 3 does not matter for reproducibility and can be arbitrary).

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt
    retryStrategy:
      limit: 3

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]

The above fan-in step produces an output that looks like

[{"number":"20"},{"number":"20"},{"number":"21"},{"number":"21"},{"number":"22"},{"number":"22"},{"number":"23"},{"number":"23"},{"number":"24"},{"number":"24"},{"number":"25"},{"number":"25"},{"number":"26"},{"number":"26"},{"number":"27"},{"number":"27"},{"number":"28"},{"number":"28"},{"number":"29"},{"number":"29"},{"number":"30"},{"number":"30"}]

when we should expect each number to appear only once.

Also, the same issue can be reproducible with a DAG instead of steps:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    dag:
      tasks:
      - name: generate
        template: gen-number-list
      - name: write
        depends: "generate.Succeeded"
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{tasks.generate.outputs.result}}"
      - name: fan-in
        depends: "write.Succeeded"
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{tasks.write.outputs.parameters}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt
    retryStrategy:
      limit: 3

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]

Message from the maintainers:

Impacted by this bug? Give it a ๐Ÿ‘. We prioritise the issues with the most ๐Ÿ‘.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 13
  • Comments: 18 (3 by maintainers)

Most upvoted comments

Keeping this alive

There is a bug in processAggregateNodeOutputs for dag or steps in workflow-controller.

The bug source code is:

For fanout task/step, argo will generate a nodeName based these output items with prefix โ€œ(โ€:

For retryStrategy, argo will generate a childNode for node with โ€œ(childIndex)โ€:

So, both the parentNodeName and childNodeName are contains nodeName and prefix โ€œ(โ€, but in bug source code only โ€œstrings.HasPrefix(node.Name, ancestorNode.Name+โ€(โ€œ)โ€ is not enough. And the result will contains both parentNode and childNodes.

Hi @ebr,

Yes, at the least this workaround has been working on my end, and here is what looks like, reusing my workflow loop-param-result-example from above.

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json

        numbers = {{inputs.parameters.numbers}}

        # For instance, n looks like {"number": "20"} and we hash on the value part, e.g. "20" in this case.
        numbers_deduped = {n['number']: n for n in numbers}.values()

        print(json.dumps(list(numbers_deduped)))

Basically, I filter duplicates in the last step fan-in, using a Pythonโ€™s dict. The print function then yields:

[{"number": "20"}, {"number": "21"}, {"number": "22"}, {"number": "23"}, {"number": "24"}, {"number": "25"}, {"number": "26"}, {"number": "2
7"}, {"number": "28"}, {"number": "29"}, {"number": "30"}]

Should this issue get fixed, I will remove this band-aid fix. Hope this answers your question?