beam: python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows
not only there are more than 1 result per window, results for each window got duplicated as well.
here is some code I made to reproduce the issue, just run it with and without *.with_fanout*
if running with Dataflow runner, add appropriate *gs://path/*
in *WriteToText*
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp
class ListFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, mutable_accumulator, element):
return mutable_accumulator + [element]
def merge_accumulators(self, accumulators):
res = []
for accu in accumulators:
res = res + accu
return res
def extract_output(self, accumulator):
return accumulator
p = beam.Pipeline()
(
p
| beam.Create([
window.TimestampedValue(1, Timestamp(seconds=1596216396)),
window.TimestampedValue(2, Timestamp(seconds=1596216397)),
window.TimestampedValue(3, Timestamp(seconds=1596216398)),
window.TimestampedValue(4, Timestamp(seconds=1596216399)),
window.TimestampedValue(5, Timestamp(seconds=1596216400)),
window.TimestampedValue(6, Timestamp(seconds=1596216402)),
window.TimestampedValue(7, Timestamp(seconds=1596216403)),
window.TimestampedValue(8, Timestamp(seconds=1596216405))])
| beam.WindowInto(window.SlidingWindows(10, 5))
| beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
| beam.Map(repr)
| beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1))
p.run()
Imported from Jira BEAM-10617. Original Jira may contain additional context. Reported by: leiyiz.
About this issue
- Original URL
- State: open
- Created 2 years ago
- Comments: 33 (31 by maintainers)
started working on this today, no PR yet.