beam: python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows

not only there are more than 1 result per window, results for each window got duplicated as well.

here is some code I made to reproduce the issue, just run it with and without *.with_fanout*

if running with Dataflow runner, add appropriate *gs://path/* in *WriteToText*


import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp

class ListFn(beam.CombineFn):
  def create_accumulator(self):
    return []


 def add_input(self, mutable_accumulator, element):
    return mutable_accumulator + [element]


 def merge_accumulators(self, accumulators):
    res = []
    for accu in accumulators:
      res = res + accu
    return res

  def extract_output(self, accumulator):
    return accumulator


p = beam.Pipeline()

(
    p
    | beam.Create([
      window.TimestampedValue(1, Timestamp(seconds=1596216396)),

     window.TimestampedValue(2, Timestamp(seconds=1596216397)),
      window.TimestampedValue(3, Timestamp(seconds=1596216398)),

     window.TimestampedValue(4, Timestamp(seconds=1596216399)),
      window.TimestampedValue(5, Timestamp(seconds=1596216400)),

     window.TimestampedValue(6, Timestamp(seconds=1596216402)),
      window.TimestampedValue(7, Timestamp(seconds=1596216403)),

     window.TimestampedValue(8, Timestamp(seconds=1596216405))])
    | beam.WindowInto(window.SlidingWindows(10, 5))
    | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
    | beam.Map(repr)

   | beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1))

p.run()

Imported from Jira BEAM-10617. Original Jira may contain additional context. Reported by: leiyiz.

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 33 (31 by maintainers)

Most upvoted comments

started working on this today, no PR yet.