swift-async-algorithms: `throttle` does not emit the latest element at interval occurrence when latest is `true`

Unlike similar operator in Combine the throttle’s latest parameter is confusing, as it does not emit any latest, but a next element instead.

When latest parameter is set to true

Actual behavior:

The throttle emits the next element from the base sequence, if any, after the interval occurs. Meaning if there’s no new elements emitted after the interval occurrence, the latest element is just skipped, leading to only the first element being emitted.

Expected behavior:

The throttle emits the latest cached element instantly when the interval occurs.

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 23 (6 by maintainers)

Most upvoted comments

We have been working on a 1.0.0 for quite some time now and are just going through the final open issues that are blocking the release. The goal for 1.0.0 is not feature parity or providing all possible algorithms but to have a baseline of algorithms that are working and functional. There are many more algorithms that we want to support but the 1.0.0 is important for us to settle the foundation. Furthermore, it is important for the ecosystem since nobody can really depend on this package before the 1.0.0 is out.

A side topic, but I was somewhat surprised to see a 1.0.0 release already. There is no description or a changelog for it. Just wondering, are there some breaking changes to justify the major increment or was it more of a milestone thing? I’d personally like to see 1.0.0 when we already have feature parity with Combine (at least its first version as of iOS 13 SDK), and throttle is one thing which is missing currently and is very demanded by UI app developers.

Could you do me a favour an try to write a validation test that represents what currently is not working like you expect it to work.

Let me help there. I believe this validation test should pass but it doesn’t:

    validate {
       "ab--cd-e-f---gh|"
       $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
       "a--b--d--f---g--[h|]"
     }

@phausler just put up a PR https://github.com/apple/swift-async-algorithms/pull/292 to change the behaviour. We would appreciate if you all could check that it is doing what you expected it to do now.

@kdubb I think that is the expected behaviour from throttle though. It will apply infinite demand as long as it has demand on itself. If you have a root asynchronous sequence that produces infinite values instantly then throttle will produce exactly one value at every interval.

I think you want another algorithm for what you describe here.

Just as a use case, I attempted to use throttle to sample the audio level from an AVAudioRecorder; which immediately returns the current value. During a simple test I realized it was consuming values at an unchecked pace.

One would indeed expect to receive the last element when using throttle. A very common use case for example is to throttle a sequence of progress values where receiving the last element (i.e. 100%) is crucial.

In this example:

var numbers = Array((0...100).reversed())
let stream = AsyncStream {
    try! await Task.sleep(nanoseconds: 50_000_000)
    return numbers.popLast()
}

Task {
    let sequence = stream
        .throttle(for: .seconds(1))
    for try await num in sequence {
        print(num)
    }
}

The output is:

0 19 38 58 77 96

Which is unexpected as the latest element is 100.

The problem is that in the AsyncThrottleSequence implementation we can wait for the next element longer then interval which can result in the latest element being ‘stuck’. Wouldn’t it make sense to add a timeout when awaiting the base sequence’s next() so that the latest element is always emitted after interval amount of time passed?

public mutating func next() async rethrows -> Reduced? {
    var reduced: Reduced?
    let start = last ?? clock.now
    repeat {
        // Return reduced if we wait longer then interval (and reduced was not returned previously)
        guard let element = try await base.next() else {
            return nil
        }
        let reduction = await reducing(reduced, element)
        let now = clock.now
        if start.duration(to: now) >= interval || last == nil {
            last = now
            return reduction
        } else {
            reduced = reduction
        }
    } while true
}